this repo has no description
1use crate::state::AppState;
2use axum::body::Bytes;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use multihash::Multihash;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use sha2::{Digest, Sha256};
15use std::str::FromStr;
16use tracing::error;
17
18pub async fn upload_blob(
19 State(state): State<AppState>,
20 headers: axum::http::HeaderMap,
21 body: Bytes,
22) -> Response {
23 let auth_header = headers.get("Authorization");
24 if auth_header.is_none() {
25 return (
26 StatusCode::UNAUTHORIZED,
27 Json(json!({"error": "AuthenticationRequired"})),
28 )
29 .into_response();
30 }
31 let token = auth_header
32 .unwrap()
33 .to_str()
34 .unwrap_or("")
35 .replace("Bearer ", "");
36
37 let session = sqlx::query!(
38 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1",
39 token
40 )
41 .fetch_optional(&state.db)
42 .await
43 .unwrap_or(None);
44
45 let (did, key_bytes) = match session {
46 Some(row) => (row.did, row.key_bytes),
47 None => {
48 return (
49 StatusCode::UNAUTHORIZED,
50 Json(json!({"error": "AuthenticationFailed"})),
51 )
52 .into_response();
53 }
54 };
55
56 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
57 return (
58 StatusCode::UNAUTHORIZED,
59 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
60 )
61 .into_response();
62 }
63
64 let mime_type = headers
65 .get("content-type")
66 .and_then(|h| h.to_str().ok())
67 .unwrap_or("application/octet-stream")
68 .to_string();
69
70 let size = body.len() as i64;
71 let data = body.to_vec();
72
73 let mut hasher = Sha256::new();
74 hasher.update(&data);
75 let hash = hasher.finalize();
76 let multihash = Multihash::wrap(0x12, &hash).unwrap();
77 let cid = Cid::new_v1(0x55, multihash);
78 let cid_str = cid.to_string();
79
80 let storage_key = format!("blobs/{}", cid_str);
81
82 if let Err(e) = state.blob_store.put(&storage_key, &data).await {
83 error!("Failed to upload blob to storage: {:?}", e);
84 return (
85 StatusCode::INTERNAL_SERVER_ERROR,
86 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
87 )
88 .into_response();
89 }
90
91 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
92 .fetch_optional(&state.db)
93 .await;
94
95 let user_id = match user_query {
96 Ok(Some(row)) => row.id,
97 _ => {
98 return (
99 StatusCode::INTERNAL_SERVER_ERROR,
100 Json(json!({"error": "InternalError"})),
101 )
102 .into_response();
103 }
104 };
105
106 let insert = sqlx::query!(
107 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING",
108 cid_str,
109 mime_type,
110 size,
111 user_id,
112 storage_key
113 )
114 .execute(&state.db)
115 .await;
116
117 if let Err(e) = insert {
118 error!("Failed to insert blob record: {:?}", e);
119 return (
120 StatusCode::INTERNAL_SERVER_ERROR,
121 Json(json!({"error": "InternalError"})),
122 )
123 .into_response();
124 }
125
126 Json(json!({
127 "blob": {
128 "ref": {
129 "$link": cid_str
130 },
131 "mimeType": mime_type,
132 "size": size
133 }
134 }))
135 .into_response()
136}
137
138#[derive(Deserialize)]
139pub struct ListMissingBlobsParams {
140 pub limit: Option<i64>,
141 pub cursor: Option<String>,
142}
143
144#[derive(Serialize)]
145#[serde(rename_all = "camelCase")]
146pub struct RecordBlob {
147 pub cid: String,
148 pub record_uri: String,
149}
150
151#[derive(Serialize)]
152pub struct ListMissingBlobsOutput {
153 pub cursor: Option<String>,
154 pub blobs: Vec<RecordBlob>,
155}
156
157fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) {
158 if let Some(obj) = val.as_object() {
159 if let Some(type_val) = obj.get("$type") {
160 if type_val == "blob" {
161 if let Some(r) = obj.get("ref") {
162 if let Some(link) = r.get("$link") {
163 if let Some(s) = link.as_str() {
164 blobs.push(s.to_string());
165 }
166 }
167 }
168 }
169 }
170 for (_, v) in obj {
171 find_blobs(v, blobs);
172 }
173 } else if let Some(arr) = val.as_array() {
174 for v in arr {
175 find_blobs(v, blobs);
176 }
177 }
178}
179
180pub async fn list_missing_blobs(
181 State(state): State<AppState>,
182 headers: axum::http::HeaderMap,
183 Query(params): Query<ListMissingBlobsParams>,
184) -> Response {
185 let auth_header = headers.get("Authorization");
186 if auth_header.is_none() {
187 return (
188 StatusCode::UNAUTHORIZED,
189 Json(json!({"error": "AuthenticationRequired"})),
190 )
191 .into_response();
192 }
193
194 let token = auth_header
195 .unwrap()
196 .to_str()
197 .unwrap_or("")
198 .replace("Bearer ", "");
199
200 let session = sqlx::query!(
201 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1",
202 token
203 )
204 .fetch_optional(&state.db)
205 .await
206 .unwrap_or(None);
207
208 let (did, key_bytes) = match session {
209 Some(row) => (row.did, row.key_bytes),
210 None => {
211 return (
212 StatusCode::UNAUTHORIZED,
213 Json(json!({"error": "AuthenticationFailed"})),
214 )
215 .into_response();
216 }
217 };
218
219 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
220 return (
221 StatusCode::UNAUTHORIZED,
222 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
223 )
224 .into_response();
225 }
226
227 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
228 .fetch_optional(&state.db)
229 .await;
230
231 let user_id = match user_query {
232 Ok(Some(row)) => row.id,
233 _ => {
234 return (
235 StatusCode::INTERNAL_SERVER_ERROR,
236 Json(json!({"error": "InternalError"})),
237 )
238 .into_response();
239 }
240 };
241
242 let limit = params.limit.unwrap_or(500).min(1000);
243 let cursor_str = params.cursor.unwrap_or_default();
244 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
245 let parts: Vec<&str> = cursor_str.split('|').collect();
246 (parts[0].to_string(), parts[1].to_string())
247 } else {
248 (String::new(), String::new())
249 };
250
251 let records_query = sqlx::query!(
252 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
253 user_id,
254 cursor_collection,
255 cursor_rkey,
256 limit
257 )
258 .fetch_all(&state.db)
259 .await;
260
261 let records = match records_query {
262 Ok(r) => r,
263 Err(e) => {
264 error!("DB error fetching records: {:?}", e);
265 return (
266 StatusCode::INTERNAL_SERVER_ERROR,
267 Json(json!({"error": "InternalError"})),
268 )
269 .into_response();
270 }
271 };
272
273 let mut missing_blobs = Vec::new();
274 let mut last_cursor = None;
275
276 for row in &records {
277 let collection = &row.collection;
278 let rkey = &row.rkey;
279 let record_cid_str = &row.record_cid;
280
281 last_cursor = Some(format!("{}|{}", collection, rkey));
282
283 let record_cid = match Cid::from_str(&record_cid_str) {
284 Ok(c) => c,
285 Err(_) => continue,
286 };
287
288 let block_bytes = match state.block_store.get(&record_cid).await {
289 Ok(Some(b)) => b,
290 _ => continue,
291 };
292
293 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
294 Ok(v) => v,
295 Err(_) => continue,
296 };
297
298 let mut blobs = Vec::new();
299 find_blobs(&record_val, &mut blobs);
300
301 for blob_cid_str in blobs {
302 let exists = sqlx::query!("SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", blob_cid_str, user_id)
303 .fetch_optional(&state.db)
304 .await;
305
306 match exists {
307 Ok(None) => {
308 missing_blobs.push(RecordBlob {
309 cid: blob_cid_str,
310 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
311 });
312 }
313 Err(e) => {
314 error!("DB error checking blob existence: {:?}", e);
315 }
316 _ => {}
317 }
318 }
319 }
320
321 // if we fetched fewer records than limit, we are done, so cursor is None.
322 // otherwise, cursor is the last one we saw.
323 // ...right?
324 let next_cursor = if records.len() < limit as usize {
325 None
326 } else {
327 last_cursor
328 };
329
330 (
331 StatusCode::OK,
332 Json(ListMissingBlobsOutput {
333 cursor: next_cursor,
334 blobs: missing_blobs,
335 }),
336 )
337 .into_response()
338}