this repo has no description
1use crate::state::AppState;
2use axum::body::Bytes;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use multihash::Multihash;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use sha2::{Digest, Sha256};
15use std::str::FromStr;
16use tracing::error;
17
18const MAX_BLOB_SIZE: usize = 1_000_000;
19
20pub async fn upload_blob(
21 State(state): State<AppState>,
22 headers: axum::http::HeaderMap,
23 body: Bytes,
24) -> Response {
25 if body.len() > MAX_BLOB_SIZE {
26 return (
27 StatusCode::PAYLOAD_TOO_LARGE,
28 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), MAX_BLOB_SIZE)})),
29 )
30 .into_response();
31 }
32 let token = match crate::auth::extract_bearer_token_from_header(
33 headers.get("Authorization").and_then(|h| h.to_str().ok())
34 ) {
35 Some(t) => t,
36 None => {
37 return (
38 StatusCode::UNAUTHORIZED,
39 Json(json!({"error": "AuthenticationRequired"})),
40 )
41 .into_response();
42 }
43 };
44 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
45 Ok(user) => user,
46 Err(_) => {
47 return (
48 StatusCode::UNAUTHORIZED,
49 Json(json!({"error": "AuthenticationFailed"})),
50 )
51 .into_response();
52 }
53 };
54 let did = auth_user.did;
55 let mime_type = headers
56 .get("content-type")
57 .and_then(|h| h.to_str().ok())
58 .unwrap_or("application/octet-stream")
59 .to_string();
60 let size = body.len() as i64;
61 let data = body.to_vec();
62 let mut hasher = Sha256::new();
63 hasher.update(&data);
64 let hash = hasher.finalize();
65 let multihash = match Multihash::wrap(0x12, &hash) {
66 Ok(mh) => mh,
67 Err(e) => {
68 error!("Failed to create multihash for blob: {:?}", e);
69 return (
70 StatusCode::INTERNAL_SERVER_ERROR,
71 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
72 )
73 .into_response();
74 }
75 };
76 let cid = Cid::new_v1(0x55, multihash);
77 let cid_str = cid.to_string();
78 let storage_key = format!("blobs/{}", cid_str);
79 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
80 .fetch_optional(&state.db)
81 .await;
82 let user_id = match user_query {
83 Ok(Some(row)) => row.id,
84 _ => {
85 return (
86 StatusCode::INTERNAL_SERVER_ERROR,
87 Json(json!({"error": "InternalError"})),
88 )
89 .into_response();
90 }
91 };
92 let mut tx = match state.db.begin().await {
93 Ok(tx) => tx,
94 Err(e) => {
95 error!("Failed to begin transaction: {:?}", e);
96 return (
97 StatusCode::INTERNAL_SERVER_ERROR,
98 Json(json!({"error": "InternalError"})),
99 )
100 .into_response();
101 }
102 };
103 let insert = sqlx::query!(
104 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
105 cid_str,
106 mime_type,
107 size,
108 user_id,
109 storage_key
110 )
111 .fetch_optional(&mut *tx)
112 .await;
113 let was_inserted = match insert {
114 Ok(Some(_)) => true,
115 Ok(None) => false,
116 Err(e) => {
117 error!("Failed to insert blob record: {:?}", e);
118 return (
119 StatusCode::INTERNAL_SERVER_ERROR,
120 Json(json!({"error": "InternalError"})),
121 )
122 .into_response();
123 }
124 };
125 if was_inserted {
126 if let Err(e) = state.blob_store.put_bytes(&storage_key, bytes::Bytes::from(data)).await {
127 error!("Failed to upload blob to storage: {:?}", e);
128 return (
129 StatusCode::INTERNAL_SERVER_ERROR,
130 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
131 )
132 .into_response();
133 }
134 }
135 if let Err(e) = tx.commit().await {
136 error!("Failed to commit blob transaction: {:?}", e);
137 if was_inserted {
138 if let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
139 error!("Failed to cleanup orphaned blob {}: {:?}", storage_key, cleanup_err);
140 }
141 }
142 return (
143 StatusCode::INTERNAL_SERVER_ERROR,
144 Json(json!({"error": "InternalError"})),
145 )
146 .into_response();
147 }
148 Json(json!({
149 "blob": {
150 "$type": "blob",
151 "ref": {
152 "$link": cid_str
153 },
154 "mimeType": mime_type,
155 "size": size
156 }
157 }))
158 .into_response()
159}
160
161#[derive(Deserialize)]
162pub struct ListMissingBlobsParams {
163 pub limit: Option<i64>,
164 pub cursor: Option<String>,
165}
166
167#[derive(Serialize)]
168#[serde(rename_all = "camelCase")]
169pub struct RecordBlob {
170 pub cid: String,
171 pub record_uri: String,
172}
173
174#[derive(Serialize)]
175pub struct ListMissingBlobsOutput {
176 pub cursor: Option<String>,
177 pub blobs: Vec<RecordBlob>,
178}
179
180fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) {
181 if let Some(obj) = val.as_object() {
182 if let Some(type_val) = obj.get("$type") {
183 if type_val == "blob" {
184 if let Some(r) = obj.get("ref") {
185 if let Some(link) = r.get("$link") {
186 if let Some(s) = link.as_str() {
187 blobs.push(s.to_string());
188 }
189 }
190 }
191 }
192 }
193 for (_, v) in obj {
194 find_blobs(v, blobs);
195 }
196 } else if let Some(arr) = val.as_array() {
197 for v in arr {
198 find_blobs(v, blobs);
199 }
200 }
201}
202
203pub async fn list_missing_blobs(
204 State(state): State<AppState>,
205 headers: axum::http::HeaderMap,
206 Query(params): Query<ListMissingBlobsParams>,
207) -> Response {
208 let token = match crate::auth::extract_bearer_token_from_header(
209 headers.get("Authorization").and_then(|h| h.to_str().ok())
210 ) {
211 Some(t) => t,
212 None => {
213 return (
214 StatusCode::UNAUTHORIZED,
215 Json(json!({"error": "AuthenticationRequired"})),
216 )
217 .into_response();
218 }
219 };
220 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
221 Ok(user) => user,
222 Err(_) => {
223 return (
224 StatusCode::UNAUTHORIZED,
225 Json(json!({"error": "AuthenticationFailed"})),
226 )
227 .into_response();
228 }
229 };
230 let did = auth_user.did;
231 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
232 .fetch_optional(&state.db)
233 .await;
234 let user_id = match user_query {
235 Ok(Some(row)) => row.id,
236 _ => {
237 return (
238 StatusCode::INTERNAL_SERVER_ERROR,
239 Json(json!({"error": "InternalError"})),
240 )
241 .into_response();
242 }
243 };
244 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
245 let cursor_str = params.cursor.unwrap_or_default();
246 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
247 let parts: Vec<&str> = cursor_str.split('|').collect();
248 (parts[0].to_string(), parts[1].to_string())
249 } else {
250 (String::new(), String::new())
251 };
252 let records_query = sqlx::query!(
253 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
254 user_id,
255 cursor_collection,
256 cursor_rkey,
257 limit
258 )
259 .fetch_all(&state.db)
260 .await;
261 let records = match records_query {
262 Ok(r) => r,
263 Err(e) => {
264 error!("DB error fetching records: {:?}", e);
265 return (
266 StatusCode::INTERNAL_SERVER_ERROR,
267 Json(json!({"error": "InternalError"})),
268 )
269 .into_response();
270 }
271 };
272 let mut missing_blobs = Vec::new();
273 let mut last_cursor = None;
274 for row in &records {
275 let collection = &row.collection;
276 let rkey = &row.rkey;
277 let record_cid_str = &row.record_cid;
278 last_cursor = Some(format!("{}|{}", collection, rkey));
279 let record_cid = match Cid::from_str(&record_cid_str) {
280 Ok(c) => c,
281 Err(_) => continue,
282 };
283 let block_bytes = match state.block_store.get(&record_cid).await {
284 Ok(Some(b)) => b,
285 _ => continue,
286 };
287 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
288 Ok(v) => v,
289 Err(_) => continue,
290 };
291 let mut blobs = Vec::new();
292 find_blobs(&record_val, &mut blobs);
293 for blob_cid_str in blobs {
294 let exists = sqlx::query!("SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", blob_cid_str, user_id)
295 .fetch_optional(&state.db)
296 .await;
297 match exists {
298 Ok(None) => {
299 missing_blobs.push(RecordBlob {
300 cid: blob_cid_str,
301 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
302 });
303 }
304 Err(e) => {
305 error!("DB error checking blob existence: {:?}", e);
306 }
307 _ => {}
308 }
309 }
310 }
311 // if we fetched fewer records than limit, we are done, so cursor is None.
312 // otherwise, cursor is the last one we saw.
313 // ...right?
314 let next_cursor = if records.len() < limit as usize {
315 None
316 } else {
317 last_cursor
318 };
319 (
320 StatusCode::OK,
321 Json(ListMissingBlobsOutput {
322 cursor: next_cursor,
323 blobs: missing_blobs,
324 }),
325 )
326 .into_response()
327}