this repo has no description
1use crate::state::AppState;
2use axum::body::Bytes;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use multihash::Multihash;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use sha2::{Digest, Sha256};
15use std::str::FromStr;
16use tracing::error;
17
18pub async fn upload_blob(
19 State(state): State<AppState>,
20 headers: axum::http::HeaderMap,
21 body: Bytes,
22) -> Response {
23 let token = match crate::auth::extract_bearer_token_from_header(
24 headers.get("Authorization").and_then(|h| h.to_str().ok())
25 ) {
26 Some(t) => t,
27 None => {
28 return (
29 StatusCode::UNAUTHORIZED,
30 Json(json!({"error": "AuthenticationRequired"})),
31 )
32 .into_response();
33 }
34 };
35
36 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
37 Ok(user) => user,
38 Err(_) => {
39 return (
40 StatusCode::UNAUTHORIZED,
41 Json(json!({"error": "AuthenticationFailed"})),
42 )
43 .into_response();
44 }
45 };
46 let did = auth_user.did;
47
48 let mime_type = headers
49 .get("content-type")
50 .and_then(|h| h.to_str().ok())
51 .unwrap_or("application/octet-stream")
52 .to_string();
53
54 let size = body.len() as i64;
55 let data = body.to_vec();
56
57 let mut hasher = Sha256::new();
58 hasher.update(&data);
59 let hash = hasher.finalize();
60 let multihash = Multihash::wrap(0x12, &hash).unwrap();
61 let cid = Cid::new_v1(0x55, multihash);
62 let cid_str = cid.to_string();
63
64 let storage_key = format!("blobs/{}", cid_str);
65
66 if let Err(e) = state.blob_store.put(&storage_key, &data).await {
67 error!("Failed to upload blob to storage: {:?}", e);
68 return (
69 StatusCode::INTERNAL_SERVER_ERROR,
70 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
71 )
72 .into_response();
73 }
74
75 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
76 .fetch_optional(&state.db)
77 .await;
78
79 let user_id = match user_query {
80 Ok(Some(row)) => row.id,
81 _ => {
82 return (
83 StatusCode::INTERNAL_SERVER_ERROR,
84 Json(json!({"error": "InternalError"})),
85 )
86 .into_response();
87 }
88 };
89
90 let insert = sqlx::query!(
91 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING",
92 cid_str,
93 mime_type,
94 size,
95 user_id,
96 storage_key
97 )
98 .execute(&state.db)
99 .await;
100
101 if let Err(e) = insert {
102 error!("Failed to insert blob record: {:?}", e);
103 return (
104 StatusCode::INTERNAL_SERVER_ERROR,
105 Json(json!({"error": "InternalError"})),
106 )
107 .into_response();
108 }
109
110 Json(json!({
111 "blob": {
112 "ref": {
113 "$link": cid_str
114 },
115 "mimeType": mime_type,
116 "size": size
117 }
118 }))
119 .into_response()
120}
121
122#[derive(Deserialize)]
123pub struct ListMissingBlobsParams {
124 pub limit: Option<i64>,
125 pub cursor: Option<String>,
126}
127
128#[derive(Serialize)]
129#[serde(rename_all = "camelCase")]
130pub struct RecordBlob {
131 pub cid: String,
132 pub record_uri: String,
133}
134
135#[derive(Serialize)]
136pub struct ListMissingBlobsOutput {
137 pub cursor: Option<String>,
138 pub blobs: Vec<RecordBlob>,
139}
140
141fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) {
142 if let Some(obj) = val.as_object() {
143 if let Some(type_val) = obj.get("$type") {
144 if type_val == "blob" {
145 if let Some(r) = obj.get("ref") {
146 if let Some(link) = r.get("$link") {
147 if let Some(s) = link.as_str() {
148 blobs.push(s.to_string());
149 }
150 }
151 }
152 }
153 }
154 for (_, v) in obj {
155 find_blobs(v, blobs);
156 }
157 } else if let Some(arr) = val.as_array() {
158 for v in arr {
159 find_blobs(v, blobs);
160 }
161 }
162}
163
164pub async fn list_missing_blobs(
165 State(state): State<AppState>,
166 headers: axum::http::HeaderMap,
167 Query(params): Query<ListMissingBlobsParams>,
168) -> Response {
169 let token = match crate::auth::extract_bearer_token_from_header(
170 headers.get("Authorization").and_then(|h| h.to_str().ok())
171 ) {
172 Some(t) => t,
173 None => {
174 return (
175 StatusCode::UNAUTHORIZED,
176 Json(json!({"error": "AuthenticationRequired"})),
177 )
178 .into_response();
179 }
180 };
181
182 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
183 Ok(user) => user,
184 Err(_) => {
185 return (
186 StatusCode::UNAUTHORIZED,
187 Json(json!({"error": "AuthenticationFailed"})),
188 )
189 .into_response();
190 }
191 };
192
193 let did = auth_user.did;
194
195 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
196 .fetch_optional(&state.db)
197 .await;
198
199 let user_id = match user_query {
200 Ok(Some(row)) => row.id,
201 _ => {
202 return (
203 StatusCode::INTERNAL_SERVER_ERROR,
204 Json(json!({"error": "InternalError"})),
205 )
206 .into_response();
207 }
208 };
209
210 let limit = params.limit.unwrap_or(500).min(1000);
211 let cursor_str = params.cursor.unwrap_or_default();
212 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
213 let parts: Vec<&str> = cursor_str.split('|').collect();
214 (parts[0].to_string(), parts[1].to_string())
215 } else {
216 (String::new(), String::new())
217 };
218
219 let records_query = sqlx::query!(
220 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
221 user_id,
222 cursor_collection,
223 cursor_rkey,
224 limit
225 )
226 .fetch_all(&state.db)
227 .await;
228
229 let records = match records_query {
230 Ok(r) => r,
231 Err(e) => {
232 error!("DB error fetching records: {:?}", e);
233 return (
234 StatusCode::INTERNAL_SERVER_ERROR,
235 Json(json!({"error": "InternalError"})),
236 )
237 .into_response();
238 }
239 };
240
241 let mut missing_blobs = Vec::new();
242 let mut last_cursor = None;
243
244 for row in &records {
245 let collection = &row.collection;
246 let rkey = &row.rkey;
247 let record_cid_str = &row.record_cid;
248
249 last_cursor = Some(format!("{}|{}", collection, rkey));
250
251 let record_cid = match Cid::from_str(&record_cid_str) {
252 Ok(c) => c,
253 Err(_) => continue,
254 };
255
256 let block_bytes = match state.block_store.get(&record_cid).await {
257 Ok(Some(b)) => b,
258 _ => continue,
259 };
260
261 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
262 Ok(v) => v,
263 Err(_) => continue,
264 };
265
266 let mut blobs = Vec::new();
267 find_blobs(&record_val, &mut blobs);
268
269 for blob_cid_str in blobs {
270 let exists = sqlx::query!("SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", blob_cid_str, user_id)
271 .fetch_optional(&state.db)
272 .await;
273
274 match exists {
275 Ok(None) => {
276 missing_blobs.push(RecordBlob {
277 cid: blob_cid_str,
278 record_uri: format!("at://{}/{}/{}", did, collection, rkey),
279 });
280 }
281 Err(e) => {
282 error!("DB error checking blob existence: {:?}", e);
283 }
284 _ => {}
285 }
286 }
287 }
288
289 // if we fetched fewer records than limit, we are done, so cursor is None.
290 // otherwise, cursor is the last one we saw.
291 // ...right?
292 let next_cursor = if records.len() < limit as usize {
293 None
294 } else {
295 last_cursor
296 };
297
298 (
299 StatusCode::OK,
300 Json(ListMissingBlobsOutput {
301 cursor: next_cursor,
302 blobs: missing_blobs,
303 }),
304 )
305 .into_response()
306}