this repo has no description
1use crate::state::AppState; 2use axum::body::Bytes; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use jacquard_repo::storage::BlockStore; 11use multihash::Multihash; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use sha2::{Digest, Sha256}; 15use std::str::FromStr; 16use tracing::error; 17const MAX_BLOB_SIZE: usize = 1_000_000; 18pub async fn upload_blob( 19 State(state): State<AppState>, 20 headers: axum::http::HeaderMap, 21 body: Bytes, 22) -> Response { 23 if body.len() > MAX_BLOB_SIZE { 24 return ( 25 StatusCode::PAYLOAD_TOO_LARGE, 26 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), MAX_BLOB_SIZE)})), 27 ) 28 .into_response(); 29 } 30 let token = match crate::auth::extract_bearer_token_from_header( 31 headers.get("Authorization").and_then(|h| h.to_str().ok()) 32 ) { 33 Some(t) => t, 34 None => { 35 return ( 36 StatusCode::UNAUTHORIZED, 37 Json(json!({"error": "AuthenticationRequired"})), 38 ) 39 .into_response(); 40 } 41 }; 42 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 43 Ok(user) => user, 44 Err(_) => { 45 return ( 46 StatusCode::UNAUTHORIZED, 47 Json(json!({"error": "AuthenticationFailed"})), 48 ) 49 .into_response(); 50 } 51 }; 52 let did = auth_user.did; 53 let mime_type = headers 54 .get("content-type") 55 .and_then(|h| h.to_str().ok()) 56 .unwrap_or("application/octet-stream") 57 .to_string(); 58 let size = body.len() as i64; 59 let data = body.to_vec(); 60 let mut hasher = Sha256::new(); 61 hasher.update(&data); 62 let hash = hasher.finalize(); 63 let multihash = match Multihash::wrap(0x12, &hash) { 64 Ok(mh) => mh, 65 Err(e) => { 66 error!("Failed to create multihash for blob: {:?}", e); 67 return ( 68 StatusCode::INTERNAL_SERVER_ERROR, 69 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})), 70 ) 71 .into_response(); 72 } 73 }; 74 let cid = Cid::new_v1(0x55, multihash); 75 let cid_str = cid.to_string(); 76 let storage_key = format!("blobs/{}", cid_str); 77 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 78 .fetch_optional(&state.db) 79 .await; 80 let user_id = match user_query { 81 Ok(Some(row)) => row.id, 82 _ => { 83 return ( 84 StatusCode::INTERNAL_SERVER_ERROR, 85 Json(json!({"error": "InternalError"})), 86 ) 87 .into_response(); 88 } 89 }; 90 let mut tx = match state.db.begin().await { 91 Ok(tx) => tx, 92 Err(e) => { 93 error!("Failed to begin transaction: {:?}", e); 94 return ( 95 StatusCode::INTERNAL_SERVER_ERROR, 96 Json(json!({"error": "InternalError"})), 97 ) 98 .into_response(); 99 } 100 }; 101 let insert = sqlx::query!( 102 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 103 cid_str, 104 mime_type, 105 size, 106 user_id, 107 storage_key 108 ) 109 .fetch_optional(&mut *tx) 110 .await; 111 let was_inserted = match insert { 112 Ok(Some(_)) => true, 113 Ok(None) => false, 114 Err(e) => { 115 error!("Failed to insert blob record: {:?}", e); 116 return ( 117 StatusCode::INTERNAL_SERVER_ERROR, 118 Json(json!({"error": "InternalError"})), 119 ) 120 .into_response(); 121 } 122 }; 123 if was_inserted { 124 if let Err(e) = state.blob_store.put_bytes(&storage_key, bytes::Bytes::from(data)).await { 125 error!("Failed to upload blob to storage: {:?}", e); 126 return ( 127 StatusCode::INTERNAL_SERVER_ERROR, 128 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 129 ) 130 .into_response(); 131 } 132 } 133 if let Err(e) = tx.commit().await { 134 error!("Failed to commit blob transaction: {:?}", e); 135 if was_inserted { 136 if let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 137 error!("Failed to cleanup orphaned blob {}: {:?}", storage_key, cleanup_err); 138 } 139 } 140 return ( 141 StatusCode::INTERNAL_SERVER_ERROR, 142 Json(json!({"error": "InternalError"})), 143 ) 144 .into_response(); 145 } 146 Json(json!({ 147 "blob": { 148 "ref": { 149 "$link": cid_str 150 }, 151 "mimeType": mime_type, 152 "size": size 153 } 154 })) 155 .into_response() 156} 157#[derive(Deserialize)] 158pub struct ListMissingBlobsParams { 159 pub limit: Option<i64>, 160 pub cursor: Option<String>, 161} 162#[derive(Serialize)] 163#[serde(rename_all = "camelCase")] 164pub struct RecordBlob { 165 pub cid: String, 166 pub record_uri: String, 167} 168#[derive(Serialize)] 169pub struct ListMissingBlobsOutput { 170 pub cursor: Option<String>, 171 pub blobs: Vec<RecordBlob>, 172} 173fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) { 174 if let Some(obj) = val.as_object() { 175 if let Some(type_val) = obj.get("$type") { 176 if type_val == "blob" { 177 if let Some(r) = obj.get("ref") { 178 if let Some(link) = r.get("$link") { 179 if let Some(s) = link.as_str() { 180 blobs.push(s.to_string()); 181 } 182 } 183 } 184 } 185 } 186 for (_, v) in obj { 187 find_blobs(v, blobs); 188 } 189 } else if let Some(arr) = val.as_array() { 190 for v in arr { 191 find_blobs(v, blobs); 192 } 193 } 194} 195pub async fn list_missing_blobs( 196 State(state): State<AppState>, 197 headers: axum::http::HeaderMap, 198 Query(params): Query<ListMissingBlobsParams>, 199) -> Response { 200 let token = match crate::auth::extract_bearer_token_from_header( 201 headers.get("Authorization").and_then(|h| h.to_str().ok()) 202 ) { 203 Some(t) => t, 204 None => { 205 return ( 206 StatusCode::UNAUTHORIZED, 207 Json(json!({"error": "AuthenticationRequired"})), 208 ) 209 .into_response(); 210 } 211 }; 212 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 213 Ok(user) => user, 214 Err(_) => { 215 return ( 216 StatusCode::UNAUTHORIZED, 217 Json(json!({"error": "AuthenticationFailed"})), 218 ) 219 .into_response(); 220 } 221 }; 222 let did = auth_user.did; 223 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 224 .fetch_optional(&state.db) 225 .await; 226 let user_id = match user_query { 227 Ok(Some(row)) => row.id, 228 _ => { 229 return ( 230 StatusCode::INTERNAL_SERVER_ERROR, 231 Json(json!({"error": "InternalError"})), 232 ) 233 .into_response(); 234 } 235 }; 236 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 237 let cursor_str = params.cursor.unwrap_or_default(); 238 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') { 239 let parts: Vec<&str> = cursor_str.split('|').collect(); 240 (parts[0].to_string(), parts[1].to_string()) 241 } else { 242 (String::new(), String::new()) 243 }; 244 let records_query = sqlx::query!( 245 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4", 246 user_id, 247 cursor_collection, 248 cursor_rkey, 249 limit 250 ) 251 .fetch_all(&state.db) 252 .await; 253 let records = match records_query { 254 Ok(r) => r, 255 Err(e) => { 256 error!("DB error fetching records: {:?}", e); 257 return ( 258 StatusCode::INTERNAL_SERVER_ERROR, 259 Json(json!({"error": "InternalError"})), 260 ) 261 .into_response(); 262 } 263 }; 264 let mut missing_blobs = Vec::new(); 265 let mut last_cursor = None; 266 for row in &records { 267 let collection = &row.collection; 268 let rkey = &row.rkey; 269 let record_cid_str = &row.record_cid; 270 last_cursor = Some(format!("{}|{}", collection, rkey)); 271 let record_cid = match Cid::from_str(&record_cid_str) { 272 Ok(c) => c, 273 Err(_) => continue, 274 }; 275 let block_bytes = match state.block_store.get(&record_cid).await { 276 Ok(Some(b)) => b, 277 _ => continue, 278 }; 279 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 280 Ok(v) => v, 281 Err(_) => continue, 282 }; 283 let mut blobs = Vec::new(); 284 find_blobs(&record_val, &mut blobs); 285 for blob_cid_str in blobs { 286 let exists = sqlx::query!("SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", blob_cid_str, user_id) 287 .fetch_optional(&state.db) 288 .await; 289 match exists { 290 Ok(None) => { 291 missing_blobs.push(RecordBlob { 292 cid: blob_cid_str, 293 record_uri: format!("at://{}/{}/{}", did, collection, rkey), 294 }); 295 } 296 Err(e) => { 297 error!("DB error checking blob existence: {:?}", e); 298 } 299 _ => {} 300 } 301 } 302 } 303 // if we fetched fewer records than limit, we are done, so cursor is None. 304 // otherwise, cursor is the last one we saw. 305 // ...right? 306 let next_cursor = if records.len() < limit as usize { 307 None 308 } else { 309 last_cursor 310 }; 311 ( 312 StatusCode::OK, 313 Json(ListMissingBlobsOutput { 314 cursor: next_cursor, 315 blobs: missing_blobs, 316 }), 317 ) 318 .into_response() 319}