this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 body::Body, 5 extract::{Query, State}, 6 http::StatusCode, 7 http::header, 8 response::{IntoResponse, Response}, 9}; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use tracing::error; 13#[derive(Deserialize)] 14pub struct GetBlobParams { 15 pub did: String, 16 pub cid: String, 17} 18pub async fn get_blob( 19 State(state): State<AppState>, 20 Query(params): Query<GetBlobParams>, 21) -> Response { 22 let did = params.did.trim(); 23 let cid = params.cid.trim(); 24 if did.is_empty() { 25 return ( 26 StatusCode::BAD_REQUEST, 27 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 28 ) 29 .into_response(); 30 } 31 if cid.is_empty() { 32 return ( 33 StatusCode::BAD_REQUEST, 34 Json(json!({"error": "InvalidRequest", "message": "cid is required"})), 35 ) 36 .into_response(); 37 } 38 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 39 .fetch_optional(&state.db) 40 .await; 41 match user_exists { 42 Ok(None) => { 43 return ( 44 StatusCode::NOT_FOUND, 45 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 46 ) 47 .into_response(); 48 } 49 Err(e) => { 50 error!("DB error in get_blob: {:?}", e); 51 return ( 52 StatusCode::INTERNAL_SERVER_ERROR, 53 Json(json!({"error": "InternalError"})), 54 ) 55 .into_response(); 56 } 57 Ok(Some(_)) => {} 58 } 59 let blob_result = sqlx::query!("SELECT storage_key, mime_type FROM blobs WHERE cid = $1", cid) 60 .fetch_optional(&state.db) 61 .await; 62 match blob_result { 63 Ok(Some(row)) => { 64 let storage_key = &row.storage_key; 65 let mime_type = &row.mime_type; 66 match state.blob_store.get(&storage_key).await { 67 Ok(data) => Response::builder() 68 .status(StatusCode::OK) 69 .header(header::CONTENT_TYPE, mime_type) 70 .body(Body::from(data)) 71 .unwrap(), 72 Err(e) => { 73 error!("Failed to fetch blob from storage: {:?}", e); 74 ( 75 StatusCode::NOT_FOUND, 76 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})), 77 ) 78 .into_response() 79 } 80 } 81 } 82 Ok(None) => ( 83 StatusCode::NOT_FOUND, 84 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})), 85 ) 86 .into_response(), 87 Err(e) => { 88 error!("DB error in get_blob: {:?}", e); 89 ( 90 StatusCode::INTERNAL_SERVER_ERROR, 91 Json(json!({"error": "InternalError"})), 92 ) 93 .into_response() 94 } 95 } 96} 97#[derive(Deserialize)] 98pub struct ListBlobsParams { 99 pub did: String, 100 pub since: Option<String>, 101 pub limit: Option<i64>, 102 pub cursor: Option<String>, 103} 104#[derive(Serialize)] 105pub struct ListBlobsOutput { 106 pub cursor: Option<String>, 107 pub cids: Vec<String>, 108} 109pub async fn list_blobs( 110 State(state): State<AppState>, 111 Query(params): Query<ListBlobsParams>, 112) -> Response { 113 let did = params.did.trim(); 114 if did.is_empty() { 115 return ( 116 StatusCode::BAD_REQUEST, 117 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 118 ) 119 .into_response(); 120 } 121 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 122 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 123 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 124 .fetch_optional(&state.db) 125 .await; 126 let user_id = match user_result { 127 Ok(Some(row)) => row.id, 128 Ok(None) => { 129 return ( 130 StatusCode::NOT_FOUND, 131 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 132 ) 133 .into_response(); 134 } 135 Err(e) => { 136 error!("DB error in list_blobs: {:?}", e); 137 return ( 138 StatusCode::INTERNAL_SERVER_ERROR, 139 Json(json!({"error": "InternalError"})), 140 ) 141 .into_response(); 142 } 143 }; 144 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = &params.since { 145 let since_time = chrono::DateTime::parse_from_rfc3339(since) 146 .map(|dt| dt.with_timezone(&chrono::Utc)) 147 .unwrap_or_else(|_| chrono::Utc::now()); 148 sqlx::query!( 149 r#" 150 SELECT cid FROM blobs 151 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3 152 ORDER BY cid ASC 153 LIMIT $4 154 "#, 155 user_id, 156 cursor_cid, 157 since_time, 158 limit + 1 159 ) 160 .fetch_all(&state.db) 161 .await 162 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 163 } else { 164 sqlx::query!( 165 r#" 166 SELECT cid FROM blobs 167 WHERE created_by_user = $1 AND cid > $2 168 ORDER BY cid ASC 169 LIMIT $3 170 "#, 171 user_id, 172 cursor_cid, 173 limit + 1 174 ) 175 .fetch_all(&state.db) 176 .await 177 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 178 }; 179 match cids_result { 180 Ok(cids) => { 181 let has_more = cids.len() as i64 > limit; 182 let cids: Vec<String> = cids 183 .into_iter() 184 .take(limit as usize) 185 .collect(); 186 let next_cursor = if has_more { 187 cids.last().cloned() 188 } else { 189 None 190 }; 191 ( 192 StatusCode::OK, 193 Json(ListBlobsOutput { 194 cursor: next_cursor, 195 cids, 196 }), 197 ) 198 .into_response() 199 } 200 Err(e) => { 201 error!("DB error in list_blobs: {:?}", e); 202 ( 203 StatusCode::INTERNAL_SERVER_ERROR, 204 Json(json!({"error": "InternalError"})), 205 ) 206 .into_response() 207 } 208 } 209}