this repo has no description
1use crate::api::error::ApiError; 2use crate::auth::{ServiceTokenVerifier, is_service_token}; 3use crate::delegation::{self, DelegationActionType}; 4use crate::state::AppState; 5use crate::util::get_max_blob_size; 6use axum::body::Body; 7use axum::{ 8 Json, 9 extract::{Query, State}, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use bytes::Bytes; 14use cid::Cid; 15use futures::StreamExt; 16use multihash::Multihash; 17use serde::{Deserialize, Serialize}; 18use serde_json::json; 19use std::pin::Pin; 20use tracing::{debug, error, info, warn}; 21 22fn detect_mime_type(data: &[u8], client_hint: &str) -> String { 23 if let Some(kind) = infer::get(data) { 24 let detected = kind.mime_type().to_string(); 25 if detected != client_hint { 26 debug!( 27 "MIME type detection: client sent '{}', detected '{}'", 28 client_hint, detected 29 ); 30 } 31 detected 32 } else { 33 if client_hint == "*/*" || client_hint.is_empty() { 34 warn!("Could not detect MIME type and client sent invalid hint: '{}'", client_hint); 35 "application/octet-stream".to_string() 36 } else { 37 client_hint.to_string() 38 } 39 } 40} 41 42pub async fn upload_blob( 43 State(state): State<AppState>, 44 headers: axum::http::HeaderMap, 45 body: Body, 46) -> Response { 47 let Some(token) = crate::auth::extract_bearer_token_from_header( 48 headers.get("Authorization").and_then(|h| h.to_str().ok()), 49 ) else { 50 return ApiError::AuthenticationRequired.into_response(); 51 }; 52 53 let is_service_auth = is_service_token(&token); 54 55 let (did, _is_migration, controller_did) = if is_service_auth { 56 debug!("Verifying service token for blob upload"); 57 let verifier = ServiceTokenVerifier::new(); 58 match verifier 59 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob")) 60 .await 61 { 62 Ok(claims) => { 63 debug!("Service token verified for DID: {}", claims.iss); 64 (claims.iss, false, None) 65 } 66 Err(e) => { 67 error!("Service token verification failed: {:?}", e); 68 return ApiError::AuthenticationFailed(Some(format!( 69 "Service token verification failed: {}", 70 e 71 ))) 72 .into_response(); 73 } 74 } 75 } else { 76 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 77 Ok(user) => { 78 let mime_type_for_check = headers 79 .get("content-type") 80 .and_then(|h| h.to_str().ok()) 81 .unwrap_or("application/octet-stream"); 82 if let Err(e) = crate::auth::scope_check::check_blob_scope( 83 user.is_oauth, 84 user.scope.as_deref(), 85 mime_type_for_check, 86 ) { 87 return e; 88 } 89 let deactivated = sqlx::query_scalar!( 90 "SELECT deactivated_at FROM users WHERE did = $1", 91 &user.did 92 ) 93 .fetch_optional(&state.db) 94 .await 95 .ok() 96 .flatten() 97 .flatten(); 98 let ctrl_did = user.controller_did.map(|d| d.to_string()); 99 (user.did.to_string(), deactivated.is_some(), ctrl_did) 100 } 101 Err(_) => { 102 return ApiError::AuthenticationFailed(None).into_response(); 103 } 104 } 105 }; 106 107 if crate::util::is_account_migrated(&state.db, &did) 108 .await 109 .unwrap_or(false) 110 { 111 return ApiError::Forbidden.into_response(); 112 } 113 114 let client_mime_hint = headers 115 .get("content-type") 116 .and_then(|h| h.to_str().ok()) 117 .unwrap_or("application/octet-stream"); 118 119 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 120 .fetch_optional(&state.db) 121 .await; 122 let user_id = match user_query { 123 Ok(Some(row)) => row.id, 124 _ => { 125 return ApiError::InternalError(None).into_response(); 126 } 127 }; 128 129 let temp_key = format!("temp/{}", uuid::Uuid::new_v4()); 130 let max_size = get_max_blob_size() as u64; 131 132 let body_stream = body.into_data_stream(); 133 let mapped_stream = 134 body_stream.map(|result| result.map_err(|e| std::io::Error::other(e.to_string()))); 135 let pinned_stream: Pin<Box<dyn futures::Stream<Item = Result<Bytes, std::io::Error>> + Send>> = 136 Box::pin(mapped_stream); 137 138 info!("Starting streaming blob upload to temp key: {}", temp_key); 139 140 let upload_result = match state.blob_store.put_stream(&temp_key, pinned_stream).await { 141 Ok(result) => result, 142 Err(e) => { 143 error!("Failed to stream blob to storage: {:?}", e); 144 return ApiError::InternalError(Some("Failed to store blob".into())).into_response(); 145 } 146 }; 147 148 let size = upload_result.size; 149 if size > max_size { 150 let _ = state.blob_store.delete(&temp_key).await; 151 return ApiError::InvalidRequest(format!( 152 "Blob size {} exceeds maximum of {} bytes", 153 size, max_size 154 )) 155 .into_response(); 156 } 157 158 let mime_type = match state.blob_store.get_head(&temp_key, 8192).await { 159 Ok(head_bytes) => detect_mime_type(&head_bytes, client_mime_hint), 160 Err(e) => { 161 warn!("Failed to read blob head for MIME detection: {:?}", e); 162 client_mime_hint.to_string() 163 } 164 }; 165 166 let multihash = match Multihash::wrap(0x12, &upload_result.sha256_hash) { 167 Ok(mh) => mh, 168 Err(e) => { 169 let _ = state.blob_store.delete(&temp_key).await; 170 error!("Failed to create multihash for blob: {:?}", e); 171 return ApiError::InternalError(Some("Failed to hash blob".into())).into_response(); 172 } 173 }; 174 let cid = Cid::new_v1(0x55, multihash); 175 let cid_str = cid.to_string(); 176 let storage_key = format!("blobs/{}", cid_str); 177 178 info!( 179 "Blob upload complete: size={}, cid={}, copying to final location", 180 size, cid_str 181 ); 182 183 let mut tx = match state.db.begin().await { 184 Ok(tx) => tx, 185 Err(e) => { 186 let _ = state.blob_store.delete(&temp_key).await; 187 error!("Failed to begin transaction: {:?}", e); 188 return ApiError::InternalError(None).into_response(); 189 } 190 }; 191 192 let insert = sqlx::query!( 193 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 194 cid_str, 195 mime_type, 196 size as i64, 197 user_id, 198 storage_key 199 ) 200 .fetch_optional(&mut *tx) 201 .await; 202 203 let was_inserted = match insert { 204 Ok(Some(_)) => true, 205 Ok(None) => false, 206 Err(e) => { 207 let _ = state.blob_store.delete(&temp_key).await; 208 error!("Failed to insert blob record: {:?}", e); 209 return ApiError::InternalError(None).into_response(); 210 } 211 }; 212 213 if was_inserted && let Err(e) = state.blob_store.copy(&temp_key, &storage_key).await { 214 let _ = state.blob_store.delete(&temp_key).await; 215 error!("Failed to copy blob to final location: {:?}", e); 216 return ApiError::InternalError(Some("Failed to store blob".into())).into_response(); 217 } 218 219 let _ = state.blob_store.delete(&temp_key).await; 220 221 if let Err(e) = tx.commit().await { 222 error!("Failed to commit blob transaction: {:?}", e); 223 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 224 error!( 225 "Failed to cleanup orphaned blob {}: {:?}", 226 storage_key, cleanup_err 227 ); 228 } 229 return ApiError::InternalError(None).into_response(); 230 } 231 232 if let Some(ref controller) = controller_did { 233 let _ = delegation::log_delegation_action( 234 &state.db, 235 &did, 236 controller, 237 Some(controller), 238 DelegationActionType::BlobUpload, 239 Some(json!({ 240 "cid": cid_str, 241 "mime_type": mime_type, 242 "size": size 243 })), 244 None, 245 None, 246 ) 247 .await; 248 } 249 250 Json(json!({ 251 "blob": { 252 "$type": "blob", 253 "ref": { 254 "$link": cid_str 255 }, 256 "mimeType": mime_type, 257 "size": size 258 } 259 })) 260 .into_response() 261} 262 263#[derive(Deserialize)] 264pub struct ListMissingBlobsParams { 265 pub limit: Option<i64>, 266 pub cursor: Option<String>, 267} 268 269#[derive(Serialize)] 270#[serde(rename_all = "camelCase")] 271pub struct RecordBlob { 272 pub cid: String, 273 pub record_uri: String, 274} 275 276#[derive(Serialize)] 277pub struct ListMissingBlobsOutput { 278 #[serde(skip_serializing_if = "Option::is_none")] 279 pub cursor: Option<String>, 280 pub blobs: Vec<RecordBlob>, 281} 282 283pub async fn list_missing_blobs( 284 State(state): State<AppState>, 285 headers: axum::http::HeaderMap, 286 Query(params): Query<ListMissingBlobsParams>, 287) -> Response { 288 let Some(token) = crate::auth::extract_bearer_token_from_header( 289 headers.get("Authorization").and_then(|h| h.to_str().ok()), 290 ) else { 291 return ApiError::AuthenticationRequired.into_response(); 292 }; 293 let auth_user = 294 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 295 Ok(user) => user, 296 Err(_) => { 297 return ApiError::AuthenticationFailed(None).into_response(); 298 } 299 }; 300 let did = auth_user.did; 301 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did.as_str()) 302 .fetch_optional(&state.db) 303 .await; 304 let user_id = match user_query { 305 Ok(Some(row)) => row.id, 306 _ => { 307 return ApiError::InternalError(None).into_response(); 308 } 309 }; 310 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 311 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 312 let missing_query = sqlx::query!( 313 r#" 314 SELECT rb.blob_cid, rb.record_uri 315 FROM record_blobs rb 316 LEFT JOIN blobs b ON rb.blob_cid = b.cid 317 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2 318 ORDER BY rb.blob_cid 319 LIMIT $3 320 "#, 321 user_id, 322 cursor_cid, 323 limit + 1 324 ) 325 .fetch_all(&state.db) 326 .await; 327 let rows = match missing_query { 328 Ok(r) => r, 329 Err(e) => { 330 error!("DB error fetching missing blobs: {:?}", e); 331 return ApiError::InternalError(None).into_response(); 332 } 333 }; 334 let has_more = rows.len() > limit as usize; 335 let blobs: Vec<RecordBlob> = rows 336 .into_iter() 337 .take(limit as usize) 338 .map(|row| RecordBlob { 339 cid: row.blob_cid, 340 record_uri: row.record_uri, 341 }) 342 .collect(); 343 let next_cursor = if has_more { 344 blobs.last().map(|b| b.cid.clone()) 345 } else { 346 None 347 }; 348 ( 349 StatusCode::OK, 350 Json(ListMissingBlobsOutput { 351 cursor: next_cursor, 352 blobs, 353 }), 354 ) 355 .into_response() 356}