this repo has no description
1use crate::api::error::ApiError; 2use crate::auth::{ServiceTokenVerifier, is_service_token}; 3use crate::delegation::{self, DelegationActionType}; 4use crate::state::AppState; 5use crate::util::get_max_blob_size; 6use axum::body::Body; 7use axum::{ 8 Json, 9 extract::{Query, State}, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use bytes::Bytes; 14use cid::Cid; 15use futures::StreamExt; 16use multihash::Multihash; 17use serde::{Deserialize, Serialize}; 18use serde_json::json; 19use std::pin::Pin; 20use tracing::{debug, error, info, warn}; 21 22fn detect_mime_type(data: &[u8], client_hint: &str) -> String { 23 if let Some(kind) = infer::get(data) { 24 let detected = kind.mime_type().to_string(); 25 if detected != client_hint { 26 debug!( 27 "MIME type detection: client sent '{}', detected '{}'", 28 client_hint, detected 29 ); 30 } 31 detected 32 } else if client_hint == "*/*" || client_hint.is_empty() { 33 warn!( 34 "Could not detect MIME type and client sent invalid hint: '{}'", 35 client_hint 36 ); 37 "application/octet-stream".to_string() 38 } else { 39 client_hint.to_string() 40 } 41} 42 43pub async fn upload_blob( 44 State(state): State<AppState>, 45 headers: axum::http::HeaderMap, 46 body: Body, 47) -> Response { 48 let Some(token) = crate::auth::extract_bearer_token_from_header( 49 headers.get("Authorization").and_then(|h| h.to_str().ok()), 50 ) else { 51 return ApiError::AuthenticationRequired.into_response(); 52 }; 53 54 let is_service_auth = is_service_token(&token); 55 56 let (did, _is_migration, controller_did) = if is_service_auth { 57 debug!("Verifying service token for blob upload"); 58 let verifier = ServiceTokenVerifier::new(); 59 match verifier 60 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob")) 61 .await 62 { 63 Ok(claims) => { 64 debug!("Service token verified for DID: {}", claims.iss); 65 (claims.iss, false, None) 66 } 67 Err(e) => { 68 error!("Service token verification failed: {:?}", e); 69 return ApiError::AuthenticationFailed(Some(format!( 70 "Service token verification failed: {}", 71 e 72 ))) 73 .into_response(); 74 } 75 } 76 } else { 77 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 78 Ok(user) => { 79 let mime_type_for_check = headers 80 .get("content-type") 81 .and_then(|h| h.to_str().ok()) 82 .unwrap_or("application/octet-stream"); 83 if let Err(e) = crate::auth::scope_check::check_blob_scope( 84 user.is_oauth, 85 user.scope.as_deref(), 86 mime_type_for_check, 87 ) { 88 return e; 89 } 90 let deactivated = sqlx::query_scalar!( 91 "SELECT deactivated_at FROM users WHERE did = $1", 92 &user.did 93 ) 94 .fetch_optional(&state.db) 95 .await 96 .ok() 97 .flatten() 98 .flatten(); 99 let ctrl_did = user.controller_did.map(|d| d.to_string()); 100 (user.did.to_string(), deactivated.is_some(), ctrl_did) 101 } 102 Err(_) => { 103 return ApiError::AuthenticationFailed(None).into_response(); 104 } 105 } 106 }; 107 108 if crate::util::is_account_migrated(&state.db, &did) 109 .await 110 .unwrap_or(false) 111 { 112 return ApiError::Forbidden.into_response(); 113 } 114 115 let client_mime_hint = headers 116 .get("content-type") 117 .and_then(|h| h.to_str().ok()) 118 .unwrap_or("application/octet-stream"); 119 120 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 121 .fetch_optional(&state.db) 122 .await; 123 let user_id = match user_query { 124 Ok(Some(row)) => row.id, 125 _ => { 126 return ApiError::InternalError(None).into_response(); 127 } 128 }; 129 130 let temp_key = format!("temp/{}", uuid::Uuid::new_v4()); 131 let max_size = get_max_blob_size() as u64; 132 133 let body_stream = body.into_data_stream(); 134 let mapped_stream = 135 body_stream.map(|result| result.map_err(|e| std::io::Error::other(e.to_string()))); 136 let pinned_stream: Pin<Box<dyn futures::Stream<Item = Result<Bytes, std::io::Error>> + Send>> = 137 Box::pin(mapped_stream); 138 139 info!("Starting streaming blob upload to temp key: {}", temp_key); 140 141 let upload_result = match state.blob_store.put_stream(&temp_key, pinned_stream).await { 142 Ok(result) => result, 143 Err(e) => { 144 error!("Failed to stream blob to storage: {:?}", e); 145 return ApiError::InternalError(Some("Failed to store blob".into())).into_response(); 146 } 147 }; 148 149 let size = upload_result.size; 150 if size > max_size { 151 let _ = state.blob_store.delete(&temp_key).await; 152 return ApiError::InvalidRequest(format!( 153 "Blob size {} exceeds maximum of {} bytes", 154 size, max_size 155 )) 156 .into_response(); 157 } 158 159 let mime_type = match state.blob_store.get_head(&temp_key, 8192).await { 160 Ok(head_bytes) => detect_mime_type(&head_bytes, client_mime_hint), 161 Err(e) => { 162 warn!("Failed to read blob head for MIME detection: {:?}", e); 163 client_mime_hint.to_string() 164 } 165 }; 166 167 let multihash = match Multihash::wrap(0x12, &upload_result.sha256_hash) { 168 Ok(mh) => mh, 169 Err(e) => { 170 let _ = state.blob_store.delete(&temp_key).await; 171 error!("Failed to create multihash for blob: {:?}", e); 172 return ApiError::InternalError(Some("Failed to hash blob".into())).into_response(); 173 } 174 }; 175 let cid = Cid::new_v1(0x55, multihash); 176 let cid_str = cid.to_string(); 177 let storage_key = format!("blobs/{}", cid_str); 178 179 info!( 180 "Blob upload complete: size={}, cid={}, copying to final location", 181 size, cid_str 182 ); 183 184 let mut tx = match state.db.begin().await { 185 Ok(tx) => tx, 186 Err(e) => { 187 let _ = state.blob_store.delete(&temp_key).await; 188 error!("Failed to begin transaction: {:?}", e); 189 return ApiError::InternalError(None).into_response(); 190 } 191 }; 192 193 let insert = sqlx::query!( 194 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 195 cid_str, 196 mime_type, 197 size as i64, 198 user_id, 199 storage_key 200 ) 201 .fetch_optional(&mut *tx) 202 .await; 203 204 let was_inserted = match insert { 205 Ok(Some(_)) => true, 206 Ok(None) => false, 207 Err(e) => { 208 let _ = state.blob_store.delete(&temp_key).await; 209 error!("Failed to insert blob record: {:?}", e); 210 return ApiError::InternalError(None).into_response(); 211 } 212 }; 213 214 if was_inserted && let Err(e) = state.blob_store.copy(&temp_key, &storage_key).await { 215 let _ = state.blob_store.delete(&temp_key).await; 216 error!("Failed to copy blob to final location: {:?}", e); 217 return ApiError::InternalError(Some("Failed to store blob".into())).into_response(); 218 } 219 220 let _ = state.blob_store.delete(&temp_key).await; 221 222 if let Err(e) = tx.commit().await { 223 error!("Failed to commit blob transaction: {:?}", e); 224 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 225 error!( 226 "Failed to cleanup orphaned blob {}: {:?}", 227 storage_key, cleanup_err 228 ); 229 } 230 return ApiError::InternalError(None).into_response(); 231 } 232 233 if let Some(ref controller) = controller_did { 234 let _ = delegation::log_delegation_action( 235 &state.db, 236 &did, 237 controller, 238 Some(controller), 239 DelegationActionType::BlobUpload, 240 Some(json!({ 241 "cid": cid_str, 242 "mime_type": mime_type, 243 "size": size 244 })), 245 None, 246 None, 247 ) 248 .await; 249 } 250 251 Json(json!({ 252 "blob": { 253 "$type": "blob", 254 "ref": { 255 "$link": cid_str 256 }, 257 "mimeType": mime_type, 258 "size": size 259 } 260 })) 261 .into_response() 262} 263 264#[derive(Deserialize)] 265pub struct ListMissingBlobsParams { 266 pub limit: Option<i64>, 267 pub cursor: Option<String>, 268} 269 270#[derive(Serialize)] 271#[serde(rename_all = "camelCase")] 272pub struct RecordBlob { 273 pub cid: String, 274 pub record_uri: String, 275} 276 277#[derive(Serialize)] 278pub struct ListMissingBlobsOutput { 279 #[serde(skip_serializing_if = "Option::is_none")] 280 pub cursor: Option<String>, 281 pub blobs: Vec<RecordBlob>, 282} 283 284pub async fn list_missing_blobs( 285 State(state): State<AppState>, 286 headers: axum::http::HeaderMap, 287 Query(params): Query<ListMissingBlobsParams>, 288) -> Response { 289 let Some(token) = crate::auth::extract_bearer_token_from_header( 290 headers.get("Authorization").and_then(|h| h.to_str().ok()), 291 ) else { 292 return ApiError::AuthenticationRequired.into_response(); 293 }; 294 let auth_user = 295 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 296 Ok(user) => user, 297 Err(_) => { 298 return ApiError::AuthenticationFailed(None).into_response(); 299 } 300 }; 301 let did = auth_user.did; 302 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did.as_str()) 303 .fetch_optional(&state.db) 304 .await; 305 let user_id = match user_query { 306 Ok(Some(row)) => row.id, 307 _ => { 308 return ApiError::InternalError(None).into_response(); 309 } 310 }; 311 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 312 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 313 let missing_query = sqlx::query!( 314 r#" 315 SELECT rb.blob_cid, rb.record_uri 316 FROM record_blobs rb 317 LEFT JOIN blobs b ON rb.blob_cid = b.cid 318 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2 319 ORDER BY rb.blob_cid 320 LIMIT $3 321 "#, 322 user_id, 323 cursor_cid, 324 limit + 1 325 ) 326 .fetch_all(&state.db) 327 .await; 328 let rows = match missing_query { 329 Ok(r) => r, 330 Err(e) => { 331 error!("DB error fetching missing blobs: {:?}", e); 332 return ApiError::InternalError(None).into_response(); 333 } 334 }; 335 let has_more = rows.len() > limit as usize; 336 let blobs: Vec<RecordBlob> = rows 337 .into_iter() 338 .take(limit as usize) 339 .map(|row| RecordBlob { 340 cid: row.blob_cid, 341 record_uri: row.record_uri, 342 }) 343 .collect(); 344 let next_cursor = if has_more { 345 blobs.last().map(|b| b.cid.clone()) 346 } else { 347 None 348 }; 349 ( 350 StatusCode::OK, 351 Json(ListMissingBlobsOutput { 352 cursor: next_cursor, 353 blobs, 354 }), 355 ) 356 .into_response() 357}