this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token}; 2use crate::delegation::{self, DelegationActionType}; 3use crate::state::AppState; 4use crate::sync::import::find_blob_refs_ipld; 5use axum::body::Bytes; 6use axum::{ 7 Json, 8 extract::{Query, State}, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11}; 12use cid::Cid; 13use ipld_core::ipld::Ipld; 14use jacquard_repo::storage::BlockStore; 15use multihash::Multihash; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use sha2::{Digest, Sha256}; 19use std::str::FromStr; 20use tracing::{debug, error, warn}; 21 22const MAX_BLOB_SIZE: usize = 10_000_000_000; 23const MAX_VIDEO_BLOB_SIZE: usize = 10_000_000_000; 24 25pub async fn upload_blob( 26 State(state): State<AppState>, 27 headers: axum::http::HeaderMap, 28 body: Bytes, 29) -> Response { 30 let token = match crate::auth::extract_bearer_token_from_header( 31 headers.get("Authorization").and_then(|h| h.to_str().ok()), 32 ) { 33 Some(t) => t, 34 None => { 35 return ( 36 StatusCode::UNAUTHORIZED, 37 Json(json!({"error": "AuthenticationRequired"})), 38 ) 39 .into_response(); 40 } 41 }; 42 43 let is_service_auth = is_service_token(&token); 44 45 let (did, is_migration, controller_did) = if is_service_auth { 46 debug!("Verifying service token for blob upload"); 47 let verifier = ServiceTokenVerifier::new(); 48 match verifier 49 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob")) 50 .await 51 { 52 Ok(claims) => { 53 debug!("Service token verified for DID: {}", claims.iss); 54 (claims.iss, false, None) 55 } 56 Err(e) => { 57 error!("Service token verification failed: {:?}", e); 58 return ( 59 StatusCode::UNAUTHORIZED, 60 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})), 61 ) 62 .into_response(); 63 } 64 } 65 } else { 66 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 67 Ok(user) => { 68 let mime_type_for_check = headers 69 .get("content-type") 70 .and_then(|h| h.to_str().ok()) 71 .unwrap_or("application/octet-stream"); 72 if let Err(e) = crate::auth::scope_check::check_blob_scope( 73 user.is_oauth, 74 user.scope.as_deref(), 75 mime_type_for_check, 76 ) { 77 return e; 78 } 79 let deactivated = sqlx::query_scalar!( 80 "SELECT deactivated_at FROM users WHERE did = $1", 81 user.did 82 ) 83 .fetch_optional(&state.db) 84 .await 85 .ok() 86 .flatten() 87 .flatten(); 88 let ctrl_did = user.controller_did.clone(); 89 (user.did, deactivated.is_some(), ctrl_did) 90 } 91 Err(_) => { 92 return ( 93 StatusCode::UNAUTHORIZED, 94 Json(json!({"error": "AuthenticationFailed"})), 95 ) 96 .into_response(); 97 } 98 } 99 }; 100 101 let max_size = if is_service_auth || is_migration { 102 MAX_VIDEO_BLOB_SIZE 103 } else { 104 MAX_BLOB_SIZE 105 }; 106 107 if body.len() > max_size { 108 return ( 109 StatusCode::PAYLOAD_TOO_LARGE, 110 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), max_size)})), 111 ) 112 .into_response(); 113 } 114 let mime_type = headers 115 .get("content-type") 116 .and_then(|h| h.to_str().ok()) 117 .unwrap_or("application/octet-stream") 118 .to_string(); 119 let size = body.len() as i64; 120 let data = body.to_vec(); 121 let mut hasher = Sha256::new(); 122 hasher.update(&data); 123 let hash = hasher.finalize(); 124 let multihash = match Multihash::wrap(0x12, &hash) { 125 Ok(mh) => mh, 126 Err(e) => { 127 error!("Failed to create multihash for blob: {:?}", e); 128 return ( 129 StatusCode::INTERNAL_SERVER_ERROR, 130 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})), 131 ) 132 .into_response(); 133 } 134 }; 135 let cid = Cid::new_v1(0x55, multihash); 136 let cid_str = cid.to_string(); 137 let storage_key = format!("blobs/{}", cid_str); 138 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 139 .fetch_optional(&state.db) 140 .await; 141 let user_id = match user_query { 142 Ok(Some(row)) => row.id, 143 _ => { 144 return ( 145 StatusCode::INTERNAL_SERVER_ERROR, 146 Json(json!({"error": "InternalError"})), 147 ) 148 .into_response(); 149 } 150 }; 151 let mut tx = match state.db.begin().await { 152 Ok(tx) => tx, 153 Err(e) => { 154 error!("Failed to begin transaction: {:?}", e); 155 return ( 156 StatusCode::INTERNAL_SERVER_ERROR, 157 Json(json!({"error": "InternalError"})), 158 ) 159 .into_response(); 160 } 161 }; 162 let insert = sqlx::query!( 163 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid", 164 cid_str, 165 mime_type, 166 size, 167 user_id, 168 storage_key 169 ) 170 .fetch_optional(&mut *tx) 171 .await; 172 let was_inserted = match insert { 173 Ok(Some(_)) => true, 174 Ok(None) => false, 175 Err(e) => { 176 error!("Failed to insert blob record: {:?}", e); 177 return ( 178 StatusCode::INTERNAL_SERVER_ERROR, 179 Json(json!({"error": "InternalError"})), 180 ) 181 .into_response(); 182 } 183 }; 184 if was_inserted 185 && let Err(e) = state 186 .blob_store 187 .put_bytes(&storage_key, bytes::Bytes::from(data)) 188 .await 189 { 190 error!("Failed to upload blob to storage: {:?}", e); 191 return ( 192 StatusCode::INTERNAL_SERVER_ERROR, 193 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 194 ) 195 .into_response(); 196 } 197 if let Err(e) = tx.commit().await { 198 error!("Failed to commit blob transaction: {:?}", e); 199 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await { 200 error!( 201 "Failed to cleanup orphaned blob {}: {:?}", 202 storage_key, cleanup_err 203 ); 204 } 205 return ( 206 StatusCode::INTERNAL_SERVER_ERROR, 207 Json(json!({"error": "InternalError"})), 208 ) 209 .into_response(); 210 } 211 212 if let Some(ref controller) = controller_did { 213 let _ = delegation::log_delegation_action( 214 &state.db, 215 &did, 216 controller, 217 Some(controller), 218 DelegationActionType::BlobUpload, 219 Some(json!({ 220 "cid": cid_str, 221 "mime_type": mime_type, 222 "size": size 223 })), 224 None, 225 None, 226 ) 227 .await; 228 } 229 230 Json(json!({ 231 "blob": { 232 "$type": "blob", 233 "ref": { 234 "$link": cid_str 235 }, 236 "mimeType": mime_type, 237 "size": size 238 } 239 })) 240 .into_response() 241} 242 243#[derive(Deserialize)] 244pub struct ListMissingBlobsParams { 245 pub limit: Option<i64>, 246 pub cursor: Option<String>, 247} 248 249#[derive(Serialize)] 250#[serde(rename_all = "camelCase")] 251pub struct RecordBlob { 252 pub cid: String, 253 pub record_uri: String, 254} 255 256#[derive(Serialize)] 257pub struct ListMissingBlobsOutput { 258 #[serde(skip_serializing_if = "Option::is_none")] 259 pub cursor: Option<String>, 260 pub blobs: Vec<RecordBlob>, 261} 262 263pub async fn list_missing_blobs( 264 State(state): State<AppState>, 265 headers: axum::http::HeaderMap, 266 Query(params): Query<ListMissingBlobsParams>, 267) -> Response { 268 let token = match crate::auth::extract_bearer_token_from_header( 269 headers.get("Authorization").and_then(|h| h.to_str().ok()), 270 ) { 271 Some(t) => t, 272 None => { 273 return ( 274 StatusCode::UNAUTHORIZED, 275 Json(json!({"error": "AuthenticationRequired"})), 276 ) 277 .into_response(); 278 } 279 }; 280 let auth_user = 281 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 282 Ok(user) => user, 283 Err(_) => { 284 return ( 285 StatusCode::UNAUTHORIZED, 286 Json(json!({"error": "AuthenticationFailed"})), 287 ) 288 .into_response(); 289 } 290 }; 291 let did = auth_user.did; 292 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 293 .fetch_optional(&state.db) 294 .await; 295 let user_id = match user_query { 296 Ok(Some(row)) => row.id, 297 _ => { 298 return ( 299 StatusCode::INTERNAL_SERVER_ERROR, 300 Json(json!({"error": "InternalError"})), 301 ) 302 .into_response(); 303 } 304 }; 305 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 306 let cursor_str = params.cursor.unwrap_or_default(); 307 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') { 308 let parts: Vec<&str> = cursor_str.split('|').collect(); 309 (parts[0].to_string(), parts[1].to_string()) 310 } else { 311 (String::new(), String::new()) 312 }; 313 let records_query = sqlx::query!( 314 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4", 315 user_id, 316 cursor_collection, 317 cursor_rkey, 318 limit 319 ) 320 .fetch_all(&state.db) 321 .await; 322 let records = match records_query { 323 Ok(r) => r, 324 Err(e) => { 325 error!("DB error fetching records: {:?}", e); 326 return ( 327 StatusCode::INTERNAL_SERVER_ERROR, 328 Json(json!({"error": "InternalError"})), 329 ) 330 .into_response(); 331 } 332 }; 333 let mut missing_blobs = Vec::new(); 334 let mut last_cursor = None; 335 for row in &records { 336 let collection = &row.collection; 337 let rkey = &row.rkey; 338 let record_cid_str = &row.record_cid; 339 last_cursor = Some(format!("{}|{}", collection, rkey)); 340 let record_cid = match Cid::from_str(record_cid_str) { 341 Ok(c) => c, 342 Err(_) => continue, 343 }; 344 let block_bytes = match state.block_store.get(&record_cid).await { 345 Ok(Some(b)) => b, 346 _ => continue, 347 }; 348 let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) { 349 Ok(v) => v, 350 Err(e) => { 351 warn!("Failed to parse record {} as IPLD: {:?}", record_cid_str, e); 352 continue; 353 } 354 }; 355 let blob_refs = find_blob_refs_ipld(&record_ipld, 0); 356 for blob_ref in blob_refs { 357 let blob_cid_str = blob_ref.cid; 358 let exists = sqlx::query!( 359 "SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", 360 blob_cid_str, 361 user_id 362 ) 363 .fetch_optional(&state.db) 364 .await; 365 match exists { 366 Ok(None) => { 367 missing_blobs.push(RecordBlob { 368 cid: blob_cid_str, 369 record_uri: format!("at://{}/{}/{}", did, collection, rkey), 370 }); 371 } 372 Err(e) => { 373 error!("DB error checking blob existence: {:?}", e); 374 } 375 _ => {} 376 } 377 } 378 } 379 // if we fetched fewer records than limit, we are done, so cursor is None. 380 // otherwise, cursor is the last one we saw. 381 // ...right? 382 let next_cursor = if records.len() < limit as usize { 383 None 384 } else { 385 last_cursor 386 }; 387 ( 388 StatusCode::OK, 389 Json(ListMissingBlobsOutput { 390 cursor: next_cursor, 391 blobs: missing_blobs, 392 }), 393 ) 394 .into_response() 395}