this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token}; 2use crate::state::AppState; 3use axum::body::Bytes; 4use axum::{ 5 Json, 6 extract::{Query, State}, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9}; 10use cid::Cid; 11use jacquard_repo::storage::BlockStore; 12use multihash::Multihash; 13use serde::{Deserialize, Serialize}; 14use serde_json::json; 15use sha2::{Digest, Sha256}; 16use std::str::FromStr; 17use tracing::{debug, error}; 18 19const MAX_BLOB_SIZE: usize = 1_000_000; 20const MAX_VIDEO_BLOB_SIZE: usize = 100_000_000; 21 22pub async fn upload_blob( 23 State(state): State<AppState>, 24 headers: axum::http::HeaderMap, 25 body: Bytes, 26) -> Response { 27 let token = match crate::auth::extract_bearer_token_from_header( 28 headers.get("Authorization").and_then(|h| h.to_str().ok()), 29 ) { 30 Some(t) => t, 31 None => { 32 return ( 33 StatusCode::UNAUTHORIZED, 34 Json(json!({"error": "AuthenticationRequired"})), 35 ) 36 .into_response(); 37 } 38 }; 39 40 let is_service_auth = is_service_token(&token); 41 42 let (did, is_migration) = if is_service_auth { 43 debug!("Verifying service token for blob upload"); 44 let verifier = ServiceTokenVerifier::new(); 45 match verifier 46 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob")) 47 .await 48 { 49 Ok(claims) => { 50 debug!("Service token verified for DID: {}", claims.iss); 51 (claims.iss, false) 52 } 53 Err(e) => { 54 error!("Service token verification failed: {:?}", e); 55 return ( 56 StatusCode::UNAUTHORIZED, 57 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})), 58 ) 59 .into_response(); 60 } 61 } 62 } else { 63 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 64 Ok(user) => { 65 let deactivated = sqlx::query_scalar!( 66 "SELECT deactivated_at FROM users WHERE did = $1", 67 user.did 68 ) 69 .fetch_optional(&state.db) 70 .await 71 .ok() 72 .flatten() 73 .flatten(); 74 (user.did, deactivated.is_some()) 75 } 76 Err(_) => { 77 return ( 78 StatusCode::UNAUTHORIZED, 79 Json(json!({"error": "AuthenticationFailed"})), 80 ) 81 .into_response(); 82 } 83 } 84 }; 85 86 let max_size = if is_service_auth || is_migration { 87 MAX_VIDEO_BLOB_SIZE 88 } else { 89 MAX_BLOB_SIZE 90 }; 91 92 if body.len() > max_size { 93 return ( 94 StatusCode::PAYLOAD_TOO_LARGE, 95 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), max_size)})), 96 ) 97 .into_response(); 98 } 99 let mime_type = headers 100 .get("content-type") 101 .and_then(|h| h.to_str().ok()) 102 .unwrap_or("application/octet-stream") 103 .to_string(); 104 let size = body.len() as i64; 105 let data = body.to_vec(); 106 let mut hasher = Sha256::new(); 107 hasher.update(&data); 108 let hash = hasher.finalize(); 109 let multihash = match Multihash::wrap(0x12, &hash) { 110 Ok(mh) => mh, 111 Err(e) => { 112 error!("Failed to create multihash for blob: {:?}", e); 113 return ( 114 StatusCode::INTERNAL_SERVER_ERROR, 115 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})), 116 ) 117 .into_response(); 118 } 119 }; 120 let cid = Cid::new_v1(0x55, multihash); 121 let cid_str = cid.to_string(); 122 let storage_key = format!("blobs/{}", cid_str); 123 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 124 .fetch_optional(&state.db) 125 .await; 126 let user_id = match user_query { 127 Ok(Some(row)) => row.id, 128 _ => { 129 return ( 130 StatusCode::INTERNAL_SERVER_ERROR, 131 Json(json!({"error": "InternalError"})), 132 ) 133 .into_response(); 134 } 135 }; 136 let mut tx = match state.db.begin().await { 137 Ok(tx) => tx, 138 Err(e) => { 139 error!("Failed to begin transaction: {:?}", e); 140 return ( 141 StatusCode::INTERNAL_SERVER_ERROR, 142 Json(json!({"error": "InternalError"})), 143 ) 144 .into_response(); 145 } 146 }; 147 let insert = sqlx::query!( 148 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 149 cid_str, 150 mime_type, 151 size, 152 user_id, 153 storage_key 154 ) 155 .fetch_optional(&mut *tx) 156 .await; 157 let was_inserted = match insert { 158 Ok(Some(_)) => true, 159 Ok(None) => false, 160 Err(e) => { 161 error!("Failed to insert blob record: {:?}", e); 162 return ( 163 StatusCode::INTERNAL_SERVER_ERROR, 164 Json(json!({"error": "InternalError"})), 165 ) 166 .into_response(); 167 } 168 }; 169 if was_inserted 170 && let Err(e) = state 171 .blob_store 172 .put_bytes(&storage_key, bytes::Bytes::from(data)) 173 .await 174 { 175 error!("Failed to upload blob to storage: {:?}", e); 176 return ( 177 StatusCode::INTERNAL_SERVER_ERROR, 178 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 179 ) 180 .into_response(); 181 } 182 if let Err(e) = tx.commit().await { 183 error!("Failed to commit blob transaction: {:?}", e); 184 if was_inserted 185 && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 186 error!( 187 "Failed to cleanup orphaned blob {}: {:?}", 188 storage_key, cleanup_err 189 ); 190 } 191 return ( 192 StatusCode::INTERNAL_SERVER_ERROR, 193 Json(json!({"error": "InternalError"})), 194 ) 195 .into_response(); 196 } 197 Json(json!({ 198 "blob": { 199 "$type": "blob", 200 "ref": { 201 "$link": cid_str 202 }, 203 "mimeType": mime_type, 204 "size": size 205 } 206 })) 207 .into_response() 208} 209 210#[derive(Deserialize)] 211pub struct ListMissingBlobsParams { 212 pub limit: Option<i64>, 213 pub cursor: Option<String>, 214} 215 216#[derive(Serialize)] 217#[serde(rename_all = "camelCase")] 218pub struct RecordBlob { 219 pub cid: String, 220 pub record_uri: String, 221} 222 223#[derive(Serialize)] 224pub struct ListMissingBlobsOutput { 225 #[serde(skip_serializing_if = "Option::is_none")] 226 pub cursor: Option<String>, 227 pub blobs: Vec<RecordBlob>, 228} 229 230fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) { 231 if let Some(obj) = val.as_object() { 232 if let Some(type_val) = obj.get("$type") 233 && type_val == "blob" 234 && let Some(r) = obj.get("ref") 235 && let Some(link) = r.get("$link") 236 && let Some(s) = link.as_str() { 237 blobs.push(s.to_string()); 238 } 239 for (_, v) in obj { 240 find_blobs(v, blobs); 241 } 242 } else if let Some(arr) = val.as_array() { 243 for v in arr { 244 find_blobs(v, blobs); 245 } 246 } 247} 248 249pub async fn list_missing_blobs( 250 State(state): State<AppState>, 251 headers: axum::http::HeaderMap, 252 Query(params): Query<ListMissingBlobsParams>, 253) -> Response { 254 let token = match crate::auth::extract_bearer_token_from_header( 255 headers.get("Authorization").and_then(|h| h.to_str().ok()), 256 ) { 257 Some(t) => t, 258 None => { 259 return ( 260 StatusCode::UNAUTHORIZED, 261 Json(json!({"error": "AuthenticationRequired"})), 262 ) 263 .into_response(); 264 } 265 }; 266 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 267 Ok(user) => user, 268 Err(_) => { 269 return ( 270 StatusCode::UNAUTHORIZED, 271 Json(json!({"error": "AuthenticationFailed"})), 272 ) 273 .into_response(); 274 } 275 }; 276 let did = auth_user.did; 277 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 278 .fetch_optional(&state.db) 279 .await; 280 let user_id = match user_query { 281 Ok(Some(row)) => row.id, 282 _ => { 283 return ( 284 StatusCode::INTERNAL_SERVER_ERROR, 285 Json(json!({"error": "InternalError"})), 286 ) 287 .into_response(); 288 } 289 }; 290 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 291 let cursor_str = params.cursor.unwrap_or_default(); 292 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') { 293 let parts: Vec<&str> = cursor_str.split('|').collect(); 294 (parts[0].to_string(), parts[1].to_string()) 295 } else { 296 (String::new(), String::new()) 297 }; 298 let records_query = sqlx::query!( 299 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4", 300 user_id, 301 cursor_collection, 302 cursor_rkey, 303 limit 304 ) 305 .fetch_all(&state.db) 306 .await; 307 let records = match records_query { 308 Ok(r) => r, 309 Err(e) => { 310 error!("DB error fetching records: {:?}", e); 311 return ( 312 StatusCode::INTERNAL_SERVER_ERROR, 313 Json(json!({"error": "InternalError"})), 314 ) 315 .into_response(); 316 } 317 }; 318 let mut missing_blobs = Vec::new(); 319 let mut last_cursor = None; 320 for row in &records { 321 let collection = &row.collection; 322 let rkey = &row.rkey; 323 let record_cid_str = &row.record_cid; 324 last_cursor = Some(format!("{}|{}", collection, rkey)); 325 let record_cid = match Cid::from_str(record_cid_str) { 326 Ok(c) => c, 327 Err(_) => continue, 328 }; 329 let block_bytes = match state.block_store.get(&record_cid).await { 330 Ok(Some(b)) => b, 331 _ => continue, 332 }; 333 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 334 Ok(v) => v, 335 Err(_) => continue, 336 }; 337 let mut blobs = Vec::new(); 338 find_blobs(&record_val, &mut blobs); 339 for blob_cid_str in blobs { 340 let exists = sqlx::query!( 341 "SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", 342 blob_cid_str, 343 user_id 344 ) 345 .fetch_optional(&state.db) 346 .await; 347 match exists { 348 Ok(None) => { 349 missing_blobs.push(RecordBlob { 350 cid: blob_cid_str, 351 record_uri: format!("at://{}/{}/{}", did, collection, rkey), 352 }); 353 } 354 Err(e) => { 355 error!("DB error checking blob existence: {:?}", e); 356 } 357 _ => {} 358 } 359 } 360 } 361 // if we fetched fewer records than limit, we are done, so cursor is None. 362 // otherwise, cursor is the last one we saw. 363 // ...right? 364 let next_cursor = if records.len() < limit as usize { 365 None 366 } else { 367 last_cursor 368 }; 369 ( 370 StatusCode::OK, 371 Json(ListMissingBlobsOutput { 372 cursor: next_cursor, 373 blobs: missing_blobs, 374 }), 375 ) 376 .into_response() 377}