this repo has no description
1use crate::state::AppState; 2use axum::body::Bytes; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use jacquard_repo::storage::BlockStore; 11use multihash::Multihash; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use sha2::{Digest, Sha256}; 15use sqlx::Row; 16use std::str::FromStr; 17use tracing::error; 18 19pub async fn upload_blob( 20 State(state): State<AppState>, 21 headers: axum::http::HeaderMap, 22 body: Bytes, 23) -> Response { 24 let auth_header = headers.get("Authorization"); 25 if auth_header.is_none() { 26 return ( 27 StatusCode::UNAUTHORIZED, 28 Json(json!({"error": "AuthenticationRequired"})), 29 ) 30 .into_response(); 31 } 32 let token = auth_header 33 .unwrap() 34 .to_str() 35 .unwrap_or("") 36 .replace("Bearer ", ""); 37 38 let session = sqlx::query( 39 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 40 ) 41 .bind(&token) 42 .fetch_optional(&state.db) 43 .await 44 .unwrap_or(None); 45 46 let (did, key_bytes) = match session { 47 Some(row) => ( 48 row.get::<String, _>("did"), 49 row.get::<Vec<u8>, _>("key_bytes"), 50 ), 51 None => { 52 return ( 53 StatusCode::UNAUTHORIZED, 54 Json(json!({"error": "AuthenticationFailed"})), 55 ) 56 .into_response(); 57 } 58 }; 59 60 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 61 return ( 62 StatusCode::UNAUTHORIZED, 63 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 64 ) 65 .into_response(); 66 } 67 68 let mime_type = headers 69 .get("content-type") 70 .and_then(|h| h.to_str().ok()) 71 .unwrap_or("application/octet-stream") 72 .to_string(); 73 74 let size = body.len() as i64; 75 let data = body.to_vec(); 76 77 let mut hasher = Sha256::new(); 78 hasher.update(&data); 79 let hash = hasher.finalize(); 80 let multihash = Multihash::wrap(0x12, &hash).unwrap(); 81 let cid = Cid::new_v1(0x55, multihash); 82 let cid_str = cid.to_string(); 83 84 let storage_key = format!("blobs/{}", cid_str); 85 86 if let Err(e) = state.blob_store.put(&storage_key, &data).await { 87 error!("Failed to upload blob to storage: {:?}", e); 88 return ( 89 StatusCode::INTERNAL_SERVER_ERROR, 90 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 91 ) 92 .into_response(); 93 } 94 95 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 96 .bind(&did) 97 .fetch_optional(&state.db) 98 .await; 99 100 let user_id: uuid::Uuid = match user_query { 101 Ok(Some(row)) => row.get("id"), 102 _ => { 103 return ( 104 StatusCode::INTERNAL_SERVER_ERROR, 105 Json(json!({"error": "InternalError"})), 106 ) 107 .into_response(); 108 } 109 }; 110 111 let insert = sqlx::query( 112 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING" 113 ) 114 .bind(&cid_str) 115 .bind(&mime_type) 116 .bind(size) 117 .bind(user_id) 118 .bind(&storage_key) 119 .execute(&state.db) 120 .await; 121 122 if let Err(e) = insert { 123 error!("Failed to insert blob record: {:?}", e); 124 return ( 125 StatusCode::INTERNAL_SERVER_ERROR, 126 Json(json!({"error": "InternalError"})), 127 ) 128 .into_response(); 129 } 130 131 Json(json!({ 132 "blob": { 133 "ref": { 134 "$link": cid_str 135 }, 136 "mimeType": mime_type, 137 "size": size 138 } 139 })) 140 .into_response() 141} 142 143#[derive(Deserialize)] 144pub struct ListMissingBlobsParams { 145 pub limit: Option<i64>, 146 pub cursor: Option<String>, 147} 148 149#[derive(Serialize)] 150#[serde(rename_all = "camelCase")] 151pub struct RecordBlob { 152 pub cid: String, 153 pub record_uri: String, 154} 155 156#[derive(Serialize)] 157pub struct ListMissingBlobsOutput { 158 pub cursor: Option<String>, 159 pub blobs: Vec<RecordBlob>, 160} 161 162fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) { 163 if let Some(obj) = val.as_object() { 164 if let Some(type_val) = obj.get("$type") { 165 if type_val == "blob" { 166 if let Some(r) = obj.get("ref") { 167 if let Some(link) = r.get("$link") { 168 if let Some(s) = link.as_str() { 169 blobs.push(s.to_string()); 170 } 171 } 172 } 173 } 174 } 175 for (_, v) in obj { 176 find_blobs(v, blobs); 177 } 178 } else if let Some(arr) = val.as_array() { 179 for v in arr { 180 find_blobs(v, blobs); 181 } 182 } 183} 184 185pub async fn list_missing_blobs( 186 State(state): State<AppState>, 187 headers: axum::http::HeaderMap, 188 Query(params): Query<ListMissingBlobsParams>, 189) -> Response { 190 let auth_header = headers.get("Authorization"); 191 if auth_header.is_none() { 192 return ( 193 StatusCode::UNAUTHORIZED, 194 Json(json!({"error": "AuthenticationRequired"})), 195 ) 196 .into_response(); 197 } 198 199 let token = auth_header 200 .unwrap() 201 .to_str() 202 .unwrap_or("") 203 .replace("Bearer ", ""); 204 205 let session = sqlx::query( 206 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 207 ) 208 .bind(&token) 209 .fetch_optional(&state.db) 210 .await 211 .unwrap_or(None); 212 213 let (did, key_bytes) = match session { 214 Some(row) => ( 215 row.get::<String, _>("did"), 216 row.get::<Vec<u8>, _>("key_bytes"), 217 ), 218 None => { 219 return ( 220 StatusCode::UNAUTHORIZED, 221 Json(json!({"error": "AuthenticationFailed"})), 222 ) 223 .into_response(); 224 } 225 }; 226 227 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 228 return ( 229 StatusCode::UNAUTHORIZED, 230 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 231 ) 232 .into_response(); 233 } 234 235 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 236 .bind(&did) 237 .fetch_optional(&state.db) 238 .await; 239 240 let user_id: uuid::Uuid = match user_query { 241 Ok(Some(row)) => row.get("id"), 242 _ => { 243 return ( 244 StatusCode::INTERNAL_SERVER_ERROR, 245 Json(json!({"error": "InternalError"})), 246 ) 247 .into_response(); 248 } 249 }; 250 251 let limit = params.limit.unwrap_or(500).min(1000); 252 let cursor_str = params.cursor.unwrap_or_default(); 253 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') { 254 let parts: Vec<&str> = cursor_str.split('|').collect(); 255 (parts[0].to_string(), parts[1].to_string()) 256 } else { 257 (String::new(), String::new()) 258 }; 259 260 let records_query = sqlx::query( 261 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4" 262 ) 263 .bind(user_id) 264 .bind(cursor_collection) 265 .bind(cursor_rkey) 266 .bind(limit) 267 .fetch_all(&state.db) 268 .await; 269 270 let records = match records_query { 271 Ok(r) => r, 272 Err(e) => { 273 error!("DB error fetching records: {:?}", e); 274 return ( 275 StatusCode::INTERNAL_SERVER_ERROR, 276 Json(json!({"error": "InternalError"})), 277 ) 278 .into_response(); 279 } 280 }; 281 282 let mut missing_blobs = Vec::new(); 283 let mut last_cursor = None; 284 285 for row in &records { 286 let collection: String = row.get("collection"); 287 let rkey: String = row.get("rkey"); 288 let record_cid_str: String = row.get("record_cid"); 289 290 last_cursor = Some(format!("{}|{}", collection, rkey)); 291 292 let record_cid = match Cid::from_str(&record_cid_str) { 293 Ok(c) => c, 294 Err(_) => continue, 295 }; 296 297 let block_bytes = match state.block_store.get(&record_cid).await { 298 Ok(Some(b)) => b, 299 _ => continue, 300 }; 301 302 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 303 Ok(v) => v, 304 Err(_) => continue, 305 }; 306 307 let mut blobs = Vec::new(); 308 find_blobs(&record_val, &mut blobs); 309 310 for blob_cid_str in blobs { 311 let exists = sqlx::query("SELECT 1 FROM blobs WHERE cid = $1 AND created_by_user = $2") 312 .bind(&blob_cid_str) 313 .bind(user_id) 314 .fetch_optional(&state.db) 315 .await; 316 317 match exists { 318 Ok(None) => { 319 missing_blobs.push(RecordBlob { 320 cid: blob_cid_str, 321 record_uri: format!("at://{}/{}/{}", did, collection, rkey), 322 }); 323 } 324 Err(e) => { 325 error!("DB error checking blob existence: {:?}", e); 326 } 327 _ => {} 328 } 329 } 330 } 331 332 // if we fetched fewer records than limit, we are done, so cursor is None. 333 // otherwise, cursor is the last one we saw. 334 // ...right? 335 let next_cursor = if records.len() < limit as usize { 336 None 337 } else { 338 last_cursor 339 }; 340 341 ( 342 StatusCode::OK, 343 Json(ListMissingBlobsOutput { 344 cursor: next_cursor, 345 blobs: missing_blobs, 346 }), 347 ) 348 .into_response() 349}