this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 body::Body, 5 extract::{Query, State}, 6 http::StatusCode, 7 http::header, 8 response::{IntoResponse, Response}, 9}; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use tracing::error; 13 14#[derive(Deserialize)] 15pub struct GetBlobParams { 16 pub did: String, 17 pub cid: String, 18} 19 20pub async fn get_blob( 21 State(state): State<AppState>, 22 Query(params): Query<GetBlobParams>, 23) -> Response { 24 let did = params.did.trim(); 25 let cid = params.cid.trim(); 26 if did.is_empty() { 27 return ( 28 StatusCode::BAD_REQUEST, 29 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 30 ) 31 .into_response(); 32 } 33 if cid.is_empty() { 34 return ( 35 StatusCode::BAD_REQUEST, 36 Json(json!({"error": "InvalidRequest", "message": "cid is required"})), 37 ) 38 .into_response(); 39 } 40 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 41 .fetch_optional(&state.db) 42 .await; 43 match user_exists { 44 Ok(None) => { 45 return ( 46 StatusCode::NOT_FOUND, 47 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 48 ) 49 .into_response(); 50 } 51 Err(e) => { 52 error!("DB error in get_blob: {:?}", e); 53 return ( 54 StatusCode::INTERNAL_SERVER_ERROR, 55 Json(json!({"error": "InternalError"})), 56 ) 57 .into_response(); 58 } 59 Ok(Some(_)) => {} 60 } 61 let blob_result = sqlx::query!( 62 "SELECT storage_key, mime_type FROM blobs WHERE cid = $1", 63 cid 64 ) 65 .fetch_optional(&state.db) 66 .await; 67 match blob_result { 68 Ok(Some(row)) => { 69 let storage_key = &row.storage_key; 70 let mime_type = &row.mime_type; 71 match state.blob_store.get(storage_key).await { 72 Ok(data) => Response::builder() 73 .status(StatusCode::OK) 74 .header(header::CONTENT_TYPE, mime_type) 75 .body(Body::from(data)) 76 .unwrap(), 77 Err(e) => { 78 error!("Failed to fetch blob from storage: {:?}", e); 79 ( 80 StatusCode::NOT_FOUND, 81 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})), 82 ) 83 .into_response() 84 } 85 } 86 } 87 Ok(None) => ( 88 StatusCode::NOT_FOUND, 89 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})), 90 ) 91 .into_response(), 92 Err(e) => { 93 error!("DB error in get_blob: {:?}", e); 94 ( 95 StatusCode::INTERNAL_SERVER_ERROR, 96 Json(json!({"error": "InternalError"})), 97 ) 98 .into_response() 99 } 100 } 101} 102 103#[derive(Deserialize)] 104pub struct ListBlobsParams { 105 pub did: String, 106 pub since: Option<String>, 107 pub limit: Option<i64>, 108 pub cursor: Option<String>, 109} 110 111#[derive(Serialize)] 112pub struct ListBlobsOutput { 113 #[serde(skip_serializing_if = "Option::is_none")] 114 pub cursor: Option<String>, 115 pub cids: Vec<String>, 116} 117 118pub async fn list_blobs( 119 State(state): State<AppState>, 120 Query(params): Query<ListBlobsParams>, 121) -> Response { 122 let did = params.did.trim(); 123 if did.is_empty() { 124 return ( 125 StatusCode::BAD_REQUEST, 126 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 127 ) 128 .into_response(); 129 } 130 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 131 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 132 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 133 .fetch_optional(&state.db) 134 .await; 135 let user_id = match user_result { 136 Ok(Some(row)) => row.id, 137 Ok(None) => { 138 return ( 139 StatusCode::NOT_FOUND, 140 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 141 ) 142 .into_response(); 143 } 144 Err(e) => { 145 error!("DB error in list_blobs: {:?}", e); 146 return ( 147 StatusCode::INTERNAL_SERVER_ERROR, 148 Json(json!({"error": "InternalError"})), 149 ) 150 .into_response(); 151 } 152 }; 153 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = &params.since { 154 let since_time = chrono::DateTime::parse_from_rfc3339(since) 155 .map(|dt| dt.with_timezone(&chrono::Utc)) 156 .unwrap_or_else(|_| chrono::Utc::now()); 157 sqlx::query!( 158 r#" 159 SELECT cid FROM blobs 160 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3 161 ORDER BY cid ASC 162 LIMIT $4 163 "#, 164 user_id, 165 cursor_cid, 166 since_time, 167 limit + 1 168 ) 169 .fetch_all(&state.db) 170 .await 171 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 172 } else { 173 sqlx::query!( 174 r#" 175 SELECT cid FROM blobs 176 WHERE created_by_user = $1 AND cid > $2 177 ORDER BY cid ASC 178 LIMIT $3 179 "#, 180 user_id, 181 cursor_cid, 182 limit + 1 183 ) 184 .fetch_all(&state.db) 185 .await 186 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 187 }; 188 match cids_result { 189 Ok(cids) => { 190 let has_more = cids.len() as i64 > limit; 191 let cids: Vec<String> = cids.into_iter().take(limit as usize).collect(); 192 let next_cursor = if has_more { cids.last().cloned() } else { None }; 193 ( 194 StatusCode::OK, 195 Json(ListBlobsOutput { 196 cursor: next_cursor, 197 cids, 198 }), 199 ) 200 .into_response() 201 } 202 Err(e) => { 203 error!("DB error in list_blobs: {:?}", e); 204 ( 205 StatusCode::INTERNAL_SERVER_ERROR, 206 Json(json!({"error": "InternalError"})), 207 ) 208 .into_response() 209 } 210 } 211}