this repo has no description
1use crate::state::AppState;
2use axum::body::Bytes;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use multihash::Multihash;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use sha2::{Digest, Sha256};
15use std::str::FromStr;
16use tracing::error;
17
18const MAX_BLOB_SIZE: usize = 1_000_000;
19
20pub async fn upload_blob(
21 State(state): State<AppState>,
22 headers: axum::http::HeaderMap,
23 body: Bytes,
24) -> Response {
25 if body.len() > MAX_BLOB_SIZE {
26 return (
27 StatusCode::PAYLOAD_TOO_LARGE,
28 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), MAX_BLOB_SIZE)})),
29 )
30 .into_response();
31 }
32
33 let token = match crate::auth::extract_bearer_token_from_header(
34 headers.get("Authorization").and_then(|h| h.to_str().ok())
35 ) {
36 Some(t) => t,
37 None => {
38 return (
39 StatusCode::UNAUTHORIZED,
40 Json(json!({"error": "AuthenticationRequired"})),
41 )
42 .into_response();
43 }
44 };
45
46 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
47 Ok(user) => user,
48 Err(_) => {
49 return (
50 StatusCode::UNAUTHORIZED,
51 Json(json!({"error": "AuthenticationFailed"})),
52 )
53 .into_response();
54 }
55 };
56 let did = auth_user.did;
57
58 let mime_type = headers
59 .get("content-type")
60 .and_then(|h| h.to_str().ok())
61 .unwrap_or("application/octet-stream")
62 .to_string();
63
64 let size = body.len() as i64;
65 let data = body.to_vec();
66
67 let mut hasher = Sha256::new();
68 hasher.update(&data);
69 let hash = hasher.finalize();
70 let multihash = match Multihash::wrap(0x12, &hash) {
71 Ok(mh) => mh,
72 Err(e) => {
73 error!("Failed to create multihash for blob: {:?}", e);
74 return (
75 StatusCode::INTERNAL_SERVER_ERROR,
76 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
77 )
78 .into_response();
79 }
80 };
81 let cid = Cid::new_v1(0x55, multihash);
82 let cid_str = cid.to_string();
83
84 let storage_key = format!("blobs/{}", cid_str);
85
86 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
87 .fetch_optional(&state.db)
88 .await;
89
90 let user_id = match user_query {
91 Ok(Some(row)) => row.id,
92 _ => {
93 return (
94 StatusCode::INTERNAL_SERVER_ERROR,
95 Json(json!({"error": "InternalError"})),
96 )
97 .into_response();
98 }
99 };
100
101 let mut tx = match state.db.begin().await {
102 Ok(tx) => tx,
103 Err(e) => {
104 error!("Failed to begin transaction: {:?}", e);
105 return (
106 StatusCode::INTERNAL_SERVER_ERROR,
107 Json(json!({"error": "InternalError"})),
108 )
109 .into_response();
110 }
111 };
112
113 let insert = sqlx::query!(
114 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
115 cid_str,
116 mime_type,
117 size,
118 user_id,
119 storage_key
120 )
121 .fetch_optional(&mut *tx)
122 .await;
123
124 let was_inserted = match insert {
125 Ok(Some(_)) => true,
126 Ok(None) => false,
127 Err(e) => {
128 error!("Failed to insert blob record: {:?}", e);
129 return (
130 StatusCode::INTERNAL_SERVER_ERROR,
131 Json(json!({"error": "InternalError"})),
132 )
133 .into_response();
134 }
135 };
136
137 if was_inserted {
138 if let Err(e) = state.blob_store.put_bytes(&storage_key, bytes::Bytes::from(data)).await {
139 error!("Failed to upload blob to storage: {:?}", e);
140 return (
141 StatusCode::INTERNAL_SERVER_ERROR,
142 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
143 )
144 .into_response();
145 }
146 }
147
148 if let Err(e) = tx.commit().await {
149 error!("Failed to commit blob transaction: {:?}", e);
150 if was_inserted {
151 if let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
152 error!("Failed to cleanup orphaned blob {}: {:?}", storage_key, cleanup_err);
153 }
154 }
155 return (
156 StatusCode::INTERNAL_SERVER_ERROR,
157 Json(json!({"error": "InternalError"})),
158 )
159 .into_response();
160 }
161
162 Json(json!({
163 "blob": {
164 "ref": {
165 "$link": cid_str
166 },
167 "mimeType": mime_type,
168 "size": size
169 }
170 }))
171 .into_response()
172}
173
174#[derive(Deserialize)]
175pub struct ListMissingBlobsParams {
176 pub limit: Option<i64>,
177 pub cursor: Option<String>,
178}
179
180#[derive(Serialize)]
181#[serde(rename_all = "camelCase")]
182pub struct RecordBlob {
183 pub cid: String,
184 pub record_uri: String,
185}
186
187#[derive(Serialize)]
188pub struct ListMissingBlobsOutput {
189 pub cursor: Option<String>,
190 pub blobs: Vec<RecordBlob>,
191}
192
193fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) {
194 if let Some(obj) = val.as_object() {
195 if let Some(type_val) = obj.get("$type") {
196 if type_val == "blob" {
197 if let Some(r) = obj.get("ref") {
198 if let Some(link) = r.get("$link") {
199 if let Some(s) = link.as_str() {
200 blobs.push(s.to_string());
201 }
202 }
203 }
204 }
205 }
206 for (_, v) in obj {
207 find_blobs(v, blobs);
208 }
209 } else if let Some(arr) = val.as_array() {
210 for v in arr {
211 find_blobs(v, blobs);
212 }
213 }
214}
215
216pub async fn list_missing_blobs(
217 State(state): State<AppState>,
218 headers: axum::http::HeaderMap,
219 Query(params): Query<ListMissingBlobsParams>,
220) -> Response {
221 let token = match crate::auth::extract_bearer_token_from_header(
222 headers.get("Authorization").and_then(|h| h.to_str().ok())
223 ) {
224 Some(t) => t,
225 None => {
226 return (
227 StatusCode::UNAUTHORIZED,
228 Json(json!({"error": "AuthenticationRequired"})),
229 )
230 .into_response();
231 }
232 };
233
234 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
235 Ok(user) => user,
236 Err(_) => {
237 return (
238 StatusCode::UNAUTHORIZED,
239 Json(json!({"error": "AuthenticationFailed"})),
240 )
241 .into_response();
242 }
243 };
244
245 let did = auth_user.did;
246
247 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
248 .fetch_optional(&state.db)
249 .await;
250
251 let user_id = match user_query {
252 Ok(Some(row)) => row.id,
253 _ => {
254 return (
255 StatusCode::INTERNAL_SERVER_ERROR,
256 Json(json!({"error": "InternalError"})),
257 )
258 .into_response();
259 }
260 };
261
262 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
263 let cursor_str = params.cursor.unwrap_or_default();
264 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
265 let parts: Vec<&str> = cursor_str.split('|').collect();
266 (parts[0].to_string(), parts[1].to_string())
267 } else {
268 (String::new(), String::new())
269 };
270
271 let records_query = sqlx::query!(
272 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
273 user_id,
274 cursor_collection,
275 cursor_rkey,
276 limit
277 )
278 .fetch_all(&state.db)
279 .await;
280
281 let records = match records_query {
282 Ok(r) => r,
283 Err(e) => {
284 error!("DB error fetching records: {:?}", e);
285 return (
286 StatusCode::INTERNAL_SERVER_ERROR,
287 Json(json!({"error": "InternalError"})),
288 )
289 .into_response();
290 }
291 };
292
293 let mut missing_blobs = Vec::new();
294 let mut last_cursor = None;
295
296 for row in &records {
297 let collection = &row.collection;
298 let rkey = &row.rkey;
299 let record_cid_str = &row.record_cid;
300
301 last_cursor = Some(format!("{}|{}", collection, rkey));
302
303 let record_cid = match Cid::from_str(&record_cid_str) {
304 Ok(c) => c,
305 Err(_) => continue,
306 };
307
308 let block_bytes = match state.block_store.get(&record_cid).await {
309 Ok(Some(b)) => b,
310 _ => continue,
311 };
312
313 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
314 Ok(v) => v,
315 Err(_) => continue,
316 };
317
318 let mut blobs = Vec::new();
319 find_blobs(&record_val, &mut blobs);
320
321 for blob_cid_str in blobs {
322 let exists = sqlx::query!("SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", blob_cid_str, user_id)
323 .fetch_optional(&state.db)
324 .await;
325
326 match exists {
327 Ok(None) => {
328 missing_blobs.push(RecordBlob {
329 cid: blob_cid_str,
330 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
331 });
332 }
333 Err(e) => {
334 error!("DB error checking blob existence: {:?}", e);
335 }
336 _ => {}
337 }
338 }
339 }
340
341 // if we fetched fewer records than limit, we are done, so cursor is None.
342 // otherwise, cursor is the last one we saw.
343 // ...right?
344 let next_cursor = if records.len() < limit as usize {
345 None
346 } else {
347 last_cursor
348 };
349
350 (
351 StatusCode::OK,
352 Json(ListMissingBlobsOutput {
353 cursor: next_cursor,
354 blobs: missing_blobs,
355 }),
356 )
357 .into_response()
358}