this repo has no description
1use crate::api::error::ApiError; 2use crate::auth::{ServiceTokenVerifier, is_service_token}; 3use crate::delegation::{self, DelegationActionType}; 4use crate::state::AppState; 5use crate::util::get_max_blob_size; 6use axum::body::Body; 7use axum::{ 8 Json, 9 extract::{Query, State}, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use bytes::Bytes; 14use cid::Cid; 15use futures::StreamExt; 16use multihash::Multihash; 17use serde::{Deserialize, Serialize}; 18use serde_json::json; 19use std::pin::Pin; 20use tracing::{debug, error, info}; 21 22pub async fn upload_blob( 23 State(state): State<AppState>, 24 headers: axum::http::HeaderMap, 25 body: Body, 26) -> Response { 27 let Some(token) = crate::auth::extract_bearer_token_from_header( 28 headers.get("Authorization").and_then(|h| h.to_str().ok()), 29 ) else { 30 return ApiError::AuthenticationRequired.into_response(); 31 }; 32 33 let is_service_auth = is_service_token(&token); 34 35 let (did, _is_migration, controller_did) = if is_service_auth { 36 debug!("Verifying service token for blob upload"); 37 let verifier = ServiceTokenVerifier::new(); 38 match verifier 39 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob")) 40 .await 41 { 42 Ok(claims) => { 43 debug!("Service token verified for DID: {}", claims.iss); 44 (claims.iss, false, None) 45 } 46 Err(e) => { 47 error!("Service token verification failed: {:?}", e); 48 return ApiError::AuthenticationFailed(Some(format!( 49 "Service token verification failed: {}", 50 e 51 ))) 52 .into_response(); 53 } 54 } 55 } else { 56 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 57 Ok(user) => { 58 let mime_type_for_check = headers 59 .get("content-type") 60 .and_then(|h| h.to_str().ok()) 61 .unwrap_or("application/octet-stream"); 62 if let Err(e) = crate::auth::scope_check::check_blob_scope( 63 user.is_oauth, 64 user.scope.as_deref(), 65 mime_type_for_check, 66 ) { 67 return e; 68 } 69 let deactivated = sqlx::query_scalar!( 70 "SELECT deactivated_at FROM users WHERE did = $1", 71 &user.did 72 ) 73 .fetch_optional(&state.db) 74 .await 75 .ok() 76 .flatten() 77 .flatten(); 78 let ctrl_did = user.controller_did.map(|d| d.to_string()); 79 (user.did.to_string(), deactivated.is_some(), ctrl_did) 80 } 81 Err(_) => { 82 return ApiError::AuthenticationFailed(None).into_response(); 83 } 84 } 85 }; 86 87 if crate::util::is_account_migrated(&state.db, &did) 88 .await 89 .unwrap_or(false) 90 { 91 return ApiError::Forbidden.into_response(); 92 } 93 94 let mime_type = headers 95 .get("content-type") 96 .and_then(|h| h.to_str().ok()) 97 .unwrap_or("application/octet-stream") 98 .to_string(); 99 100 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 101 .fetch_optional(&state.db) 102 .await; 103 let user_id = match user_query { 104 Ok(Some(row)) => row.id, 105 _ => { 106 return ApiError::InternalError(None).into_response(); 107 } 108 }; 109 110 let temp_key = format!("temp/{}", uuid::Uuid::new_v4()); 111 let max_size = get_max_blob_size() as u64; 112 113 let body_stream = body.into_data_stream(); 114 let mapped_stream = 115 body_stream.map(|result| result.map_err(|e| std::io::Error::other(e.to_string()))); 116 let pinned_stream: Pin<Box<dyn futures::Stream<Item = Result<Bytes, std::io::Error>> + Send>> = 117 Box::pin(mapped_stream); 118 119 info!("Starting streaming blob upload to temp key: {}", temp_key); 120 121 let upload_result = match state.blob_store.put_stream(&temp_key, pinned_stream).await { 122 Ok(result) => result, 123 Err(e) => { 124 error!("Failed to stream blob to storage: {:?}", e); 125 return ApiError::InternalError(Some("Failed to store blob".into())).into_response(); 126 } 127 }; 128 129 let size = upload_result.size; 130 if size > max_size { 131 let _ = state.blob_store.delete(&temp_key).await; 132 return ApiError::InvalidRequest(format!( 133 "Blob size {} exceeds maximum of {} bytes", 134 size, max_size 135 )) 136 .into_response(); 137 } 138 139 let multihash = match Multihash::wrap(0x12, &upload_result.sha256_hash) { 140 Ok(mh) => mh, 141 Err(e) => { 142 let _ = state.blob_store.delete(&temp_key).await; 143 error!("Failed to create multihash for blob: {:?}", e); 144 return ApiError::InternalError(Some("Failed to hash blob".into())).into_response(); 145 } 146 }; 147 let cid = Cid::new_v1(0x55, multihash); 148 let cid_str = cid.to_string(); 149 let storage_key = format!("blobs/{}", cid_str); 150 151 info!( 152 "Blob upload complete: size={}, cid={}, copying to final location", 153 size, cid_str 154 ); 155 156 let mut tx = match state.db.begin().await { 157 Ok(tx) => tx, 158 Err(e) => { 159 let _ = state.blob_store.delete(&temp_key).await; 160 error!("Failed to begin transaction: {:?}", e); 161 return ApiError::InternalError(None).into_response(); 162 } 163 }; 164 165 let insert = sqlx::query!( 166 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 167 cid_str, 168 mime_type, 169 size as i64, 170 user_id, 171 storage_key 172 ) 173 .fetch_optional(&mut *tx) 174 .await; 175 176 let was_inserted = match insert { 177 Ok(Some(_)) => true, 178 Ok(None) => false, 179 Err(e) => { 180 let _ = state.blob_store.delete(&temp_key).await; 181 error!("Failed to insert blob record: {:?}", e); 182 return ApiError::InternalError(None).into_response(); 183 } 184 }; 185 186 if was_inserted && let Err(e) = state.blob_store.copy(&temp_key, &storage_key).await { 187 let _ = state.blob_store.delete(&temp_key).await; 188 error!("Failed to copy blob to final location: {:?}", e); 189 return ApiError::InternalError(Some("Failed to store blob".into())).into_response(); 190 } 191 192 let _ = state.blob_store.delete(&temp_key).await; 193 194 if let Err(e) = tx.commit().await { 195 error!("Failed to commit blob transaction: {:?}", e); 196 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 197 error!( 198 "Failed to cleanup orphaned blob {}: {:?}", 199 storage_key, cleanup_err 200 ); 201 } 202 return ApiError::InternalError(None).into_response(); 203 } 204 205 if let Some(ref controller) = controller_did { 206 let _ = delegation::log_delegation_action( 207 &state.db, 208 &did, 209 controller, 210 Some(controller), 211 DelegationActionType::BlobUpload, 212 Some(json!({ 213 "cid": cid_str, 214 "mime_type": mime_type, 215 "size": size 216 })), 217 None, 218 None, 219 ) 220 .await; 221 } 222 223 Json(json!({ 224 "blob": { 225 "$type": "blob", 226 "ref": { 227 "$link": cid_str 228 }, 229 "mimeType": mime_type, 230 "size": size 231 } 232 })) 233 .into_response() 234} 235 236#[derive(Deserialize)] 237pub struct ListMissingBlobsParams { 238 pub limit: Option<i64>, 239 pub cursor: Option<String>, 240} 241 242#[derive(Serialize)] 243#[serde(rename_all = "camelCase")] 244pub struct RecordBlob { 245 pub cid: String, 246 pub record_uri: String, 247} 248 249#[derive(Serialize)] 250pub struct ListMissingBlobsOutput { 251 #[serde(skip_serializing_if = "Option::is_none")] 252 pub cursor: Option<String>, 253 pub blobs: Vec<RecordBlob>, 254} 255 256pub async fn list_missing_blobs( 257 State(state): State<AppState>, 258 headers: axum::http::HeaderMap, 259 Query(params): Query<ListMissingBlobsParams>, 260) -> Response { 261 let Some(token) = crate::auth::extract_bearer_token_from_header( 262 headers.get("Authorization").and_then(|h| h.to_str().ok()), 263 ) else { 264 return ApiError::AuthenticationRequired.into_response(); 265 }; 266 let auth_user = 267 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 268 Ok(user) => user, 269 Err(_) => { 270 return ApiError::AuthenticationFailed(None).into_response(); 271 } 272 }; 273 let did = auth_user.did; 274 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did.as_str()) 275 .fetch_optional(&state.db) 276 .await; 277 let user_id = match user_query { 278 Ok(Some(row)) => row.id, 279 _ => { 280 return ApiError::InternalError(None).into_response(); 281 } 282 }; 283 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 284 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 285 let missing_query = sqlx::query!( 286 r#" 287 SELECT rb.blob_cid, rb.record_uri 288 FROM record_blobs rb 289 LEFT JOIN blobs b ON rb.blob_cid = b.cid 290 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2 291 ORDER BY rb.blob_cid 292 LIMIT $3 293 "#, 294 user_id, 295 cursor_cid, 296 limit + 1 297 ) 298 .fetch_all(&state.db) 299 .await; 300 let rows = match missing_query { 301 Ok(r) => r, 302 Err(e) => { 303 error!("DB error fetching missing blobs: {:?}", e); 304 return ApiError::InternalError(None).into_response(); 305 } 306 }; 307 let has_more = rows.len() > limit as usize; 308 let blobs: Vec<RecordBlob> = rows 309 .into_iter() 310 .take(limit as usize) 311 .map(|row| RecordBlob { 312 cid: row.blob_cid, 313 record_uri: row.record_uri, 314 }) 315 .collect(); 316 let next_cursor = if has_more { 317 blobs.last().map(|b| b.cid.clone()) 318 } else { 319 None 320 }; 321 ( 322 StatusCode::OK, 323 Json(ListMissingBlobsOutput { 324 cursor: next_cursor, 325 blobs, 326 }), 327 ) 328 .into_response() 329}