this repo has no description
1use crate::state::AppState; 2use axum::body::Bytes; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use jacquard_repo::storage::BlockStore; 11use multihash::Multihash; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use sha2::{Digest, Sha256}; 15use std::str::FromStr; 16use tracing::error; 17 18pub async fn upload_blob( 19 State(state): State<AppState>, 20 headers: axum::http::HeaderMap, 21 body: Bytes, 22) -> Response { 23 let auth_header = headers.get("Authorization"); 24 if auth_header.is_none() { 25 return ( 26 StatusCode::UNAUTHORIZED, 27 Json(json!({"error": "AuthenticationRequired"})), 28 ) 29 .into_response(); 30 } 31 let token = auth_header 32 .unwrap() 33 .to_str() 34 .unwrap_or("") 35 .replace("Bearer ", ""); 36 37 let session = sqlx::query!( 38 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", 39 token 40 ) 41 .fetch_optional(&state.db) 42 .await 43 .unwrap_or(None); 44 45 let (did, key_bytes) = match session { 46 Some(row) => (row.did, row.key_bytes), 47 None => { 48 return ( 49 StatusCode::UNAUTHORIZED, 50 Json(json!({"error": "AuthenticationFailed"})), 51 ) 52 .into_response(); 53 } 54 }; 55 56 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 57 return ( 58 StatusCode::UNAUTHORIZED, 59 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 60 ) 61 .into_response(); 62 } 63 64 let mime_type = headers 65 .get("content-type") 66 .and_then(|h| h.to_str().ok()) 67 .unwrap_or("application/octet-stream") 68 .to_string(); 69 70 let size = body.len() as i64; 71 let data = body.to_vec(); 72 73 let mut hasher = Sha256::new(); 74 hasher.update(&data); 75 let hash = hasher.finalize(); 76 let multihash = Multihash::wrap(0x12, &hash).unwrap(); 77 let cid = Cid::new_v1(0x55, multihash); 78 let cid_str = cid.to_string(); 79 80 let storage_key = format!("blobs/{}", cid_str); 81 82 if let Err(e) = state.blob_store.put(&storage_key, &data).await { 83 error!("Failed to upload blob to storage: {:?}", e); 84 return ( 85 StatusCode::INTERNAL_SERVER_ERROR, 86 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 87 ) 88 .into_response(); 89 } 90 91 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 92 .fetch_optional(&state.db) 93 .await; 94 95 let user_id = match user_query { 96 Ok(Some(row)) => row.id, 97 _ => { 98 return ( 99 StatusCode::INTERNAL_SERVER_ERROR, 100 Json(json!({"error": "InternalError"})), 101 ) 102 .into_response(); 103 } 104 }; 105 106 let insert = sqlx::query!( 107 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING", 108 cid_str, 109 mime_type, 110 size, 111 user_id, 112 storage_key 113 ) 114 .execute(&state.db) 115 .await; 116 117 if let Err(e) = insert { 118 error!("Failed to insert blob record: {:?}", e); 119 return ( 120 StatusCode::INTERNAL_SERVER_ERROR, 121 Json(json!({"error": "InternalError"})), 122 ) 123 .into_response(); 124 } 125 126 Json(json!({ 127 "blob": { 128 "ref": { 129 "$link": cid_str 130 }, 131 "mimeType": mime_type, 132 "size": size 133 } 134 })) 135 .into_response() 136} 137 138#[derive(Deserialize)] 139pub struct ListMissingBlobsParams { 140 pub limit: Option<i64>, 141 pub cursor: Option<String>, 142} 143 144#[derive(Serialize)] 145#[serde(rename_all = "camelCase")] 146pub struct RecordBlob { 147 pub cid: String, 148 pub record_uri: String, 149} 150 151#[derive(Serialize)] 152pub struct ListMissingBlobsOutput { 153 pub cursor: Option<String>, 154 pub blobs: Vec<RecordBlob>, 155} 156 157fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) { 158 if let Some(obj) = val.as_object() { 159 if let Some(type_val) = obj.get("$type") { 160 if type_val == "blob" { 161 if let Some(r) = obj.get("ref") { 162 if let Some(link) = r.get("$link") { 163 if let Some(s) = link.as_str() { 164 blobs.push(s.to_string()); 165 } 166 } 167 } 168 } 169 } 170 for (_, v) in obj { 171 find_blobs(v, blobs); 172 } 173 } else if let Some(arr) = val.as_array() { 174 for v in arr { 175 find_blobs(v, blobs); 176 } 177 } 178} 179 180pub async fn list_missing_blobs( 181 State(state): State<AppState>, 182 headers: axum::http::HeaderMap, 183 Query(params): Query<ListMissingBlobsParams>, 184) -> Response { 185 let auth_header = headers.get("Authorization"); 186 if auth_header.is_none() { 187 return ( 188 StatusCode::UNAUTHORIZED, 189 Json(json!({"error": "AuthenticationRequired"})), 190 ) 191 .into_response(); 192 } 193 194 let token = auth_header 195 .unwrap() 196 .to_str() 197 .unwrap_or("") 198 .replace("Bearer ", ""); 199 200 let session = sqlx::query!( 201 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", 202 token 203 ) 204 .fetch_optional(&state.db) 205 .await 206 .unwrap_or(None); 207 208 let (did, key_bytes) = match session { 209 Some(row) => (row.did, row.key_bytes), 210 None => { 211 return ( 212 StatusCode::UNAUTHORIZED, 213 Json(json!({"error": "AuthenticationFailed"})), 214 ) 215 .into_response(); 216 } 217 }; 218 219 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 220 return ( 221 StatusCode::UNAUTHORIZED, 222 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 223 ) 224 .into_response(); 225 } 226 227 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 228 .fetch_optional(&state.db) 229 .await; 230 231 let user_id = match user_query { 232 Ok(Some(row)) => row.id, 233 _ => { 234 return ( 235 StatusCode::INTERNAL_SERVER_ERROR, 236 Json(json!({"error": "InternalError"})), 237 ) 238 .into_response(); 239 } 240 }; 241 242 let limit = params.limit.unwrap_or(500).min(1000); 243 let cursor_str = params.cursor.unwrap_or_default(); 244 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') { 245 let parts: Vec<&str> = cursor_str.split('|').collect(); 246 (parts[0].to_string(), parts[1].to_string()) 247 } else { 248 (String::new(), String::new()) 249 }; 250 251 let records_query = sqlx::query!( 252 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4", 253 user_id, 254 cursor_collection, 255 cursor_rkey, 256 limit 257 ) 258 .fetch_all(&state.db) 259 .await; 260 261 let records = match records_query { 262 Ok(r) => r, 263 Err(e) => { 264 error!("DB error fetching records: {:?}", e); 265 return ( 266 StatusCode::INTERNAL_SERVER_ERROR, 267 Json(json!({"error": "InternalError"})), 268 ) 269 .into_response(); 270 } 271 }; 272 273 let mut missing_blobs = Vec::new(); 274 let mut last_cursor = None; 275 276 for row in &records { 277 let collection = &row.collection; 278 let rkey = &row.rkey; 279 let record_cid_str = &row.record_cid; 280 281 last_cursor = Some(format!("{}|{}", collection, rkey)); 282 283 let record_cid = match Cid::from_str(&record_cid_str) { 284 Ok(c) => c, 285 Err(_) => continue, 286 }; 287 288 let block_bytes = match state.block_store.get(&record_cid).await { 289 Ok(Some(b)) => b, 290 _ => continue, 291 }; 292 293 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 294 Ok(v) => v, 295 Err(_) => continue, 296 }; 297 298 let mut blobs = Vec::new(); 299 find_blobs(&record_val, &mut blobs); 300 301 for blob_cid_str in blobs { 302 let exists = sqlx::query!("SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", blob_cid_str, user_id) 303 .fetch_optional(&state.db) 304 .await; 305 306 match exists { 307 Ok(None) => { 308 missing_blobs.push(RecordBlob { 309 cid: blob_cid_str, 310 record_uri: format!("at://{}/{}/{}", did, collection, rkey), 311 }); 312 } 313 Err(e) => { 314 error!("DB error checking blob existence: {:?}", e); 315 } 316 _ => {} 317 } 318 } 319 } 320 321 // if we fetched fewer records than limit, we are done, so cursor is None. 322 // otherwise, cursor is the last one we saw. 323 // ...right? 324 let next_cursor = if records.len() < limit as usize { 325 None 326 } else { 327 last_cursor 328 }; 329 330 ( 331 StatusCode::OK, 332 Json(ListMissingBlobsOutput { 333 cursor: next_cursor, 334 blobs: missing_blobs, 335 }), 336 ) 337 .into_response() 338}