this repo has no description
1use crate::state::AppState; 2use axum::body::Bytes; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use jacquard_repo::storage::BlockStore; 11use multihash::Multihash; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use sha2::{Digest, Sha256}; 15use std::str::FromStr; 16use tracing::error; 17 18const MAX_BLOB_SIZE: usize = 1_000_000; 19 20pub async fn upload_blob( 21 State(state): State<AppState>, 22 headers: axum::http::HeaderMap, 23 body: Bytes, 24) -> Response { 25 if body.len() > MAX_BLOB_SIZE { 26 return ( 27 StatusCode::PAYLOAD_TOO_LARGE, 28 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), MAX_BLOB_SIZE)})), 29 ) 30 .into_response(); 31 } 32 let token = match crate::auth::extract_bearer_token_from_header( 33 headers.get("Authorization").and_then(|h| h.to_str().ok()), 34 ) { 35 Some(t) => t, 36 None => { 37 return ( 38 StatusCode::UNAUTHORIZED, 39 Json(json!({"error": "AuthenticationRequired"})), 40 ) 41 .into_response(); 42 } 43 }; 44 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 45 Ok(user) => user, 46 Err(_) => { 47 return ( 48 StatusCode::UNAUTHORIZED, 49 Json(json!({"error": "AuthenticationFailed"})), 50 ) 51 .into_response(); 52 } 53 }; 54 let did = auth_user.did; 55 let mime_type = headers 56 .get("content-type") 57 .and_then(|h| h.to_str().ok()) 58 .unwrap_or("application/octet-stream") 59 .to_string(); 60 let size = body.len() as i64; 61 let data = body.to_vec(); 62 let mut hasher = Sha256::new(); 63 hasher.update(&data); 64 let hash = hasher.finalize(); 65 let multihash = match Multihash::wrap(0x12, &hash) { 66 Ok(mh) => mh, 67 Err(e) => { 68 error!("Failed to create multihash for blob: {:?}", e); 69 return ( 70 StatusCode::INTERNAL_SERVER_ERROR, 71 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})), 72 ) 73 .into_response(); 74 } 75 }; 76 let cid = Cid::new_v1(0x55, multihash); 77 let cid_str = cid.to_string(); 78 let storage_key = format!("blobs/{}", cid_str); 79 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 80 .fetch_optional(&state.db) 81 .await; 82 let user_id = match user_query { 83 Ok(Some(row)) => row.id, 84 _ => { 85 return ( 86 StatusCode::INTERNAL_SERVER_ERROR, 87 Json(json!({"error": "InternalError"})), 88 ) 89 .into_response(); 90 } 91 }; 92 let mut tx = match state.db.begin().await { 93 Ok(tx) => tx, 94 Err(e) => { 95 error!("Failed to begin transaction: {:?}", e); 96 return ( 97 StatusCode::INTERNAL_SERVER_ERROR, 98 Json(json!({"error": "InternalError"})), 99 ) 100 .into_response(); 101 } 102 }; 103 let insert = sqlx::query!( 104 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 105 cid_str, 106 mime_type, 107 size, 108 user_id, 109 storage_key 110 ) 111 .fetch_optional(&mut *tx) 112 .await; 113 let was_inserted = match insert { 114 Ok(Some(_)) => true, 115 Ok(None) => false, 116 Err(e) => { 117 error!("Failed to insert blob record: {:?}", e); 118 return ( 119 StatusCode::INTERNAL_SERVER_ERROR, 120 Json(json!({"error": "InternalError"})), 121 ) 122 .into_response(); 123 } 124 }; 125 if was_inserted 126 && let Err(e) = state 127 .blob_store 128 .put_bytes(&storage_key, bytes::Bytes::from(data)) 129 .await 130 { 131 error!("Failed to upload blob to storage: {:?}", e); 132 return ( 133 StatusCode::INTERNAL_SERVER_ERROR, 134 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 135 ) 136 .into_response(); 137 } 138 if let Err(e) = tx.commit().await { 139 error!("Failed to commit blob transaction: {:?}", e); 140 if was_inserted 141 && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 142 error!( 143 "Failed to cleanup orphaned blob {}: {:?}", 144 storage_key, cleanup_err 145 ); 146 } 147 return ( 148 StatusCode::INTERNAL_SERVER_ERROR, 149 Json(json!({"error": "InternalError"})), 150 ) 151 .into_response(); 152 } 153 Json(json!({ 154 "blob": { 155 "$type": "blob", 156 "ref": { 157 "$link": cid_str 158 }, 159 "mimeType": mime_type, 160 "size": size 161 } 162 })) 163 .into_response() 164} 165 166#[derive(Deserialize)] 167pub struct ListMissingBlobsParams { 168 pub limit: Option<i64>, 169 pub cursor: Option<String>, 170} 171 172#[derive(Serialize)] 173#[serde(rename_all = "camelCase")] 174pub struct RecordBlob { 175 pub cid: String, 176 pub record_uri: String, 177} 178 179#[derive(Serialize)] 180pub struct ListMissingBlobsOutput { 181 pub cursor: Option<String>, 182 pub blobs: Vec<RecordBlob>, 183} 184 185fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) { 186 if let Some(obj) = val.as_object() { 187 if let Some(type_val) = obj.get("$type") 188 && type_val == "blob" 189 && let Some(r) = obj.get("ref") 190 && let Some(link) = r.get("$link") 191 && let Some(s) = link.as_str() { 192 blobs.push(s.to_string()); 193 } 194 for (_, v) in obj { 195 find_blobs(v, blobs); 196 } 197 } else if let Some(arr) = val.as_array() { 198 for v in arr { 199 find_blobs(v, blobs); 200 } 201 } 202} 203 204pub async fn list_missing_blobs( 205 State(state): State<AppState>, 206 headers: axum::http::HeaderMap, 207 Query(params): Query<ListMissingBlobsParams>, 208) -> Response { 209 let token = match crate::auth::extract_bearer_token_from_header( 210 headers.get("Authorization").and_then(|h| h.to_str().ok()), 211 ) { 212 Some(t) => t, 213 None => { 214 return ( 215 StatusCode::UNAUTHORIZED, 216 Json(json!({"error": "AuthenticationRequired"})), 217 ) 218 .into_response(); 219 } 220 }; 221 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 222 Ok(user) => user, 223 Err(_) => { 224 return ( 225 StatusCode::UNAUTHORIZED, 226 Json(json!({"error": "AuthenticationFailed"})), 227 ) 228 .into_response(); 229 } 230 }; 231 let did = auth_user.did; 232 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 233 .fetch_optional(&state.db) 234 .await; 235 let user_id = match user_query { 236 Ok(Some(row)) => row.id, 237 _ => { 238 return ( 239 StatusCode::INTERNAL_SERVER_ERROR, 240 Json(json!({"error": "InternalError"})), 241 ) 242 .into_response(); 243 } 244 }; 245 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 246 let cursor_str = params.cursor.unwrap_or_default(); 247 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') { 248 let parts: Vec<&str> = cursor_str.split('|').collect(); 249 (parts[0].to_string(), parts[1].to_string()) 250 } else { 251 (String::new(), String::new()) 252 }; 253 let records_query = sqlx::query!( 254 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4", 255 user_id, 256 cursor_collection, 257 cursor_rkey, 258 limit 259 ) 260 .fetch_all(&state.db) 261 .await; 262 let records = match records_query { 263 Ok(r) => r, 264 Err(e) => { 265 error!("DB error fetching records: {:?}", e); 266 return ( 267 StatusCode::INTERNAL_SERVER_ERROR, 268 Json(json!({"error": "InternalError"})), 269 ) 270 .into_response(); 271 } 272 }; 273 let mut missing_blobs = Vec::new(); 274 let mut last_cursor = None; 275 for row in &records { 276 let collection = &row.collection; 277 let rkey = &row.rkey; 278 let record_cid_str = &row.record_cid; 279 last_cursor = Some(format!("{}|{}", collection, rkey)); 280 let record_cid = match Cid::from_str(record_cid_str) { 281 Ok(c) => c, 282 Err(_) => continue, 283 }; 284 let block_bytes = match state.block_store.get(&record_cid).await { 285 Ok(Some(b)) => b, 286 _ => continue, 287 }; 288 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 289 Ok(v) => v, 290 Err(_) => continue, 291 }; 292 let mut blobs = Vec::new(); 293 find_blobs(&record_val, &mut blobs); 294 for blob_cid_str in blobs { 295 let exists = sqlx::query!( 296 "SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", 297 blob_cid_str, 298 user_id 299 ) 300 .fetch_optional(&state.db) 301 .await; 302 match exists { 303 Ok(None) => { 304 missing_blobs.push(RecordBlob { 305 cid: blob_cid_str, 306 record_uri: format!("at://{}/{}/{}", did, collection, rkey), 307 }); 308 } 309 Err(e) => { 310 error!("DB error checking blob existence: {:?}", e); 311 } 312 _ => {} 313 } 314 } 315 } 316 // if we fetched fewer records than limit, we are done, so cursor is None. 317 // otherwise, cursor is the last one we saw. 318 // ...right? 319 let next_cursor = if records.len() < limit as usize { 320 None 321 } else { 322 last_cursor 323 }; 324 ( 325 StatusCode::OK, 326 Json(ListMissingBlobsOutput { 327 cursor: next_cursor, 328 blobs: missing_blobs, 329 }), 330 ) 331 .into_response() 332}