this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token};
2use crate::state::AppState;
3use axum::body::Bytes;
4use axum::{
5 Json,
6 extract::{Query, State},
7 http::StatusCode,
8 response::{IntoResponse, Response},
9};
10use cid::Cid;
11use jacquard_repo::storage::BlockStore;
12use multihash::Multihash;
13use serde::{Deserialize, Serialize};
14use serde_json::json;
15use sha2::{Digest, Sha256};
16use std::str::FromStr;
17use tracing::{debug, error};
18
19const MAX_BLOB_SIZE: usize = 1_000_000;
20const MAX_VIDEO_BLOB_SIZE: usize = 100_000_000;
21
22pub async fn upload_blob(
23 State(state): State<AppState>,
24 headers: axum::http::HeaderMap,
25 body: Bytes,
26) -> Response {
27 let token = match crate::auth::extract_bearer_token_from_header(
28 headers.get("Authorization").and_then(|h| h.to_str().ok()),
29 ) {
30 Some(t) => t,
31 None => {
32 return (
33 StatusCode::UNAUTHORIZED,
34 Json(json!({"error": "AuthenticationRequired"})),
35 )
36 .into_response();
37 }
38 };
39
40 let is_service_auth = is_service_token(&token);
41
42 let (did, is_migration) = if is_service_auth {
43 debug!("Verifying service token for blob upload");
44 let verifier = ServiceTokenVerifier::new();
45 match verifier
46 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob"))
47 .await
48 {
49 Ok(claims) => {
50 debug!("Service token verified for DID: {}", claims.iss);
51 (claims.iss, false)
52 }
53 Err(e) => {
54 error!("Service token verification failed: {:?}", e);
55 return (
56 StatusCode::UNAUTHORIZED,
57 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})),
58 )
59 .into_response();
60 }
61 }
62 } else {
63 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
64 Ok(user) => {
65 let deactivated = sqlx::query_scalar!(
66 "SELECT deactivated_at FROM users WHERE did = $1",
67 user.did
68 )
69 .fetch_optional(&state.db)
70 .await
71 .ok()
72 .flatten()
73 .flatten();
74 (user.did, deactivated.is_some())
75 }
76 Err(_) => {
77 return (
78 StatusCode::UNAUTHORIZED,
79 Json(json!({"error": "AuthenticationFailed"})),
80 )
81 .into_response();
82 }
83 }
84 };
85
86 let max_size = if is_service_auth || is_migration {
87 MAX_VIDEO_BLOB_SIZE
88 } else {
89 MAX_BLOB_SIZE
90 };
91
92 if body.len() > max_size {
93 return (
94 StatusCode::PAYLOAD_TOO_LARGE,
95 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), max_size)})),
96 )
97 .into_response();
98 }
99 let mime_type = headers
100 .get("content-type")
101 .and_then(|h| h.to_str().ok())
102 .unwrap_or("application/octet-stream")
103 .to_string();
104 let size = body.len() as i64;
105 let data = body.to_vec();
106 let mut hasher = Sha256::new();
107 hasher.update(&data);
108 let hash = hasher.finalize();
109 let multihash = match Multihash::wrap(0x12, &hash) {
110 Ok(mh) => mh,
111 Err(e) => {
112 error!("Failed to create multihash for blob: {:?}", e);
113 return (
114 StatusCode::INTERNAL_SERVER_ERROR,
115 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
116 )
117 .into_response();
118 }
119 };
120 let cid = Cid::new_v1(0x55, multihash);
121 let cid_str = cid.to_string();
122 let storage_key = format!("blobs/{}", cid_str);
123 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
124 .fetch_optional(&state.db)
125 .await;
126 let user_id = match user_query {
127 Ok(Some(row)) => row.id,
128 _ => {
129 return (
130 StatusCode::INTERNAL_SERVER_ERROR,
131 Json(json!({"error": "InternalError"})),
132 )
133 .into_response();
134 }
135 };
136 let mut tx = match state.db.begin().await {
137 Ok(tx) => tx,
138 Err(e) => {
139 error!("Failed to begin transaction: {:?}", e);
140 return (
141 StatusCode::INTERNAL_SERVER_ERROR,
142 Json(json!({"error": "InternalError"})),
143 )
144 .into_response();
145 }
146 };
147 let insert = sqlx::query!(
148 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
149 cid_str,
150 mime_type,
151 size,
152 user_id,
153 storage_key
154 )
155 .fetch_optional(&mut *tx)
156 .await;
157 let was_inserted = match insert {
158 Ok(Some(_)) => true,
159 Ok(None) => false,
160 Err(e) => {
161 error!("Failed to insert blob record: {:?}", e);
162 return (
163 StatusCode::INTERNAL_SERVER_ERROR,
164 Json(json!({"error": "InternalError"})),
165 )
166 .into_response();
167 }
168 };
169 if was_inserted
170 && let Err(e) = state
171 .blob_store
172 .put_bytes(&storage_key, bytes::Bytes::from(data))
173 .await
174 {
175 error!("Failed to upload blob to storage: {:?}", e);
176 return (
177 StatusCode::INTERNAL_SERVER_ERROR,
178 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
179 )
180 .into_response();
181 }
182 if let Err(e) = tx.commit().await {
183 error!("Failed to commit blob transaction: {:?}", e);
184 if was_inserted
185 && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
186 error!(
187 "Failed to cleanup orphaned blob {}: {:?}",
188 storage_key, cleanup_err
189 );
190 }
191 return (
192 StatusCode::INTERNAL_SERVER_ERROR,
193 Json(json!({"error": "InternalError"})),
194 )
195 .into_response();
196 }
197 Json(json!({
198 "blob": {
199 "$type": "blob",
200 "ref": {
201 "$link": cid_str
202 },
203 "mimeType": mime_type,
204 "size": size
205 }
206 }))
207 .into_response()
208}
209
210#[derive(Deserialize)]
211pub struct ListMissingBlobsParams {
212 pub limit: Option<i64>,
213 pub cursor: Option<String>,
214}
215
216#[derive(Serialize)]
217#[serde(rename_all = "camelCase")]
218pub struct RecordBlob {
219 pub cid: String,
220 pub record_uri: String,
221}
222
223#[derive(Serialize)]
224pub struct ListMissingBlobsOutput {
225 pub cursor: Option<String>,
226 pub blobs: Vec<RecordBlob>,
227}
228
229fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) {
230 if let Some(obj) = val.as_object() {
231 if let Some(type_val) = obj.get("$type")
232 && type_val == "blob"
233 && let Some(r) = obj.get("ref")
234 && let Some(link) = r.get("$link")
235 && let Some(s) = link.as_str() {
236 blobs.push(s.to_string());
237 }
238 for (_, v) in obj {
239 find_blobs(v, blobs);
240 }
241 } else if let Some(arr) = val.as_array() {
242 for v in arr {
243 find_blobs(v, blobs);
244 }
245 }
246}
247
248pub async fn list_missing_blobs(
249 State(state): State<AppState>,
250 headers: axum::http::HeaderMap,
251 Query(params): Query<ListMissingBlobsParams>,
252) -> Response {
253 let token = match crate::auth::extract_bearer_token_from_header(
254 headers.get("Authorization").and_then(|h| h.to_str().ok()),
255 ) {
256 Some(t) => t,
257 None => {
258 return (
259 StatusCode::UNAUTHORIZED,
260 Json(json!({"error": "AuthenticationRequired"})),
261 )
262 .into_response();
263 }
264 };
265 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
266 Ok(user) => user,
267 Err(_) => {
268 return (
269 StatusCode::UNAUTHORIZED,
270 Json(json!({"error": "AuthenticationFailed"})),
271 )
272 .into_response();
273 }
274 };
275 let did = auth_user.did;
276 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
277 .fetch_optional(&state.db)
278 .await;
279 let user_id = match user_query {
280 Ok(Some(row)) => row.id,
281 _ => {
282 return (
283 StatusCode::INTERNAL_SERVER_ERROR,
284 Json(json!({"error": "InternalError"})),
285 )
286 .into_response();
287 }
288 };
289 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
290 let cursor_str = params.cursor.unwrap_or_default();
291 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
292 let parts: Vec<&str> = cursor_str.split('|').collect();
293 (parts[0].to_string(), parts[1].to_string())
294 } else {
295 (String::new(), String::new())
296 };
297 let records_query = sqlx::query!(
298 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
299 user_id,
300 cursor_collection,
301 cursor_rkey,
302 limit
303 )
304 .fetch_all(&state.db)
305 .await;
306 let records = match records_query {
307 Ok(r) => r,
308 Err(e) => {
309 error!("DB error fetching records: {:?}", e);
310 return (
311 StatusCode::INTERNAL_SERVER_ERROR,
312 Json(json!({"error": "InternalError"})),
313 )
314 .into_response();
315 }
316 };
317 let mut missing_blobs = Vec::new();
318 let mut last_cursor = None;
319 for row in &records {
320 let collection = &row.collection;
321 let rkey = &row.rkey;
322 let record_cid_str = &row.record_cid;
323 last_cursor = Some(format!("{}|{}", collection, rkey));
324 let record_cid = match Cid::from_str(record_cid_str) {
325 Ok(c) => c,
326 Err(_) => continue,
327 };
328 let block_bytes = match state.block_store.get(&record_cid).await {
329 Ok(Some(b)) => b,
330 _ => continue,
331 };
332 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
333 Ok(v) => v,
334 Err(_) => continue,
335 };
336 let mut blobs = Vec::new();
337 find_blobs(&record_val, &mut blobs);
338 for blob_cid_str in blobs {
339 let exists = sqlx::query!(
340 "SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2",
341 blob_cid_str,
342 user_id
343 )
344 .fetch_optional(&state.db)
345 .await;
346 match exists {
347 Ok(None) => {
348 missing_blobs.push(RecordBlob {
349 cid: blob_cid_str,
350 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
351 });
352 }
353 Err(e) => {
354 error!("DB error checking blob existence: {:?}", e);
355 }
356 _ => {}
357 }
358 }
359 }
360 // if we fetched fewer records than limit, we are done, so cursor is None.
361 // otherwise, cursor is the last one we saw.
362 // ...right?
363 let next_cursor = if records.len() < limit as usize {
364 None
365 } else {
366 last_cursor
367 };
368 (
369 StatusCode::OK,
370 Json(ListMissingBlobsOutput {
371 cursor: next_cursor,
372 blobs: missing_blobs,
373 }),
374 )
375 .into_response()
376}