this repo has no description
1use crate::state::AppState;
2use axum::body::Bytes;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use multihash::Multihash;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use sha2::{Digest, Sha256};
15use std::str::FromStr;
16use tracing::error;
17
18const MAX_BLOB_SIZE: usize = 1_000_000;
19
20pub async fn upload_blob(
21 State(state): State<AppState>,
22 headers: axum::http::HeaderMap,
23 body: Bytes,
24) -> Response {
25 if body.len() > MAX_BLOB_SIZE {
26 return (
27 StatusCode::PAYLOAD_TOO_LARGE,
28 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), MAX_BLOB_SIZE)})),
29 )
30 .into_response();
31 }
32 let token = match crate::auth::extract_bearer_token_from_header(
33 headers.get("Authorization").and_then(|h| h.to_str().ok()),
34 ) {
35 Some(t) => t,
36 None => {
37 return (
38 StatusCode::UNAUTHORIZED,
39 Json(json!({"error": "AuthenticationRequired"})),
40 )
41 .into_response();
42 }
43 };
44 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
45 Ok(user) => user,
46 Err(_) => {
47 return (
48 StatusCode::UNAUTHORIZED,
49 Json(json!({"error": "AuthenticationFailed"})),
50 )
51 .into_response();
52 }
53 };
54 let did = auth_user.did;
55 let mime_type = headers
56 .get("content-type")
57 .and_then(|h| h.to_str().ok())
58 .unwrap_or("application/octet-stream")
59 .to_string();
60 let size = body.len() as i64;
61 let data = body.to_vec();
62 let mut hasher = Sha256::new();
63 hasher.update(&data);
64 let hash = hasher.finalize();
65 let multihash = match Multihash::wrap(0x12, &hash) {
66 Ok(mh) => mh,
67 Err(e) => {
68 error!("Failed to create multihash for blob: {:?}", e);
69 return (
70 StatusCode::INTERNAL_SERVER_ERROR,
71 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
72 )
73 .into_response();
74 }
75 };
76 let cid = Cid::new_v1(0x55, multihash);
77 let cid_str = cid.to_string();
78 let storage_key = format!("blobs/{}", cid_str);
79 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
80 .fetch_optional(&state.db)
81 .await;
82 let user_id = match user_query {
83 Ok(Some(row)) => row.id,
84 _ => {
85 return (
86 StatusCode::INTERNAL_SERVER_ERROR,
87 Json(json!({"error": "InternalError"})),
88 )
89 .into_response();
90 }
91 };
92 let mut tx = match state.db.begin().await {
93 Ok(tx) => tx,
94 Err(e) => {
95 error!("Failed to begin transaction: {:?}", e);
96 return (
97 StatusCode::INTERNAL_SERVER_ERROR,
98 Json(json!({"error": "InternalError"})),
99 )
100 .into_response();
101 }
102 };
103 let insert = sqlx::query!(
104 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
105 cid_str,
106 mime_type,
107 size,
108 user_id,
109 storage_key
110 )
111 .fetch_optional(&mut *tx)
112 .await;
113 let was_inserted = match insert {
114 Ok(Some(_)) => true,
115 Ok(None) => false,
116 Err(e) => {
117 error!("Failed to insert blob record: {:?}", e);
118 return (
119 StatusCode::INTERNAL_SERVER_ERROR,
120 Json(json!({"error": "InternalError"})),
121 )
122 .into_response();
123 }
124 };
125 if was_inserted
126 && let Err(e) = state
127 .blob_store
128 .put_bytes(&storage_key, bytes::Bytes::from(data))
129 .await
130 {
131 error!("Failed to upload blob to storage: {:?}", e);
132 return (
133 StatusCode::INTERNAL_SERVER_ERROR,
134 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
135 )
136 .into_response();
137 }
138 if let Err(e) = tx.commit().await {
139 error!("Failed to commit blob transaction: {:?}", e);
140 if was_inserted
141 && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
142 error!(
143 "Failed to cleanup orphaned blob {}: {:?}",
144 storage_key, cleanup_err
145 );
146 }
147 return (
148 StatusCode::INTERNAL_SERVER_ERROR,
149 Json(json!({"error": "InternalError"})),
150 )
151 .into_response();
152 }
153 Json(json!({
154 "blob": {
155 "$type": "blob",
156 "ref": {
157 "$link": cid_str
158 },
159 "mimeType": mime_type,
160 "size": size
161 }
162 }))
163 .into_response()
164}
165
166#[derive(Deserialize)]
167pub struct ListMissingBlobsParams {
168 pub limit: Option<i64>,
169 pub cursor: Option<String>,
170}
171
172#[derive(Serialize)]
173#[serde(rename_all = "camelCase")]
174pub struct RecordBlob {
175 pub cid: String,
176 pub record_uri: String,
177}
178
179#[derive(Serialize)]
180pub struct ListMissingBlobsOutput {
181 pub cursor: Option<String>,
182 pub blobs: Vec<RecordBlob>,
183}
184
185fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) {
186 if let Some(obj) = val.as_object() {
187 if let Some(type_val) = obj.get("$type")
188 && type_val == "blob"
189 && let Some(r) = obj.get("ref")
190 && let Some(link) = r.get("$link")
191 && let Some(s) = link.as_str() {
192 blobs.push(s.to_string());
193 }
194 for (_, v) in obj {
195 find_blobs(v, blobs);
196 }
197 } else if let Some(arr) = val.as_array() {
198 for v in arr {
199 find_blobs(v, blobs);
200 }
201 }
202}
203
204pub async fn list_missing_blobs(
205 State(state): State<AppState>,
206 headers: axum::http::HeaderMap,
207 Query(params): Query<ListMissingBlobsParams>,
208) -> Response {
209 let token = match crate::auth::extract_bearer_token_from_header(
210 headers.get("Authorization").and_then(|h| h.to_str().ok()),
211 ) {
212 Some(t) => t,
213 None => {
214 return (
215 StatusCode::UNAUTHORIZED,
216 Json(json!({"error": "AuthenticationRequired"})),
217 )
218 .into_response();
219 }
220 };
221 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
222 Ok(user) => user,
223 Err(_) => {
224 return (
225 StatusCode::UNAUTHORIZED,
226 Json(json!({"error": "AuthenticationFailed"})),
227 )
228 .into_response();
229 }
230 };
231 let did = auth_user.did;
232 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
233 .fetch_optional(&state.db)
234 .await;
235 let user_id = match user_query {
236 Ok(Some(row)) => row.id,
237 _ => {
238 return (
239 StatusCode::INTERNAL_SERVER_ERROR,
240 Json(json!({"error": "InternalError"})),
241 )
242 .into_response();
243 }
244 };
245 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
246 let cursor_str = params.cursor.unwrap_or_default();
247 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
248 let parts: Vec<&str> = cursor_str.split('|').collect();
249 (parts[0].to_string(), parts[1].to_string())
250 } else {
251 (String::new(), String::new())
252 };
253 let records_query = sqlx::query!(
254 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
255 user_id,
256 cursor_collection,
257 cursor_rkey,
258 limit
259 )
260 .fetch_all(&state.db)
261 .await;
262 let records = match records_query {
263 Ok(r) => r,
264 Err(e) => {
265 error!("DB error fetching records: {:?}", e);
266 return (
267 StatusCode::INTERNAL_SERVER_ERROR,
268 Json(json!({"error": "InternalError"})),
269 )
270 .into_response();
271 }
272 };
273 let mut missing_blobs = Vec::new();
274 let mut last_cursor = None;
275 for row in &records {
276 let collection = &row.collection;
277 let rkey = &row.rkey;
278 let record_cid_str = &row.record_cid;
279 last_cursor = Some(format!("{}|{}", collection, rkey));
280 let record_cid = match Cid::from_str(record_cid_str) {
281 Ok(c) => c,
282 Err(_) => continue,
283 };
284 let block_bytes = match state.block_store.get(&record_cid).await {
285 Ok(Some(b)) => b,
286 _ => continue,
287 };
288 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
289 Ok(v) => v,
290 Err(_) => continue,
291 };
292 let mut blobs = Vec::new();
293 find_blobs(&record_val, &mut blobs);
294 for blob_cid_str in blobs {
295 let exists = sqlx::query!(
296 "SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2",
297 blob_cid_str,
298 user_id
299 )
300 .fetch_optional(&state.db)
301 .await;
302 match exists {
303 Ok(None) => {
304 missing_blobs.push(RecordBlob {
305 cid: blob_cid_str,
306 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
307 });
308 }
309 Err(e) => {
310 error!("DB error checking blob existence: {:?}", e);
311 }
312 _ => {}
313 }
314 }
315 }
316 // if we fetched fewer records than limit, we are done, so cursor is None.
317 // otherwise, cursor is the last one we saw.
318 // ...right?
319 let next_cursor = if records.len() < limit as usize {
320 None
321 } else {
322 last_cursor
323 };
324 (
325 StatusCode::OK,
326 Json(ListMissingBlobsOutput {
327 cursor: next_cursor,
328 blobs: missing_blobs,
329 }),
330 )
331 .into_response()
332}