this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 body::Body, 5 extract::{Query, State}, 6 http::StatusCode, 7 http::header, 8 response::{IntoResponse, Response}, 9}; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use tracing::error; 13 14#[derive(Deserialize)] 15pub struct GetBlobParams { 16 pub did: String, 17 pub cid: String, 18} 19 20pub async fn get_blob( 21 State(state): State<AppState>, 22 Query(params): Query<GetBlobParams>, 23) -> Response { 24 let did = params.did.trim(); 25 let cid = params.cid.trim(); 26 if did.is_empty() { 27 return ( 28 StatusCode::BAD_REQUEST, 29 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 30 ) 31 .into_response(); 32 } 33 if cid.is_empty() { 34 return ( 35 StatusCode::BAD_REQUEST, 36 Json(json!({"error": "InvalidRequest", "message": "cid is required"})), 37 ) 38 .into_response(); 39 } 40 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 41 .fetch_optional(&state.db) 42 .await; 43 match user_exists { 44 Ok(None) => { 45 return ( 46 StatusCode::NOT_FOUND, 47 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 48 ) 49 .into_response(); 50 } 51 Err(e) => { 52 error!("DB error in get_blob: {:?}", e); 53 return ( 54 StatusCode::INTERNAL_SERVER_ERROR, 55 Json(json!({"error": "InternalError"})), 56 ) 57 .into_response(); 58 } 59 Ok(Some(_)) => {} 60 } 61 let blob_result = sqlx::query!( 62 "SELECT storage_key, mime_type FROM blobs WHERE cid = $1", 63 cid 64 ) 65 .fetch_optional(&state.db) 66 .await; 67 match blob_result { 68 Ok(Some(row)) => { 69 let storage_key = &row.storage_key; 70 let mime_type = &row.mime_type; 71 match state.blob_store.get(storage_key).await { 72 Ok(data) => Response::builder() 73 .status(StatusCode::OK) 74 .header(header::CONTENT_TYPE, mime_type) 75 .body(Body::from(data)) 76 .unwrap(), 77 Err(e) => { 78 error!("Failed to fetch blob from storage: {:?}", e); 79 ( 80 StatusCode::NOT_FOUND, 81 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})), 82 ) 83 .into_response() 84 } 85 } 86 } 87 Ok(None) => ( 88 StatusCode::NOT_FOUND, 89 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})), 90 ) 91 .into_response(), 92 Err(e) => { 93 error!("DB error in get_blob: {:?}", e); 94 ( 95 StatusCode::INTERNAL_SERVER_ERROR, 96 Json(json!({"error": "InternalError"})), 97 ) 98 .into_response() 99 } 100 } 101} 102 103#[derive(Deserialize)] 104pub struct ListBlobsParams { 105 pub did: String, 106 pub since: Option<String>, 107 pub limit: Option<i64>, 108 pub cursor: Option<String>, 109} 110 111#[derive(Serialize)] 112pub struct ListBlobsOutput { 113 pub cursor: Option<String>, 114 pub cids: Vec<String>, 115} 116 117pub async fn list_blobs( 118 State(state): State<AppState>, 119 Query(params): Query<ListBlobsParams>, 120) -> Response { 121 let did = params.did.trim(); 122 if did.is_empty() { 123 return ( 124 StatusCode::BAD_REQUEST, 125 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 126 ) 127 .into_response(); 128 } 129 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 130 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 131 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 132 .fetch_optional(&state.db) 133 .await; 134 let user_id = match user_result { 135 Ok(Some(row)) => row.id, 136 Ok(None) => { 137 return ( 138 StatusCode::NOT_FOUND, 139 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 140 ) 141 .into_response(); 142 } 143 Err(e) => { 144 error!("DB error in list_blobs: {:?}", e); 145 return ( 146 StatusCode::INTERNAL_SERVER_ERROR, 147 Json(json!({"error": "InternalError"})), 148 ) 149 .into_response(); 150 } 151 }; 152 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = &params.since { 153 let since_time = chrono::DateTime::parse_from_rfc3339(since) 154 .map(|dt| dt.with_timezone(&chrono::Utc)) 155 .unwrap_or_else(|_| chrono::Utc::now()); 156 sqlx::query!( 157 r#" 158 SELECT cid FROM blobs 159 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3 160 ORDER BY cid ASC 161 LIMIT $4 162 "#, 163 user_id, 164 cursor_cid, 165 since_time, 166 limit + 1 167 ) 168 .fetch_all(&state.db) 169 .await 170 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 171 } else { 172 sqlx::query!( 173 r#" 174 SELECT cid FROM blobs 175 WHERE created_by_user = $1 AND cid > $2 176 ORDER BY cid ASC 177 LIMIT $3 178 "#, 179 user_id, 180 cursor_cid, 181 limit + 1 182 ) 183 .fetch_all(&state.db) 184 .await 185 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 186 }; 187 match cids_result { 188 Ok(cids) => { 189 let has_more = cids.len() as i64 > limit; 190 let cids: Vec<String> = cids.into_iter().take(limit as usize).collect(); 191 let next_cursor = if has_more { cids.last().cloned() } else { None }; 192 ( 193 StatusCode::OK, 194 Json(ListBlobsOutput { 195 cursor: next_cursor, 196 cids, 197 }), 198 ) 199 .into_response() 200 } 201 Err(e) => { 202 error!("DB error in list_blobs: {:?}", e); 203 ( 204 StatusCode::INTERNAL_SERVER_ERROR, 205 Json(json!({"error": "InternalError"})), 206 ) 207 .into_response() 208 } 209 } 210}