this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 body::Body, 5 extract::{Query, State}, 6 http::StatusCode, 7 http::header, 8 response::{IntoResponse, Response}, 9}; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use tracing::error; 13 14#[derive(Deserialize)] 15pub struct GetBlobParams { 16 pub did: String, 17 pub cid: String, 18} 19 20pub async fn get_blob( 21 State(state): State<AppState>, 22 Query(params): Query<GetBlobParams>, 23) -> Response { 24 let did = params.did.trim(); 25 let cid = params.cid.trim(); 26 if did.is_empty() { 27 return ( 28 StatusCode::BAD_REQUEST, 29 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 30 ) 31 .into_response(); 32 } 33 if cid.is_empty() { 34 return ( 35 StatusCode::BAD_REQUEST, 36 Json(json!({"error": "InvalidRequest", "message": "cid is required"})), 37 ) 38 .into_response(); 39 } 40 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 41 .fetch_optional(&state.db) 42 .await; 43 match user_exists { 44 Ok(None) => { 45 return ( 46 StatusCode::NOT_FOUND, 47 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 48 ) 49 .into_response(); 50 } 51 Err(e) => { 52 error!("DB error in get_blob: {:?}", e); 53 return ( 54 StatusCode::INTERNAL_SERVER_ERROR, 55 Json(json!({"error": "InternalError"})), 56 ) 57 .into_response(); 58 } 59 Ok(Some(_)) => {} 60 } 61 let blob_result = sqlx::query!("SELECT storage_key, mime_type FROM blobs WHERE cid = $1", cid) 62 .fetch_optional(&state.db) 63 .await; 64 match blob_result { 65 Ok(Some(row)) => { 66 let storage_key = &row.storage_key; 67 let mime_type = &row.mime_type; 68 match state.blob_store.get(&storage_key).await { 69 Ok(data) => Response::builder() 70 .status(StatusCode::OK) 71 .header(header::CONTENT_TYPE, mime_type) 72 .body(Body::from(data)) 73 .unwrap(), 74 Err(e) => { 75 error!("Failed to fetch blob from storage: {:?}", e); 76 ( 77 StatusCode::NOT_FOUND, 78 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})), 79 ) 80 .into_response() 81 } 82 } 83 } 84 Ok(None) => ( 85 StatusCode::NOT_FOUND, 86 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})), 87 ) 88 .into_response(), 89 Err(e) => { 90 error!("DB error in get_blob: {:?}", e); 91 ( 92 StatusCode::INTERNAL_SERVER_ERROR, 93 Json(json!({"error": "InternalError"})), 94 ) 95 .into_response() 96 } 97 } 98} 99 100#[derive(Deserialize)] 101pub struct ListBlobsParams { 102 pub did: String, 103 pub since: Option<String>, 104 pub limit: Option<i64>, 105 pub cursor: Option<String>, 106} 107 108#[derive(Serialize)] 109pub struct ListBlobsOutput { 110 pub cursor: Option<String>, 111 pub cids: Vec<String>, 112} 113 114pub async fn list_blobs( 115 State(state): State<AppState>, 116 Query(params): Query<ListBlobsParams>, 117) -> Response { 118 let did = params.did.trim(); 119 if did.is_empty() { 120 return ( 121 StatusCode::BAD_REQUEST, 122 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 123 ) 124 .into_response(); 125 } 126 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 127 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 128 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 129 .fetch_optional(&state.db) 130 .await; 131 let user_id = match user_result { 132 Ok(Some(row)) => row.id, 133 Ok(None) => { 134 return ( 135 StatusCode::NOT_FOUND, 136 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 137 ) 138 .into_response(); 139 } 140 Err(e) => { 141 error!("DB error in list_blobs: {:?}", e); 142 return ( 143 StatusCode::INTERNAL_SERVER_ERROR, 144 Json(json!({"error": "InternalError"})), 145 ) 146 .into_response(); 147 } 148 }; 149 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = &params.since { 150 let since_time = chrono::DateTime::parse_from_rfc3339(since) 151 .map(|dt| dt.with_timezone(&chrono::Utc)) 152 .unwrap_or_else(|_| chrono::Utc::now()); 153 sqlx::query!( 154 r#" 155 SELECT cid FROM blobs 156 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3 157 ORDER BY cid ASC 158 LIMIT $4 159 "#, 160 user_id, 161 cursor_cid, 162 since_time, 163 limit + 1 164 ) 165 .fetch_all(&state.db) 166 .await 167 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 168 } else { 169 sqlx::query!( 170 r#" 171 SELECT cid FROM blobs 172 WHERE created_by_user = $1 AND cid > $2 173 ORDER BY cid ASC 174 LIMIT $3 175 "#, 176 user_id, 177 cursor_cid, 178 limit + 1 179 ) 180 .fetch_all(&state.db) 181 .await 182 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 183 }; 184 match cids_result { 185 Ok(cids) => { 186 let has_more = cids.len() as i64 > limit; 187 let cids: Vec<String> = cids 188 .into_iter() 189 .take(limit as usize) 190 .collect(); 191 let next_cursor = if has_more { 192 cids.last().cloned() 193 } else { 194 None 195 }; 196 ( 197 StatusCode::OK, 198 Json(ListBlobsOutput { 199 cursor: next_cursor, 200 cids, 201 }), 202 ) 203 .into_response() 204 } 205 Err(e) => { 206 error!("DB error in list_blobs: {:?}", e); 207 ( 208 StatusCode::INTERNAL_SERVER_ERROR, 209 Json(json!({"error": "InternalError"})), 210 ) 211 .into_response() 212 } 213 } 214}