this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token}; 2use crate::delegation::{self, DelegationActionType}; 3use crate::state::AppState; 4use axum::body::Bytes; 5use axum::{ 6 Json, 7 extract::{Query, State}, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use cid::Cid; 12use jacquard_repo::storage::BlockStore; 13use multihash::Multihash; 14use serde::{Deserialize, Serialize}; 15use serde_json::json; 16use sha2::{Digest, Sha256}; 17use std::str::FromStr; 18use tracing::{debug, error}; 19 20const MAX_BLOB_SIZE: usize = 10_000_000_000; 21const MAX_VIDEO_BLOB_SIZE: usize = 10_000_000_000; 22 23pub async fn upload_blob( 24 State(state): State<AppState>, 25 headers: axum::http::HeaderMap, 26 body: Bytes, 27) -> Response { 28 let token = match crate::auth::extract_bearer_token_from_header( 29 headers.get("Authorization").and_then(|h| h.to_str().ok()), 30 ) { 31 Some(t) => t, 32 None => { 33 return ( 34 StatusCode::UNAUTHORIZED, 35 Json(json!({"error": "AuthenticationRequired"})), 36 ) 37 .into_response(); 38 } 39 }; 40 41 let is_service_auth = is_service_token(&token); 42 43 let (did, is_migration, controller_did) = if is_service_auth { 44 debug!("Verifying service token for blob upload"); 45 let verifier = ServiceTokenVerifier::new(); 46 match verifier 47 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob")) 48 .await 49 { 50 Ok(claims) => { 51 debug!("Service token verified for DID: {}", claims.iss); 52 (claims.iss, false, None) 53 } 54 Err(e) => { 55 error!("Service token verification failed: {:?}", e); 56 return ( 57 StatusCode::UNAUTHORIZED, 58 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})), 59 ) 60 .into_response(); 61 } 62 } 63 } else { 64 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 65 Ok(user) => { 66 let mime_type_for_check = headers 67 .get("content-type") 68 .and_then(|h| h.to_str().ok()) 69 .unwrap_or("application/octet-stream"); 70 if let Err(e) = crate::auth::scope_check::check_blob_scope( 71 user.is_oauth, 72 user.scope.as_deref(), 73 mime_type_for_check, 74 ) { 75 return e; 76 } 77 let deactivated = sqlx::query_scalar!( 78 "SELECT deactivated_at FROM users WHERE did = $1", 79 user.did 80 ) 81 .fetch_optional(&state.db) 82 .await 83 .ok() 84 .flatten() 85 .flatten(); 86 let ctrl_did = user.controller_did.clone(); 87 (user.did, deactivated.is_some(), ctrl_did) 88 } 89 Err(_) => { 90 return ( 91 StatusCode::UNAUTHORIZED, 92 Json(json!({"error": "AuthenticationFailed"})), 93 ) 94 .into_response(); 95 } 96 } 97 }; 98 99 let max_size = if is_service_auth || is_migration { 100 MAX_VIDEO_BLOB_SIZE 101 } else { 102 MAX_BLOB_SIZE 103 }; 104 105 if body.len() > max_size { 106 return ( 107 StatusCode::PAYLOAD_TOO_LARGE, 108 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), max_size)})), 109 ) 110 .into_response(); 111 } 112 let mime_type = headers 113 .get("content-type") 114 .and_then(|h| h.to_str().ok()) 115 .unwrap_or("application/octet-stream") 116 .to_string(); 117 let size = body.len() as i64; 118 let data = body.to_vec(); 119 let mut hasher = Sha256::new(); 120 hasher.update(&data); 121 let hash = hasher.finalize(); 122 let multihash = match Multihash::wrap(0x12, &hash) { 123 Ok(mh) => mh, 124 Err(e) => { 125 error!("Failed to create multihash for blob: {:?}", e); 126 return ( 127 StatusCode::INTERNAL_SERVER_ERROR, 128 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})), 129 ) 130 .into_response(); 131 } 132 }; 133 let cid = Cid::new_v1(0x55, multihash); 134 let cid_str = cid.to_string(); 135 let storage_key = format!("blobs/{}", cid_str); 136 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 137 .fetch_optional(&state.db) 138 .await; 139 let user_id = match user_query { 140 Ok(Some(row)) => row.id, 141 _ => { 142 return ( 143 StatusCode::INTERNAL_SERVER_ERROR, 144 Json(json!({"error": "InternalError"})), 145 ) 146 .into_response(); 147 } 148 }; 149 let mut tx = match state.db.begin().await { 150 Ok(tx) => tx, 151 Err(e) => { 152 error!("Failed to begin transaction: {:?}", e); 153 return ( 154 StatusCode::INTERNAL_SERVER_ERROR, 155 Json(json!({"error": "InternalError"})), 156 ) 157 .into_response(); 158 } 159 }; 160 let insert = sqlx::query!( 161 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 162 cid_str, 163 mime_type, 164 size, 165 user_id, 166 storage_key 167 ) 168 .fetch_optional(&mut *tx) 169 .await; 170 let was_inserted = match insert { 171 Ok(Some(_)) => true, 172 Ok(None) => false, 173 Err(e) => { 174 error!("Failed to insert blob record: {:?}", e); 175 return ( 176 StatusCode::INTERNAL_SERVER_ERROR, 177 Json(json!({"error": "InternalError"})), 178 ) 179 .into_response(); 180 } 181 }; 182 if was_inserted 183 && let Err(e) = state 184 .blob_store 185 .put_bytes(&storage_key, bytes::Bytes::from(data)) 186 .await 187 { 188 error!("Failed to upload blob to storage: {:?}", e); 189 return ( 190 StatusCode::INTERNAL_SERVER_ERROR, 191 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 192 ) 193 .into_response(); 194 } 195 if let Err(e) = tx.commit().await { 196 error!("Failed to commit blob transaction: {:?}", e); 197 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 198 error!( 199 "Failed to cleanup orphaned blob {}: {:?}", 200 storage_key, cleanup_err 201 ); 202 } 203 return ( 204 StatusCode::INTERNAL_SERVER_ERROR, 205 Json(json!({"error": "InternalError"})), 206 ) 207 .into_response(); 208 } 209 210 if let Some(ref controller) = controller_did { 211 let _ = delegation::log_delegation_action( 212 &state.db, 213 &did, 214 controller, 215 Some(controller), 216 DelegationActionType::BlobUpload, 217 Some(json!({ 218 "cid": cid_str, 219 "mime_type": mime_type, 220 "size": size 221 })), 222 None, 223 None, 224 ) 225 .await; 226 } 227 228 Json(json!({ 229 "blob": { 230 "$type": "blob", 231 "ref": { 232 "$link": cid_str 233 }, 234 "mimeType": mime_type, 235 "size": size 236 } 237 })) 238 .into_response() 239} 240 241#[derive(Deserialize)] 242pub struct ListMissingBlobsParams { 243 pub limit: Option<i64>, 244 pub cursor: Option<String>, 245} 246 247#[derive(Serialize)] 248#[serde(rename_all = "camelCase")] 249pub struct RecordBlob { 250 pub cid: String, 251 pub record_uri: String, 252} 253 254#[derive(Serialize)] 255pub struct ListMissingBlobsOutput { 256 #[serde(skip_serializing_if = "Option::is_none")] 257 pub cursor: Option<String>, 258 pub blobs: Vec<RecordBlob>, 259} 260 261fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) { 262 if let Some(obj) = val.as_object() { 263 if let Some(type_val) = obj.get("$type") 264 && type_val == "blob" 265 && let Some(r) = obj.get("ref") 266 && let Some(link) = r.get("$link") 267 && let Some(s) = link.as_str() 268 { 269 blobs.push(s.to_string()); 270 } 271 for (_, v) in obj { 272 find_blobs(v, blobs); 273 } 274 } else if let Some(arr) = val.as_array() { 275 for v in arr { 276 find_blobs(v, blobs); 277 } 278 } 279} 280 281pub async fn list_missing_blobs( 282 State(state): State<AppState>, 283 headers: axum::http::HeaderMap, 284 Query(params): Query<ListMissingBlobsParams>, 285) -> Response { 286 let token = match crate::auth::extract_bearer_token_from_header( 287 headers.get("Authorization").and_then(|h| h.to_str().ok()), 288 ) { 289 Some(t) => t, 290 None => { 291 return ( 292 StatusCode::UNAUTHORIZED, 293 Json(json!({"error": "AuthenticationRequired"})), 294 ) 295 .into_response(); 296 } 297 }; 298 let auth_user = match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 299 Ok(user) => user, 300 Err(_) => { 301 return ( 302 StatusCode::UNAUTHORIZED, 303 Json(json!({"error": "AuthenticationFailed"})), 304 ) 305 .into_response(); 306 } 307 }; 308 let did = auth_user.did; 309 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 310 .fetch_optional(&state.db) 311 .await; 312 let user_id = match user_query { 313 Ok(Some(row)) => row.id, 314 _ => { 315 return ( 316 StatusCode::INTERNAL_SERVER_ERROR, 317 Json(json!({"error": "InternalError"})), 318 ) 319 .into_response(); 320 } 321 }; 322 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 323 let cursor_str = params.cursor.unwrap_or_default(); 324 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') { 325 let parts: Vec<&str> = cursor_str.split('|').collect(); 326 (parts[0].to_string(), parts[1].to_string()) 327 } else { 328 (String::new(), String::new()) 329 }; 330 let records_query = sqlx::query!( 331 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4", 332 user_id, 333 cursor_collection, 334 cursor_rkey, 335 limit 336 ) 337 .fetch_all(&state.db) 338 .await; 339 let records = match records_query { 340 Ok(r) => r, 341 Err(e) => { 342 error!("DB error fetching records: {:?}", e); 343 return ( 344 StatusCode::INTERNAL_SERVER_ERROR, 345 Json(json!({"error": "InternalError"})), 346 ) 347 .into_response(); 348 } 349 }; 350 let mut missing_blobs = Vec::new(); 351 let mut last_cursor = None; 352 for row in &records { 353 let collection = &row.collection; 354 let rkey = &row.rkey; 355 let record_cid_str = &row.record_cid; 356 last_cursor = Some(format!("{}|{}", collection, rkey)); 357 let record_cid = match Cid::from_str(record_cid_str) { 358 Ok(c) => c, 359 Err(_) => continue, 360 }; 361 let block_bytes = match state.block_store.get(&record_cid).await { 362 Ok(Some(b)) => b, 363 _ => continue, 364 }; 365 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 366 Ok(v) => v, 367 Err(_) => continue, 368 }; 369 let mut blobs = Vec::new(); 370 find_blobs(&record_val, &mut blobs); 371 for blob_cid_str in blobs { 372 let exists = sqlx::query!( 373 "SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", 374 blob_cid_str, 375 user_id 376 ) 377 .fetch_optional(&state.db) 378 .await; 379 match exists { 380 Ok(None) => { 381 missing_blobs.push(RecordBlob { 382 cid: blob_cid_str, 383 record_uri: format!("at://{}/{}/{}", did, collection, rkey), 384 }); 385 } 386 Err(e) => { 387 error!("DB error checking blob existence: {:?}", e); 388 } 389 _ => {} 390 } 391 } 392 } 393 // if we fetched fewer records than limit, we are done, so cursor is None. 394 // otherwise, cursor is the last one we saw. 395 // ...right? 396 let next_cursor = if records.len() < limit as usize { 397 None 398 } else { 399 last_cursor 400 }; 401 ( 402 StatusCode::OK, 403 Json(ListMissingBlobsOutput { 404 cursor: next_cursor, 405 blobs: missing_blobs, 406 }), 407 ) 408 .into_response() 409}