this repo has no description
1use crate::state::AppState; 2use crate::sync::util::assert_repo_availability; 3use axum::{ 4 Json, 5 body::Body, 6 extract::{Query, State}, 7 http::StatusCode, 8 http::header, 9 response::{IntoResponse, Response}, 10}; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::error; 14 15#[derive(Deserialize)] 16pub struct GetBlobParams { 17 pub did: String, 18 pub cid: String, 19} 20 21pub async fn get_blob( 22 State(state): State<AppState>, 23 Query(params): Query<GetBlobParams>, 24) -> Response { 25 let did = params.did.trim(); 26 let cid = params.cid.trim(); 27 if did.is_empty() { 28 return ( 29 StatusCode::BAD_REQUEST, 30 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 31 ) 32 .into_response(); 33 } 34 if cid.is_empty() { 35 return ( 36 StatusCode::BAD_REQUEST, 37 Json(json!({"error": "InvalidRequest", "message": "cid is required"})), 38 ) 39 .into_response(); 40 } 41 42 let _account = match assert_repo_availability(&state.db, did, false).await { 43 Ok(a) => a, 44 Err(e) => return e.into_response(), 45 }; 46 47 let blob_result = sqlx::query!( 48 "SELECT storage_key, mime_type, size_bytes FROM blobs WHERE cid = $1", 49 cid 50 ) 51 .fetch_optional(&state.db) 52 .await; 53 match blob_result { 54 Ok(Some(row)) => { 55 let storage_key = &row.storage_key; 56 let mime_type = &row.mime_type; 57 let size_bytes = row.size_bytes; 58 match state.blob_store.get(storage_key).await { 59 Ok(data) => Response::builder() 60 .status(StatusCode::OK) 61 .header(header::CONTENT_TYPE, mime_type) 62 .header(header::CONTENT_LENGTH, size_bytes.to_string()) 63 .header("x-content-type-options", "nosniff") 64 .header("content-security-policy", "default-src 'none'; sandbox") 65 .body(Body::from(data)) 66 .unwrap(), 67 Err(e) => { 68 error!("Failed to fetch blob from storage: {:?}", e); 69 ( 70 StatusCode::NOT_FOUND, 71 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})), 72 ) 73 .into_response() 74 } 75 } 76 } 77 Ok(None) => ( 78 StatusCode::NOT_FOUND, 79 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})), 80 ) 81 .into_response(), 82 Err(e) => { 83 error!("DB error in get_blob: {:?}", e); 84 ( 85 StatusCode::INTERNAL_SERVER_ERROR, 86 Json(json!({"error": "InternalError"})), 87 ) 88 .into_response() 89 } 90 } 91} 92 93#[derive(Deserialize)] 94pub struct ListBlobsParams { 95 pub did: String, 96 pub since: Option<String>, 97 pub limit: Option<i64>, 98 pub cursor: Option<String>, 99} 100 101#[derive(Serialize)] 102pub struct ListBlobsOutput { 103 #[serde(skip_serializing_if = "Option::is_none")] 104 pub cursor: Option<String>, 105 pub cids: Vec<String>, 106} 107 108pub async fn list_blobs( 109 State(state): State<AppState>, 110 Query(params): Query<ListBlobsParams>, 111) -> Response { 112 let did = params.did.trim(); 113 if did.is_empty() { 114 return ( 115 StatusCode::BAD_REQUEST, 116 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 117 ) 118 .into_response(); 119 } 120 121 let account = match assert_repo_availability(&state.db, did, false).await { 122 Ok(a) => a, 123 Err(e) => return e.into_response(), 124 }; 125 126 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 127 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 128 let user_id = account.user_id; 129 130 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = &params.since { 131 sqlx::query_scalar!( 132 r#" 133 SELECT DISTINCT unnest(blobs) as "cid!" 134 FROM repo_seq 135 WHERE did = $1 AND rev > $2 AND blobs IS NOT NULL 136 "#, 137 did, 138 since 139 ) 140 .fetch_all(&state.db) 141 .await 142 .map(|mut cids| { 143 cids.sort(); 144 cids.into_iter() 145 .filter(|c| c.as_str() > cursor_cid) 146 .take((limit + 1) as usize) 147 .collect() 148 }) 149 } else { 150 sqlx::query!( 151 r#" 152 SELECT cid FROM blobs 153 WHERE created_by_user = $1 AND cid > $2 154 ORDER BY cid ASC 155 LIMIT $3 156 "#, 157 user_id, 158 cursor_cid, 159 limit + 1 160 ) 161 .fetch_all(&state.db) 162 .await 163 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 164 }; 165 match cids_result { 166 Ok(cids) => { 167 let has_more = cids.len() as i64 > limit; 168 let cids: Vec<String> = cids.into_iter().take(limit as usize).collect(); 169 let next_cursor = if has_more { cids.last().cloned() } else { None }; 170 ( 171 StatusCode::OK, 172 Json(ListBlobsOutput { 173 cursor: next_cursor, 174 cids, 175 }), 176 ) 177 .into_response() 178 } 179 Err(e) => { 180 error!("DB error in list_blobs: {:?}", e); 181 ( 182 StatusCode::INTERNAL_SERVER_ERROR, 183 Json(json!({"error": "InternalError"})), 184 ) 185 .into_response() 186 } 187 } 188}