this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token};
2use crate::delegation::{self, DelegationActionType};
3use crate::state::AppState;
4use crate::util::get_max_blob_size;
5use axum::body::Body;
6use axum::{
7 Json,
8 extract::{Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Response},
11};
12use bytes::Bytes;
13use cid::Cid;
14use futures::StreamExt;
15use multihash::Multihash;
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use std::pin::Pin;
19use tracing::{debug, error, info};
20
21pub async fn upload_blob(
22 State(state): State<AppState>,
23 headers: axum::http::HeaderMap,
24 body: Body,
25) -> Response {
26 let token = match crate::auth::extract_bearer_token_from_header(
27 headers.get("Authorization").and_then(|h| h.to_str().ok()),
28 ) {
29 Some(t) => t,
30 None => {
31 return (
32 StatusCode::UNAUTHORIZED,
33 Json(json!({"error": "AuthenticationRequired"})),
34 )
35 .into_response();
36 }
37 };
38
39 let is_service_auth = is_service_token(&token);
40
41 let (did, _is_migration, controller_did) = if is_service_auth {
42 debug!("Verifying service token for blob upload");
43 let verifier = ServiceTokenVerifier::new();
44 match verifier
45 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob"))
46 .await
47 {
48 Ok(claims) => {
49 debug!("Service token verified for DID: {}", claims.iss);
50 (claims.iss, false, None)
51 }
52 Err(e) => {
53 error!("Service token verification failed: {:?}", e);
54 return (
55 StatusCode::UNAUTHORIZED,
56 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})),
57 )
58 .into_response();
59 }
60 }
61 } else {
62 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
63 Ok(user) => {
64 let mime_type_for_check = headers
65 .get("content-type")
66 .and_then(|h| h.to_str().ok())
67 .unwrap_or("application/octet-stream");
68 if let Err(e) = crate::auth::scope_check::check_blob_scope(
69 user.is_oauth,
70 user.scope.as_deref(),
71 mime_type_for_check,
72 ) {
73 return e;
74 }
75 let deactivated = sqlx::query_scalar!(
76 "SELECT deactivated_at FROM users WHERE did = $1",
77 user.did
78 )
79 .fetch_optional(&state.db)
80 .await
81 .ok()
82 .flatten()
83 .flatten();
84 let ctrl_did = user.controller_did.clone();
85 (user.did, deactivated.is_some(), ctrl_did)
86 }
87 Err(_) => {
88 return (
89 StatusCode::UNAUTHORIZED,
90 Json(json!({"error": "AuthenticationFailed"})),
91 )
92 .into_response();
93 }
94 }
95 };
96
97 if crate::util::is_account_migrated(&state.db, &did)
98 .await
99 .unwrap_or(false)
100 {
101 return (
102 StatusCode::FORBIDDEN,
103 Json(json!({
104 "error": "AccountMigrated",
105 "message": "Account has been migrated to another PDS. Blob operations are not allowed."
106 })),
107 )
108 .into_response();
109 }
110
111 let mime_type = headers
112 .get("content-type")
113 .and_then(|h| h.to_str().ok())
114 .unwrap_or("application/octet-stream")
115 .to_string();
116
117 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
118 .fetch_optional(&state.db)
119 .await;
120 let user_id = match user_query {
121 Ok(Some(row)) => row.id,
122 _ => {
123 return (
124 StatusCode::INTERNAL_SERVER_ERROR,
125 Json(json!({"error": "InternalError"})),
126 )
127 .into_response();
128 }
129 };
130
131 let temp_key = format!("temp/{}", uuid::Uuid::new_v4());
132 let max_size = get_max_blob_size() as u64;
133
134 let body_stream = body.into_data_stream();
135 let mapped_stream =
136 body_stream.map(|result| result.map_err(|e| std::io::Error::other(e.to_string())));
137 let pinned_stream: Pin<Box<dyn futures::Stream<Item = Result<Bytes, std::io::Error>> + Send>> =
138 Box::pin(mapped_stream);
139
140 info!("Starting streaming blob upload to temp key: {}", temp_key);
141
142 let upload_result = match state.blob_store.put_stream(&temp_key, pinned_stream).await {
143 Ok(result) => result,
144 Err(e) => {
145 error!("Failed to stream blob to storage: {:?}", e);
146 return (
147 StatusCode::INTERNAL_SERVER_ERROR,
148 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
149 )
150 .into_response();
151 }
152 };
153
154 let size = upload_result.size;
155 if size > max_size {
156 let _ = state.blob_store.delete(&temp_key).await;
157 return (
158 StatusCode::PAYLOAD_TOO_LARGE,
159 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", size, max_size)})),
160 )
161 .into_response();
162 }
163
164 let multihash = match Multihash::wrap(0x12, &upload_result.sha256_hash) {
165 Ok(mh) => mh,
166 Err(e) => {
167 let _ = state.blob_store.delete(&temp_key).await;
168 error!("Failed to create multihash for blob: {:?}", e);
169 return (
170 StatusCode::INTERNAL_SERVER_ERROR,
171 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
172 )
173 .into_response();
174 }
175 };
176 let cid = Cid::new_v1(0x55, multihash);
177 let cid_str = cid.to_string();
178 let storage_key = format!("blobs/{}", cid_str);
179
180 info!(
181 "Blob upload complete: size={}, cid={}, copying to final location",
182 size, cid_str
183 );
184
185 let mut tx = match state.db.begin().await {
186 Ok(tx) => tx,
187 Err(e) => {
188 let _ = state.blob_store.delete(&temp_key).await;
189 error!("Failed to begin transaction: {:?}", e);
190 return (
191 StatusCode::INTERNAL_SERVER_ERROR,
192 Json(json!({"error": "InternalError"})),
193 )
194 .into_response();
195 }
196 };
197
198 let insert = sqlx::query!(
199 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
200 cid_str,
201 mime_type,
202 size as i64,
203 user_id,
204 storage_key
205 )
206 .fetch_optional(&mut *tx)
207 .await;
208
209 let was_inserted = match insert {
210 Ok(Some(_)) => true,
211 Ok(None) => false,
212 Err(e) => {
213 let _ = state.blob_store.delete(&temp_key).await;
214 error!("Failed to insert blob record: {:?}", e);
215 return (
216 StatusCode::INTERNAL_SERVER_ERROR,
217 Json(json!({"error": "InternalError"})),
218 )
219 .into_response();
220 }
221 };
222
223 if was_inserted && let Err(e) = state.blob_store.copy(&temp_key, &storage_key).await {
224 let _ = state.blob_store.delete(&temp_key).await;
225 error!("Failed to copy blob to final location: {:?}", e);
226 return (
227 StatusCode::INTERNAL_SERVER_ERROR,
228 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
229 )
230 .into_response();
231 }
232
233 let _ = state.blob_store.delete(&temp_key).await;
234
235 if let Err(e) = tx.commit().await {
236 error!("Failed to commit blob transaction: {:?}", e);
237 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
238 error!(
239 "Failed to cleanup orphaned blob {}: {:?}",
240 storage_key, cleanup_err
241 );
242 }
243 return (
244 StatusCode::INTERNAL_SERVER_ERROR,
245 Json(json!({"error": "InternalError"})),
246 )
247 .into_response();
248 }
249
250 if let Some(ref controller) = controller_did {
251 let _ = delegation::log_delegation_action(
252 &state.db,
253 &did,
254 controller,
255 Some(controller),
256 DelegationActionType::BlobUpload,
257 Some(json!({
258 "cid": cid_str,
259 "mime_type": mime_type,
260 "size": size
261 })),
262 None,
263 None,
264 )
265 .await;
266 }
267
268 Json(json!({
269 "blob": {
270 "$type": "blob",
271 "ref": {
272 "$link": cid_str
273 },
274 "mimeType": mime_type,
275 "size": size
276 }
277 }))
278 .into_response()
279}
280
281#[derive(Deserialize)]
282pub struct ListMissingBlobsParams {
283 pub limit: Option<i64>,
284 pub cursor: Option<String>,
285}
286
287#[derive(Serialize)]
288#[serde(rename_all = "camelCase")]
289pub struct RecordBlob {
290 pub cid: String,
291 pub record_uri: String,
292}
293
294#[derive(Serialize)]
295pub struct ListMissingBlobsOutput {
296 #[serde(skip_serializing_if = "Option::is_none")]
297 pub cursor: Option<String>,
298 pub blobs: Vec<RecordBlob>,
299}
300
301pub async fn list_missing_blobs(
302 State(state): State<AppState>,
303 headers: axum::http::HeaderMap,
304 Query(params): Query<ListMissingBlobsParams>,
305) -> Response {
306 let token = match crate::auth::extract_bearer_token_from_header(
307 headers.get("Authorization").and_then(|h| h.to_str().ok()),
308 ) {
309 Some(t) => t,
310 None => {
311 return (
312 StatusCode::UNAUTHORIZED,
313 Json(json!({"error": "AuthenticationRequired"})),
314 )
315 .into_response();
316 }
317 };
318 let auth_user =
319 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
320 Ok(user) => user,
321 Err(_) => {
322 return (
323 StatusCode::UNAUTHORIZED,
324 Json(json!({"error": "AuthenticationFailed"})),
325 )
326 .into_response();
327 }
328 };
329 let did = auth_user.did;
330 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
331 .fetch_optional(&state.db)
332 .await;
333 let user_id = match user_query {
334 Ok(Some(row)) => row.id,
335 _ => {
336 return (
337 StatusCode::INTERNAL_SERVER_ERROR,
338 Json(json!({"error": "InternalError"})),
339 )
340 .into_response();
341 }
342 };
343 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
344 let cursor_cid = params.cursor.as_deref().unwrap_or("");
345 let missing_query = sqlx::query!(
346 r#"
347 SELECT rb.blob_cid, rb.record_uri
348 FROM record_blobs rb
349 LEFT JOIN blobs b ON rb.blob_cid = b.cid
350 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2
351 ORDER BY rb.blob_cid
352 LIMIT $3
353 "#,
354 user_id,
355 cursor_cid,
356 limit + 1
357 )
358 .fetch_all(&state.db)
359 .await;
360 let rows = match missing_query {
361 Ok(r) => r,
362 Err(e) => {
363 error!("DB error fetching missing blobs: {:?}", e);
364 return (
365 StatusCode::INTERNAL_SERVER_ERROR,
366 Json(json!({"error": "InternalError"})),
367 )
368 .into_response();
369 }
370 };
371 let has_more = rows.len() > limit as usize;
372 let blobs: Vec<RecordBlob> = rows
373 .into_iter()
374 .take(limit as usize)
375 .map(|row| RecordBlob {
376 cid: row.blob_cid,
377 record_uri: row.record_uri,
378 })
379 .collect();
380 let next_cursor = if has_more {
381 blobs.last().map(|b| b.cid.clone())
382 } else {
383 None
384 };
385 (
386 StatusCode::OK,
387 Json(ListMissingBlobsOutput {
388 cursor: next_cursor,
389 blobs,
390 }),
391 )
392 .into_response()
393}