this repo has no description
1use crate::state::AppState;
2use axum::body::Bytes;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use multihash::Multihash;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use sha2::{Digest, Sha256};
15use std::str::FromStr;
16use tracing::error;
17const MAX_BLOB_SIZE: usize = 1_000_000;
18pub async fn upload_blob(
19 State(state): State<AppState>,
20 headers: axum::http::HeaderMap,
21 body: Bytes,
22) -> Response {
23 if body.len() > MAX_BLOB_SIZE {
24 return (
25 StatusCode::PAYLOAD_TOO_LARGE,
26 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), MAX_BLOB_SIZE)})),
27 )
28 .into_response();
29 }
30 let token = match crate::auth::extract_bearer_token_from_header(
31 headers.get("Authorization").and_then(|h| h.to_str().ok())
32 ) {
33 Some(t) => t,
34 None => {
35 return (
36 StatusCode::UNAUTHORIZED,
37 Json(json!({"error": "AuthenticationRequired"})),
38 )
39 .into_response();
40 }
41 };
42 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
43 Ok(user) => user,
44 Err(_) => {
45 return (
46 StatusCode::UNAUTHORIZED,
47 Json(json!({"error": "AuthenticationFailed"})),
48 )
49 .into_response();
50 }
51 };
52 let did = auth_user.did;
53 let mime_type = headers
54 .get("content-type")
55 .and_then(|h| h.to_str().ok())
56 .unwrap_or("application/octet-stream")
57 .to_string();
58 let size = body.len() as i64;
59 let data = body.to_vec();
60 let mut hasher = Sha256::new();
61 hasher.update(&data);
62 let hash = hasher.finalize();
63 let multihash = match Multihash::wrap(0x12, &hash) {
64 Ok(mh) => mh,
65 Err(e) => {
66 error!("Failed to create multihash for blob: {:?}", e);
67 return (
68 StatusCode::INTERNAL_SERVER_ERROR,
69 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
70 )
71 .into_response();
72 }
73 };
74 let cid = Cid::new_v1(0x55, multihash);
75 let cid_str = cid.to_string();
76 let storage_key = format!("blobs/{}", cid_str);
77 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
78 .fetch_optional(&state.db)
79 .await;
80 let user_id = match user_query {
81 Ok(Some(row)) => row.id,
82 _ => {
83 return (
84 StatusCode::INTERNAL_SERVER_ERROR,
85 Json(json!({"error": "InternalError"})),
86 )
87 .into_response();
88 }
89 };
90 let mut tx = match state.db.begin().await {
91 Ok(tx) => tx,
92 Err(e) => {
93 error!("Failed to begin transaction: {:?}", e);
94 return (
95 StatusCode::INTERNAL_SERVER_ERROR,
96 Json(json!({"error": "InternalError"})),
97 )
98 .into_response();
99 }
100 };
101 let insert = sqlx::query!(
102 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
103 cid_str,
104 mime_type,
105 size,
106 user_id,
107 storage_key
108 )
109 .fetch_optional(&mut *tx)
110 .await;
111 let was_inserted = match insert {
112 Ok(Some(_)) => true,
113 Ok(None) => false,
114 Err(e) => {
115 error!("Failed to insert blob record: {:?}", e);
116 return (
117 StatusCode::INTERNAL_SERVER_ERROR,
118 Json(json!({"error": "InternalError"})),
119 )
120 .into_response();
121 }
122 };
123 if was_inserted {
124 if let Err(e) = state.blob_store.put_bytes(&storage_key, bytes::Bytes::from(data)).await {
125 error!("Failed to upload blob to storage: {:?}", e);
126 return (
127 StatusCode::INTERNAL_SERVER_ERROR,
128 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
129 )
130 .into_response();
131 }
132 }
133 if let Err(e) = tx.commit().await {
134 error!("Failed to commit blob transaction: {:?}", e);
135 if was_inserted {
136 if let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
137 error!("Failed to cleanup orphaned blob {}: {:?}", storage_key, cleanup_err);
138 }
139 }
140 return (
141 StatusCode::INTERNAL_SERVER_ERROR,
142 Json(json!({"error": "InternalError"})),
143 )
144 .into_response();
145 }
146 Json(json!({
147 "blob": {
148 "ref": {
149 "$link": cid_str
150 },
151 "mimeType": mime_type,
152 "size": size
153 }
154 }))
155 .into_response()
156}
157#[derive(Deserialize)]
158pub struct ListMissingBlobsParams {
159 pub limit: Option<i64>,
160 pub cursor: Option<String>,
161}
162#[derive(Serialize)]
163#[serde(rename_all = "camelCase")]
164pub struct RecordBlob {
165 pub cid: String,
166 pub record_uri: String,
167}
168#[derive(Serialize)]
169pub struct ListMissingBlobsOutput {
170 pub cursor: Option<String>,
171 pub blobs: Vec<RecordBlob>,
172}
173fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) {
174 if let Some(obj) = val.as_object() {
175 if let Some(type_val) = obj.get("$type") {
176 if type_val == "blob" {
177 if let Some(r) = obj.get("ref") {
178 if let Some(link) = r.get("$link") {
179 if let Some(s) = link.as_str() {
180 blobs.push(s.to_string());
181 }
182 }
183 }
184 }
185 }
186 for (_, v) in obj {
187 find_blobs(v, blobs);
188 }
189 } else if let Some(arr) = val.as_array() {
190 for v in arr {
191 find_blobs(v, blobs);
192 }
193 }
194}
195pub async fn list_missing_blobs(
196 State(state): State<AppState>,
197 headers: axum::http::HeaderMap,
198 Query(params): Query<ListMissingBlobsParams>,
199) -> Response {
200 let token = match crate::auth::extract_bearer_token_from_header(
201 headers.get("Authorization").and_then(|h| h.to_str().ok())
202 ) {
203 Some(t) => t,
204 None => {
205 return (
206 StatusCode::UNAUTHORIZED,
207 Json(json!({"error": "AuthenticationRequired"})),
208 )
209 .into_response();
210 }
211 };
212 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
213 Ok(user) => user,
214 Err(_) => {
215 return (
216 StatusCode::UNAUTHORIZED,
217 Json(json!({"error": "AuthenticationFailed"})),
218 )
219 .into_response();
220 }
221 };
222 let did = auth_user.did;
223 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
224 .fetch_optional(&state.db)
225 .await;
226 let user_id = match user_query {
227 Ok(Some(row)) => row.id,
228 _ => {
229 return (
230 StatusCode::INTERNAL_SERVER_ERROR,
231 Json(json!({"error": "InternalError"})),
232 )
233 .into_response();
234 }
235 };
236 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
237 let cursor_str = params.cursor.unwrap_or_default();
238 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
239 let parts: Vec<&str> = cursor_str.split('|').collect();
240 (parts[0].to_string(), parts[1].to_string())
241 } else {
242 (String::new(), String::new())
243 };
244 let records_query = sqlx::query!(
245 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
246 user_id,
247 cursor_collection,
248 cursor_rkey,
249 limit
250 )
251 .fetch_all(&state.db)
252 .await;
253 let records = match records_query {
254 Ok(r) => r,
255 Err(e) => {
256 error!("DB error fetching records: {:?}", e);
257 return (
258 StatusCode::INTERNAL_SERVER_ERROR,
259 Json(json!({"error": "InternalError"})),
260 )
261 .into_response();
262 }
263 };
264 let mut missing_blobs = Vec::new();
265 let mut last_cursor = None;
266 for row in &records {
267 let collection = &row.collection;
268 let rkey = &row.rkey;
269 let record_cid_str = &row.record_cid;
270 last_cursor = Some(format!("{}|{}", collection, rkey));
271 let record_cid = match Cid::from_str(&record_cid_str) {
272 Ok(c) => c,
273 Err(_) => continue,
274 };
275 let block_bytes = match state.block_store.get(&record_cid).await {
276 Ok(Some(b)) => b,
277 _ => continue,
278 };
279 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
280 Ok(v) => v,
281 Err(_) => continue,
282 };
283 let mut blobs = Vec::new();
284 find_blobs(&record_val, &mut blobs);
285 for blob_cid_str in blobs {
286 let exists = sqlx::query!("SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", blob_cid_str, user_id)
287 .fetch_optional(&state.db)
288 .await;
289 match exists {
290 Ok(None) => {
291 missing_blobs.push(RecordBlob {
292 cid: blob_cid_str,
293 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
294 });
295 }
296 Err(e) => {
297 error!("DB error checking blob existence: {:?}", e);
298 }
299 _ => {}
300 }
301 }
302 }
303 // if we fetched fewer records than limit, we are done, so cursor is None.
304 // otherwise, cursor is the last one we saw.
305 // ...right?
306 let next_cursor = if records.len() < limit as usize {
307 None
308 } else {
309 last_cursor
310 };
311 (
312 StatusCode::OK,
313 Json(ListMissingBlobsOutput {
314 cursor: next_cursor,
315 blobs: missing_blobs,
316 }),
317 )
318 .into_response()
319}