this repo has no description
1use crate::api::error::ApiError;
2use crate::auth::{ServiceTokenVerifier, is_service_token};
3use crate::delegation::{self, DelegationActionType};
4use crate::state::AppState;
5use crate::util::get_max_blob_size;
6use axum::body::Body;
7use axum::{
8 Json,
9 extract::{Query, State},
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use bytes::Bytes;
14use cid::Cid;
15use futures::StreamExt;
16use multihash::Multihash;
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19use std::pin::Pin;
20use tracing::{debug, error, info};
21
22pub async fn upload_blob(
23 State(state): State<AppState>,
24 headers: axum::http::HeaderMap,
25 body: Body,
26) -> Response {
27 let Some(token) = crate::auth::extract_bearer_token_from_header(
28 headers.get("Authorization").and_then(|h| h.to_str().ok()),
29 ) else {
30 return ApiError::AuthenticationRequired.into_response();
31 };
32
33 let is_service_auth = is_service_token(&token);
34
35 let (did, _is_migration, controller_did) = if is_service_auth {
36 debug!("Verifying service token for blob upload");
37 let verifier = ServiceTokenVerifier::new();
38 match verifier
39 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob"))
40 .await
41 {
42 Ok(claims) => {
43 debug!("Service token verified for DID: {}", claims.iss);
44 (claims.iss, false, None)
45 }
46 Err(e) => {
47 error!("Service token verification failed: {:?}", e);
48 return ApiError::AuthenticationFailed(Some(format!(
49 "Service token verification failed: {}",
50 e
51 )))
52 .into_response();
53 }
54 }
55 } else {
56 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
57 Ok(user) => {
58 let mime_type_for_check = headers
59 .get("content-type")
60 .and_then(|h| h.to_str().ok())
61 .unwrap_or("application/octet-stream");
62 if let Err(e) = crate::auth::scope_check::check_blob_scope(
63 user.is_oauth,
64 user.scope.as_deref(),
65 mime_type_for_check,
66 ) {
67 return e;
68 }
69 let deactivated = sqlx::query_scalar!(
70 "SELECT deactivated_at FROM users WHERE did = $1",
71 &user.did
72 )
73 .fetch_optional(&state.db)
74 .await
75 .ok()
76 .flatten()
77 .flatten();
78 let ctrl_did = user.controller_did.map(|d| d.to_string());
79 (user.did.to_string(), deactivated.is_some(), ctrl_did)
80 }
81 Err(_) => {
82 return ApiError::AuthenticationFailed(None).into_response();
83 }
84 }
85 };
86
87 if crate::util::is_account_migrated(&state.db, &did)
88 .await
89 .unwrap_or(false)
90 {
91 return ApiError::Forbidden.into_response();
92 }
93
94 let mime_type = headers
95 .get("content-type")
96 .and_then(|h| h.to_str().ok())
97 .unwrap_or("application/octet-stream")
98 .to_string();
99
100 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
101 .fetch_optional(&state.db)
102 .await;
103 let user_id = match user_query {
104 Ok(Some(row)) => row.id,
105 _ => {
106 return ApiError::InternalError(None).into_response();
107 }
108 };
109
110 let temp_key = format!("temp/{}", uuid::Uuid::new_v4());
111 let max_size = get_max_blob_size() as u64;
112
113 let body_stream = body.into_data_stream();
114 let mapped_stream =
115 body_stream.map(|result| result.map_err(|e| std::io::Error::other(e.to_string())));
116 let pinned_stream: Pin<Box<dyn futures::Stream<Item = Result<Bytes, std::io::Error>> + Send>> =
117 Box::pin(mapped_stream);
118
119 info!("Starting streaming blob upload to temp key: {}", temp_key);
120
121 let upload_result = match state.blob_store.put_stream(&temp_key, pinned_stream).await {
122 Ok(result) => result,
123 Err(e) => {
124 error!("Failed to stream blob to storage: {:?}", e);
125 return ApiError::InternalError(Some("Failed to store blob".into())).into_response();
126 }
127 };
128
129 let size = upload_result.size;
130 if size > max_size {
131 let _ = state.blob_store.delete(&temp_key).await;
132 return ApiError::InvalidRequest(format!(
133 "Blob size {} exceeds maximum of {} bytes",
134 size, max_size
135 ))
136 .into_response();
137 }
138
139 let multihash = match Multihash::wrap(0x12, &upload_result.sha256_hash) {
140 Ok(mh) => mh,
141 Err(e) => {
142 let _ = state.blob_store.delete(&temp_key).await;
143 error!("Failed to create multihash for blob: {:?}", e);
144 return ApiError::InternalError(Some("Failed to hash blob".into())).into_response();
145 }
146 };
147 let cid = Cid::new_v1(0x55, multihash);
148 let cid_str = cid.to_string();
149 let storage_key = format!("blobs/{}", cid_str);
150
151 info!(
152 "Blob upload complete: size={}, cid={}, copying to final location",
153 size, cid_str
154 );
155
156 let mut tx = match state.db.begin().await {
157 Ok(tx) => tx,
158 Err(e) => {
159 let _ = state.blob_store.delete(&temp_key).await;
160 error!("Failed to begin transaction: {:?}", e);
161 return ApiError::InternalError(None).into_response();
162 }
163 };
164
165 let insert = sqlx::query!(
166 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
167 cid_str,
168 mime_type,
169 size as i64,
170 user_id,
171 storage_key
172 )
173 .fetch_optional(&mut *tx)
174 .await;
175
176 let was_inserted = match insert {
177 Ok(Some(_)) => true,
178 Ok(None) => false,
179 Err(e) => {
180 let _ = state.blob_store.delete(&temp_key).await;
181 error!("Failed to insert blob record: {:?}", e);
182 return ApiError::InternalError(None).into_response();
183 }
184 };
185
186 if was_inserted && let Err(e) = state.blob_store.copy(&temp_key, &storage_key).await {
187 let _ = state.blob_store.delete(&temp_key).await;
188 error!("Failed to copy blob to final location: {:?}", e);
189 return ApiError::InternalError(Some("Failed to store blob".into())).into_response();
190 }
191
192 let _ = state.blob_store.delete(&temp_key).await;
193
194 if let Err(e) = tx.commit().await {
195 error!("Failed to commit blob transaction: {:?}", e);
196 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
197 error!(
198 "Failed to cleanup orphaned blob {}: {:?}",
199 storage_key, cleanup_err
200 );
201 }
202 return ApiError::InternalError(None).into_response();
203 }
204
205 if let Some(ref controller) = controller_did {
206 let _ = delegation::log_delegation_action(
207 &state.db,
208 &did,
209 controller,
210 Some(controller),
211 DelegationActionType::BlobUpload,
212 Some(json!({
213 "cid": cid_str,
214 "mime_type": mime_type,
215 "size": size
216 })),
217 None,
218 None,
219 )
220 .await;
221 }
222
223 Json(json!({
224 "blob": {
225 "$type": "blob",
226 "ref": {
227 "$link": cid_str
228 },
229 "mimeType": mime_type,
230 "size": size
231 }
232 }))
233 .into_response()
234}
235
236#[derive(Deserialize)]
237pub struct ListMissingBlobsParams {
238 pub limit: Option<i64>,
239 pub cursor: Option<String>,
240}
241
242#[derive(Serialize)]
243#[serde(rename_all = "camelCase")]
244pub struct RecordBlob {
245 pub cid: String,
246 pub record_uri: String,
247}
248
249#[derive(Serialize)]
250pub struct ListMissingBlobsOutput {
251 #[serde(skip_serializing_if = "Option::is_none")]
252 pub cursor: Option<String>,
253 pub blobs: Vec<RecordBlob>,
254}
255
256pub async fn list_missing_blobs(
257 State(state): State<AppState>,
258 headers: axum::http::HeaderMap,
259 Query(params): Query<ListMissingBlobsParams>,
260) -> Response {
261 let Some(token) = crate::auth::extract_bearer_token_from_header(
262 headers.get("Authorization").and_then(|h| h.to_str().ok()),
263 ) else {
264 return ApiError::AuthenticationRequired.into_response();
265 };
266 let auth_user =
267 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
268 Ok(user) => user,
269 Err(_) => {
270 return ApiError::AuthenticationFailed(None).into_response();
271 }
272 };
273 let did = auth_user.did;
274 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did.as_str())
275 .fetch_optional(&state.db)
276 .await;
277 let user_id = match user_query {
278 Ok(Some(row)) => row.id,
279 _ => {
280 return ApiError::InternalError(None).into_response();
281 }
282 };
283 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
284 let cursor_cid = params.cursor.as_deref().unwrap_or("");
285 let missing_query = sqlx::query!(
286 r#"
287 SELECT rb.blob_cid, rb.record_uri
288 FROM record_blobs rb
289 LEFT JOIN blobs b ON rb.blob_cid = b.cid
290 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2
291 ORDER BY rb.blob_cid
292 LIMIT $3
293 "#,
294 user_id,
295 cursor_cid,
296 limit + 1
297 )
298 .fetch_all(&state.db)
299 .await;
300 let rows = match missing_query {
301 Ok(r) => r,
302 Err(e) => {
303 error!("DB error fetching missing blobs: {:?}", e);
304 return ApiError::InternalError(None).into_response();
305 }
306 };
307 let has_more = rows.len() > limit as usize;
308 let blobs: Vec<RecordBlob> = rows
309 .into_iter()
310 .take(limit as usize)
311 .map(|row| RecordBlob {
312 cid: row.blob_cid,
313 record_uri: row.record_uri,
314 })
315 .collect();
316 let next_cursor = if has_more {
317 blobs.last().map(|b| b.cid.clone())
318 } else {
319 None
320 };
321 (
322 StatusCode::OK,
323 Json(ListMissingBlobsOutput {
324 cursor: next_cursor,
325 blobs,
326 }),
327 )
328 .into_response()
329}