this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token};
2use crate::state::AppState;
3use axum::body::Bytes;
4use axum::{
5 Json,
6 extract::{Query, State},
7 http::StatusCode,
8 response::{IntoResponse, Response},
9};
10use cid::Cid;
11use jacquard_repo::storage::BlockStore;
12use multihash::Multihash;
13use serde::{Deserialize, Serialize};
14use serde_json::json;
15use sha2::{Digest, Sha256};
16use std::str::FromStr;
17use tracing::{debug, error};
18
19const MAX_BLOB_SIZE: usize = 1_000_000;
20const MAX_VIDEO_BLOB_SIZE: usize = 100_000_000;
21
22pub async fn upload_blob(
23 State(state): State<AppState>,
24 headers: axum::http::HeaderMap,
25 body: Bytes,
26) -> Response {
27 let token = match crate::auth::extract_bearer_token_from_header(
28 headers.get("Authorization").and_then(|h| h.to_str().ok()),
29 ) {
30 Some(t) => t,
31 None => {
32 return (
33 StatusCode::UNAUTHORIZED,
34 Json(json!({"error": "AuthenticationRequired"})),
35 )
36 .into_response();
37 }
38 };
39
40 let is_service_auth = is_service_token(&token);
41
42 let (did, is_migration) = if is_service_auth {
43 debug!("Verifying service token for blob upload");
44 let verifier = ServiceTokenVerifier::new();
45 match verifier
46 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob"))
47 .await
48 {
49 Ok(claims) => {
50 debug!("Service token verified for DID: {}", claims.iss);
51 (claims.iss, false)
52 }
53 Err(e) => {
54 error!("Service token verification failed: {:?}", e);
55 return (
56 StatusCode::UNAUTHORIZED,
57 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})),
58 )
59 .into_response();
60 }
61 }
62 } else {
63 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
64 Ok(user) => {
65 let deactivated = sqlx::query_scalar!(
66 "SELECT deactivated_at FROM users WHERE did = $1",
67 user.did
68 )
69 .fetch_optional(&state.db)
70 .await
71 .ok()
72 .flatten()
73 .flatten();
74 (user.did, deactivated.is_some())
75 }
76 Err(_) => {
77 return (
78 StatusCode::UNAUTHORIZED,
79 Json(json!({"error": "AuthenticationFailed"})),
80 )
81 .into_response();
82 }
83 }
84 };
85
86 let max_size = if is_service_auth || is_migration {
87 MAX_VIDEO_BLOB_SIZE
88 } else {
89 MAX_BLOB_SIZE
90 };
91
92 if body.len() > max_size {
93 return (
94 StatusCode::PAYLOAD_TOO_LARGE,
95 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), max_size)})),
96 )
97 .into_response();
98 }
99 let mime_type = headers
100 .get("content-type")
101 .and_then(|h| h.to_str().ok())
102 .unwrap_or("application/octet-stream")
103 .to_string();
104 let size = body.len() as i64;
105 let data = body.to_vec();
106 let mut hasher = Sha256::new();
107 hasher.update(&data);
108 let hash = hasher.finalize();
109 let multihash = match Multihash::wrap(0x12, &hash) {
110 Ok(mh) => mh,
111 Err(e) => {
112 error!("Failed to create multihash for blob: {:?}", e);
113 return (
114 StatusCode::INTERNAL_SERVER_ERROR,
115 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
116 )
117 .into_response();
118 }
119 };
120 let cid = Cid::new_v1(0x55, multihash);
121 let cid_str = cid.to_string();
122 let storage_key = format!("blobs/{}", cid_str);
123 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
124 .fetch_optional(&state.db)
125 .await;
126 let user_id = match user_query {
127 Ok(Some(row)) => row.id,
128 _ => {
129 return (
130 StatusCode::INTERNAL_SERVER_ERROR,
131 Json(json!({"error": "InternalError"})),
132 )
133 .into_response();
134 }
135 };
136 let mut tx = match state.db.begin().await {
137 Ok(tx) => tx,
138 Err(e) => {
139 error!("Failed to begin transaction: {:?}", e);
140 return (
141 StatusCode::INTERNAL_SERVER_ERROR,
142 Json(json!({"error": "InternalError"})),
143 )
144 .into_response();
145 }
146 };
147 let insert = sqlx::query!(
148 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
149 cid_str,
150 mime_type,
151 size,
152 user_id,
153 storage_key
154 )
155 .fetch_optional(&mut *tx)
156 .await;
157 let was_inserted = match insert {
158 Ok(Some(_)) => true,
159 Ok(None) => false,
160 Err(e) => {
161 error!("Failed to insert blob record: {:?}", e);
162 return (
163 StatusCode::INTERNAL_SERVER_ERROR,
164 Json(json!({"error": "InternalError"})),
165 )
166 .into_response();
167 }
168 };
169 if was_inserted
170 && let Err(e) = state
171 .blob_store
172 .put_bytes(&storage_key, bytes::Bytes::from(data))
173 .await
174 {
175 error!("Failed to upload blob to storage: {:?}", e);
176 return (
177 StatusCode::INTERNAL_SERVER_ERROR,
178 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
179 )
180 .into_response();
181 }
182 if let Err(e) = tx.commit().await {
183 error!("Failed to commit blob transaction: {:?}", e);
184 if was_inserted
185 && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
186 error!(
187 "Failed to cleanup orphaned blob {}: {:?}",
188 storage_key, cleanup_err
189 );
190 }
191 return (
192 StatusCode::INTERNAL_SERVER_ERROR,
193 Json(json!({"error": "InternalError"})),
194 )
195 .into_response();
196 }
197 Json(json!({
198 "blob": {
199 "$type": "blob",
200 "ref": {
201 "$link": cid_str
202 },
203 "mimeType": mime_type,
204 "size": size
205 }
206 }))
207 .into_response()
208}
209
210#[derive(Deserialize)]
211pub struct ListMissingBlobsParams {
212 pub limit: Option<i64>,
213 pub cursor: Option<String>,
214}
215
216#[derive(Serialize)]
217#[serde(rename_all = "camelCase")]
218pub struct RecordBlob {
219 pub cid: String,
220 pub record_uri: String,
221}
222
223#[derive(Serialize)]
224pub struct ListMissingBlobsOutput {
225 #[serde(skip_serializing_if = "Option::is_none")]
226 pub cursor: Option<String>,
227 pub blobs: Vec<RecordBlob>,
228}
229
230fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) {
231 if let Some(obj) = val.as_object() {
232 if let Some(type_val) = obj.get("$type")
233 && type_val == "blob"
234 && let Some(r) = obj.get("ref")
235 && let Some(link) = r.get("$link")
236 && let Some(s) = link.as_str() {
237 blobs.push(s.to_string());
238 }
239 for (_, v) in obj {
240 find_blobs(v, blobs);
241 }
242 } else if let Some(arr) = val.as_array() {
243 for v in arr {
244 find_blobs(v, blobs);
245 }
246 }
247}
248
249pub async fn list_missing_blobs(
250 State(state): State<AppState>,
251 headers: axum::http::HeaderMap,
252 Query(params): Query<ListMissingBlobsParams>,
253) -> Response {
254 let token = match crate::auth::extract_bearer_token_from_header(
255 headers.get("Authorization").and_then(|h| h.to_str().ok()),
256 ) {
257 Some(t) => t,
258 None => {
259 return (
260 StatusCode::UNAUTHORIZED,
261 Json(json!({"error": "AuthenticationRequired"})),
262 )
263 .into_response();
264 }
265 };
266 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
267 Ok(user) => user,
268 Err(_) => {
269 return (
270 StatusCode::UNAUTHORIZED,
271 Json(json!({"error": "AuthenticationFailed"})),
272 )
273 .into_response();
274 }
275 };
276 let did = auth_user.did;
277 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
278 .fetch_optional(&state.db)
279 .await;
280 let user_id = match user_query {
281 Ok(Some(row)) => row.id,
282 _ => {
283 return (
284 StatusCode::INTERNAL_SERVER_ERROR,
285 Json(json!({"error": "InternalError"})),
286 )
287 .into_response();
288 }
289 };
290 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
291 let cursor_str = params.cursor.unwrap_or_default();
292 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
293 let parts: Vec<&str> = cursor_str.split('|').collect();
294 (parts[0].to_string(), parts[1].to_string())
295 } else {
296 (String::new(), String::new())
297 };
298 let records_query = sqlx::query!(
299 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
300 user_id,
301 cursor_collection,
302 cursor_rkey,
303 limit
304 )
305 .fetch_all(&state.db)
306 .await;
307 let records = match records_query {
308 Ok(r) => r,
309 Err(e) => {
310 error!("DB error fetching records: {:?}", e);
311 return (
312 StatusCode::INTERNAL_SERVER_ERROR,
313 Json(json!({"error": "InternalError"})),
314 )
315 .into_response();
316 }
317 };
318 let mut missing_blobs = Vec::new();
319 let mut last_cursor = None;
320 for row in &records {
321 let collection = &row.collection;
322 let rkey = &row.rkey;
323 let record_cid_str = &row.record_cid;
324 last_cursor = Some(format!("{}|{}", collection, rkey));
325 let record_cid = match Cid::from_str(record_cid_str) {
326 Ok(c) => c,
327 Err(_) => continue,
328 };
329 let block_bytes = match state.block_store.get(&record_cid).await {
330 Ok(Some(b)) => b,
331 _ => continue,
332 };
333 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
334 Ok(v) => v,
335 Err(_) => continue,
336 };
337 let mut blobs = Vec::new();
338 find_blobs(&record_val, &mut blobs);
339 for blob_cid_str in blobs {
340 let exists = sqlx::query!(
341 "SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2",
342 blob_cid_str,
343 user_id
344 )
345 .fetch_optional(&state.db)
346 .await;
347 match exists {
348 Ok(None) => {
349 missing_blobs.push(RecordBlob {
350 cid: blob_cid_str,
351 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
352 });
353 }
354 Err(e) => {
355 error!("DB error checking blob existence: {:?}", e);
356 }
357 _ => {}
358 }
359 }
360 }
361 // if we fetched fewer records than limit, we are done, so cursor is None.
362 // otherwise, cursor is the last one we saw.
363 // ...right?
364 let next_cursor = if records.len() < limit as usize {
365 None
366 } else {
367 last_cursor
368 };
369 (
370 StatusCode::OK,
371 Json(ListMissingBlobsOutput {
372 cursor: next_cursor,
373 blobs: missing_blobs,
374 }),
375 )
376 .into_response()
377}