this repo has no description
1use crate::state::AppState;
2use axum::body::Bytes;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use multihash::Multihash;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use sha2::{Digest, Sha256};
15use std::str::FromStr;
16use tracing::error;
17
18const MAX_BLOB_SIZE: usize = 1_000_000;
19
20pub async fn upload_blob(
21 State(state): State<AppState>,
22 headers: axum::http::HeaderMap,
23 body: Bytes,
24) -> Response {
25 if body.len() > MAX_BLOB_SIZE {
26 return (
27 StatusCode::PAYLOAD_TOO_LARGE,
28 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), MAX_BLOB_SIZE)})),
29 )
30 .into_response();
31 }
32 let token = match crate::auth::extract_bearer_token_from_header(
33 headers.get("Authorization").and_then(|h| h.to_str().ok())
34 ) {
35 Some(t) => t,
36 None => {
37 return (
38 StatusCode::UNAUTHORIZED,
39 Json(json!({"error": "AuthenticationRequired"})),
40 )
41 .into_response();
42 }
43 };
44 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
45 Ok(user) => user,
46 Err(_) => {
47 return (
48 StatusCode::UNAUTHORIZED,
49 Json(json!({"error": "AuthenticationFailed"})),
50 )
51 .into_response();
52 }
53 };
54 let did = auth_user.did;
55 let mime_type = headers
56 .get("content-type")
57 .and_then(|h| h.to_str().ok())
58 .unwrap_or("application/octet-stream")
59 .to_string();
60 let size = body.len() as i64;
61 let data = body.to_vec();
62 let mut hasher = Sha256::new();
63 hasher.update(&data);
64 let hash = hasher.finalize();
65 let multihash = match Multihash::wrap(0x12, &hash) {
66 Ok(mh) => mh,
67 Err(e) => {
68 error!("Failed to create multihash for blob: {:?}", e);
69 return (
70 StatusCode::INTERNAL_SERVER_ERROR,
71 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
72 )
73 .into_response();
74 }
75 };
76 let cid = Cid::new_v1(0x55, multihash);
77 let cid_str = cid.to_string();
78 let storage_key = format!("blobs/{}", cid_str);
79 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
80 .fetch_optional(&state.db)
81 .await;
82 let user_id = match user_query {
83 Ok(Some(row)) => row.id,
84 _ => {
85 return (
86 StatusCode::INTERNAL_SERVER_ERROR,
87 Json(json!({"error": "InternalError"})),
88 )
89 .into_response();
90 }
91 };
92 let mut tx = match state.db.begin().await {
93 Ok(tx) => tx,
94 Err(e) => {
95 error!("Failed to begin transaction: {:?}", e);
96 return (
97 StatusCode::INTERNAL_SERVER_ERROR,
98 Json(json!({"error": "InternalError"})),
99 )
100 .into_response();
101 }
102 };
103 let insert = sqlx::query!(
104 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
105 cid_str,
106 mime_type,
107 size,
108 user_id,
109 storage_key
110 )
111 .fetch_optional(&mut *tx)
112 .await;
113 let was_inserted = match insert {
114 Ok(Some(_)) => true,
115 Ok(None) => false,
116 Err(e) => {
117 error!("Failed to insert blob record: {:?}", e);
118 return (
119 StatusCode::INTERNAL_SERVER_ERROR,
120 Json(json!({"error": "InternalError"})),
121 )
122 .into_response();
123 }
124 };
125 if was_inserted {
126 if let Err(e) = state.blob_store.put_bytes(&storage_key, bytes::Bytes::from(data)).await {
127 error!("Failed to upload blob to storage: {:?}", e);
128 return (
129 StatusCode::INTERNAL_SERVER_ERROR,
130 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
131 )
132 .into_response();
133 }
134 }
135 if let Err(e) = tx.commit().await {
136 error!("Failed to commit blob transaction: {:?}", e);
137 if was_inserted {
138 if let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
139 error!("Failed to cleanup orphaned blob {}: {:?}", storage_key, cleanup_err);
140 }
141 }
142 return (
143 StatusCode::INTERNAL_SERVER_ERROR,
144 Json(json!({"error": "InternalError"})),
145 )
146 .into_response();
147 }
148 Json(json!({
149 "blob": {
150 "ref": {
151 "$link": cid_str
152 },
153 "mimeType": mime_type,
154 "size": size
155 }
156 }))
157 .into_response()
158}
159
160#[derive(Deserialize)]
161pub struct ListMissingBlobsParams {
162 pub limit: Option<i64>,
163 pub cursor: Option<String>,
164}
165
166#[derive(Serialize)]
167#[serde(rename_all = "camelCase")]
168pub struct RecordBlob {
169 pub cid: String,
170 pub record_uri: String,
171}
172
173#[derive(Serialize)]
174pub struct ListMissingBlobsOutput {
175 pub cursor: Option<String>,
176 pub blobs: Vec<RecordBlob>,
177}
178
179fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) {
180 if let Some(obj) = val.as_object() {
181 if let Some(type_val) = obj.get("$type") {
182 if type_val == "blob" {
183 if let Some(r) = obj.get("ref") {
184 if let Some(link) = r.get("$link") {
185 if let Some(s) = link.as_str() {
186 blobs.push(s.to_string());
187 }
188 }
189 }
190 }
191 }
192 for (_, v) in obj {
193 find_blobs(v, blobs);
194 }
195 } else if let Some(arr) = val.as_array() {
196 for v in arr {
197 find_blobs(v, blobs);
198 }
199 }
200}
201
202pub async fn list_missing_blobs(
203 State(state): State<AppState>,
204 headers: axum::http::HeaderMap,
205 Query(params): Query<ListMissingBlobsParams>,
206) -> Response {
207 let token = match crate::auth::extract_bearer_token_from_header(
208 headers.get("Authorization").and_then(|h| h.to_str().ok())
209 ) {
210 Some(t) => t,
211 None => {
212 return (
213 StatusCode::UNAUTHORIZED,
214 Json(json!({"error": "AuthenticationRequired"})),
215 )
216 .into_response();
217 }
218 };
219 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
220 Ok(user) => user,
221 Err(_) => {
222 return (
223 StatusCode::UNAUTHORIZED,
224 Json(json!({"error": "AuthenticationFailed"})),
225 )
226 .into_response();
227 }
228 };
229 let did = auth_user.did;
230 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
231 .fetch_optional(&state.db)
232 .await;
233 let user_id = match user_query {
234 Ok(Some(row)) => row.id,
235 _ => {
236 return (
237 StatusCode::INTERNAL_SERVER_ERROR,
238 Json(json!({"error": "InternalError"})),
239 )
240 .into_response();
241 }
242 };
243 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
244 let cursor_str = params.cursor.unwrap_or_default();
245 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
246 let parts: Vec<&str> = cursor_str.split('|').collect();
247 (parts[0].to_string(), parts[1].to_string())
248 } else {
249 (String::new(), String::new())
250 };
251 let records_query = sqlx::query!(
252 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
253 user_id,
254 cursor_collection,
255 cursor_rkey,
256 limit
257 )
258 .fetch_all(&state.db)
259 .await;
260 let records = match records_query {
261 Ok(r) => r,
262 Err(e) => {
263 error!("DB error fetching records: {:?}", e);
264 return (
265 StatusCode::INTERNAL_SERVER_ERROR,
266 Json(json!({"error": "InternalError"})),
267 )
268 .into_response();
269 }
270 };
271 let mut missing_blobs = Vec::new();
272 let mut last_cursor = None;
273 for row in &records {
274 let collection = &row.collection;
275 let rkey = &row.rkey;
276 let record_cid_str = &row.record_cid;
277 last_cursor = Some(format!("{}|{}", collection, rkey));
278 let record_cid = match Cid::from_str(&record_cid_str) {
279 Ok(c) => c,
280 Err(_) => continue,
281 };
282 let block_bytes = match state.block_store.get(&record_cid).await {
283 Ok(Some(b)) => b,
284 _ => continue,
285 };
286 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
287 Ok(v) => v,
288 Err(_) => continue,
289 };
290 let mut blobs = Vec::new();
291 find_blobs(&record_val, &mut blobs);
292 for blob_cid_str in blobs {
293 let exists = sqlx::query!("SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", blob_cid_str, user_id)
294 .fetch_optional(&state.db)
295 .await;
296 match exists {
297 Ok(None) => {
298 missing_blobs.push(RecordBlob {
299 cid: blob_cid_str,
300 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
301 });
302 }
303 Err(e) => {
304 error!("DB error checking blob existence: {:?}", e);
305 }
306 _ => {}
307 }
308 }
309 }
310 // if we fetched fewer records than limit, we are done, so cursor is None.
311 // otherwise, cursor is the last one we saw.
312 // ...right?
313 let next_cursor = if records.len() < limit as usize {
314 None
315 } else {
316 last_cursor
317 };
318 (
319 StatusCode::OK,
320 Json(ListMissingBlobsOutput {
321 cursor: next_cursor,
322 blobs: missing_blobs,
323 }),
324 )
325 .into_response()
326}