this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token};
2use crate::delegation::{self, DelegationActionType};
3use crate::state::AppState;
4use axum::body::Bytes;
5use axum::{
6 Json,
7 extract::{Query, State},
8 http::StatusCode,
9 response::{IntoResponse, Response},
10};
11use cid::Cid;
12use jacquard_repo::storage::BlockStore;
13use multihash::Multihash;
14use serde::{Deserialize, Serialize};
15use serde_json::json;
16use sha2::{Digest, Sha256};
17use std::str::FromStr;
18use tracing::{debug, error};
19
20const MAX_BLOB_SIZE: usize = 10_000_000_000;
21const MAX_VIDEO_BLOB_SIZE: usize = 10_000_000_000;
22
23pub async fn upload_blob(
24 State(state): State<AppState>,
25 headers: axum::http::HeaderMap,
26 body: Bytes,
27) -> Response {
28 let token = match crate::auth::extract_bearer_token_from_header(
29 headers.get("Authorization").and_then(|h| h.to_str().ok()),
30 ) {
31 Some(t) => t,
32 None => {
33 return (
34 StatusCode::UNAUTHORIZED,
35 Json(json!({"error": "AuthenticationRequired"})),
36 )
37 .into_response();
38 }
39 };
40
41 let is_service_auth = is_service_token(&token);
42
43 let (did, is_migration, controller_did) = if is_service_auth {
44 debug!("Verifying service token for blob upload");
45 let verifier = ServiceTokenVerifier::new();
46 match verifier
47 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob"))
48 .await
49 {
50 Ok(claims) => {
51 debug!("Service token verified for DID: {}", claims.iss);
52 (claims.iss, false, None)
53 }
54 Err(e) => {
55 error!("Service token verification failed: {:?}", e);
56 return (
57 StatusCode::UNAUTHORIZED,
58 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})),
59 )
60 .into_response();
61 }
62 }
63 } else {
64 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
65 Ok(user) => {
66 let mime_type_for_check = headers
67 .get("content-type")
68 .and_then(|h| h.to_str().ok())
69 .unwrap_or("application/octet-stream");
70 if let Err(e) = crate::auth::scope_check::check_blob_scope(
71 user.is_oauth,
72 user.scope.as_deref(),
73 mime_type_for_check,
74 ) {
75 return e;
76 }
77 let deactivated = sqlx::query_scalar!(
78 "SELECT deactivated_at FROM users WHERE did = $1",
79 user.did
80 )
81 .fetch_optional(&state.db)
82 .await
83 .ok()
84 .flatten()
85 .flatten();
86 let ctrl_did = user.controller_did.clone();
87 (user.did, deactivated.is_some(), ctrl_did)
88 }
89 Err(_) => {
90 return (
91 StatusCode::UNAUTHORIZED,
92 Json(json!({"error": "AuthenticationFailed"})),
93 )
94 .into_response();
95 }
96 }
97 };
98
99 let max_size = if is_service_auth || is_migration {
100 MAX_VIDEO_BLOB_SIZE
101 } else {
102 MAX_BLOB_SIZE
103 };
104
105 if body.len() > max_size {
106 return (
107 StatusCode::PAYLOAD_TOO_LARGE,
108 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), max_size)})),
109 )
110 .into_response();
111 }
112 let mime_type = headers
113 .get("content-type")
114 .and_then(|h| h.to_str().ok())
115 .unwrap_or("application/octet-stream")
116 .to_string();
117 let size = body.len() as i64;
118 let data = body.to_vec();
119 let mut hasher = Sha256::new();
120 hasher.update(&data);
121 let hash = hasher.finalize();
122 let multihash = match Multihash::wrap(0x12, &hash) {
123 Ok(mh) => mh,
124 Err(e) => {
125 error!("Failed to create multihash for blob: {:?}", e);
126 return (
127 StatusCode::INTERNAL_SERVER_ERROR,
128 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
129 )
130 .into_response();
131 }
132 };
133 let cid = Cid::new_v1(0x55, multihash);
134 let cid_str = cid.to_string();
135 let storage_key = format!("blobs/{}", cid_str);
136 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
137 .fetch_optional(&state.db)
138 .await;
139 let user_id = match user_query {
140 Ok(Some(row)) => row.id,
141 _ => {
142 return (
143 StatusCode::INTERNAL_SERVER_ERROR,
144 Json(json!({"error": "InternalError"})),
145 )
146 .into_response();
147 }
148 };
149 let mut tx = match state.db.begin().await {
150 Ok(tx) => tx,
151 Err(e) => {
152 error!("Failed to begin transaction: {:?}", e);
153 return (
154 StatusCode::INTERNAL_SERVER_ERROR,
155 Json(json!({"error": "InternalError"})),
156 )
157 .into_response();
158 }
159 };
160 let insert = sqlx::query!(
161 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
162 cid_str,
163 mime_type,
164 size,
165 user_id,
166 storage_key
167 )
168 .fetch_optional(&mut *tx)
169 .await;
170 let was_inserted = match insert {
171 Ok(Some(_)) => true,
172 Ok(None) => false,
173 Err(e) => {
174 error!("Failed to insert blob record: {:?}", e);
175 return (
176 StatusCode::INTERNAL_SERVER_ERROR,
177 Json(json!({"error": "InternalError"})),
178 )
179 .into_response();
180 }
181 };
182 if was_inserted
183 && let Err(e) = state
184 .blob_store
185 .put_bytes(&storage_key, bytes::Bytes::from(data))
186 .await
187 {
188 error!("Failed to upload blob to storage: {:?}", e);
189 return (
190 StatusCode::INTERNAL_SERVER_ERROR,
191 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
192 )
193 .into_response();
194 }
195 if let Err(e) = tx.commit().await {
196 error!("Failed to commit blob transaction: {:?}", e);
197 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
198 error!(
199 "Failed to cleanup orphaned blob {}: {:?}",
200 storage_key, cleanup_err
201 );
202 }
203 return (
204 StatusCode::INTERNAL_SERVER_ERROR,
205 Json(json!({"error": "InternalError"})),
206 )
207 .into_response();
208 }
209
210 if let Some(ref controller) = controller_did {
211 let _ = delegation::log_delegation_action(
212 &state.db,
213 &did,
214 controller,
215 Some(controller),
216 DelegationActionType::BlobUpload,
217 Some(json!({
218 "cid": cid_str,
219 "mime_type": mime_type,
220 "size": size
221 })),
222 None,
223 None,
224 )
225 .await;
226 }
227
228 Json(json!({
229 "blob": {
230 "$type": "blob",
231 "ref": {
232 "$link": cid_str
233 },
234 "mimeType": mime_type,
235 "size": size
236 }
237 }))
238 .into_response()
239}
240
241#[derive(Deserialize)]
242pub struct ListMissingBlobsParams {
243 pub limit: Option<i64>,
244 pub cursor: Option<String>,
245}
246
247#[derive(Serialize)]
248#[serde(rename_all = "camelCase")]
249pub struct RecordBlob {
250 pub cid: String,
251 pub record_uri: String,
252}
253
254#[derive(Serialize)]
255pub struct ListMissingBlobsOutput {
256 #[serde(skip_serializing_if = "Option::is_none")]
257 pub cursor: Option<String>,
258 pub blobs: Vec<RecordBlob>,
259}
260
261fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) {
262 if let Some(obj) = val.as_object() {
263 if let Some(type_val) = obj.get("$type")
264 && type_val == "blob"
265 && let Some(r) = obj.get("ref")
266 && let Some(link) = r.get("$link")
267 && let Some(s) = link.as_str()
268 {
269 blobs.push(s.to_string());
270 }
271 for (_, v) in obj {
272 find_blobs(v, blobs);
273 }
274 } else if let Some(arr) = val.as_array() {
275 for v in arr {
276 find_blobs(v, blobs);
277 }
278 }
279}
280
281pub async fn list_missing_blobs(
282 State(state): State<AppState>,
283 headers: axum::http::HeaderMap,
284 Query(params): Query<ListMissingBlobsParams>,
285) -> Response {
286 let token = match crate::auth::extract_bearer_token_from_header(
287 headers.get("Authorization").and_then(|h| h.to_str().ok()),
288 ) {
289 Some(t) => t,
290 None => {
291 return (
292 StatusCode::UNAUTHORIZED,
293 Json(json!({"error": "AuthenticationRequired"})),
294 )
295 .into_response();
296 }
297 };
298 let auth_user = match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
299 Ok(user) => user,
300 Err(_) => {
301 return (
302 StatusCode::UNAUTHORIZED,
303 Json(json!({"error": "AuthenticationFailed"})),
304 )
305 .into_response();
306 }
307 };
308 let did = auth_user.did;
309 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
310 .fetch_optional(&state.db)
311 .await;
312 let user_id = match user_query {
313 Ok(Some(row)) => row.id,
314 _ => {
315 return (
316 StatusCode::INTERNAL_SERVER_ERROR,
317 Json(json!({"error": "InternalError"})),
318 )
319 .into_response();
320 }
321 };
322 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
323 let cursor_str = params.cursor.unwrap_or_default();
324 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
325 let parts: Vec<&str> = cursor_str.split('|').collect();
326 (parts[0].to_string(), parts[1].to_string())
327 } else {
328 (String::new(), String::new())
329 };
330 let records_query = sqlx::query!(
331 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
332 user_id,
333 cursor_collection,
334 cursor_rkey,
335 limit
336 )
337 .fetch_all(&state.db)
338 .await;
339 let records = match records_query {
340 Ok(r) => r,
341 Err(e) => {
342 error!("DB error fetching records: {:?}", e);
343 return (
344 StatusCode::INTERNAL_SERVER_ERROR,
345 Json(json!({"error": "InternalError"})),
346 )
347 .into_response();
348 }
349 };
350 let mut missing_blobs = Vec::new();
351 let mut last_cursor = None;
352 for row in &records {
353 let collection = &row.collection;
354 let rkey = &row.rkey;
355 let record_cid_str = &row.record_cid;
356 last_cursor = Some(format!("{}|{}", collection, rkey));
357 let record_cid = match Cid::from_str(record_cid_str) {
358 Ok(c) => c,
359 Err(_) => continue,
360 };
361 let block_bytes = match state.block_store.get(&record_cid).await {
362 Ok(Some(b)) => b,
363 _ => continue,
364 };
365 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
366 Ok(v) => v,
367 Err(_) => continue,
368 };
369 let mut blobs = Vec::new();
370 find_blobs(&record_val, &mut blobs);
371 for blob_cid_str in blobs {
372 let exists = sqlx::query!(
373 "SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2",
374 blob_cid_str,
375 user_id
376 )
377 .fetch_optional(&state.db)
378 .await;
379 match exists {
380 Ok(None) => {
381 missing_blobs.push(RecordBlob {
382 cid: blob_cid_str,
383 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
384 });
385 }
386 Err(e) => {
387 error!("DB error checking blob existence: {:?}", e);
388 }
389 _ => {}
390 }
391 }
392 }
393 // if we fetched fewer records than limit, we are done, so cursor is None.
394 // otherwise, cursor is the last one we saw.
395 // ...right?
396 let next_cursor = if records.len() < limit as usize {
397 None
398 } else {
399 last_cursor
400 };
401 (
402 StatusCode::OK,
403 Json(ListMissingBlobsOutput {
404 cursor: next_cursor,
405 blobs: missing_blobs,
406 }),
407 )
408 .into_response()
409}