this repo has no description
1use crate::state::AppState; 2use axum::body::Bytes; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use jacquard_repo::storage::BlockStore; 11use multihash::Multihash; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use sha2::{Digest, Sha256}; 15use std::str::FromStr; 16use tracing::error; 17 18const MAX_BLOB_SIZE: usize = 1_000_000; 19 20pub async fn upload_blob( 21 State(state): State<AppState>, 22 headers: axum::http::HeaderMap, 23 body: Bytes, 24) -> Response { 25 if body.len() > MAX_BLOB_SIZE { 26 return ( 27 StatusCode::PAYLOAD_TOO_LARGE, 28 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), MAX_BLOB_SIZE)})), 29 ) 30 .into_response(); 31 } 32 33 let token = match crate::auth::extract_bearer_token_from_header( 34 headers.get("Authorization").and_then(|h| h.to_str().ok()) 35 ) { 36 Some(t) => t, 37 None => { 38 return ( 39 StatusCode::UNAUTHORIZED, 40 Json(json!({"error": "AuthenticationRequired"})), 41 ) 42 .into_response(); 43 } 44 }; 45 46 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 47 Ok(user) => user, 48 Err(_) => { 49 return ( 50 StatusCode::UNAUTHORIZED, 51 Json(json!({"error": "AuthenticationFailed"})), 52 ) 53 .into_response(); 54 } 55 }; 56 let did = auth_user.did; 57 58 let mime_type = headers 59 .get("content-type") 60 .and_then(|h| h.to_str().ok()) 61 .unwrap_or("application/octet-stream") 62 .to_string(); 63 64 let size = body.len() as i64; 65 let data = body.to_vec(); 66 67 let mut hasher = Sha256::new(); 68 hasher.update(&data); 69 let hash = hasher.finalize(); 70 let multihash = match Multihash::wrap(0x12, &hash) { 71 Ok(mh) => mh, 72 Err(e) => { 73 error!("Failed to create multihash for blob: {:?}", e); 74 return ( 75 StatusCode::INTERNAL_SERVER_ERROR, 76 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})), 77 ) 78 .into_response(); 79 } 80 }; 81 let cid = Cid::new_v1(0x55, multihash); 82 let cid_str = cid.to_string(); 83 84 let storage_key = format!("blobs/{}", cid_str); 85 86 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 87 .fetch_optional(&state.db) 88 .await; 89 90 let user_id = match user_query { 91 Ok(Some(row)) => row.id, 92 _ => { 93 return ( 94 StatusCode::INTERNAL_SERVER_ERROR, 95 Json(json!({"error": "InternalError"})), 96 ) 97 .into_response(); 98 } 99 }; 100 101 let mut tx = match state.db.begin().await { 102 Ok(tx) => tx, 103 Err(e) => { 104 error!("Failed to begin transaction: {:?}", e); 105 return ( 106 StatusCode::INTERNAL_SERVER_ERROR, 107 Json(json!({"error": "InternalError"})), 108 ) 109 .into_response(); 110 } 111 }; 112 113 let insert = sqlx::query!( 114 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 115 cid_str, 116 mime_type, 117 size, 118 user_id, 119 storage_key 120 ) 121 .fetch_optional(&mut *tx) 122 .await; 123 124 let was_inserted = match insert { 125 Ok(Some(_)) => true, 126 Ok(None) => false, 127 Err(e) => { 128 error!("Failed to insert blob record: {:?}", e); 129 return ( 130 StatusCode::INTERNAL_SERVER_ERROR, 131 Json(json!({"error": "InternalError"})), 132 ) 133 .into_response(); 134 } 135 }; 136 137 if was_inserted { 138 if let Err(e) = state.blob_store.put_bytes(&storage_key, bytes::Bytes::from(data)).await { 139 error!("Failed to upload blob to storage: {:?}", e); 140 return ( 141 StatusCode::INTERNAL_SERVER_ERROR, 142 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 143 ) 144 .into_response(); 145 } 146 } 147 148 if let Err(e) = tx.commit().await { 149 error!("Failed to commit blob transaction: {:?}", e); 150 if was_inserted { 151 if let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 152 error!("Failed to cleanup orphaned blob {}: {:?}", storage_key, cleanup_err); 153 } 154 } 155 return ( 156 StatusCode::INTERNAL_SERVER_ERROR, 157 Json(json!({"error": "InternalError"})), 158 ) 159 .into_response(); 160 } 161 162 Json(json!({ 163 "blob": { 164 "ref": { 165 "$link": cid_str 166 }, 167 "mimeType": mime_type, 168 "size": size 169 } 170 })) 171 .into_response() 172} 173 174#[derive(Deserialize)] 175pub struct ListMissingBlobsParams { 176 pub limit: Option<i64>, 177 pub cursor: Option<String>, 178} 179 180#[derive(Serialize)] 181#[serde(rename_all = "camelCase")] 182pub struct RecordBlob { 183 pub cid: String, 184 pub record_uri: String, 185} 186 187#[derive(Serialize)] 188pub struct ListMissingBlobsOutput { 189 pub cursor: Option<String>, 190 pub blobs: Vec<RecordBlob>, 191} 192 193fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) { 194 if let Some(obj) = val.as_object() { 195 if let Some(type_val) = obj.get("$type") { 196 if type_val == "blob" { 197 if let Some(r) = obj.get("ref") { 198 if let Some(link) = r.get("$link") { 199 if let Some(s) = link.as_str() { 200 blobs.push(s.to_string()); 201 } 202 } 203 } 204 } 205 } 206 for (_, v) in obj { 207 find_blobs(v, blobs); 208 } 209 } else if let Some(arr) = val.as_array() { 210 for v in arr { 211 find_blobs(v, blobs); 212 } 213 } 214} 215 216pub async fn list_missing_blobs( 217 State(state): State<AppState>, 218 headers: axum::http::HeaderMap, 219 Query(params): Query<ListMissingBlobsParams>, 220) -> Response { 221 let token = match crate::auth::extract_bearer_token_from_header( 222 headers.get("Authorization").and_then(|h| h.to_str().ok()) 223 ) { 224 Some(t) => t, 225 None => { 226 return ( 227 StatusCode::UNAUTHORIZED, 228 Json(json!({"error": "AuthenticationRequired"})), 229 ) 230 .into_response(); 231 } 232 }; 233 234 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 235 Ok(user) => user, 236 Err(_) => { 237 return ( 238 StatusCode::UNAUTHORIZED, 239 Json(json!({"error": "AuthenticationFailed"})), 240 ) 241 .into_response(); 242 } 243 }; 244 245 let did = auth_user.did; 246 247 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 248 .fetch_optional(&state.db) 249 .await; 250 251 let user_id = match user_query { 252 Ok(Some(row)) => row.id, 253 _ => { 254 return ( 255 StatusCode::INTERNAL_SERVER_ERROR, 256 Json(json!({"error": "InternalError"})), 257 ) 258 .into_response(); 259 } 260 }; 261 262 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 263 let cursor_str = params.cursor.unwrap_or_default(); 264 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') { 265 let parts: Vec<&str> = cursor_str.split('|').collect(); 266 (parts[0].to_string(), parts[1].to_string()) 267 } else { 268 (String::new(), String::new()) 269 }; 270 271 let records_query = sqlx::query!( 272 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4", 273 user_id, 274 cursor_collection, 275 cursor_rkey, 276 limit 277 ) 278 .fetch_all(&state.db) 279 .await; 280 281 let records = match records_query { 282 Ok(r) => r, 283 Err(e) => { 284 error!("DB error fetching records: {:?}", e); 285 return ( 286 StatusCode::INTERNAL_SERVER_ERROR, 287 Json(json!({"error": "InternalError"})), 288 ) 289 .into_response(); 290 } 291 }; 292 293 let mut missing_blobs = Vec::new(); 294 let mut last_cursor = None; 295 296 for row in &records { 297 let collection = &row.collection; 298 let rkey = &row.rkey; 299 let record_cid_str = &row.record_cid; 300 301 last_cursor = Some(format!("{}|{}", collection, rkey)); 302 303 let record_cid = match Cid::from_str(&record_cid_str) { 304 Ok(c) => c, 305 Err(_) => continue, 306 }; 307 308 let block_bytes = match state.block_store.get(&record_cid).await { 309 Ok(Some(b)) => b, 310 _ => continue, 311 }; 312 313 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 314 Ok(v) => v, 315 Err(_) => continue, 316 }; 317 318 let mut blobs = Vec::new(); 319 find_blobs(&record_val, &mut blobs); 320 321 for blob_cid_str in blobs { 322 let exists = sqlx::query!("SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", blob_cid_str, user_id) 323 .fetch_optional(&state.db) 324 .await; 325 326 match exists { 327 Ok(None) => { 328 missing_blobs.push(RecordBlob { 329 cid: blob_cid_str, 330 record_uri: format!("at://{}/{}/{}", did, collection, rkey), 331 }); 332 } 333 Err(e) => { 334 error!("DB error checking blob existence: {:?}", e); 335 } 336 _ => {} 337 } 338 } 339 } 340 341 // if we fetched fewer records than limit, we are done, so cursor is None. 342 // otherwise, cursor is the last one we saw. 343 // ...right? 344 let next_cursor = if records.len() < limit as usize { 345 None 346 } else { 347 last_cursor 348 }; 349 350 ( 351 StatusCode::OK, 352 Json(ListMissingBlobsOutput { 353 cursor: next_cursor, 354 blobs: missing_blobs, 355 }), 356 ) 357 .into_response() 358}