this repo has no description
1use crate::state::AppState;
2use axum::body::Bytes;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use multihash::Multihash;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use sha2::{Digest, Sha256};
15use std::str::FromStr;
16use tracing::error;
17
18const MAX_BLOB_SIZE: usize = 1_000_000;
19
20pub async fn upload_blob(
21 State(state): State<AppState>,
22 headers: axum::http::HeaderMap,
23 body: Bytes,
24) -> Response {
25 if body.len() > MAX_BLOB_SIZE {
26 return (
27 StatusCode::PAYLOAD_TOO_LARGE,
28 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), MAX_BLOB_SIZE)})),
29 )
30 .into_response();
31 }
32
33 let token = match crate::auth::extract_bearer_token_from_header(
34 headers.get("Authorization").and_then(|h| h.to_str().ok())
35 ) {
36 Some(t) => t,
37 None => {
38 return (
39 StatusCode::UNAUTHORIZED,
40 Json(json!({"error": "AuthenticationRequired"})),
41 )
42 .into_response();
43 }
44 };
45
46 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
47 Ok(user) => user,
48 Err(_) => {
49 return (
50 StatusCode::UNAUTHORIZED,
51 Json(json!({"error": "AuthenticationFailed"})),
52 )
53 .into_response();
54 }
55 };
56 let did = auth_user.did;
57
58 let mime_type = headers
59 .get("content-type")
60 .and_then(|h| h.to_str().ok())
61 .unwrap_or("application/octet-stream")
62 .to_string();
63
64 let size = body.len() as i64;
65 let data = body.to_vec();
66
67 let mut hasher = Sha256::new();
68 hasher.update(&data);
69 let hash = hasher.finalize();
70 let multihash = match Multihash::wrap(0x12, &hash) {
71 Ok(mh) => mh,
72 Err(e) => {
73 error!("Failed to create multihash for blob: {:?}", e);
74 return (
75 StatusCode::INTERNAL_SERVER_ERROR,
76 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
77 )
78 .into_response();
79 }
80 };
81 let cid = Cid::new_v1(0x55, multihash);
82 let cid_str = cid.to_string();
83
84 let storage_key = format!("blobs/{}", cid_str);
85
86 if let Err(e) = state.blob_store.put(&storage_key, &data).await {
87 error!("Failed to upload blob to storage: {:?}", e);
88 return (
89 StatusCode::INTERNAL_SERVER_ERROR,
90 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
91 )
92 .into_response();
93 }
94
95 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
96 .fetch_optional(&state.db)
97 .await;
98
99 let user_id = match user_query {
100 Ok(Some(row)) => row.id,
101 _ => {
102 return (
103 StatusCode::INTERNAL_SERVER_ERROR,
104 Json(json!({"error": "InternalError"})),
105 )
106 .into_response();
107 }
108 };
109
110 let insert = sqlx::query!(
111 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING",
112 cid_str,
113 mime_type,
114 size,
115 user_id,
116 storage_key
117 )
118 .execute(&state.db)
119 .await;
120
121 if let Err(e) = insert {
122 error!("Failed to insert blob record: {:?}", e);
123 return (
124 StatusCode::INTERNAL_SERVER_ERROR,
125 Json(json!({"error": "InternalError"})),
126 )
127 .into_response();
128 }
129
130 Json(json!({
131 "blob": {
132 "ref": {
133 "$link": cid_str
134 },
135 "mimeType": mime_type,
136 "size": size
137 }
138 }))
139 .into_response()
140}
141
142#[derive(Deserialize)]
143pub struct ListMissingBlobsParams {
144 pub limit: Option<i64>,
145 pub cursor: Option<String>,
146}
147
148#[derive(Serialize)]
149#[serde(rename_all = "camelCase")]
150pub struct RecordBlob {
151 pub cid: String,
152 pub record_uri: String,
153}
154
155#[derive(Serialize)]
156pub struct ListMissingBlobsOutput {
157 pub cursor: Option<String>,
158 pub blobs: Vec<RecordBlob>,
159}
160
161fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) {
162 if let Some(obj) = val.as_object() {
163 if let Some(type_val) = obj.get("$type") {
164 if type_val == "blob" {
165 if let Some(r) = obj.get("ref") {
166 if let Some(link) = r.get("$link") {
167 if let Some(s) = link.as_str() {
168 blobs.push(s.to_string());
169 }
170 }
171 }
172 }
173 }
174 for (_, v) in obj {
175 find_blobs(v, blobs);
176 }
177 } else if let Some(arr) = val.as_array() {
178 for v in arr {
179 find_blobs(v, blobs);
180 }
181 }
182}
183
184pub async fn list_missing_blobs(
185 State(state): State<AppState>,
186 headers: axum::http::HeaderMap,
187 Query(params): Query<ListMissingBlobsParams>,
188) -> Response {
189 let token = match crate::auth::extract_bearer_token_from_header(
190 headers.get("Authorization").and_then(|h| h.to_str().ok())
191 ) {
192 Some(t) => t,
193 None => {
194 return (
195 StatusCode::UNAUTHORIZED,
196 Json(json!({"error": "AuthenticationRequired"})),
197 )
198 .into_response();
199 }
200 };
201
202 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
203 Ok(user) => user,
204 Err(_) => {
205 return (
206 StatusCode::UNAUTHORIZED,
207 Json(json!({"error": "AuthenticationFailed"})),
208 )
209 .into_response();
210 }
211 };
212
213 let did = auth_user.did;
214
215 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
216 .fetch_optional(&state.db)
217 .await;
218
219 let user_id = match user_query {
220 Ok(Some(row)) => row.id,
221 _ => {
222 return (
223 StatusCode::INTERNAL_SERVER_ERROR,
224 Json(json!({"error": "InternalError"})),
225 )
226 .into_response();
227 }
228 };
229
230 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
231 let cursor_str = params.cursor.unwrap_or_default();
232 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
233 let parts: Vec<&str> = cursor_str.split('|').collect();
234 (parts[0].to_string(), parts[1].to_string())
235 } else {
236 (String::new(), String::new())
237 };
238
239 let records_query = sqlx::query!(
240 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
241 user_id,
242 cursor_collection,
243 cursor_rkey,
244 limit
245 )
246 .fetch_all(&state.db)
247 .await;
248
249 let records = match records_query {
250 Ok(r) => r,
251 Err(e) => {
252 error!("DB error fetching records: {:?}", e);
253 return (
254 StatusCode::INTERNAL_SERVER_ERROR,
255 Json(json!({"error": "InternalError"})),
256 )
257 .into_response();
258 }
259 };
260
261 let mut missing_blobs = Vec::new();
262 let mut last_cursor = None;
263
264 for row in &records {
265 let collection = &row.collection;
266 let rkey = &row.rkey;
267 let record_cid_str = &row.record_cid;
268
269 last_cursor = Some(format!("{}|{}", collection, rkey));
270
271 let record_cid = match Cid::from_str(&record_cid_str) {
272 Ok(c) => c,
273 Err(_) => continue,
274 };
275
276 let block_bytes = match state.block_store.get(&record_cid).await {
277 Ok(Some(b)) => b,
278 _ => continue,
279 };
280
281 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
282 Ok(v) => v,
283 Err(_) => continue,
284 };
285
286 let mut blobs = Vec::new();
287 find_blobs(&record_val, &mut blobs);
288
289 for blob_cid_str in blobs {
290 let exists = sqlx::query!("SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", blob_cid_str, user_id)
291 .fetch_optional(&state.db)
292 .await;
293
294 match exists {
295 Ok(None) => {
296 missing_blobs.push(RecordBlob {
297 cid: blob_cid_str,
298 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
299 });
300 }
301 Err(e) => {
302 error!("DB error checking blob existence: {:?}", e);
303 }
304 _ => {}
305 }
306 }
307 }
308
309 // if we fetched fewer records than limit, we are done, so cursor is None.
310 // otherwise, cursor is the last one we saw.
311 // ...right?
312 let next_cursor = if records.len() < limit as usize {
313 None
314 } else {
315 last_cursor
316 };
317
318 (
319 StatusCode::OK,
320 Json(ListMissingBlobsOutput {
321 cursor: next_cursor,
322 blobs: missing_blobs,
323 }),
324 )
325 .into_response()
326}