this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token};
2use crate::state::AppState;
3use axum::body::Bytes;
4use axum::{
5 Json,
6 extract::{Query, State},
7 http::StatusCode,
8 response::{IntoResponse, Response},
9};
10use cid::Cid;
11use jacquard_repo::storage::BlockStore;
12use multihash::Multihash;
13use serde::{Deserialize, Serialize};
14use serde_json::json;
15use sha2::{Digest, Sha256};
16use std::str::FromStr;
17use tracing::{debug, error};
18
19const MAX_BLOB_SIZE: usize = 10_000_000_000;
20const MAX_VIDEO_BLOB_SIZE: usize = 10_000_000_000;
21
22pub async fn upload_blob(
23 State(state): State<AppState>,
24 headers: axum::http::HeaderMap,
25 body: Bytes,
26) -> Response {
27 let token = match crate::auth::extract_bearer_token_from_header(
28 headers.get("Authorization").and_then(|h| h.to_str().ok()),
29 ) {
30 Some(t) => t,
31 None => {
32 return (
33 StatusCode::UNAUTHORIZED,
34 Json(json!({"error": "AuthenticationRequired"})),
35 )
36 .into_response();
37 }
38 };
39
40 let is_service_auth = is_service_token(&token);
41
42 let (did, is_migration) = if is_service_auth {
43 debug!("Verifying service token for blob upload");
44 let verifier = ServiceTokenVerifier::new();
45 match verifier
46 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob"))
47 .await
48 {
49 Ok(claims) => {
50 debug!("Service token verified for DID: {}", claims.iss);
51 (claims.iss, false)
52 }
53 Err(e) => {
54 error!("Service token verification failed: {:?}", e);
55 return (
56 StatusCode::UNAUTHORIZED,
57 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})),
58 )
59 .into_response();
60 }
61 }
62 } else {
63 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
64 Ok(user) => {
65 let mime_type_for_check = headers
66 .get("content-type")
67 .and_then(|h| h.to_str().ok())
68 .unwrap_or("application/octet-stream");
69 if let Err(e) = crate::auth::scope_check::check_blob_scope(
70 user.is_oauth,
71 user.scope.as_deref(),
72 mime_type_for_check,
73 ) {
74 return e;
75 }
76 let deactivated = sqlx::query_scalar!(
77 "SELECT deactivated_at FROM users WHERE did = $1",
78 user.did
79 )
80 .fetch_optional(&state.db)
81 .await
82 .ok()
83 .flatten()
84 .flatten();
85 (user.did, deactivated.is_some())
86 }
87 Err(_) => {
88 return (
89 StatusCode::UNAUTHORIZED,
90 Json(json!({"error": "AuthenticationFailed"})),
91 )
92 .into_response();
93 }
94 }
95 };
96
97 let max_size = if is_service_auth || is_migration {
98 MAX_VIDEO_BLOB_SIZE
99 } else {
100 MAX_BLOB_SIZE
101 };
102
103 if body.len() > max_size {
104 return (
105 StatusCode::PAYLOAD_TOO_LARGE,
106 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), max_size)})),
107 )
108 .into_response();
109 }
110 let mime_type = headers
111 .get("content-type")
112 .and_then(|h| h.to_str().ok())
113 .unwrap_or("application/octet-stream")
114 .to_string();
115 let size = body.len() as i64;
116 let data = body.to_vec();
117 let mut hasher = Sha256::new();
118 hasher.update(&data);
119 let hash = hasher.finalize();
120 let multihash = match Multihash::wrap(0x12, &hash) {
121 Ok(mh) => mh,
122 Err(e) => {
123 error!("Failed to create multihash for blob: {:?}", e);
124 return (
125 StatusCode::INTERNAL_SERVER_ERROR,
126 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
127 )
128 .into_response();
129 }
130 };
131 let cid = Cid::new_v1(0x55, multihash);
132 let cid_str = cid.to_string();
133 let storage_key = format!("blobs/{}", cid_str);
134 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
135 .fetch_optional(&state.db)
136 .await;
137 let user_id = match user_query {
138 Ok(Some(row)) => row.id,
139 _ => {
140 return (
141 StatusCode::INTERNAL_SERVER_ERROR,
142 Json(json!({"error": "InternalError"})),
143 )
144 .into_response();
145 }
146 };
147 let mut tx = match state.db.begin().await {
148 Ok(tx) => tx,
149 Err(e) => {
150 error!("Failed to begin transaction: {:?}", e);
151 return (
152 StatusCode::INTERNAL_SERVER_ERROR,
153 Json(json!({"error": "InternalError"})),
154 )
155 .into_response();
156 }
157 };
158 let insert = sqlx::query!(
159 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
160 cid_str,
161 mime_type,
162 size,
163 user_id,
164 storage_key
165 )
166 .fetch_optional(&mut *tx)
167 .await;
168 let was_inserted = match insert {
169 Ok(Some(_)) => true,
170 Ok(None) => false,
171 Err(e) => {
172 error!("Failed to insert blob record: {:?}", e);
173 return (
174 StatusCode::INTERNAL_SERVER_ERROR,
175 Json(json!({"error": "InternalError"})),
176 )
177 .into_response();
178 }
179 };
180 if was_inserted
181 && let Err(e) = state
182 .blob_store
183 .put_bytes(&storage_key, bytes::Bytes::from(data))
184 .await
185 {
186 error!("Failed to upload blob to storage: {:?}", e);
187 return (
188 StatusCode::INTERNAL_SERVER_ERROR,
189 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
190 )
191 .into_response();
192 }
193 if let Err(e) = tx.commit().await {
194 error!("Failed to commit blob transaction: {:?}", e);
195 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
196 error!(
197 "Failed to cleanup orphaned blob {}: {:?}",
198 storage_key, cleanup_err
199 );
200 }
201 return (
202 StatusCode::INTERNAL_SERVER_ERROR,
203 Json(json!({"error": "InternalError"})),
204 )
205 .into_response();
206 }
207 Json(json!({
208 "blob": {
209 "$type": "blob",
210 "ref": {
211 "$link": cid_str
212 },
213 "mimeType": mime_type,
214 "size": size
215 }
216 }))
217 .into_response()
218}
219
220#[derive(Deserialize)]
221pub struct ListMissingBlobsParams {
222 pub limit: Option<i64>,
223 pub cursor: Option<String>,
224}
225
226#[derive(Serialize)]
227#[serde(rename_all = "camelCase")]
228pub struct RecordBlob {
229 pub cid: String,
230 pub record_uri: String,
231}
232
233#[derive(Serialize)]
234pub struct ListMissingBlobsOutput {
235 #[serde(skip_serializing_if = "Option::is_none")]
236 pub cursor: Option<String>,
237 pub blobs: Vec<RecordBlob>,
238}
239
240fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) {
241 if let Some(obj) = val.as_object() {
242 if let Some(type_val) = obj.get("$type")
243 && type_val == "blob"
244 && let Some(r) = obj.get("ref")
245 && let Some(link) = r.get("$link")
246 && let Some(s) = link.as_str()
247 {
248 blobs.push(s.to_string());
249 }
250 for (_, v) in obj {
251 find_blobs(v, blobs);
252 }
253 } else if let Some(arr) = val.as_array() {
254 for v in arr {
255 find_blobs(v, blobs);
256 }
257 }
258}
259
260pub async fn list_missing_blobs(
261 State(state): State<AppState>,
262 headers: axum::http::HeaderMap,
263 Query(params): Query<ListMissingBlobsParams>,
264) -> Response {
265 let token = match crate::auth::extract_bearer_token_from_header(
266 headers.get("Authorization").and_then(|h| h.to_str().ok()),
267 ) {
268 Some(t) => t,
269 None => {
270 return (
271 StatusCode::UNAUTHORIZED,
272 Json(json!({"error": "AuthenticationRequired"})),
273 )
274 .into_response();
275 }
276 };
277 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
278 Ok(user) => user,
279 Err(_) => {
280 return (
281 StatusCode::UNAUTHORIZED,
282 Json(json!({"error": "AuthenticationFailed"})),
283 )
284 .into_response();
285 }
286 };
287 let did = auth_user.did;
288 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
289 .fetch_optional(&state.db)
290 .await;
291 let user_id = match user_query {
292 Ok(Some(row)) => row.id,
293 _ => {
294 return (
295 StatusCode::INTERNAL_SERVER_ERROR,
296 Json(json!({"error": "InternalError"})),
297 )
298 .into_response();
299 }
300 };
301 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
302 let cursor_str = params.cursor.unwrap_or_default();
303 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
304 let parts: Vec<&str> = cursor_str.split('|').collect();
305 (parts[0].to_string(), parts[1].to_string())
306 } else {
307 (String::new(), String::new())
308 };
309 let records_query = sqlx::query!(
310 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
311 user_id,
312 cursor_collection,
313 cursor_rkey,
314 limit
315 )
316 .fetch_all(&state.db)
317 .await;
318 let records = match records_query {
319 Ok(r) => r,
320 Err(e) => {
321 error!("DB error fetching records: {:?}", e);
322 return (
323 StatusCode::INTERNAL_SERVER_ERROR,
324 Json(json!({"error": "InternalError"})),
325 )
326 .into_response();
327 }
328 };
329 let mut missing_blobs = Vec::new();
330 let mut last_cursor = None;
331 for row in &records {
332 let collection = &row.collection;
333 let rkey = &row.rkey;
334 let record_cid_str = &row.record_cid;
335 last_cursor = Some(format!("{}|{}", collection, rkey));
336 let record_cid = match Cid::from_str(record_cid_str) {
337 Ok(c) => c,
338 Err(_) => continue,
339 };
340 let block_bytes = match state.block_store.get(&record_cid).await {
341 Ok(Some(b)) => b,
342 _ => continue,
343 };
344 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
345 Ok(v) => v,
346 Err(_) => continue,
347 };
348 let mut blobs = Vec::new();
349 find_blobs(&record_val, &mut blobs);
350 for blob_cid_str in blobs {
351 let exists = sqlx::query!(
352 "SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2",
353 blob_cid_str,
354 user_id
355 )
356 .fetch_optional(&state.db)
357 .await;
358 match exists {
359 Ok(None) => {
360 missing_blobs.push(RecordBlob {
361 cid: blob_cid_str,
362 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
363 });
364 }
365 Err(e) => {
366 error!("DB error checking blob existence: {:?}", e);
367 }
368 _ => {}
369 }
370 }
371 }
372 // if we fetched fewer records than limit, we are done, so cursor is None.
373 // otherwise, cursor is the last one we saw.
374 // ...right?
375 let next_cursor = if records.len() < limit as usize {
376 None
377 } else {
378 last_cursor
379 };
380 (
381 StatusCode::OK,
382 Json(ListMissingBlobsOutput {
383 cursor: next_cursor,
384 blobs: missing_blobs,
385 }),
386 )
387 .into_response()
388}