this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token}; 2use crate::state::AppState; 3use axum::body::Bytes; 4use axum::{ 5 Json, 6 extract::{Query, State}, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9}; 10use cid::Cid; 11use jacquard_repo::storage::BlockStore; 12use multihash::Multihash; 13use serde::{Deserialize, Serialize}; 14use serde_json::json; 15use sha2::{Digest, Sha256}; 16use std::str::FromStr; 17use tracing::{debug, error}; 18 19const MAX_BLOB_SIZE: usize = 10_000_000_000; 20const MAX_VIDEO_BLOB_SIZE: usize = 10_000_000_000; 21 22pub async fn upload_blob( 23 State(state): State<AppState>, 24 headers: axum::http::HeaderMap, 25 body: Bytes, 26) -> Response { 27 let token = match crate::auth::extract_bearer_token_from_header( 28 headers.get("Authorization").and_then(|h| h.to_str().ok()), 29 ) { 30 Some(t) => t, 31 None => { 32 return ( 33 StatusCode::UNAUTHORIZED, 34 Json(json!({"error": "AuthenticationRequired"})), 35 ) 36 .into_response(); 37 } 38 }; 39 40 let is_service_auth = is_service_token(&token); 41 42 let (did, is_migration) = if is_service_auth { 43 debug!("Verifying service token for blob upload"); 44 let verifier = ServiceTokenVerifier::new(); 45 match verifier 46 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob")) 47 .await 48 { 49 Ok(claims) => { 50 debug!("Service token verified for DID: {}", claims.iss); 51 (claims.iss, false) 52 } 53 Err(e) => { 54 error!("Service token verification failed: {:?}", e); 55 return ( 56 StatusCode::UNAUTHORIZED, 57 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})), 58 ) 59 .into_response(); 60 } 61 } 62 } else { 63 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 64 Ok(user) => { 65 let mime_type_for_check = headers 66 .get("content-type") 67 .and_then(|h| h.to_str().ok()) 68 .unwrap_or("application/octet-stream"); 69 if let Err(e) = crate::auth::scope_check::check_blob_scope( 70 user.is_oauth, 71 user.scope.as_deref(), 72 mime_type_for_check, 73 ) { 74 return e; 75 } 76 let deactivated = sqlx::query_scalar!( 77 "SELECT deactivated_at FROM users WHERE did = $1", 78 user.did 79 ) 80 .fetch_optional(&state.db) 81 .await 82 .ok() 83 .flatten() 84 .flatten(); 85 (user.did, deactivated.is_some()) 86 } 87 Err(_) => { 88 return ( 89 StatusCode::UNAUTHORIZED, 90 Json(json!({"error": "AuthenticationFailed"})), 91 ) 92 .into_response(); 93 } 94 } 95 }; 96 97 let max_size = if is_service_auth || is_migration { 98 MAX_VIDEO_BLOB_SIZE 99 } else { 100 MAX_BLOB_SIZE 101 }; 102 103 if body.len() > max_size { 104 return ( 105 StatusCode::PAYLOAD_TOO_LARGE, 106 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), max_size)})), 107 ) 108 .into_response(); 109 } 110 let mime_type = headers 111 .get("content-type") 112 .and_then(|h| h.to_str().ok()) 113 .unwrap_or("application/octet-stream") 114 .to_string(); 115 let size = body.len() as i64; 116 let data = body.to_vec(); 117 let mut hasher = Sha256::new(); 118 hasher.update(&data); 119 let hash = hasher.finalize(); 120 let multihash = match Multihash::wrap(0x12, &hash) { 121 Ok(mh) => mh, 122 Err(e) => { 123 error!("Failed to create multihash for blob: {:?}", e); 124 return ( 125 StatusCode::INTERNAL_SERVER_ERROR, 126 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})), 127 ) 128 .into_response(); 129 } 130 }; 131 let cid = Cid::new_v1(0x55, multihash); 132 let cid_str = cid.to_string(); 133 let storage_key = format!("blobs/{}", cid_str); 134 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 135 .fetch_optional(&state.db) 136 .await; 137 let user_id = match user_query { 138 Ok(Some(row)) => row.id, 139 _ => { 140 return ( 141 StatusCode::INTERNAL_SERVER_ERROR, 142 Json(json!({"error": "InternalError"})), 143 ) 144 .into_response(); 145 } 146 }; 147 let mut tx = match state.db.begin().await { 148 Ok(tx) => tx, 149 Err(e) => { 150 error!("Failed to begin transaction: {:?}", e); 151 return ( 152 StatusCode::INTERNAL_SERVER_ERROR, 153 Json(json!({"error": "InternalError"})), 154 ) 155 .into_response(); 156 } 157 }; 158 let insert = sqlx::query!( 159 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 160 cid_str, 161 mime_type, 162 size, 163 user_id, 164 storage_key 165 ) 166 .fetch_optional(&mut *tx) 167 .await; 168 let was_inserted = match insert { 169 Ok(Some(_)) => true, 170 Ok(None) => false, 171 Err(e) => { 172 error!("Failed to insert blob record: {:?}", e); 173 return ( 174 StatusCode::INTERNAL_SERVER_ERROR, 175 Json(json!({"error": "InternalError"})), 176 ) 177 .into_response(); 178 } 179 }; 180 if was_inserted 181 && let Err(e) = state 182 .blob_store 183 .put_bytes(&storage_key, bytes::Bytes::from(data)) 184 .await 185 { 186 error!("Failed to upload blob to storage: {:?}", e); 187 return ( 188 StatusCode::INTERNAL_SERVER_ERROR, 189 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 190 ) 191 .into_response(); 192 } 193 if let Err(e) = tx.commit().await { 194 error!("Failed to commit blob transaction: {:?}", e); 195 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 196 error!( 197 "Failed to cleanup orphaned blob {}: {:?}", 198 storage_key, cleanup_err 199 ); 200 } 201 return ( 202 StatusCode::INTERNAL_SERVER_ERROR, 203 Json(json!({"error": "InternalError"})), 204 ) 205 .into_response(); 206 } 207 Json(json!({ 208 "blob": { 209 "$type": "blob", 210 "ref": { 211 "$link": cid_str 212 }, 213 "mimeType": mime_type, 214 "size": size 215 } 216 })) 217 .into_response() 218} 219 220#[derive(Deserialize)] 221pub struct ListMissingBlobsParams { 222 pub limit: Option<i64>, 223 pub cursor: Option<String>, 224} 225 226#[derive(Serialize)] 227#[serde(rename_all = "camelCase")] 228pub struct RecordBlob { 229 pub cid: String, 230 pub record_uri: String, 231} 232 233#[derive(Serialize)] 234pub struct ListMissingBlobsOutput { 235 #[serde(skip_serializing_if = "Option::is_none")] 236 pub cursor: Option<String>, 237 pub blobs: Vec<RecordBlob>, 238} 239 240fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) { 241 if let Some(obj) = val.as_object() { 242 if let Some(type_val) = obj.get("$type") 243 && type_val == "blob" 244 && let Some(r) = obj.get("ref") 245 && let Some(link) = r.get("$link") 246 && let Some(s) = link.as_str() 247 { 248 blobs.push(s.to_string()); 249 } 250 for (_, v) in obj { 251 find_blobs(v, blobs); 252 } 253 } else if let Some(arr) = val.as_array() { 254 for v in arr { 255 find_blobs(v, blobs); 256 } 257 } 258} 259 260pub async fn list_missing_blobs( 261 State(state): State<AppState>, 262 headers: axum::http::HeaderMap, 263 Query(params): Query<ListMissingBlobsParams>, 264) -> Response { 265 let token = match crate::auth::extract_bearer_token_from_header( 266 headers.get("Authorization").and_then(|h| h.to_str().ok()), 267 ) { 268 Some(t) => t, 269 None => { 270 return ( 271 StatusCode::UNAUTHORIZED, 272 Json(json!({"error": "AuthenticationRequired"})), 273 ) 274 .into_response(); 275 } 276 }; 277 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 278 Ok(user) => user, 279 Err(_) => { 280 return ( 281 StatusCode::UNAUTHORIZED, 282 Json(json!({"error": "AuthenticationFailed"})), 283 ) 284 .into_response(); 285 } 286 }; 287 let did = auth_user.did; 288 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 289 .fetch_optional(&state.db) 290 .await; 291 let user_id = match user_query { 292 Ok(Some(row)) => row.id, 293 _ => { 294 return ( 295 StatusCode::INTERNAL_SERVER_ERROR, 296 Json(json!({"error": "InternalError"})), 297 ) 298 .into_response(); 299 } 300 }; 301 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 302 let cursor_str = params.cursor.unwrap_or_default(); 303 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') { 304 let parts: Vec<&str> = cursor_str.split('|').collect(); 305 (parts[0].to_string(), parts[1].to_string()) 306 } else { 307 (String::new(), String::new()) 308 }; 309 let records_query = sqlx::query!( 310 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4", 311 user_id, 312 cursor_collection, 313 cursor_rkey, 314 limit 315 ) 316 .fetch_all(&state.db) 317 .await; 318 let records = match records_query { 319 Ok(r) => r, 320 Err(e) => { 321 error!("DB error fetching records: {:?}", e); 322 return ( 323 StatusCode::INTERNAL_SERVER_ERROR, 324 Json(json!({"error": "InternalError"})), 325 ) 326 .into_response(); 327 } 328 }; 329 let mut missing_blobs = Vec::new(); 330 let mut last_cursor = None; 331 for row in &records { 332 let collection = &row.collection; 333 let rkey = &row.rkey; 334 let record_cid_str = &row.record_cid; 335 last_cursor = Some(format!("{}|{}", collection, rkey)); 336 let record_cid = match Cid::from_str(record_cid_str) { 337 Ok(c) => c, 338 Err(_) => continue, 339 }; 340 let block_bytes = match state.block_store.get(&record_cid).await { 341 Ok(Some(b)) => b, 342 _ => continue, 343 }; 344 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 345 Ok(v) => v, 346 Err(_) => continue, 347 }; 348 let mut blobs = Vec::new(); 349 find_blobs(&record_val, &mut blobs); 350 for blob_cid_str in blobs { 351 let exists = sqlx::query!( 352 "SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", 353 blob_cid_str, 354 user_id 355 ) 356 .fetch_optional(&state.db) 357 .await; 358 match exists { 359 Ok(None) => { 360 missing_blobs.push(RecordBlob { 361 cid: blob_cid_str, 362 record_uri: format!("at://{}/{}/{}", did, collection, rkey), 363 }); 364 } 365 Err(e) => { 366 error!("DB error checking blob existence: {:?}", e); 367 } 368 _ => {} 369 } 370 } 371 } 372 // if we fetched fewer records than limit, we are done, so cursor is None. 373 // otherwise, cursor is the last one we saw. 374 // ...right? 375 let next_cursor = if records.len() < limit as usize { 376 None 377 } else { 378 last_cursor 379 }; 380 ( 381 StatusCode::OK, 382 Json(ListMissingBlobsOutput { 383 cursor: next_cursor, 384 blobs: missing_blobs, 385 }), 386 ) 387 .into_response() 388}