this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token}; 2use crate::delegation::{self, DelegationActionType}; 3use crate::state::AppState; 4use axum::body::Bytes; 5use axum::{ 6 Json, 7 extract::{Query, State}, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use cid::Cid; 12use multihash::Multihash; 13use serde::{Deserialize, Serialize}; 14use serde_json::json; 15use sha2::{Digest, Sha256}; 16use tracing::{debug, error}; 17 18const MAX_BLOB_SIZE: usize = 10_000_000_000; 19const MAX_VIDEO_BLOB_SIZE: usize = 10_000_000_000; 20 21pub async fn upload_blob( 22 State(state): State<AppState>, 23 headers: axum::http::HeaderMap, 24 body: Bytes, 25) -> Response { 26 let token = match crate::auth::extract_bearer_token_from_header( 27 headers.get("Authorization").and_then(|h| h.to_str().ok()), 28 ) { 29 Some(t) => t, 30 None => { 31 return ( 32 StatusCode::UNAUTHORIZED, 33 Json(json!({"error": "AuthenticationRequired"})), 34 ) 35 .into_response(); 36 } 37 }; 38 39 let is_service_auth = is_service_token(&token); 40 41 let (did, is_migration, controller_did) = if is_service_auth { 42 debug!("Verifying service token for blob upload"); 43 let verifier = ServiceTokenVerifier::new(); 44 match verifier 45 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob")) 46 .await 47 { 48 Ok(claims) => { 49 debug!("Service token verified for DID: {}", claims.iss); 50 (claims.iss, false, None) 51 } 52 Err(e) => { 53 error!("Service token verification failed: {:?}", e); 54 return ( 55 StatusCode::UNAUTHORIZED, 56 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})), 57 ) 58 .into_response(); 59 } 60 } 61 } else { 62 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 63 Ok(user) => { 64 let mime_type_for_check = headers 65 .get("content-type") 66 .and_then(|h| h.to_str().ok()) 67 .unwrap_or("application/octet-stream"); 68 if let Err(e) = crate::auth::scope_check::check_blob_scope( 69 user.is_oauth, 70 user.scope.as_deref(), 71 mime_type_for_check, 72 ) { 73 return e; 74 } 75 let deactivated = sqlx::query_scalar!( 76 "SELECT deactivated_at FROM users WHERE did = $1", 77 user.did 78 ) 79 .fetch_optional(&state.db) 80 .await 81 .ok() 82 .flatten() 83 .flatten(); 84 let ctrl_did = user.controller_did.clone(); 85 (user.did, deactivated.is_some(), ctrl_did) 86 } 87 Err(_) => { 88 return ( 89 StatusCode::UNAUTHORIZED, 90 Json(json!({"error": "AuthenticationFailed"})), 91 ) 92 .into_response(); 93 } 94 } 95 }; 96 97 let max_size = if is_service_auth || is_migration { 98 MAX_VIDEO_BLOB_SIZE 99 } else { 100 MAX_BLOB_SIZE 101 }; 102 103 if body.len() > max_size { 104 return ( 105 StatusCode::PAYLOAD_TOO_LARGE, 106 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), max_size)})), 107 ) 108 .into_response(); 109 } 110 let mime_type = headers 111 .get("content-type") 112 .and_then(|h| h.to_str().ok()) 113 .unwrap_or("application/octet-stream") 114 .to_string(); 115 let size = body.len() as i64; 116 let data = body.to_vec(); 117 let mut hasher = Sha256::new(); 118 hasher.update(&data); 119 let hash = hasher.finalize(); 120 let multihash = match Multihash::wrap(0x12, &hash) { 121 Ok(mh) => mh, 122 Err(e) => { 123 error!("Failed to create multihash for blob: {:?}", e); 124 return ( 125 StatusCode::INTERNAL_SERVER_ERROR, 126 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})), 127 ) 128 .into_response(); 129 } 130 }; 131 let cid = Cid::new_v1(0x55, multihash); 132 let cid_str = cid.to_string(); 133 let storage_key = format!("blobs/{}", cid_str); 134 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 135 .fetch_optional(&state.db) 136 .await; 137 let user_id = match user_query { 138 Ok(Some(row)) => row.id, 139 _ => { 140 return ( 141 StatusCode::INTERNAL_SERVER_ERROR, 142 Json(json!({"error": "InternalError"})), 143 ) 144 .into_response(); 145 } 146 }; 147 let mut tx = match state.db.begin().await { 148 Ok(tx) => tx, 149 Err(e) => { 150 error!("Failed to begin transaction: {:?}", e); 151 return ( 152 StatusCode::INTERNAL_SERVER_ERROR, 153 Json(json!({"error": "InternalError"})), 154 ) 155 .into_response(); 156 } 157 }; 158 let insert = sqlx::query!( 159 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 160 cid_str, 161 mime_type, 162 size, 163 user_id, 164 storage_key 165 ) 166 .fetch_optional(&mut *tx) 167 .await; 168 let was_inserted = match insert { 169 Ok(Some(_)) => true, 170 Ok(None) => false, 171 Err(e) => { 172 error!("Failed to insert blob record: {:?}", e); 173 return ( 174 StatusCode::INTERNAL_SERVER_ERROR, 175 Json(json!({"error": "InternalError"})), 176 ) 177 .into_response(); 178 } 179 }; 180 if was_inserted 181 && let Err(e) = state 182 .blob_store 183 .put_bytes(&storage_key, bytes::Bytes::from(data)) 184 .await 185 { 186 error!("Failed to upload blob to storage: {:?}", e); 187 return ( 188 StatusCode::INTERNAL_SERVER_ERROR, 189 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 190 ) 191 .into_response(); 192 } 193 if let Err(e) = tx.commit().await { 194 error!("Failed to commit blob transaction: {:?}", e); 195 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 196 error!( 197 "Failed to cleanup orphaned blob {}: {:?}", 198 storage_key, cleanup_err 199 ); 200 } 201 return ( 202 StatusCode::INTERNAL_SERVER_ERROR, 203 Json(json!({"error": "InternalError"})), 204 ) 205 .into_response(); 206 } 207 208 if let Some(ref controller) = controller_did { 209 let _ = delegation::log_delegation_action( 210 &state.db, 211 &did, 212 controller, 213 Some(controller), 214 DelegationActionType::BlobUpload, 215 Some(json!({ 216 "cid": cid_str, 217 "mime_type": mime_type, 218 "size": size 219 })), 220 None, 221 None, 222 ) 223 .await; 224 } 225 226 Json(json!({ 227 "blob": { 228 "$type": "blob", 229 "ref": { 230 "$link": cid_str 231 }, 232 "mimeType": mime_type, 233 "size": size 234 } 235 })) 236 .into_response() 237} 238 239#[derive(Deserialize)] 240pub struct ListMissingBlobsParams { 241 pub limit: Option<i64>, 242 pub cursor: Option<String>, 243} 244 245#[derive(Serialize)] 246#[serde(rename_all = "camelCase")] 247pub struct RecordBlob { 248 pub cid: String, 249 pub record_uri: String, 250} 251 252#[derive(Serialize)] 253pub struct ListMissingBlobsOutput { 254 #[serde(skip_serializing_if = "Option::is_none")] 255 pub cursor: Option<String>, 256 pub blobs: Vec<RecordBlob>, 257} 258 259pub async fn list_missing_blobs( 260 State(state): State<AppState>, 261 headers: axum::http::HeaderMap, 262 Query(params): Query<ListMissingBlobsParams>, 263) -> Response { 264 let token = match crate::auth::extract_bearer_token_from_header( 265 headers.get("Authorization").and_then(|h| h.to_str().ok()), 266 ) { 267 Some(t) => t, 268 None => { 269 return ( 270 StatusCode::UNAUTHORIZED, 271 Json(json!({"error": "AuthenticationRequired"})), 272 ) 273 .into_response(); 274 } 275 }; 276 let auth_user = 277 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 278 Ok(user) => user, 279 Err(_) => { 280 return ( 281 StatusCode::UNAUTHORIZED, 282 Json(json!({"error": "AuthenticationFailed"})), 283 ) 284 .into_response(); 285 } 286 }; 287 let did = auth_user.did; 288 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 289 .fetch_optional(&state.db) 290 .await; 291 let user_id = match user_query { 292 Ok(Some(row)) => row.id, 293 _ => { 294 return ( 295 StatusCode::INTERNAL_SERVER_ERROR, 296 Json(json!({"error": "InternalError"})), 297 ) 298 .into_response(); 299 } 300 }; 301 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 302 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 303 let missing_query = sqlx::query!( 304 r#" 305 SELECT rb.blob_cid, rb.record_uri 306 FROM record_blobs rb 307 LEFT JOIN blobs b ON rb.blob_cid = b.cid AND b.created_by_user = rb.repo_id 308 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2 309 ORDER BY rb.blob_cid 310 LIMIT $3 311 "#, 312 user_id, 313 cursor_cid, 314 limit + 1 315 ) 316 .fetch_all(&state.db) 317 .await; 318 let rows = match missing_query { 319 Ok(r) => r, 320 Err(e) => { 321 error!("DB error fetching missing blobs: {:?}", e); 322 return ( 323 StatusCode::INTERNAL_SERVER_ERROR, 324 Json(json!({"error": "InternalError"})), 325 ) 326 .into_response(); 327 } 328 }; 329 let has_more = rows.len() > limit as usize; 330 let blobs: Vec<RecordBlob> = rows 331 .into_iter() 332 .take(limit as usize) 333 .map(|row| RecordBlob { 334 cid: row.blob_cid, 335 record_uri: row.record_uri, 336 }) 337 .collect(); 338 let next_cursor = if has_more { 339 blobs.last().map(|b| b.cid.clone()) 340 } else { 341 None 342 }; 343 ( 344 StatusCode::OK, 345 Json(ListMissingBlobsOutput { 346 cursor: next_cursor, 347 blobs, 348 }), 349 ) 350 .into_response() 351}