this repo has no description
1use crate::api::error::ApiError;
2use crate::auth::{BearerAuthAllowDeactivated, ServiceTokenVerifier, is_service_token};
3use crate::delegation::{self, DelegationActionType};
4use crate::state::AppState;
5use crate::util::get_max_blob_size;
6use axum::body::Body;
7use axum::{
8 Json,
9 extract::{Query, State},
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use bytes::Bytes;
14use cid::Cid;
15use futures::StreamExt;
16use multihash::Multihash;
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19use std::pin::Pin;
20use tracing::{debug, error, info, warn};
21
22fn detect_mime_type(data: &[u8], client_hint: &str) -> String {
23 if let Some(kind) = infer::get(data) {
24 let detected = kind.mime_type().to_string();
25 if detected != client_hint {
26 debug!(
27 "MIME type detection: client sent '{}', detected '{}'",
28 client_hint, detected
29 );
30 }
31 detected
32 } else if client_hint == "*/*" || client_hint.is_empty() {
33 warn!(
34 "Could not detect MIME type and client sent invalid hint: '{}'",
35 client_hint
36 );
37 "application/octet-stream".to_string()
38 } else {
39 client_hint.to_string()
40 }
41}
42
43pub async fn upload_blob(
44 State(state): State<AppState>,
45 headers: axum::http::HeaderMap,
46 body: Body,
47) -> Response {
48 let extracted = match crate::auth::extract_auth_token_from_header(
49 headers.get("Authorization").and_then(|h| h.to_str().ok()),
50 ) {
51 Some(t) => t,
52 None => return ApiError::AuthenticationRequired.into_response(),
53 };
54 let token = extracted.token;
55
56 let is_service_auth = is_service_token(&token);
57
58 let (did, _is_migration, controller_did) = if is_service_auth {
59 debug!("Verifying service token for blob upload");
60 let verifier = ServiceTokenVerifier::new();
61 match verifier
62 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob"))
63 .await
64 {
65 Ok(claims) => {
66 debug!("Service token verified for DID: {}", claims.iss);
67 (claims.iss, false, None)
68 }
69 Err(e) => {
70 error!("Service token verification failed: {:?}", e);
71 return ApiError::AuthenticationFailed(Some(format!(
72 "Service token verification failed: {}",
73 e
74 )))
75 .into_response();
76 }
77 }
78 } else {
79 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
80 let http_uri = format!(
81 "https://{}/xrpc/com.atproto.repo.uploadBlob",
82 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
83 );
84 match crate::auth::validate_token_with_dpop(
85 &state.db,
86 &token,
87 extracted.is_dpop,
88 dpop_proof,
89 "POST",
90 &http_uri,
91 true,
92 false,
93 )
94 .await
95 {
96 Ok(user) => {
97 let mime_type_for_check = headers
98 .get("content-type")
99 .and_then(|h| h.to_str().ok())
100 .unwrap_or("application/octet-stream");
101 if let Err(e) = crate::auth::scope_check::check_blob_scope(
102 user.is_oauth,
103 user.scope.as_deref(),
104 mime_type_for_check,
105 ) {
106 return e;
107 }
108 let deactivated = sqlx::query_scalar!(
109 "SELECT deactivated_at FROM users WHERE did = $1",
110 &user.did
111 )
112 .fetch_optional(&state.db)
113 .await
114 .ok()
115 .flatten()
116 .flatten();
117 let ctrl_did = user.controller_did.map(|d| d.to_string());
118 (user.did.to_string(), deactivated.is_some(), ctrl_did)
119 }
120 Err(_) => {
121 return ApiError::AuthenticationFailed(None).into_response();
122 }
123 }
124 };
125
126 if crate::util::is_account_migrated(&state.db, &did)
127 .await
128 .unwrap_or(false)
129 {
130 return ApiError::Forbidden.into_response();
131 }
132
133 let client_mime_hint = headers
134 .get("content-type")
135 .and_then(|h| h.to_str().ok())
136 .unwrap_or("application/octet-stream");
137
138 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
139 .fetch_optional(&state.db)
140 .await;
141 let user_id = match user_query {
142 Ok(Some(row)) => row.id,
143 _ => {
144 return ApiError::InternalError(None).into_response();
145 }
146 };
147
148 let temp_key = format!("temp/{}", uuid::Uuid::new_v4());
149 let max_size = get_max_blob_size() as u64;
150
151 let body_stream = body.into_data_stream();
152 let mapped_stream =
153 body_stream.map(|result| result.map_err(|e| std::io::Error::other(e.to_string())));
154 let pinned_stream: Pin<Box<dyn futures::Stream<Item = Result<Bytes, std::io::Error>> + Send>> =
155 Box::pin(mapped_stream);
156
157 info!("Starting streaming blob upload to temp key: {}", temp_key);
158
159 let upload_result = match state.blob_store.put_stream(&temp_key, pinned_stream).await {
160 Ok(result) => result,
161 Err(e) => {
162 error!("Failed to stream blob to storage: {:?}", e);
163 return ApiError::InternalError(Some("Failed to store blob".into())).into_response();
164 }
165 };
166
167 let size = upload_result.size;
168 if size > max_size {
169 let _ = state.blob_store.delete(&temp_key).await;
170 return ApiError::InvalidRequest(format!(
171 "Blob size {} exceeds maximum of {} bytes",
172 size, max_size
173 ))
174 .into_response();
175 }
176
177 let mime_type = match state.blob_store.get_head(&temp_key, 8192).await {
178 Ok(head_bytes) => detect_mime_type(&head_bytes, client_mime_hint),
179 Err(e) => {
180 warn!("Failed to read blob head for MIME detection: {:?}", e);
181 client_mime_hint.to_string()
182 }
183 };
184
185 let multihash = match Multihash::wrap(0x12, &upload_result.sha256_hash) {
186 Ok(mh) => mh,
187 Err(e) => {
188 let _ = state.blob_store.delete(&temp_key).await;
189 error!("Failed to create multihash for blob: {:?}", e);
190 return ApiError::InternalError(Some("Failed to hash blob".into())).into_response();
191 }
192 };
193 let cid = Cid::new_v1(0x55, multihash);
194 let cid_str = cid.to_string();
195 let storage_key = format!("blobs/{}", cid_str);
196
197 info!(
198 "Blob upload complete: size={}, cid={}, copying to final location",
199 size, cid_str
200 );
201
202 let mut tx = match state.db.begin().await {
203 Ok(tx) => tx,
204 Err(e) => {
205 let _ = state.blob_store.delete(&temp_key).await;
206 error!("Failed to begin transaction: {:?}", e);
207 return ApiError::InternalError(None).into_response();
208 }
209 };
210
211 let insert = sqlx::query!(
212 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
213 cid_str,
214 mime_type,
215 size as i64,
216 user_id,
217 storage_key
218 )
219 .fetch_optional(&mut *tx)
220 .await;
221
222 let was_inserted = match insert {
223 Ok(Some(_)) => true,
224 Ok(None) => false,
225 Err(e) => {
226 let _ = state.blob_store.delete(&temp_key).await;
227 error!("Failed to insert blob record: {:?}", e);
228 return ApiError::InternalError(None).into_response();
229 }
230 };
231
232 if was_inserted && let Err(e) = state.blob_store.copy(&temp_key, &storage_key).await {
233 let _ = state.blob_store.delete(&temp_key).await;
234 error!("Failed to copy blob to final location: {:?}", e);
235 return ApiError::InternalError(Some("Failed to store blob".into())).into_response();
236 }
237
238 let _ = state.blob_store.delete(&temp_key).await;
239
240 if let Err(e) = tx.commit().await {
241 error!("Failed to commit blob transaction: {:?}", e);
242 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
243 error!(
244 "Failed to cleanup orphaned blob {}: {:?}",
245 storage_key, cleanup_err
246 );
247 }
248 return ApiError::InternalError(None).into_response();
249 }
250
251 if let Some(ref controller) = controller_did {
252 let _ = delegation::log_delegation_action(
253 &state.db,
254 &did,
255 controller,
256 Some(controller),
257 DelegationActionType::BlobUpload,
258 Some(json!({
259 "cid": cid_str,
260 "mime_type": mime_type,
261 "size": size
262 })),
263 None,
264 None,
265 )
266 .await;
267 }
268
269 Json(json!({
270 "blob": {
271 "$type": "blob",
272 "ref": {
273 "$link": cid_str
274 },
275 "mimeType": mime_type,
276 "size": size
277 }
278 }))
279 .into_response()
280}
281
282#[derive(Deserialize)]
283pub struct ListMissingBlobsParams {
284 pub limit: Option<i64>,
285 pub cursor: Option<String>,
286}
287
288#[derive(Serialize)]
289#[serde(rename_all = "camelCase")]
290pub struct RecordBlob {
291 pub cid: String,
292 pub record_uri: String,
293}
294
295#[derive(Serialize)]
296pub struct ListMissingBlobsOutput {
297 #[serde(skip_serializing_if = "Option::is_none")]
298 pub cursor: Option<String>,
299 pub blobs: Vec<RecordBlob>,
300}
301
302pub async fn list_missing_blobs(
303 State(state): State<AppState>,
304 auth: BearerAuthAllowDeactivated,
305 Query(params): Query<ListMissingBlobsParams>,
306) -> Response {
307 let auth_user = auth.0;
308 let did = auth_user.did;
309 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did.as_str())
310 .fetch_optional(&state.db)
311 .await;
312 let user_id = match user_query {
313 Ok(Some(row)) => row.id,
314 _ => {
315 return ApiError::InternalError(None).into_response();
316 }
317 };
318 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
319 let cursor_cid = params.cursor.as_deref().unwrap_or("");
320 let missing_query = sqlx::query!(
321 r#"
322 SELECT rb.blob_cid, rb.record_uri
323 FROM record_blobs rb
324 LEFT JOIN blobs b ON rb.blob_cid = b.cid
325 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2
326 ORDER BY rb.blob_cid
327 LIMIT $3
328 "#,
329 user_id,
330 cursor_cid,
331 limit + 1
332 )
333 .fetch_all(&state.db)
334 .await;
335 let rows = match missing_query {
336 Ok(r) => r,
337 Err(e) => {
338 error!("DB error fetching missing blobs: {:?}", e);
339 return ApiError::InternalError(None).into_response();
340 }
341 };
342 let has_more = rows.len() > limit as usize;
343 let blobs: Vec<RecordBlob> = rows
344 .into_iter()
345 .take(limit as usize)
346 .map(|row| RecordBlob {
347 cid: row.blob_cid,
348 record_uri: row.record_uri,
349 })
350 .collect();
351 let next_cursor = if has_more {
352 blobs.last().map(|b| b.cid.clone())
353 } else {
354 None
355 };
356 (
357 StatusCode::OK,
358 Json(ListMissingBlobsOutput {
359 cursor: next_cursor,
360 blobs,
361 }),
362 )
363 .into_response()
364}