this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 body::Body, 5 extract::{Query, State}, 6 http::StatusCode, 7 http::header, 8 response::{IntoResponse, Response}, 9}; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use tracing::error; 13 14#[derive(Deserialize)] 15pub struct GetBlobParams { 16 pub did: String, 17 pub cid: String, 18} 19 20pub async fn get_blob( 21 State(state): State<AppState>, 22 Query(params): Query<GetBlobParams>, 23) -> Response { 24 let did = params.did.trim(); 25 let cid = params.cid.trim(); 26 27 if did.is_empty() { 28 return ( 29 StatusCode::BAD_REQUEST, 30 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 31 ) 32 .into_response(); 33 } 34 35 if cid.is_empty() { 36 return ( 37 StatusCode::BAD_REQUEST, 38 Json(json!({"error": "InvalidRequest", "message": "cid is required"})), 39 ) 40 .into_response(); 41 } 42 43 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 44 .fetch_optional(&state.db) 45 .await; 46 47 match user_exists { 48 Ok(None) => { 49 return ( 50 StatusCode::NOT_FOUND, 51 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 52 ) 53 .into_response(); 54 } 55 Err(e) => { 56 error!("DB error in get_blob: {:?}", e); 57 return ( 58 StatusCode::INTERNAL_SERVER_ERROR, 59 Json(json!({"error": "InternalError"})), 60 ) 61 .into_response(); 62 } 63 Ok(Some(_)) => {} 64 } 65 66 let blob_result = sqlx::query!("SELECT storage_key, mime_type FROM blobs WHERE cid = $1", cid) 67 .fetch_optional(&state.db) 68 .await; 69 70 match blob_result { 71 Ok(Some(row)) => { 72 let storage_key = &row.storage_key; 73 let mime_type = &row.mime_type; 74 75 match state.blob_store.get(&storage_key).await { 76 Ok(data) => Response::builder() 77 .status(StatusCode::OK) 78 .header(header::CONTENT_TYPE, mime_type) 79 .body(Body::from(data)) 80 .unwrap(), 81 Err(e) => { 82 error!("Failed to fetch blob from storage: {:?}", e); 83 ( 84 StatusCode::NOT_FOUND, 85 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})), 86 ) 87 .into_response() 88 } 89 } 90 } 91 Ok(None) => ( 92 StatusCode::NOT_FOUND, 93 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})), 94 ) 95 .into_response(), 96 Err(e) => { 97 error!("DB error in get_blob: {:?}", e); 98 ( 99 StatusCode::INTERNAL_SERVER_ERROR, 100 Json(json!({"error": "InternalError"})), 101 ) 102 .into_response() 103 } 104 } 105} 106 107#[derive(Deserialize)] 108pub struct ListBlobsParams { 109 pub did: String, 110 pub since: Option<String>, 111 pub limit: Option<i64>, 112 pub cursor: Option<String>, 113} 114 115#[derive(Serialize)] 116pub struct ListBlobsOutput { 117 pub cursor: Option<String>, 118 pub cids: Vec<String>, 119} 120 121pub async fn list_blobs( 122 State(state): State<AppState>, 123 Query(params): Query<ListBlobsParams>, 124) -> Response { 125 let did = params.did.trim(); 126 127 if did.is_empty() { 128 return ( 129 StatusCode::BAD_REQUEST, 130 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 131 ) 132 .into_response(); 133 } 134 135 let limit = params.limit.unwrap_or(500).min(1000); 136 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 137 138 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 139 .fetch_optional(&state.db) 140 .await; 141 142 let user_id = match user_result { 143 Ok(Some(row)) => row.id, 144 Ok(None) => { 145 return ( 146 StatusCode::NOT_FOUND, 147 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 148 ) 149 .into_response(); 150 } 151 Err(e) => { 152 error!("DB error in list_blobs: {:?}", e); 153 return ( 154 StatusCode::INTERNAL_SERVER_ERROR, 155 Json(json!({"error": "InternalError"})), 156 ) 157 .into_response(); 158 } 159 }; 160 161 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = &params.since { 162 let since_time = chrono::DateTime::parse_from_rfc3339(since) 163 .map(|dt| dt.with_timezone(&chrono::Utc)) 164 .unwrap_or_else(|_| chrono::Utc::now()); 165 sqlx::query!( 166 r#" 167 SELECT cid FROM blobs 168 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3 169 ORDER BY cid ASC 170 LIMIT $4 171 "#, 172 user_id, 173 cursor_cid, 174 since_time, 175 limit + 1 176 ) 177 .fetch_all(&state.db) 178 .await 179 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 180 } else { 181 sqlx::query!( 182 r#" 183 SELECT cid FROM blobs 184 WHERE created_by_user = $1 AND cid > $2 185 ORDER BY cid ASC 186 LIMIT $3 187 "#, 188 user_id, 189 cursor_cid, 190 limit + 1 191 ) 192 .fetch_all(&state.db) 193 .await 194 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 195 }; 196 197 match cids_result { 198 Ok(cids) => { 199 let has_more = cids.len() as i64 > limit; 200 let cids: Vec<String> = cids 201 .into_iter() 202 .take(limit as usize) 203 .collect(); 204 205 let next_cursor = if has_more { 206 cids.last().cloned() 207 } else { 208 None 209 }; 210 211 ( 212 StatusCode::OK, 213 Json(ListBlobsOutput { 214 cursor: next_cursor, 215 cids, 216 }), 217 ) 218 .into_response() 219 } 220 Err(e) => { 221 error!("DB error in list_blobs: {:?}", e); 222 ( 223 StatusCode::INTERNAL_SERVER_ERROR, 224 Json(json!({"error": "InternalError"})), 225 ) 226 .into_response() 227 } 228 } 229}