this repo has no description
at main 11 kB view raw
1use crate::api::error::ApiError; 2use crate::auth::{BearerAuthAllowDeactivated, ServiceTokenVerifier, is_service_token}; 3use crate::delegation::{self, DelegationActionType}; 4use crate::state::AppState; 5use crate::util::get_max_blob_size; 6use axum::body::Body; 7use axum::{ 8 Json, 9 extract::{Query, State}, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use bytes::Bytes; 14use cid::Cid; 15use futures::StreamExt; 16use multihash::Multihash; 17use serde::{Deserialize, Serialize}; 18use serde_json::json; 19use std::pin::Pin; 20use tracing::{debug, error, info, warn}; 21 22fn detect_mime_type(data: &[u8], client_hint: &str) -> String { 23 if let Some(kind) = infer::get(data) { 24 let detected = kind.mime_type().to_string(); 25 if detected != client_hint { 26 debug!( 27 "MIME type detection: client sent '{}', detected '{}'", 28 client_hint, detected 29 ); 30 } 31 detected 32 } else if client_hint == "*/*" || client_hint.is_empty() { 33 warn!( 34 "Could not detect MIME type and client sent invalid hint: '{}'", 35 client_hint 36 ); 37 "application/octet-stream".to_string() 38 } else { 39 client_hint.to_string() 40 } 41} 42 43pub async fn upload_blob( 44 State(state): State<AppState>, 45 headers: axum::http::HeaderMap, 46 body: Body, 47) -> Response { 48 let extracted = match crate::auth::extract_auth_token_from_header( 49 headers.get("Authorization").and_then(|h| h.to_str().ok()), 50 ) { 51 Some(t) => t, 52 None => return ApiError::AuthenticationRequired.into_response(), 53 }; 54 let token = extracted.token; 55 56 let is_service_auth = is_service_token(&token); 57 58 let (did, _is_migration, controller_did) = if is_service_auth { 59 debug!("Verifying service token for blob upload"); 60 let verifier = ServiceTokenVerifier::new(); 61 match verifier 62 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob")) 63 .await 64 { 65 Ok(claims) => { 66 debug!("Service token verified for DID: {}", claims.iss); 67 (claims.iss, false, None) 68 } 69 Err(e) => { 70 error!("Service token verification failed: {:?}", e); 71 return ApiError::AuthenticationFailed(Some(format!( 72 "Service token verification failed: {}", 73 e 74 ))) 75 .into_response(); 76 } 77 } 78 } else { 79 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 80 let http_uri = format!( 81 "https://{}/xrpc/com.atproto.repo.uploadBlob", 82 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()) 83 ); 84 match crate::auth::validate_token_with_dpop( 85 &state.db, 86 &token, 87 extracted.is_dpop, 88 dpop_proof, 89 "POST", 90 &http_uri, 91 true, 92 false, 93 ) 94 .await 95 { 96 Ok(user) => { 97 let mime_type_for_check = headers 98 .get("content-type") 99 .and_then(|h| h.to_str().ok()) 100 .unwrap_or("application/octet-stream"); 101 if let Err(e) = crate::auth::scope_check::check_blob_scope( 102 user.is_oauth, 103 user.scope.as_deref(), 104 mime_type_for_check, 105 ) { 106 return e; 107 } 108 let deactivated = sqlx::query_scalar!( 109 "SELECT deactivated_at FROM users WHERE did = $1", 110 &user.did 111 ) 112 .fetch_optional(&state.db) 113 .await 114 .ok() 115 .flatten() 116 .flatten(); 117 let ctrl_did = user.controller_did.map(|d| d.to_string()); 118 (user.did.to_string(), deactivated.is_some(), ctrl_did) 119 } 120 Err(_) => { 121 return ApiError::AuthenticationFailed(None).into_response(); 122 } 123 } 124 }; 125 126 if crate::util::is_account_migrated(&state.db, &did) 127 .await 128 .unwrap_or(false) 129 { 130 return ApiError::Forbidden.into_response(); 131 } 132 133 let client_mime_hint = headers 134 .get("content-type") 135 .and_then(|h| h.to_str().ok()) 136 .unwrap_or("application/octet-stream"); 137 138 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 139 .fetch_optional(&state.db) 140 .await; 141 let user_id = match user_query { 142 Ok(Some(row)) => row.id, 143 _ => { 144 return ApiError::InternalError(None).into_response(); 145 } 146 }; 147 148 let temp_key = format!("temp/{}", uuid::Uuid::new_v4()); 149 let max_size = get_max_blob_size() as u64; 150 151 let body_stream = body.into_data_stream(); 152 let mapped_stream = 153 body_stream.map(|result| result.map_err(|e| std::io::Error::other(e.to_string()))); 154 let pinned_stream: Pin<Box<dyn futures::Stream<Item = Result<Bytes, std::io::Error>> + Send>> = 155 Box::pin(mapped_stream); 156 157 info!("Starting streaming blob upload to temp key: {}", temp_key); 158 159 let upload_result = match state.blob_store.put_stream(&temp_key, pinned_stream).await { 160 Ok(result) => result, 161 Err(e) => { 162 error!("Failed to stream blob to storage: {:?}", e); 163 return ApiError::InternalError(Some("Failed to store blob".into())).into_response(); 164 } 165 }; 166 167 let size = upload_result.size; 168 if size > max_size { 169 let _ = state.blob_store.delete(&temp_key).await; 170 return ApiError::InvalidRequest(format!( 171 "Blob size {} exceeds maximum of {} bytes", 172 size, max_size 173 )) 174 .into_response(); 175 } 176 177 let mime_type = match state.blob_store.get_head(&temp_key, 8192).await { 178 Ok(head_bytes) => detect_mime_type(&head_bytes, client_mime_hint), 179 Err(e) => { 180 warn!("Failed to read blob head for MIME detection: {:?}", e); 181 client_mime_hint.to_string() 182 } 183 }; 184 185 let multihash = match Multihash::wrap(0x12, &upload_result.sha256_hash) { 186 Ok(mh) => mh, 187 Err(e) => { 188 let _ = state.blob_store.delete(&temp_key).await; 189 error!("Failed to create multihash for blob: {:?}", e); 190 return ApiError::InternalError(Some("Failed to hash blob".into())).into_response(); 191 } 192 }; 193 let cid = Cid::new_v1(0x55, multihash); 194 let cid_str = cid.to_string(); 195 let storage_key = format!("blobs/{}", cid_str); 196 197 info!( 198 "Blob upload complete: size={}, cid={}, copying to final location", 199 size, cid_str 200 ); 201 202 let mut tx = match state.db.begin().await { 203 Ok(tx) => tx, 204 Err(e) => { 205 let _ = state.blob_store.delete(&temp_key).await; 206 error!("Failed to begin transaction: {:?}", e); 207 return ApiError::InternalError(None).into_response(); 208 } 209 }; 210 211 let insert = sqlx::query!( 212 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 213 cid_str, 214 mime_type, 215 size as i64, 216 user_id, 217 storage_key 218 ) 219 .fetch_optional(&mut *tx) 220 .await; 221 222 let was_inserted = match insert { 223 Ok(Some(_)) => true, 224 Ok(None) => false, 225 Err(e) => { 226 let _ = state.blob_store.delete(&temp_key).await; 227 error!("Failed to insert blob record: {:?}", e); 228 return ApiError::InternalError(None).into_response(); 229 } 230 }; 231 232 if was_inserted && let Err(e) = state.blob_store.copy(&temp_key, &storage_key).await { 233 let _ = state.blob_store.delete(&temp_key).await; 234 error!("Failed to copy blob to final location: {:?}", e); 235 return ApiError::InternalError(Some("Failed to store blob".into())).into_response(); 236 } 237 238 let _ = state.blob_store.delete(&temp_key).await; 239 240 if let Err(e) = tx.commit().await { 241 error!("Failed to commit blob transaction: {:?}", e); 242 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 243 error!( 244 "Failed to cleanup orphaned blob {}: {:?}", 245 storage_key, cleanup_err 246 ); 247 } 248 return ApiError::InternalError(None).into_response(); 249 } 250 251 if let Some(ref controller) = controller_did { 252 let _ = delegation::log_delegation_action( 253 &state.db, 254 &did, 255 controller, 256 Some(controller), 257 DelegationActionType::BlobUpload, 258 Some(json!({ 259 "cid": cid_str, 260 "mime_type": mime_type, 261 "size": size 262 })), 263 None, 264 None, 265 ) 266 .await; 267 } 268 269 Json(json!({ 270 "blob": { 271 "$type": "blob", 272 "ref": { 273 "$link": cid_str 274 }, 275 "mimeType": mime_type, 276 "size": size 277 } 278 })) 279 .into_response() 280} 281 282#[derive(Deserialize)] 283pub struct ListMissingBlobsParams { 284 pub limit: Option<i64>, 285 pub cursor: Option<String>, 286} 287 288#[derive(Serialize)] 289#[serde(rename_all = "camelCase")] 290pub struct RecordBlob { 291 pub cid: String, 292 pub record_uri: String, 293} 294 295#[derive(Serialize)] 296pub struct ListMissingBlobsOutput { 297 #[serde(skip_serializing_if = "Option::is_none")] 298 pub cursor: Option<String>, 299 pub blobs: Vec<RecordBlob>, 300} 301 302pub async fn list_missing_blobs( 303 State(state): State<AppState>, 304 auth: BearerAuthAllowDeactivated, 305 Query(params): Query<ListMissingBlobsParams>, 306) -> Response { 307 let auth_user = auth.0; 308 let did = auth_user.did; 309 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did.as_str()) 310 .fetch_optional(&state.db) 311 .await; 312 let user_id = match user_query { 313 Ok(Some(row)) => row.id, 314 _ => { 315 return ApiError::InternalError(None).into_response(); 316 } 317 }; 318 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 319 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 320 let missing_query = sqlx::query!( 321 r#" 322 SELECT rb.blob_cid, rb.record_uri 323 FROM record_blobs rb 324 LEFT JOIN blobs b ON rb.blob_cid = b.cid 325 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2 326 ORDER BY rb.blob_cid 327 LIMIT $3 328 "#, 329 user_id, 330 cursor_cid, 331 limit + 1 332 ) 333 .fetch_all(&state.db) 334 .await; 335 let rows = match missing_query { 336 Ok(r) => r, 337 Err(e) => { 338 error!("DB error fetching missing blobs: {:?}", e); 339 return ApiError::InternalError(None).into_response(); 340 } 341 }; 342 let has_more = rows.len() > limit as usize; 343 let blobs: Vec<RecordBlob> = rows 344 .into_iter() 345 .take(limit as usize) 346 .map(|row| RecordBlob { 347 cid: row.blob_cid, 348 record_uri: row.record_uri, 349 }) 350 .collect(); 351 let next_cursor = if has_more { 352 blobs.last().map(|b| b.cid.clone()) 353 } else { 354 None 355 }; 356 ( 357 StatusCode::OK, 358 Json(ListMissingBlobsOutput { 359 cursor: next_cursor, 360 blobs, 361 }), 362 ) 363 .into_response() 364}