this repo has no description
1use crate::state::AppState; 2use axum::body::Bytes; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use jacquard_repo::storage::BlockStore; 11use multihash::Multihash; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use sha2::{Digest, Sha256}; 15use std::str::FromStr; 16use tracing::error; 17 18const MAX_BLOB_SIZE: usize = 1_000_000; 19 20pub async fn upload_blob( 21 State(state): State<AppState>, 22 headers: axum::http::HeaderMap, 23 body: Bytes, 24) -> Response { 25 if body.len() > MAX_BLOB_SIZE { 26 return ( 27 StatusCode::PAYLOAD_TOO_LARGE, 28 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), MAX_BLOB_SIZE)})), 29 ) 30 .into_response(); 31 } 32 33 let token = match crate::auth::extract_bearer_token_from_header( 34 headers.get("Authorization").and_then(|h| h.to_str().ok()) 35 ) { 36 Some(t) => t, 37 None => { 38 return ( 39 StatusCode::UNAUTHORIZED, 40 Json(json!({"error": "AuthenticationRequired"})), 41 ) 42 .into_response(); 43 } 44 }; 45 46 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 47 Ok(user) => user, 48 Err(_) => { 49 return ( 50 StatusCode::UNAUTHORIZED, 51 Json(json!({"error": "AuthenticationFailed"})), 52 ) 53 .into_response(); 54 } 55 }; 56 let did = auth_user.did; 57 58 let mime_type = headers 59 .get("content-type") 60 .and_then(|h| h.to_str().ok()) 61 .unwrap_or("application/octet-stream") 62 .to_string(); 63 64 let size = body.len() as i64; 65 let data = body.to_vec(); 66 67 let mut hasher = Sha256::new(); 68 hasher.update(&data); 69 let hash = hasher.finalize(); 70 let multihash = match Multihash::wrap(0x12, &hash) { 71 Ok(mh) => mh, 72 Err(e) => { 73 error!("Failed to create multihash for blob: {:?}", e); 74 return ( 75 StatusCode::INTERNAL_SERVER_ERROR, 76 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})), 77 ) 78 .into_response(); 79 } 80 }; 81 let cid = Cid::new_v1(0x55, multihash); 82 let cid_str = cid.to_string(); 83 84 let storage_key = format!("blobs/{}", cid_str); 85 86 if let Err(e) = state.blob_store.put(&storage_key, &data).await { 87 error!("Failed to upload blob to storage: {:?}", e); 88 return ( 89 StatusCode::INTERNAL_SERVER_ERROR, 90 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 91 ) 92 .into_response(); 93 } 94 95 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 96 .fetch_optional(&state.db) 97 .await; 98 99 let user_id = match user_query { 100 Ok(Some(row)) => row.id, 101 _ => { 102 return ( 103 StatusCode::INTERNAL_SERVER_ERROR, 104 Json(json!({"error": "InternalError"})), 105 ) 106 .into_response(); 107 } 108 }; 109 110 let insert = sqlx::query!( 111 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING", 112 cid_str, 113 mime_type, 114 size, 115 user_id, 116 storage_key 117 ) 118 .execute(&state.db) 119 .await; 120 121 if let Err(e) = insert { 122 error!("Failed to insert blob record: {:?}", e); 123 return ( 124 StatusCode::INTERNAL_SERVER_ERROR, 125 Json(json!({"error": "InternalError"})), 126 ) 127 .into_response(); 128 } 129 130 Json(json!({ 131 "blob": { 132 "ref": { 133 "$link": cid_str 134 }, 135 "mimeType": mime_type, 136 "size": size 137 } 138 })) 139 .into_response() 140} 141 142#[derive(Deserialize)] 143pub struct ListMissingBlobsParams { 144 pub limit: Option<i64>, 145 pub cursor: Option<String>, 146} 147 148#[derive(Serialize)] 149#[serde(rename_all = "camelCase")] 150pub struct RecordBlob { 151 pub cid: String, 152 pub record_uri: String, 153} 154 155#[derive(Serialize)] 156pub struct ListMissingBlobsOutput { 157 pub cursor: Option<String>, 158 pub blobs: Vec<RecordBlob>, 159} 160 161fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) { 162 if let Some(obj) = val.as_object() { 163 if let Some(type_val) = obj.get("$type") { 164 if type_val == "blob" { 165 if let Some(r) = obj.get("ref") { 166 if let Some(link) = r.get("$link") { 167 if let Some(s) = link.as_str() { 168 blobs.push(s.to_string()); 169 } 170 } 171 } 172 } 173 } 174 for (_, v) in obj { 175 find_blobs(v, blobs); 176 } 177 } else if let Some(arr) = val.as_array() { 178 for v in arr { 179 find_blobs(v, blobs); 180 } 181 } 182} 183 184pub async fn list_missing_blobs( 185 State(state): State<AppState>, 186 headers: axum::http::HeaderMap, 187 Query(params): Query<ListMissingBlobsParams>, 188) -> Response { 189 let token = match crate::auth::extract_bearer_token_from_header( 190 headers.get("Authorization").and_then(|h| h.to_str().ok()) 191 ) { 192 Some(t) => t, 193 None => { 194 return ( 195 StatusCode::UNAUTHORIZED, 196 Json(json!({"error": "AuthenticationRequired"})), 197 ) 198 .into_response(); 199 } 200 }; 201 202 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 203 Ok(user) => user, 204 Err(_) => { 205 return ( 206 StatusCode::UNAUTHORIZED, 207 Json(json!({"error": "AuthenticationFailed"})), 208 ) 209 .into_response(); 210 } 211 }; 212 213 let did = auth_user.did; 214 215 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 216 .fetch_optional(&state.db) 217 .await; 218 219 let user_id = match user_query { 220 Ok(Some(row)) => row.id, 221 _ => { 222 return ( 223 StatusCode::INTERNAL_SERVER_ERROR, 224 Json(json!({"error": "InternalError"})), 225 ) 226 .into_response(); 227 } 228 }; 229 230 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 231 let cursor_str = params.cursor.unwrap_or_default(); 232 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') { 233 let parts: Vec<&str> = cursor_str.split('|').collect(); 234 (parts[0].to_string(), parts[1].to_string()) 235 } else { 236 (String::new(), String::new()) 237 }; 238 239 let records_query = sqlx::query!( 240 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4", 241 user_id, 242 cursor_collection, 243 cursor_rkey, 244 limit 245 ) 246 .fetch_all(&state.db) 247 .await; 248 249 let records = match records_query { 250 Ok(r) => r, 251 Err(e) => { 252 error!("DB error fetching records: {:?}", e); 253 return ( 254 StatusCode::INTERNAL_SERVER_ERROR, 255 Json(json!({"error": "InternalError"})), 256 ) 257 .into_response(); 258 } 259 }; 260 261 let mut missing_blobs = Vec::new(); 262 let mut last_cursor = None; 263 264 for row in &records { 265 let collection = &row.collection; 266 let rkey = &row.rkey; 267 let record_cid_str = &row.record_cid; 268 269 last_cursor = Some(format!("{}|{}", collection, rkey)); 270 271 let record_cid = match Cid::from_str(&record_cid_str) { 272 Ok(c) => c, 273 Err(_) => continue, 274 }; 275 276 let block_bytes = match state.block_store.get(&record_cid).await { 277 Ok(Some(b)) => b, 278 _ => continue, 279 }; 280 281 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 282 Ok(v) => v, 283 Err(_) => continue, 284 }; 285 286 let mut blobs = Vec::new(); 287 find_blobs(&record_val, &mut blobs); 288 289 for blob_cid_str in blobs { 290 let exists = sqlx::query!("SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", blob_cid_str, user_id) 291 .fetch_optional(&state.db) 292 .await; 293 294 match exists { 295 Ok(None) => { 296 missing_blobs.push(RecordBlob { 297 cid: blob_cid_str, 298 record_uri: format!("at://{}/{}/{}", did, collection, rkey), 299 }); 300 } 301 Err(e) => { 302 error!("DB error checking blob existence: {:?}", e); 303 } 304 _ => {} 305 } 306 } 307 } 308 309 // if we fetched fewer records than limit, we are done, so cursor is None. 310 // otherwise, cursor is the last one we saw. 311 // ...right? 312 let next_cursor = if records.len() < limit as usize { 313 None 314 } else { 315 last_cursor 316 }; 317 318 ( 319 StatusCode::OK, 320 Json(ListMissingBlobsOutput { 321 cursor: next_cursor, 322 blobs: missing_blobs, 323 }), 324 ) 325 .into_response() 326}