this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token}; 2use crate::delegation::{self, DelegationActionType}; 3use crate::state::AppState; 4use crate::util::get_max_blob_size; 5use axum::body::Bytes; 6use axum::{ 7 Json, 8 extract::{Query, State}, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11}; 12use cid::Cid; 13use multihash::Multihash; 14use serde::{Deserialize, Serialize}; 15use serde_json::json; 16use sha2::{Digest, Sha256}; 17use tracing::{debug, error}; 18 19pub async fn upload_blob( 20 State(state): State<AppState>, 21 headers: axum::http::HeaderMap, 22 body: Bytes, 23) -> Response { 24 let token = match crate::auth::extract_bearer_token_from_header( 25 headers.get("Authorization").and_then(|h| h.to_str().ok()), 26 ) { 27 Some(t) => t, 28 None => { 29 return ( 30 StatusCode::UNAUTHORIZED, 31 Json(json!({"error": "AuthenticationRequired"})), 32 ) 33 .into_response(); 34 } 35 }; 36 37 let is_service_auth = is_service_token(&token); 38 39 let (did, _is_migration, controller_did) = if is_service_auth { 40 debug!("Verifying service token for blob upload"); 41 let verifier = ServiceTokenVerifier::new(); 42 match verifier 43 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob")) 44 .await 45 { 46 Ok(claims) => { 47 debug!("Service token verified for DID: {}", claims.iss); 48 (claims.iss, false, None) 49 } 50 Err(e) => { 51 error!("Service token verification failed: {:?}", e); 52 return ( 53 StatusCode::UNAUTHORIZED, 54 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})), 55 ) 56 .into_response(); 57 } 58 } 59 } else { 60 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 61 Ok(user) => { 62 let mime_type_for_check = headers 63 .get("content-type") 64 .and_then(|h| h.to_str().ok()) 65 .unwrap_or("application/octet-stream"); 66 if let Err(e) = crate::auth::scope_check::check_blob_scope( 67 user.is_oauth, 68 user.scope.as_deref(), 69 mime_type_for_check, 70 ) { 71 return e; 72 } 73 let deactivated = sqlx::query_scalar!( 74 "SELECT deactivated_at FROM users WHERE did = $1", 75 user.did 76 ) 77 .fetch_optional(&state.db) 78 .await 79 .ok() 80 .flatten() 81 .flatten(); 82 let ctrl_did = user.controller_did.clone(); 83 (user.did, deactivated.is_some(), ctrl_did) 84 } 85 Err(_) => { 86 return ( 87 StatusCode::UNAUTHORIZED, 88 Json(json!({"error": "AuthenticationFailed"})), 89 ) 90 .into_response(); 91 } 92 } 93 }; 94 95 if crate::util::is_account_migrated(&state.db, &did) 96 .await 97 .unwrap_or(false) 98 { 99 return ( 100 StatusCode::FORBIDDEN, 101 Json(json!({ 102 "error": "AccountMigrated", 103 "message": "Account has been migrated to another PDS. Blob operations are not allowed." 104 })), 105 ) 106 .into_response(); 107 } 108 109 let max_size = get_max_blob_size(); 110 111 if body.len() > max_size { 112 return ( 113 StatusCode::PAYLOAD_TOO_LARGE, 114 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), max_size)})), 115 ) 116 .into_response(); 117 } 118 let mime_type = headers 119 .get("content-type") 120 .and_then(|h| h.to_str().ok()) 121 .unwrap_or("application/octet-stream") 122 .to_string(); 123 let size = body.len() as i64; 124 let data = body.to_vec(); 125 let mut hasher = Sha256::new(); 126 hasher.update(&data); 127 let hash = hasher.finalize(); 128 let multihash = match Multihash::wrap(0x12, &hash) { 129 Ok(mh) => mh, 130 Err(e) => { 131 error!("Failed to create multihash for blob: {:?}", e); 132 return ( 133 StatusCode::INTERNAL_SERVER_ERROR, 134 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})), 135 ) 136 .into_response(); 137 } 138 }; 139 let cid = Cid::new_v1(0x55, multihash); 140 let cid_str = cid.to_string(); 141 let storage_key = format!("blobs/{}", cid_str); 142 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 143 .fetch_optional(&state.db) 144 .await; 145 let user_id = match user_query { 146 Ok(Some(row)) => row.id, 147 _ => { 148 return ( 149 StatusCode::INTERNAL_SERVER_ERROR, 150 Json(json!({"error": "InternalError"})), 151 ) 152 .into_response(); 153 } 154 }; 155 let mut tx = match state.db.begin().await { 156 Ok(tx) => tx, 157 Err(e) => { 158 error!("Failed to begin transaction: {:?}", e); 159 return ( 160 StatusCode::INTERNAL_SERVER_ERROR, 161 Json(json!({"error": "InternalError"})), 162 ) 163 .into_response(); 164 } 165 }; 166 let insert = sqlx::query!( 167 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 168 cid_str, 169 mime_type, 170 size, 171 user_id, 172 storage_key 173 ) 174 .fetch_optional(&mut *tx) 175 .await; 176 let was_inserted = match insert { 177 Ok(Some(_)) => true, 178 Ok(None) => false, 179 Err(e) => { 180 error!("Failed to insert blob record: {:?}", e); 181 return ( 182 StatusCode::INTERNAL_SERVER_ERROR, 183 Json(json!({"error": "InternalError"})), 184 ) 185 .into_response(); 186 } 187 }; 188 if was_inserted 189 && let Err(e) = state 190 .blob_store 191 .put_bytes(&storage_key, bytes::Bytes::from(data)) 192 .await 193 { 194 error!("Failed to upload blob to storage: {:?}", e); 195 return ( 196 StatusCode::INTERNAL_SERVER_ERROR, 197 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 198 ) 199 .into_response(); 200 } 201 if let Err(e) = tx.commit().await { 202 error!("Failed to commit blob transaction: {:?}", e); 203 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 204 error!( 205 "Failed to cleanup orphaned blob {}: {:?}", 206 storage_key, cleanup_err 207 ); 208 } 209 return ( 210 StatusCode::INTERNAL_SERVER_ERROR, 211 Json(json!({"error": "InternalError"})), 212 ) 213 .into_response(); 214 } 215 216 if let Some(ref controller) = controller_did { 217 let _ = delegation::log_delegation_action( 218 &state.db, 219 &did, 220 controller, 221 Some(controller), 222 DelegationActionType::BlobUpload, 223 Some(json!({ 224 "cid": cid_str, 225 "mime_type": mime_type, 226 "size": size 227 })), 228 None, 229 None, 230 ) 231 .await; 232 } 233 234 Json(json!({ 235 "blob": { 236 "$type": "blob", 237 "ref": { 238 "$link": cid_str 239 }, 240 "mimeType": mime_type, 241 "size": size 242 } 243 })) 244 .into_response() 245} 246 247#[derive(Deserialize)] 248pub struct ListMissingBlobsParams { 249 pub limit: Option<i64>, 250 pub cursor: Option<String>, 251} 252 253#[derive(Serialize)] 254#[serde(rename_all = "camelCase")] 255pub struct RecordBlob { 256 pub cid: String, 257 pub record_uri: String, 258} 259 260#[derive(Serialize)] 261pub struct ListMissingBlobsOutput { 262 #[serde(skip_serializing_if = "Option::is_none")] 263 pub cursor: Option<String>, 264 pub blobs: Vec<RecordBlob>, 265} 266 267pub async fn list_missing_blobs( 268 State(state): State<AppState>, 269 headers: axum::http::HeaderMap, 270 Query(params): Query<ListMissingBlobsParams>, 271) -> Response { 272 let token = match crate::auth::extract_bearer_token_from_header( 273 headers.get("Authorization").and_then(|h| h.to_str().ok()), 274 ) { 275 Some(t) => t, 276 None => { 277 return ( 278 StatusCode::UNAUTHORIZED, 279 Json(json!({"error": "AuthenticationRequired"})), 280 ) 281 .into_response(); 282 } 283 }; 284 let auth_user = 285 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 286 Ok(user) => user, 287 Err(_) => { 288 return ( 289 StatusCode::UNAUTHORIZED, 290 Json(json!({"error": "AuthenticationFailed"})), 291 ) 292 .into_response(); 293 } 294 }; 295 let did = auth_user.did; 296 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 297 .fetch_optional(&state.db) 298 .await; 299 let user_id = match user_query { 300 Ok(Some(row)) => row.id, 301 _ => { 302 return ( 303 StatusCode::INTERNAL_SERVER_ERROR, 304 Json(json!({"error": "InternalError"})), 305 ) 306 .into_response(); 307 } 308 }; 309 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 310 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 311 let missing_query = sqlx::query!( 312 r#" 313 SELECT rb.blob_cid, rb.record_uri 314 FROM record_blobs rb 315 LEFT JOIN blobs b ON rb.blob_cid = b.cid 316 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2 317 ORDER BY rb.blob_cid 318 LIMIT $3 319 "#, 320 user_id, 321 cursor_cid, 322 limit + 1 323 ) 324 .fetch_all(&state.db) 325 .await; 326 let rows = match missing_query { 327 Ok(r) => r, 328 Err(e) => { 329 error!("DB error fetching missing blobs: {:?}", e); 330 return ( 331 StatusCode::INTERNAL_SERVER_ERROR, 332 Json(json!({"error": "InternalError"})), 333 ) 334 .into_response(); 335 } 336 }; 337 let has_more = rows.len() > limit as usize; 338 let blobs: Vec<RecordBlob> = rows 339 .into_iter() 340 .take(limit as usize) 341 .map(|row| RecordBlob { 342 cid: row.blob_cid, 343 record_uri: row.record_uri, 344 }) 345 .collect(); 346 let next_cursor = if has_more { 347 blobs.last().map(|b| b.cid.clone()) 348 } else { 349 None 350 }; 351 ( 352 StatusCode::OK, 353 Json(ListMissingBlobsOutput { 354 cursor: next_cursor, 355 blobs, 356 }), 357 ) 358 .into_response() 359}