this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token};
2use crate::delegation::{self, DelegationActionType};
3use crate::state::AppState;
4use crate::util::get_max_blob_size;
5use axum::body::Bytes;
6use axum::{
7 Json,
8 extract::{Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Response},
11};
12use cid::Cid;
13use multihash::Multihash;
14use serde::{Deserialize, Serialize};
15use serde_json::json;
16use sha2::{Digest, Sha256};
17use tracing::{debug, error};
18
19pub async fn upload_blob(
20 State(state): State<AppState>,
21 headers: axum::http::HeaderMap,
22 body: Bytes,
23) -> Response {
24 let token = match crate::auth::extract_bearer_token_from_header(
25 headers.get("Authorization").and_then(|h| h.to_str().ok()),
26 ) {
27 Some(t) => t,
28 None => {
29 return (
30 StatusCode::UNAUTHORIZED,
31 Json(json!({"error": "AuthenticationRequired"})),
32 )
33 .into_response();
34 }
35 };
36
37 let is_service_auth = is_service_token(&token);
38
39 let (did, _is_migration, controller_did) = if is_service_auth {
40 debug!("Verifying service token for blob upload");
41 let verifier = ServiceTokenVerifier::new();
42 match verifier
43 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob"))
44 .await
45 {
46 Ok(claims) => {
47 debug!("Service token verified for DID: {}", claims.iss);
48 (claims.iss, false, None)
49 }
50 Err(e) => {
51 error!("Service token verification failed: {:?}", e);
52 return (
53 StatusCode::UNAUTHORIZED,
54 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})),
55 )
56 .into_response();
57 }
58 }
59 } else {
60 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
61 Ok(user) => {
62 let mime_type_for_check = headers
63 .get("content-type")
64 .and_then(|h| h.to_str().ok())
65 .unwrap_or("application/octet-stream");
66 if let Err(e) = crate::auth::scope_check::check_blob_scope(
67 user.is_oauth,
68 user.scope.as_deref(),
69 mime_type_for_check,
70 ) {
71 return e;
72 }
73 let deactivated = sqlx::query_scalar!(
74 "SELECT deactivated_at FROM users WHERE did = $1",
75 user.did
76 )
77 .fetch_optional(&state.db)
78 .await
79 .ok()
80 .flatten()
81 .flatten();
82 let ctrl_did = user.controller_did.clone();
83 (user.did, deactivated.is_some(), ctrl_did)
84 }
85 Err(_) => {
86 return (
87 StatusCode::UNAUTHORIZED,
88 Json(json!({"error": "AuthenticationFailed"})),
89 )
90 .into_response();
91 }
92 }
93 };
94
95 let max_size = get_max_blob_size();
96
97 if body.len() > max_size {
98 return (
99 StatusCode::PAYLOAD_TOO_LARGE,
100 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), max_size)})),
101 )
102 .into_response();
103 }
104 let mime_type = headers
105 .get("content-type")
106 .and_then(|h| h.to_str().ok())
107 .unwrap_or("application/octet-stream")
108 .to_string();
109 let size = body.len() as i64;
110 let data = body.to_vec();
111 let mut hasher = Sha256::new();
112 hasher.update(&data);
113 let hash = hasher.finalize();
114 let multihash = match Multihash::wrap(0x12, &hash) {
115 Ok(mh) => mh,
116 Err(e) => {
117 error!("Failed to create multihash for blob: {:?}", e);
118 return (
119 StatusCode::INTERNAL_SERVER_ERROR,
120 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
121 )
122 .into_response();
123 }
124 };
125 let cid = Cid::new_v1(0x55, multihash);
126 let cid_str = cid.to_string();
127 let storage_key = format!("blobs/{}", cid_str);
128 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
129 .fetch_optional(&state.db)
130 .await;
131 let user_id = match user_query {
132 Ok(Some(row)) => row.id,
133 _ => {
134 return (
135 StatusCode::INTERNAL_SERVER_ERROR,
136 Json(json!({"error": "InternalError"})),
137 )
138 .into_response();
139 }
140 };
141 let mut tx = match state.db.begin().await {
142 Ok(tx) => tx,
143 Err(e) => {
144 error!("Failed to begin transaction: {:?}", e);
145 return (
146 StatusCode::INTERNAL_SERVER_ERROR,
147 Json(json!({"error": "InternalError"})),
148 )
149 .into_response();
150 }
151 };
152 let insert = sqlx::query!(
153 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
154 cid_str,
155 mime_type,
156 size,
157 user_id,
158 storage_key
159 )
160 .fetch_optional(&mut *tx)
161 .await;
162 let was_inserted = match insert {
163 Ok(Some(_)) => true,
164 Ok(None) => false,
165 Err(e) => {
166 error!("Failed to insert blob record: {:?}", e);
167 return (
168 StatusCode::INTERNAL_SERVER_ERROR,
169 Json(json!({"error": "InternalError"})),
170 )
171 .into_response();
172 }
173 };
174 if was_inserted
175 && let Err(e) = state
176 .blob_store
177 .put_bytes(&storage_key, bytes::Bytes::from(data))
178 .await
179 {
180 error!("Failed to upload blob to storage: {:?}", e);
181 return (
182 StatusCode::INTERNAL_SERVER_ERROR,
183 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
184 )
185 .into_response();
186 }
187 if let Err(e) = tx.commit().await {
188 error!("Failed to commit blob transaction: {:?}", e);
189 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
190 error!(
191 "Failed to cleanup orphaned blob {}: {:?}",
192 storage_key, cleanup_err
193 );
194 }
195 return (
196 StatusCode::INTERNAL_SERVER_ERROR,
197 Json(json!({"error": "InternalError"})),
198 )
199 .into_response();
200 }
201
202 if let Some(ref controller) = controller_did {
203 let _ = delegation::log_delegation_action(
204 &state.db,
205 &did,
206 controller,
207 Some(controller),
208 DelegationActionType::BlobUpload,
209 Some(json!({
210 "cid": cid_str,
211 "mime_type": mime_type,
212 "size": size
213 })),
214 None,
215 None,
216 )
217 .await;
218 }
219
220 Json(json!({
221 "blob": {
222 "$type": "blob",
223 "ref": {
224 "$link": cid_str
225 },
226 "mimeType": mime_type,
227 "size": size
228 }
229 }))
230 .into_response()
231}
232
233#[derive(Deserialize)]
234pub struct ListMissingBlobsParams {
235 pub limit: Option<i64>,
236 pub cursor: Option<String>,
237}
238
239#[derive(Serialize)]
240#[serde(rename_all = "camelCase")]
241pub struct RecordBlob {
242 pub cid: String,
243 pub record_uri: String,
244}
245
246#[derive(Serialize)]
247pub struct ListMissingBlobsOutput {
248 #[serde(skip_serializing_if = "Option::is_none")]
249 pub cursor: Option<String>,
250 pub blobs: Vec<RecordBlob>,
251}
252
253pub async fn list_missing_blobs(
254 State(state): State<AppState>,
255 headers: axum::http::HeaderMap,
256 Query(params): Query<ListMissingBlobsParams>,
257) -> Response {
258 let token = match crate::auth::extract_bearer_token_from_header(
259 headers.get("Authorization").and_then(|h| h.to_str().ok()),
260 ) {
261 Some(t) => t,
262 None => {
263 return (
264 StatusCode::UNAUTHORIZED,
265 Json(json!({"error": "AuthenticationRequired"})),
266 )
267 .into_response();
268 }
269 };
270 let auth_user =
271 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
272 Ok(user) => user,
273 Err(_) => {
274 return (
275 StatusCode::UNAUTHORIZED,
276 Json(json!({"error": "AuthenticationFailed"})),
277 )
278 .into_response();
279 }
280 };
281 let did = auth_user.did;
282 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
283 .fetch_optional(&state.db)
284 .await;
285 let user_id = match user_query {
286 Ok(Some(row)) => row.id,
287 _ => {
288 return (
289 StatusCode::INTERNAL_SERVER_ERROR,
290 Json(json!({"error": "InternalError"})),
291 )
292 .into_response();
293 }
294 };
295 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
296 let cursor_cid = params.cursor.as_deref().unwrap_or("");
297 let missing_query = sqlx::query!(
298 r#"
299 SELECT rb.blob_cid, rb.record_uri
300 FROM record_blobs rb
301 LEFT JOIN blobs b ON rb.blob_cid = b.cid AND b.created_by_user = rb.repo_id
302 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2
303 ORDER BY rb.blob_cid
304 LIMIT $3
305 "#,
306 user_id,
307 cursor_cid,
308 limit + 1
309 )
310 .fetch_all(&state.db)
311 .await;
312 let rows = match missing_query {
313 Ok(r) => r,
314 Err(e) => {
315 error!("DB error fetching missing blobs: {:?}", e);
316 return (
317 StatusCode::INTERNAL_SERVER_ERROR,
318 Json(json!({"error": "InternalError"})),
319 )
320 .into_response();
321 }
322 };
323 let has_more = rows.len() > limit as usize;
324 let blobs: Vec<RecordBlob> = rows
325 .into_iter()
326 .take(limit as usize)
327 .map(|row| RecordBlob {
328 cid: row.blob_cid,
329 record_uri: row.record_uri,
330 })
331 .collect();
332 let next_cursor = if has_more {
333 blobs.last().map(|b| b.cid.clone())
334 } else {
335 None
336 };
337 (
338 StatusCode::OK,
339 Json(ListMissingBlobsOutput {
340 cursor: next_cursor,
341 blobs,
342 }),
343 )
344 .into_response()
345}