this repo has no description
1use crate::api::error::ApiError;
2use crate::auth::{ServiceTokenVerifier, is_service_token};
3use crate::delegation::{self, DelegationActionType};
4use crate::state::AppState;
5use crate::util::get_max_blob_size;
6use axum::body::Body;
7use axum::{
8 Json,
9 extract::{Query, State},
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use bytes::Bytes;
14use cid::Cid;
15use futures::StreamExt;
16use multihash::Multihash;
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19use std::pin::Pin;
20use tracing::{debug, error, info, warn};
21
22fn detect_mime_type(data: &[u8], client_hint: &str) -> String {
23 if let Some(kind) = infer::get(data) {
24 let detected = kind.mime_type().to_string();
25 if detected != client_hint {
26 debug!(
27 "MIME type detection: client sent '{}', detected '{}'",
28 client_hint, detected
29 );
30 }
31 detected
32 } else {
33 if client_hint == "*/*" || client_hint.is_empty() {
34 warn!("Could not detect MIME type and client sent invalid hint: '{}'", client_hint);
35 "application/octet-stream".to_string()
36 } else {
37 client_hint.to_string()
38 }
39 }
40}
41
42pub async fn upload_blob(
43 State(state): State<AppState>,
44 headers: axum::http::HeaderMap,
45 body: Body,
46) -> Response {
47 let Some(token) = crate::auth::extract_bearer_token_from_header(
48 headers.get("Authorization").and_then(|h| h.to_str().ok()),
49 ) else {
50 return ApiError::AuthenticationRequired.into_response();
51 };
52
53 let is_service_auth = is_service_token(&token);
54
55 let (did, _is_migration, controller_did) = if is_service_auth {
56 debug!("Verifying service token for blob upload");
57 let verifier = ServiceTokenVerifier::new();
58 match verifier
59 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob"))
60 .await
61 {
62 Ok(claims) => {
63 debug!("Service token verified for DID: {}", claims.iss);
64 (claims.iss, false, None)
65 }
66 Err(e) => {
67 error!("Service token verification failed: {:?}", e);
68 return ApiError::AuthenticationFailed(Some(format!(
69 "Service token verification failed: {}",
70 e
71 )))
72 .into_response();
73 }
74 }
75 } else {
76 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
77 Ok(user) => {
78 let mime_type_for_check = headers
79 .get("content-type")
80 .and_then(|h| h.to_str().ok())
81 .unwrap_or("application/octet-stream");
82 if let Err(e) = crate::auth::scope_check::check_blob_scope(
83 user.is_oauth,
84 user.scope.as_deref(),
85 mime_type_for_check,
86 ) {
87 return e;
88 }
89 let deactivated = sqlx::query_scalar!(
90 "SELECT deactivated_at FROM users WHERE did = $1",
91 &user.did
92 )
93 .fetch_optional(&state.db)
94 .await
95 .ok()
96 .flatten()
97 .flatten();
98 let ctrl_did = user.controller_did.map(|d| d.to_string());
99 (user.did.to_string(), deactivated.is_some(), ctrl_did)
100 }
101 Err(_) => {
102 return ApiError::AuthenticationFailed(None).into_response();
103 }
104 }
105 };
106
107 if crate::util::is_account_migrated(&state.db, &did)
108 .await
109 .unwrap_or(false)
110 {
111 return ApiError::Forbidden.into_response();
112 }
113
114 let client_mime_hint = headers
115 .get("content-type")
116 .and_then(|h| h.to_str().ok())
117 .unwrap_or("application/octet-stream");
118
119 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
120 .fetch_optional(&state.db)
121 .await;
122 let user_id = match user_query {
123 Ok(Some(row)) => row.id,
124 _ => {
125 return ApiError::InternalError(None).into_response();
126 }
127 };
128
129 let temp_key = format!("temp/{}", uuid::Uuid::new_v4());
130 let max_size = get_max_blob_size() as u64;
131
132 let body_stream = body.into_data_stream();
133 let mapped_stream =
134 body_stream.map(|result| result.map_err(|e| std::io::Error::other(e.to_string())));
135 let pinned_stream: Pin<Box<dyn futures::Stream<Item = Result<Bytes, std::io::Error>> + Send>> =
136 Box::pin(mapped_stream);
137
138 info!("Starting streaming blob upload to temp key: {}", temp_key);
139
140 let upload_result = match state.blob_store.put_stream(&temp_key, pinned_stream).await {
141 Ok(result) => result,
142 Err(e) => {
143 error!("Failed to stream blob to storage: {:?}", e);
144 return ApiError::InternalError(Some("Failed to store blob".into())).into_response();
145 }
146 };
147
148 let size = upload_result.size;
149 if size > max_size {
150 let _ = state.blob_store.delete(&temp_key).await;
151 return ApiError::InvalidRequest(format!(
152 "Blob size {} exceeds maximum of {} bytes",
153 size, max_size
154 ))
155 .into_response();
156 }
157
158 let mime_type = match state.blob_store.get_head(&temp_key, 8192).await {
159 Ok(head_bytes) => detect_mime_type(&head_bytes, client_mime_hint),
160 Err(e) => {
161 warn!("Failed to read blob head for MIME detection: {:?}", e);
162 client_mime_hint.to_string()
163 }
164 };
165
166 let multihash = match Multihash::wrap(0x12, &upload_result.sha256_hash) {
167 Ok(mh) => mh,
168 Err(e) => {
169 let _ = state.blob_store.delete(&temp_key).await;
170 error!("Failed to create multihash for blob: {:?}", e);
171 return ApiError::InternalError(Some("Failed to hash blob".into())).into_response();
172 }
173 };
174 let cid = Cid::new_v1(0x55, multihash);
175 let cid_str = cid.to_string();
176 let storage_key = format!("blobs/{}", cid_str);
177
178 info!(
179 "Blob upload complete: size={}, cid={}, copying to final location",
180 size, cid_str
181 );
182
183 let mut tx = match state.db.begin().await {
184 Ok(tx) => tx,
185 Err(e) => {
186 let _ = state.blob_store.delete(&temp_key).await;
187 error!("Failed to begin transaction: {:?}", e);
188 return ApiError::InternalError(None).into_response();
189 }
190 };
191
192 let insert = sqlx::query!(
193 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
194 cid_str,
195 mime_type,
196 size as i64,
197 user_id,
198 storage_key
199 )
200 .fetch_optional(&mut *tx)
201 .await;
202
203 let was_inserted = match insert {
204 Ok(Some(_)) => true,
205 Ok(None) => false,
206 Err(e) => {
207 let _ = state.blob_store.delete(&temp_key).await;
208 error!("Failed to insert blob record: {:?}", e);
209 return ApiError::InternalError(None).into_response();
210 }
211 };
212
213 if was_inserted && let Err(e) = state.blob_store.copy(&temp_key, &storage_key).await {
214 let _ = state.blob_store.delete(&temp_key).await;
215 error!("Failed to copy blob to final location: {:?}", e);
216 return ApiError::InternalError(Some("Failed to store blob".into())).into_response();
217 }
218
219 let _ = state.blob_store.delete(&temp_key).await;
220
221 if let Err(e) = tx.commit().await {
222 error!("Failed to commit blob transaction: {:?}", e);
223 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
224 error!(
225 "Failed to cleanup orphaned blob {}: {:?}",
226 storage_key, cleanup_err
227 );
228 }
229 return ApiError::InternalError(None).into_response();
230 }
231
232 if let Some(ref controller) = controller_did {
233 let _ = delegation::log_delegation_action(
234 &state.db,
235 &did,
236 controller,
237 Some(controller),
238 DelegationActionType::BlobUpload,
239 Some(json!({
240 "cid": cid_str,
241 "mime_type": mime_type,
242 "size": size
243 })),
244 None,
245 None,
246 )
247 .await;
248 }
249
250 Json(json!({
251 "blob": {
252 "$type": "blob",
253 "ref": {
254 "$link": cid_str
255 },
256 "mimeType": mime_type,
257 "size": size
258 }
259 }))
260 .into_response()
261}
262
263#[derive(Deserialize)]
264pub struct ListMissingBlobsParams {
265 pub limit: Option<i64>,
266 pub cursor: Option<String>,
267}
268
269#[derive(Serialize)]
270#[serde(rename_all = "camelCase")]
271pub struct RecordBlob {
272 pub cid: String,
273 pub record_uri: String,
274}
275
276#[derive(Serialize)]
277pub struct ListMissingBlobsOutput {
278 #[serde(skip_serializing_if = "Option::is_none")]
279 pub cursor: Option<String>,
280 pub blobs: Vec<RecordBlob>,
281}
282
283pub async fn list_missing_blobs(
284 State(state): State<AppState>,
285 headers: axum::http::HeaderMap,
286 Query(params): Query<ListMissingBlobsParams>,
287) -> Response {
288 let Some(token) = crate::auth::extract_bearer_token_from_header(
289 headers.get("Authorization").and_then(|h| h.to_str().ok()),
290 ) else {
291 return ApiError::AuthenticationRequired.into_response();
292 };
293 let auth_user =
294 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
295 Ok(user) => user,
296 Err(_) => {
297 return ApiError::AuthenticationFailed(None).into_response();
298 }
299 };
300 let did = auth_user.did;
301 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did.as_str())
302 .fetch_optional(&state.db)
303 .await;
304 let user_id = match user_query {
305 Ok(Some(row)) => row.id,
306 _ => {
307 return ApiError::InternalError(None).into_response();
308 }
309 };
310 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
311 let cursor_cid = params.cursor.as_deref().unwrap_or("");
312 let missing_query = sqlx::query!(
313 r#"
314 SELECT rb.blob_cid, rb.record_uri
315 FROM record_blobs rb
316 LEFT JOIN blobs b ON rb.blob_cid = b.cid
317 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2
318 ORDER BY rb.blob_cid
319 LIMIT $3
320 "#,
321 user_id,
322 cursor_cid,
323 limit + 1
324 )
325 .fetch_all(&state.db)
326 .await;
327 let rows = match missing_query {
328 Ok(r) => r,
329 Err(e) => {
330 error!("DB error fetching missing blobs: {:?}", e);
331 return ApiError::InternalError(None).into_response();
332 }
333 };
334 let has_more = rows.len() > limit as usize;
335 let blobs: Vec<RecordBlob> = rows
336 .into_iter()
337 .take(limit as usize)
338 .map(|row| RecordBlob {
339 cid: row.blob_cid,
340 record_uri: row.record_uri,
341 })
342 .collect();
343 let next_cursor = if has_more {
344 blobs.last().map(|b| b.cid.clone())
345 } else {
346 None
347 };
348 (
349 StatusCode::OK,
350 Json(ListMissingBlobsOutput {
351 cursor: next_cursor,
352 blobs,
353 }),
354 )
355 .into_response()
356}