this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token};
2use crate::delegation::{self, DelegationActionType};
3use crate::state::AppState;
4use crate::sync::import::find_blob_refs_ipld;
5use axum::body::Bytes;
6use axum::{
7 Json,
8 extract::{Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Response},
11};
12use cid::Cid;
13use ipld_core::ipld::Ipld;
14use jacquard_repo::storage::BlockStore;
15use multihash::Multihash;
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use sha2::{Digest, Sha256};
19use std::str::FromStr;
20use tracing::{debug, error, warn};
21
22const MAX_BLOB_SIZE: usize = 10_000_000_000;
23const MAX_VIDEO_BLOB_SIZE: usize = 10_000_000_000;
24
25pub async fn upload_blob(
26 State(state): State<AppState>,
27 headers: axum::http::HeaderMap,
28 body: Bytes,
29) -> Response {
30 let token = match crate::auth::extract_bearer_token_from_header(
31 headers.get("Authorization").and_then(|h| h.to_str().ok()),
32 ) {
33 Some(t) => t,
34 None => {
35 return (
36 StatusCode::UNAUTHORIZED,
37 Json(json!({"error": "AuthenticationRequired"})),
38 )
39 .into_response();
40 }
41 };
42
43 let is_service_auth = is_service_token(&token);
44
45 let (did, is_migration, controller_did) = if is_service_auth {
46 debug!("Verifying service token for blob upload");
47 let verifier = ServiceTokenVerifier::new();
48 match verifier
49 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob"))
50 .await
51 {
52 Ok(claims) => {
53 debug!("Service token verified for DID: {}", claims.iss);
54 (claims.iss, false, None)
55 }
56 Err(e) => {
57 error!("Service token verification failed: {:?}", e);
58 return (
59 StatusCode::UNAUTHORIZED,
60 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})),
61 )
62 .into_response();
63 }
64 }
65 } else {
66 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
67 Ok(user) => {
68 let mime_type_for_check = headers
69 .get("content-type")
70 .and_then(|h| h.to_str().ok())
71 .unwrap_or("application/octet-stream");
72 if let Err(e) = crate::auth::scope_check::check_blob_scope(
73 user.is_oauth,
74 user.scope.as_deref(),
75 mime_type_for_check,
76 ) {
77 return e;
78 }
79 let deactivated = sqlx::query_scalar!(
80 "SELECT deactivated_at FROM users WHERE did = $1",
81 user.did
82 )
83 .fetch_optional(&state.db)
84 .await
85 .ok()
86 .flatten()
87 .flatten();
88 let ctrl_did = user.controller_did.clone();
89 (user.did, deactivated.is_some(), ctrl_did)
90 }
91 Err(_) => {
92 return (
93 StatusCode::UNAUTHORIZED,
94 Json(json!({"error": "AuthenticationFailed"})),
95 )
96 .into_response();
97 }
98 }
99 };
100
101 let max_size = if is_service_auth || is_migration {
102 MAX_VIDEO_BLOB_SIZE
103 } else {
104 MAX_BLOB_SIZE
105 };
106
107 if body.len() > max_size {
108 return (
109 StatusCode::PAYLOAD_TOO_LARGE,
110 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), max_size)})),
111 )
112 .into_response();
113 }
114 let mime_type = headers
115 .get("content-type")
116 .and_then(|h| h.to_str().ok())
117 .unwrap_or("application/octet-stream")
118 .to_string();
119 let size = body.len() as i64;
120 let data = body.to_vec();
121 let mut hasher = Sha256::new();
122 hasher.update(&data);
123 let hash = hasher.finalize();
124 let multihash = match Multihash::wrap(0x12, &hash) {
125 Ok(mh) => mh,
126 Err(e) => {
127 error!("Failed to create multihash for blob: {:?}", e);
128 return (
129 StatusCode::INTERNAL_SERVER_ERROR,
130 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
131 )
132 .into_response();
133 }
134 };
135 let cid = Cid::new_v1(0x55, multihash);
136 let cid_str = cid.to_string();
137 let storage_key = format!("blobs/{}", cid_str);
138 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
139 .fetch_optional(&state.db)
140 .await;
141 let user_id = match user_query {
142 Ok(Some(row)) => row.id,
143 _ => {
144 return (
145 StatusCode::INTERNAL_SERVER_ERROR,
146 Json(json!({"error": "InternalError"})),
147 )
148 .into_response();
149 }
150 };
151 let mut tx = match state.db.begin().await {
152 Ok(tx) => tx,
153 Err(e) => {
154 error!("Failed to begin transaction: {:?}", e);
155 return (
156 StatusCode::INTERNAL_SERVER_ERROR,
157 Json(json!({"error": "InternalError"})),
158 )
159 .into_response();
160 }
161 };
162 let insert = sqlx::query!(
163 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
164 cid_str,
165 mime_type,
166 size,
167 user_id,
168 storage_key
169 )
170 .fetch_optional(&mut *tx)
171 .await;
172 let was_inserted = match insert {
173 Ok(Some(_)) => true,
174 Ok(None) => false,
175 Err(e) => {
176 error!("Failed to insert blob record: {:?}", e);
177 return (
178 StatusCode::INTERNAL_SERVER_ERROR,
179 Json(json!({"error": "InternalError"})),
180 )
181 .into_response();
182 }
183 };
184 if was_inserted
185 && let Err(e) = state
186 .blob_store
187 .put_bytes(&storage_key, bytes::Bytes::from(data))
188 .await
189 {
190 error!("Failed to upload blob to storage: {:?}", e);
191 return (
192 StatusCode::INTERNAL_SERVER_ERROR,
193 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
194 )
195 .into_response();
196 }
197 if let Err(e) = tx.commit().await {
198 error!("Failed to commit blob transaction: {:?}", e);
199 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
200 error!(
201 "Failed to cleanup orphaned blob {}: {:?}",
202 storage_key, cleanup_err
203 );
204 }
205 return (
206 StatusCode::INTERNAL_SERVER_ERROR,
207 Json(json!({"error": "InternalError"})),
208 )
209 .into_response();
210 }
211
212 if let Some(ref controller) = controller_did {
213 let _ = delegation::log_delegation_action(
214 &state.db,
215 &did,
216 controller,
217 Some(controller),
218 DelegationActionType::BlobUpload,
219 Some(json!({
220 "cid": cid_str,
221 "mime_type": mime_type,
222 "size": size
223 })),
224 None,
225 None,
226 )
227 .await;
228 }
229
230 Json(json!({
231 "blob": {
232 "$type": "blob",
233 "ref": {
234 "$link": cid_str
235 },
236 "mimeType": mime_type,
237 "size": size
238 }
239 }))
240 .into_response()
241}
242
243#[derive(Deserialize)]
244pub struct ListMissingBlobsParams {
245 pub limit: Option<i64>,
246 pub cursor: Option<String>,
247}
248
249#[derive(Serialize)]
250#[serde(rename_all = "camelCase")]
251pub struct RecordBlob {
252 pub cid: String,
253 pub record_uri: String,
254}
255
256#[derive(Serialize)]
257pub struct ListMissingBlobsOutput {
258 #[serde(skip_serializing_if = "Option::is_none")]
259 pub cursor: Option<String>,
260 pub blobs: Vec<RecordBlob>,
261}
262
263pub async fn list_missing_blobs(
264 State(state): State<AppState>,
265 headers: axum::http::HeaderMap,
266 Query(params): Query<ListMissingBlobsParams>,
267) -> Response {
268 let token = match crate::auth::extract_bearer_token_from_header(
269 headers.get("Authorization").and_then(|h| h.to_str().ok()),
270 ) {
271 Some(t) => t,
272 None => {
273 return (
274 StatusCode::UNAUTHORIZED,
275 Json(json!({"error": "AuthenticationRequired"})),
276 )
277 .into_response();
278 }
279 };
280 let auth_user =
281 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
282 Ok(user) => user,
283 Err(_) => {
284 return (
285 StatusCode::UNAUTHORIZED,
286 Json(json!({"error": "AuthenticationFailed"})),
287 )
288 .into_response();
289 }
290 };
291 let did = auth_user.did;
292 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
293 .fetch_optional(&state.db)
294 .await;
295 let user_id = match user_query {
296 Ok(Some(row)) => row.id,
297 _ => {
298 return (
299 StatusCode::INTERNAL_SERVER_ERROR,
300 Json(json!({"error": "InternalError"})),
301 )
302 .into_response();
303 }
304 };
305 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
306 let cursor_str = params.cursor.unwrap_or_default();
307 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
308 let parts: Vec<&str> = cursor_str.split('|').collect();
309 (parts[0].to_string(), parts[1].to_string())
310 } else {
311 (String::new(), String::new())
312 };
313 let records_query = sqlx::query!(
314 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
315 user_id,
316 cursor_collection,
317 cursor_rkey,
318 limit
319 )
320 .fetch_all(&state.db)
321 .await;
322 let records = match records_query {
323 Ok(r) => r,
324 Err(e) => {
325 error!("DB error fetching records: {:?}", e);
326 return (
327 StatusCode::INTERNAL_SERVER_ERROR,
328 Json(json!({"error": "InternalError"})),
329 )
330 .into_response();
331 }
332 };
333 let mut missing_blobs = Vec::new();
334 let mut last_cursor = None;
335 for row in &records {
336 let collection = &row.collection;
337 let rkey = &row.rkey;
338 let record_cid_str = &row.record_cid;
339 last_cursor = Some(format!("{}|{}", collection, rkey));
340 let record_cid = match Cid::from_str(record_cid_str) {
341 Ok(c) => c,
342 Err(_) => continue,
343 };
344 let block_bytes = match state.block_store.get(&record_cid).await {
345 Ok(Some(b)) => b,
346 _ => continue,
347 };
348 let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) {
349 Ok(v) => v,
350 Err(e) => {
351 warn!("Failed to parse record {} as IPLD: {:?}", record_cid_str, e);
352 continue;
353 }
354 };
355 let blob_refs = find_blob_refs_ipld(&record_ipld, 0);
356 for blob_ref in blob_refs {
357 let blob_cid_str = blob_ref.cid;
358 let exists = sqlx::query!(
359 "SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2",
360 blob_cid_str,
361 user_id
362 )
363 .fetch_optional(&state.db)
364 .await;
365 match exists {
366 Ok(None) => {
367 missing_blobs.push(RecordBlob {
368 cid: blob_cid_str,
369 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
370 });
371 }
372 Err(e) => {
373 error!("DB error checking blob existence: {:?}", e);
374 }
375 _ => {}
376 }
377 }
378 }
379 // if we fetched fewer records than limit, we are done, so cursor is None.
380 // otherwise, cursor is the last one we saw.
381 // ...right?
382 let next_cursor = if records.len() < limit as usize {
383 None
384 } else {
385 last_cursor
386 };
387 (
388 StatusCode::OK,
389 Json(ListMissingBlobsOutput {
390 cursor: next_cursor,
391 blobs: missing_blobs,
392 }),
393 )
394 .into_response()
395}