this repo has no description
1use crate::api::error::ApiError;
2use crate::auth::{ServiceTokenVerifier, is_service_token};
3use crate::delegation::{self, DelegationActionType};
4use crate::state::AppState;
5use crate::util::get_max_blob_size;
6use axum::body::Body;
7use axum::{
8 Json,
9 extract::{Query, State},
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use bytes::Bytes;
14use cid::Cid;
15use futures::StreamExt;
16use multihash::Multihash;
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19use std::pin::Pin;
20use tracing::{debug, error, info, warn};
21
22fn detect_mime_type(data: &[u8], client_hint: &str) -> String {
23 if let Some(kind) = infer::get(data) {
24 let detected = kind.mime_type().to_string();
25 if detected != client_hint {
26 debug!(
27 "MIME type detection: client sent '{}', detected '{}'",
28 client_hint, detected
29 );
30 }
31 detected
32 } else if client_hint == "*/*" || client_hint.is_empty() {
33 warn!(
34 "Could not detect MIME type and client sent invalid hint: '{}'",
35 client_hint
36 );
37 "application/octet-stream".to_string()
38 } else {
39 client_hint.to_string()
40 }
41}
42
43pub async fn upload_blob(
44 State(state): State<AppState>,
45 headers: axum::http::HeaderMap,
46 body: Body,
47) -> Response {
48 let Some(token) = crate::auth::extract_bearer_token_from_header(
49 headers.get("Authorization").and_then(|h| h.to_str().ok()),
50 ) else {
51 return ApiError::AuthenticationRequired.into_response();
52 };
53
54 let is_service_auth = is_service_token(&token);
55
56 let (did, _is_migration, controller_did) = if is_service_auth {
57 debug!("Verifying service token for blob upload");
58 let verifier = ServiceTokenVerifier::new();
59 match verifier
60 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob"))
61 .await
62 {
63 Ok(claims) => {
64 debug!("Service token verified for DID: {}", claims.iss);
65 (claims.iss, false, None)
66 }
67 Err(e) => {
68 error!("Service token verification failed: {:?}", e);
69 return ApiError::AuthenticationFailed(Some(format!(
70 "Service token verification failed: {}",
71 e
72 )))
73 .into_response();
74 }
75 }
76 } else {
77 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
78 Ok(user) => {
79 let mime_type_for_check = headers
80 .get("content-type")
81 .and_then(|h| h.to_str().ok())
82 .unwrap_or("application/octet-stream");
83 if let Err(e) = crate::auth::scope_check::check_blob_scope(
84 user.is_oauth,
85 user.scope.as_deref(),
86 mime_type_for_check,
87 ) {
88 return e;
89 }
90 let deactivated = sqlx::query_scalar!(
91 "SELECT deactivated_at FROM users WHERE did = $1",
92 &user.did
93 )
94 .fetch_optional(&state.db)
95 .await
96 .ok()
97 .flatten()
98 .flatten();
99 let ctrl_did = user.controller_did.map(|d| d.to_string());
100 (user.did.to_string(), deactivated.is_some(), ctrl_did)
101 }
102 Err(_) => {
103 return ApiError::AuthenticationFailed(None).into_response();
104 }
105 }
106 };
107
108 if crate::util::is_account_migrated(&state.db, &did)
109 .await
110 .unwrap_or(false)
111 {
112 return ApiError::Forbidden.into_response();
113 }
114
115 let client_mime_hint = headers
116 .get("content-type")
117 .and_then(|h| h.to_str().ok())
118 .unwrap_or("application/octet-stream");
119
120 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
121 .fetch_optional(&state.db)
122 .await;
123 let user_id = match user_query {
124 Ok(Some(row)) => row.id,
125 _ => {
126 return ApiError::InternalError(None).into_response();
127 }
128 };
129
130 let temp_key = format!("temp/{}", uuid::Uuid::new_v4());
131 let max_size = get_max_blob_size() as u64;
132
133 let body_stream = body.into_data_stream();
134 let mapped_stream =
135 body_stream.map(|result| result.map_err(|e| std::io::Error::other(e.to_string())));
136 let pinned_stream: Pin<Box<dyn futures::Stream<Item = Result<Bytes, std::io::Error>> + Send>> =
137 Box::pin(mapped_stream);
138
139 info!("Starting streaming blob upload to temp key: {}", temp_key);
140
141 let upload_result = match state.blob_store.put_stream(&temp_key, pinned_stream).await {
142 Ok(result) => result,
143 Err(e) => {
144 error!("Failed to stream blob to storage: {:?}", e);
145 return ApiError::InternalError(Some("Failed to store blob".into())).into_response();
146 }
147 };
148
149 let size = upload_result.size;
150 if size > max_size {
151 let _ = state.blob_store.delete(&temp_key).await;
152 return ApiError::InvalidRequest(format!(
153 "Blob size {} exceeds maximum of {} bytes",
154 size, max_size
155 ))
156 .into_response();
157 }
158
159 let mime_type = match state.blob_store.get_head(&temp_key, 8192).await {
160 Ok(head_bytes) => detect_mime_type(&head_bytes, client_mime_hint),
161 Err(e) => {
162 warn!("Failed to read blob head for MIME detection: {:?}", e);
163 client_mime_hint.to_string()
164 }
165 };
166
167 let multihash = match Multihash::wrap(0x12, &upload_result.sha256_hash) {
168 Ok(mh) => mh,
169 Err(e) => {
170 let _ = state.blob_store.delete(&temp_key).await;
171 error!("Failed to create multihash for blob: {:?}", e);
172 return ApiError::InternalError(Some("Failed to hash blob".into())).into_response();
173 }
174 };
175 let cid = Cid::new_v1(0x55, multihash);
176 let cid_str = cid.to_string();
177 let storage_key = format!("blobs/{}", cid_str);
178
179 info!(
180 "Blob upload complete: size={}, cid={}, copying to final location",
181 size, cid_str
182 );
183
184 let mut tx = match state.db.begin().await {
185 Ok(tx) => tx,
186 Err(e) => {
187 let _ = state.blob_store.delete(&temp_key).await;
188 error!("Failed to begin transaction: {:?}", e);
189 return ApiError::InternalError(None).into_response();
190 }
191 };
192
193 let insert = sqlx::query!(
194 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
195 cid_str,
196 mime_type,
197 size as i64,
198 user_id,
199 storage_key
200 )
201 .fetch_optional(&mut *tx)
202 .await;
203
204 let was_inserted = match insert {
205 Ok(Some(_)) => true,
206 Ok(None) => false,
207 Err(e) => {
208 let _ = state.blob_store.delete(&temp_key).await;
209 error!("Failed to insert blob record: {:?}", e);
210 return ApiError::InternalError(None).into_response();
211 }
212 };
213
214 if was_inserted && let Err(e) = state.blob_store.copy(&temp_key, &storage_key).await {
215 let _ = state.blob_store.delete(&temp_key).await;
216 error!("Failed to copy blob to final location: {:?}", e);
217 return ApiError::InternalError(Some("Failed to store blob".into())).into_response();
218 }
219
220 let _ = state.blob_store.delete(&temp_key).await;
221
222 if let Err(e) = tx.commit().await {
223 error!("Failed to commit blob transaction: {:?}", e);
224 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
225 error!(
226 "Failed to cleanup orphaned blob {}: {:?}",
227 storage_key, cleanup_err
228 );
229 }
230 return ApiError::InternalError(None).into_response();
231 }
232
233 if let Some(ref controller) = controller_did {
234 let _ = delegation::log_delegation_action(
235 &state.db,
236 &did,
237 controller,
238 Some(controller),
239 DelegationActionType::BlobUpload,
240 Some(json!({
241 "cid": cid_str,
242 "mime_type": mime_type,
243 "size": size
244 })),
245 None,
246 None,
247 )
248 .await;
249 }
250
251 Json(json!({
252 "blob": {
253 "$type": "blob",
254 "ref": {
255 "$link": cid_str
256 },
257 "mimeType": mime_type,
258 "size": size
259 }
260 }))
261 .into_response()
262}
263
264#[derive(Deserialize)]
265pub struct ListMissingBlobsParams {
266 pub limit: Option<i64>,
267 pub cursor: Option<String>,
268}
269
270#[derive(Serialize)]
271#[serde(rename_all = "camelCase")]
272pub struct RecordBlob {
273 pub cid: String,
274 pub record_uri: String,
275}
276
277#[derive(Serialize)]
278pub struct ListMissingBlobsOutput {
279 #[serde(skip_serializing_if = "Option::is_none")]
280 pub cursor: Option<String>,
281 pub blobs: Vec<RecordBlob>,
282}
283
284pub async fn list_missing_blobs(
285 State(state): State<AppState>,
286 headers: axum::http::HeaderMap,
287 Query(params): Query<ListMissingBlobsParams>,
288) -> Response {
289 let Some(token) = crate::auth::extract_bearer_token_from_header(
290 headers.get("Authorization").and_then(|h| h.to_str().ok()),
291 ) else {
292 return ApiError::AuthenticationRequired.into_response();
293 };
294 let auth_user =
295 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
296 Ok(user) => user,
297 Err(_) => {
298 return ApiError::AuthenticationFailed(None).into_response();
299 }
300 };
301 let did = auth_user.did;
302 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did.as_str())
303 .fetch_optional(&state.db)
304 .await;
305 let user_id = match user_query {
306 Ok(Some(row)) => row.id,
307 _ => {
308 return ApiError::InternalError(None).into_response();
309 }
310 };
311 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
312 let cursor_cid = params.cursor.as_deref().unwrap_or("");
313 let missing_query = sqlx::query!(
314 r#"
315 SELECT rb.blob_cid, rb.record_uri
316 FROM record_blobs rb
317 LEFT JOIN blobs b ON rb.blob_cid = b.cid
318 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2
319 ORDER BY rb.blob_cid
320 LIMIT $3
321 "#,
322 user_id,
323 cursor_cid,
324 limit + 1
325 )
326 .fetch_all(&state.db)
327 .await;
328 let rows = match missing_query {
329 Ok(r) => r,
330 Err(e) => {
331 error!("DB error fetching missing blobs: {:?}", e);
332 return ApiError::InternalError(None).into_response();
333 }
334 };
335 let has_more = rows.len() > limit as usize;
336 let blobs: Vec<RecordBlob> = rows
337 .into_iter()
338 .take(limit as usize)
339 .map(|row| RecordBlob {
340 cid: row.blob_cid,
341 record_uri: row.record_uri,
342 })
343 .collect();
344 let next_cursor = if has_more {
345 blobs.last().map(|b| b.cid.clone())
346 } else {
347 None
348 };
349 (
350 StatusCode::OK,
351 Json(ListMissingBlobsOutput {
352 cursor: next_cursor,
353 blobs,
354 }),
355 )
356 .into_response()
357}