this repo has no description
1use crate::state::AppState;
2use axum::body::Bytes;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use multihash::Multihash;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use sha2::{Digest, Sha256};
15use sqlx::Row;
16use std::str::FromStr;
17use tracing::error;
18
19pub async fn upload_blob(
20 State(state): State<AppState>,
21 headers: axum::http::HeaderMap,
22 body: Bytes,
23) -> Response {
24 let auth_header = headers.get("Authorization");
25 if auth_header.is_none() {
26 return (
27 StatusCode::UNAUTHORIZED,
28 Json(json!({"error": "AuthenticationRequired"})),
29 )
30 .into_response();
31 }
32 let token = auth_header
33 .unwrap()
34 .to_str()
35 .unwrap_or("")
36 .replace("Bearer ", "");
37
38 let session = sqlx::query(
39 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
40 )
41 .bind(&token)
42 .fetch_optional(&state.db)
43 .await
44 .unwrap_or(None);
45
46 let (did, key_bytes) = match session {
47 Some(row) => (
48 row.get::<String, _>("did"),
49 row.get::<Vec<u8>, _>("key_bytes"),
50 ),
51 None => {
52 return (
53 StatusCode::UNAUTHORIZED,
54 Json(json!({"error": "AuthenticationFailed"})),
55 )
56 .into_response();
57 }
58 };
59
60 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
61 return (
62 StatusCode::UNAUTHORIZED,
63 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
64 )
65 .into_response();
66 }
67
68 let mime_type = headers
69 .get("content-type")
70 .and_then(|h| h.to_str().ok())
71 .unwrap_or("application/octet-stream")
72 .to_string();
73
74 let size = body.len() as i64;
75 let data = body.to_vec();
76
77 let mut hasher = Sha256::new();
78 hasher.update(&data);
79 let hash = hasher.finalize();
80 let multihash = Multihash::wrap(0x12, &hash).unwrap();
81 let cid = Cid::new_v1(0x55, multihash);
82 let cid_str = cid.to_string();
83
84 let storage_key = format!("blobs/{}", cid_str);
85
86 if let Err(e) = state.blob_store.put(&storage_key, &data).await {
87 error!("Failed to upload blob to storage: {:?}", e);
88 return (
89 StatusCode::INTERNAL_SERVER_ERROR,
90 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
91 )
92 .into_response();
93 }
94
95 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
96 .bind(&did)
97 .fetch_optional(&state.db)
98 .await;
99
100 let user_id: uuid::Uuid = match user_query {
101 Ok(Some(row)) => row.get("id"),
102 _ => {
103 return (
104 StatusCode::INTERNAL_SERVER_ERROR,
105 Json(json!({"error": "InternalError"})),
106 )
107 .into_response();
108 }
109 };
110
111 let insert = sqlx::query(
112 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING"
113 )
114 .bind(&cid_str)
115 .bind(&mime_type)
116 .bind(size)
117 .bind(user_id)
118 .bind(&storage_key)
119 .execute(&state.db)
120 .await;
121
122 if let Err(e) = insert {
123 error!("Failed to insert blob record: {:?}", e);
124 return (
125 StatusCode::INTERNAL_SERVER_ERROR,
126 Json(json!({"error": "InternalError"})),
127 )
128 .into_response();
129 }
130
131 Json(json!({
132 "blob": {
133 "ref": {
134 "$link": cid_str
135 },
136 "mimeType": mime_type,
137 "size": size
138 }
139 }))
140 .into_response()
141}
142
143#[derive(Deserialize)]
144pub struct ListMissingBlobsParams {
145 pub limit: Option<i64>,
146 pub cursor: Option<String>,
147}
148
149#[derive(Serialize)]
150#[serde(rename_all = "camelCase")]
151pub struct RecordBlob {
152 pub cid: String,
153 pub record_uri: String,
154}
155
156#[derive(Serialize)]
157pub struct ListMissingBlobsOutput {
158 pub cursor: Option<String>,
159 pub blobs: Vec<RecordBlob>,
160}
161
162fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) {
163 if let Some(obj) = val.as_object() {
164 if let Some(type_val) = obj.get("$type") {
165 if type_val == "blob" {
166 if let Some(r) = obj.get("ref") {
167 if let Some(link) = r.get("$link") {
168 if let Some(s) = link.as_str() {
169 blobs.push(s.to_string());
170 }
171 }
172 }
173 }
174 }
175 for (_, v) in obj {
176 find_blobs(v, blobs);
177 }
178 } else if let Some(arr) = val.as_array() {
179 for v in arr {
180 find_blobs(v, blobs);
181 }
182 }
183}
184
185pub async fn list_missing_blobs(
186 State(state): State<AppState>,
187 headers: axum::http::HeaderMap,
188 Query(params): Query<ListMissingBlobsParams>,
189) -> Response {
190 let auth_header = headers.get("Authorization");
191 if auth_header.is_none() {
192 return (
193 StatusCode::UNAUTHORIZED,
194 Json(json!({"error": "AuthenticationRequired"})),
195 )
196 .into_response();
197 }
198
199 let token = auth_header
200 .unwrap()
201 .to_str()
202 .unwrap_or("")
203 .replace("Bearer ", "");
204
205 let session = sqlx::query(
206 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
207 )
208 .bind(&token)
209 .fetch_optional(&state.db)
210 .await
211 .unwrap_or(None);
212
213 let (did, key_bytes) = match session {
214 Some(row) => (
215 row.get::<String, _>("did"),
216 row.get::<Vec<u8>, _>("key_bytes"),
217 ),
218 None => {
219 return (
220 StatusCode::UNAUTHORIZED,
221 Json(json!({"error": "AuthenticationFailed"})),
222 )
223 .into_response();
224 }
225 };
226
227 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
228 return (
229 StatusCode::UNAUTHORIZED,
230 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
231 )
232 .into_response();
233 }
234
235 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
236 .bind(&did)
237 .fetch_optional(&state.db)
238 .await;
239
240 let user_id: uuid::Uuid = match user_query {
241 Ok(Some(row)) => row.get("id"),
242 _ => {
243 return (
244 StatusCode::INTERNAL_SERVER_ERROR,
245 Json(json!({"error": "InternalError"})),
246 )
247 .into_response();
248 }
249 };
250
251 let limit = params.limit.unwrap_or(500).min(1000);
252 let cursor_str = params.cursor.unwrap_or_default();
253 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
254 let parts: Vec<&str> = cursor_str.split('|').collect();
255 (parts[0].to_string(), parts[1].to_string())
256 } else {
257 (String::new(), String::new())
258 };
259
260 let records_query = sqlx::query(
261 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4"
262 )
263 .bind(user_id)
264 .bind(cursor_collection)
265 .bind(cursor_rkey)
266 .bind(limit)
267 .fetch_all(&state.db)
268 .await;
269
270 let records = match records_query {
271 Ok(r) => r,
272 Err(e) => {
273 error!("DB error fetching records: {:?}", e);
274 return (
275 StatusCode::INTERNAL_SERVER_ERROR,
276 Json(json!({"error": "InternalError"})),
277 )
278 .into_response();
279 }
280 };
281
282 let mut missing_blobs = Vec::new();
283 let mut last_cursor = None;
284
285 for row in &records {
286 let collection: String = row.get("collection");
287 let rkey: String = row.get("rkey");
288 let record_cid_str: String = row.get("record_cid");
289
290 last_cursor = Some(format!("{}|{}", collection, rkey));
291
292 let record_cid = match Cid::from_str(&record_cid_str) {
293 Ok(c) => c,
294 Err(_) => continue,
295 };
296
297 let block_bytes = match state.block_store.get(&record_cid).await {
298 Ok(Some(b)) => b,
299 _ => continue,
300 };
301
302 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
303 Ok(v) => v,
304 Err(_) => continue,
305 };
306
307 let mut blobs = Vec::new();
308 find_blobs(&record_val, &mut blobs);
309
310 for blob_cid_str in blobs {
311 let exists = sqlx::query("SELECT 1 FROM blobs WHERE cid = $1 AND created_by_user = $2")
312 .bind(&blob_cid_str)
313 .bind(user_id)
314 .fetch_optional(&state.db)
315 .await;
316
317 match exists {
318 Ok(None) => {
319 missing_blobs.push(RecordBlob {
320 cid: blob_cid_str,
321 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
322 });
323 }
324 Err(e) => {
325 error!("DB error checking blob existence: {:?}", e);
326 }
327 _ => {}
328 }
329 }
330 }
331
332 // if we fetched fewer records than limit, we are done, so cursor is None.
333 // otherwise, cursor is the last one we saw.
334 // ...right?
335 let next_cursor = if records.len() < limit as usize {
336 None
337 } else {
338 last_cursor
339 };
340
341 (
342 StatusCode::OK,
343 Json(ListMissingBlobsOutput {
344 cursor: next_cursor,
345 blobs: missing_blobs,
346 }),
347 )
348 .into_response()
349}