this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token}; 2use crate::delegation::{self, DelegationActionType}; 3use crate::state::AppState; 4use crate::util::get_max_blob_size; 5use axum::body::Body; 6use axum::{ 7 Json, 8 extract::{Query, State}, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11}; 12use bytes::Bytes; 13use cid::Cid; 14use futures::StreamExt; 15use multihash::Multihash; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use std::pin::Pin; 19use tracing::{debug, error, info}; 20 21pub async fn upload_blob( 22 State(state): State<AppState>, 23 headers: axum::http::HeaderMap, 24 body: Body, 25) -> Response { 26 let token = match crate::auth::extract_bearer_token_from_header( 27 headers.get("Authorization").and_then(|h| h.to_str().ok()), 28 ) { 29 Some(t) => t, 30 None => { 31 return ( 32 StatusCode::UNAUTHORIZED, 33 Json(json!({"error": "AuthenticationRequired"})), 34 ) 35 .into_response(); 36 } 37 }; 38 39 let is_service_auth = is_service_token(&token); 40 41 let (did, _is_migration, controller_did) = if is_service_auth { 42 debug!("Verifying service token for blob upload"); 43 let verifier = ServiceTokenVerifier::new(); 44 match verifier 45 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob")) 46 .await 47 { 48 Ok(claims) => { 49 debug!("Service token verified for DID: {}", claims.iss); 50 (claims.iss, false, None) 51 } 52 Err(e) => { 53 error!("Service token verification failed: {:?}", e); 54 return ( 55 StatusCode::UNAUTHORIZED, 56 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})), 57 ) 58 .into_response(); 59 } 60 } 61 } else { 62 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 63 Ok(user) => { 64 let mime_type_for_check = headers 65 .get("content-type") 66 .and_then(|h| h.to_str().ok()) 67 .unwrap_or("application/octet-stream"); 68 if let Err(e) = crate::auth::scope_check::check_blob_scope( 69 user.is_oauth, 70 user.scope.as_deref(), 71 mime_type_for_check, 72 ) { 73 return e; 74 } 75 let deactivated = sqlx::query_scalar!( 76 "SELECT deactivated_at FROM users WHERE did = $1", 77 user.did 78 ) 79 .fetch_optional(&state.db) 80 .await 81 .ok() 82 .flatten() 83 .flatten(); 84 let ctrl_did = user.controller_did.clone(); 85 (user.did, deactivated.is_some(), ctrl_did) 86 } 87 Err(_) => { 88 return ( 89 StatusCode::UNAUTHORIZED, 90 Json(json!({"error": "AuthenticationFailed"})), 91 ) 92 .into_response(); 93 } 94 } 95 }; 96 97 if crate::util::is_account_migrated(&state.db, &did) 98 .await 99 .unwrap_or(false) 100 { 101 return ( 102 StatusCode::FORBIDDEN, 103 Json(json!({ 104 "error": "AccountMigrated", 105 "message": "Account has been migrated to another PDS. Blob operations are not allowed." 106 })), 107 ) 108 .into_response(); 109 } 110 111 let mime_type = headers 112 .get("content-type") 113 .and_then(|h| h.to_str().ok()) 114 .unwrap_or("application/octet-stream") 115 .to_string(); 116 117 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 118 .fetch_optional(&state.db) 119 .await; 120 let user_id = match user_query { 121 Ok(Some(row)) => row.id, 122 _ => { 123 return ( 124 StatusCode::INTERNAL_SERVER_ERROR, 125 Json(json!({"error": "InternalError"})), 126 ) 127 .into_response(); 128 } 129 }; 130 131 let temp_key = format!("temp/{}", uuid::Uuid::new_v4()); 132 let max_size = get_max_blob_size() as u64; 133 134 let body_stream = body.into_data_stream(); 135 let mapped_stream = 136 body_stream.map(|result| result.map_err(|e| std::io::Error::other(e.to_string()))); 137 let pinned_stream: Pin<Box<dyn futures::Stream<Item = Result<Bytes, std::io::Error>> + Send>> = 138 Box::pin(mapped_stream); 139 140 info!("Starting streaming blob upload to temp key: {}", temp_key); 141 142 let upload_result = match state.blob_store.put_stream(&temp_key, pinned_stream).await { 143 Ok(result) => result, 144 Err(e) => { 145 error!("Failed to stream blob to storage: {:?}", e); 146 return ( 147 StatusCode::INTERNAL_SERVER_ERROR, 148 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 149 ) 150 .into_response(); 151 } 152 }; 153 154 let size = upload_result.size; 155 if size > max_size { 156 let _ = state.blob_store.delete(&temp_key).await; 157 return ( 158 StatusCode::PAYLOAD_TOO_LARGE, 159 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", size, max_size)})), 160 ) 161 .into_response(); 162 } 163 164 let multihash = match Multihash::wrap(0x12, &upload_result.sha256_hash) { 165 Ok(mh) => mh, 166 Err(e) => { 167 let _ = state.blob_store.delete(&temp_key).await; 168 error!("Failed to create multihash for blob: {:?}", e); 169 return ( 170 StatusCode::INTERNAL_SERVER_ERROR, 171 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})), 172 ) 173 .into_response(); 174 } 175 }; 176 let cid = Cid::new_v1(0x55, multihash); 177 let cid_str = cid.to_string(); 178 let storage_key = format!("blobs/{}", cid_str); 179 180 info!( 181 "Blob upload complete: size={}, cid={}, copying to final location", 182 size, cid_str 183 ); 184 185 let mut tx = match state.db.begin().await { 186 Ok(tx) => tx, 187 Err(e) => { 188 let _ = state.blob_store.delete(&temp_key).await; 189 error!("Failed to begin transaction: {:?}", e); 190 return ( 191 StatusCode::INTERNAL_SERVER_ERROR, 192 Json(json!({"error": "InternalError"})), 193 ) 194 .into_response(); 195 } 196 }; 197 198 let insert = sqlx::query!( 199 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 200 cid_str, 201 mime_type, 202 size as i64, 203 user_id, 204 storage_key 205 ) 206 .fetch_optional(&mut *tx) 207 .await; 208 209 let was_inserted = match insert { 210 Ok(Some(_)) => true, 211 Ok(None) => false, 212 Err(e) => { 213 let _ = state.blob_store.delete(&temp_key).await; 214 error!("Failed to insert blob record: {:?}", e); 215 return ( 216 StatusCode::INTERNAL_SERVER_ERROR, 217 Json(json!({"error": "InternalError"})), 218 ) 219 .into_response(); 220 } 221 }; 222 223 if was_inserted && let Err(e) = state.blob_store.copy(&temp_key, &storage_key).await { 224 let _ = state.blob_store.delete(&temp_key).await; 225 error!("Failed to copy blob to final location: {:?}", e); 226 return ( 227 StatusCode::INTERNAL_SERVER_ERROR, 228 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 229 ) 230 .into_response(); 231 } 232 233 let _ = state.blob_store.delete(&temp_key).await; 234 235 if let Err(e) = tx.commit().await { 236 error!("Failed to commit blob transaction: {:?}", e); 237 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 238 error!( 239 "Failed to cleanup orphaned blob {}: {:?}", 240 storage_key, cleanup_err 241 ); 242 } 243 return ( 244 StatusCode::INTERNAL_SERVER_ERROR, 245 Json(json!({"error": "InternalError"})), 246 ) 247 .into_response(); 248 } 249 250 if let Some(ref controller) = controller_did { 251 let _ = delegation::log_delegation_action( 252 &state.db, 253 &did, 254 controller, 255 Some(controller), 256 DelegationActionType::BlobUpload, 257 Some(json!({ 258 "cid": cid_str, 259 "mime_type": mime_type, 260 "size": size 261 })), 262 None, 263 None, 264 ) 265 .await; 266 } 267 268 Json(json!({ 269 "blob": { 270 "$type": "blob", 271 "ref": { 272 "$link": cid_str 273 }, 274 "mimeType": mime_type, 275 "size": size 276 } 277 })) 278 .into_response() 279} 280 281#[derive(Deserialize)] 282pub struct ListMissingBlobsParams { 283 pub limit: Option<i64>, 284 pub cursor: Option<String>, 285} 286 287#[derive(Serialize)] 288#[serde(rename_all = "camelCase")] 289pub struct RecordBlob { 290 pub cid: String, 291 pub record_uri: String, 292} 293 294#[derive(Serialize)] 295pub struct ListMissingBlobsOutput { 296 #[serde(skip_serializing_if = "Option::is_none")] 297 pub cursor: Option<String>, 298 pub blobs: Vec<RecordBlob>, 299} 300 301pub async fn list_missing_blobs( 302 State(state): State<AppState>, 303 headers: axum::http::HeaderMap, 304 Query(params): Query<ListMissingBlobsParams>, 305) -> Response { 306 let token = match crate::auth::extract_bearer_token_from_header( 307 headers.get("Authorization").and_then(|h| h.to_str().ok()), 308 ) { 309 Some(t) => t, 310 None => { 311 return ( 312 StatusCode::UNAUTHORIZED, 313 Json(json!({"error": "AuthenticationRequired"})), 314 ) 315 .into_response(); 316 } 317 }; 318 let auth_user = 319 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 320 Ok(user) => user, 321 Err(_) => { 322 return ( 323 StatusCode::UNAUTHORIZED, 324 Json(json!({"error": "AuthenticationFailed"})), 325 ) 326 .into_response(); 327 } 328 }; 329 let did = auth_user.did; 330 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 331 .fetch_optional(&state.db) 332 .await; 333 let user_id = match user_query { 334 Ok(Some(row)) => row.id, 335 _ => { 336 return ( 337 StatusCode::INTERNAL_SERVER_ERROR, 338 Json(json!({"error": "InternalError"})), 339 ) 340 .into_response(); 341 } 342 }; 343 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 344 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 345 let missing_query = sqlx::query!( 346 r#" 347 SELECT rb.blob_cid, rb.record_uri 348 FROM record_blobs rb 349 LEFT JOIN blobs b ON rb.blob_cid = b.cid 350 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2 351 ORDER BY rb.blob_cid 352 LIMIT $3 353 "#, 354 user_id, 355 cursor_cid, 356 limit + 1 357 ) 358 .fetch_all(&state.db) 359 .await; 360 let rows = match missing_query { 361 Ok(r) => r, 362 Err(e) => { 363 error!("DB error fetching missing blobs: {:?}", e); 364 return ( 365 StatusCode::INTERNAL_SERVER_ERROR, 366 Json(json!({"error": "InternalError"})), 367 ) 368 .into_response(); 369 } 370 }; 371 let has_more = rows.len() > limit as usize; 372 let blobs: Vec<RecordBlob> = rows 373 .into_iter() 374 .take(limit as usize) 375 .map(|row| RecordBlob { 376 cid: row.blob_cid, 377 record_uri: row.record_uri, 378 }) 379 .collect(); 380 let next_cursor = if has_more { 381 blobs.last().map(|b| b.cid.clone()) 382 } else { 383 None 384 }; 385 ( 386 StatusCode::OK, 387 Json(ListMissingBlobsOutput { 388 cursor: next_cursor, 389 blobs, 390 }), 391 ) 392 .into_response() 393}