this repo has no description
1use crate::state::AppState; 2use axum::body::Bytes; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use jacquard_repo::storage::BlockStore; 11use multihash::Multihash; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use sha2::{Digest, Sha256}; 15use std::str::FromStr; 16use tracing::error; 17 18pub async fn upload_blob( 19 State(state): State<AppState>, 20 headers: axum::http::HeaderMap, 21 body: Bytes, 22) -> Response { 23 let token = match crate::auth::extract_bearer_token_from_header( 24 headers.get("Authorization").and_then(|h| h.to_str().ok()) 25 ) { 26 Some(t) => t, 27 None => { 28 return ( 29 StatusCode::UNAUTHORIZED, 30 Json(json!({"error": "AuthenticationRequired"})), 31 ) 32 .into_response(); 33 } 34 }; 35 36 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 37 Ok(user) => user, 38 Err(_) => { 39 return ( 40 StatusCode::UNAUTHORIZED, 41 Json(json!({"error": "AuthenticationFailed"})), 42 ) 43 .into_response(); 44 } 45 }; 46 let did = auth_user.did; 47 48 let mime_type = headers 49 .get("content-type") 50 .and_then(|h| h.to_str().ok()) 51 .unwrap_or("application/octet-stream") 52 .to_string(); 53 54 let size = body.len() as i64; 55 let data = body.to_vec(); 56 57 let mut hasher = Sha256::new(); 58 hasher.update(&data); 59 let hash = hasher.finalize(); 60 let multihash = Multihash::wrap(0x12, &hash).unwrap(); 61 let cid = Cid::new_v1(0x55, multihash); 62 let cid_str = cid.to_string(); 63 64 let storage_key = format!("blobs/{}", cid_str); 65 66 if let Err(e) = state.blob_store.put(&storage_key, &data).await { 67 error!("Failed to upload blob to storage: {:?}", e); 68 return ( 69 StatusCode::INTERNAL_SERVER_ERROR, 70 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 71 ) 72 .into_response(); 73 } 74 75 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 76 .fetch_optional(&state.db) 77 .await; 78 79 let user_id = match user_query { 80 Ok(Some(row)) => row.id, 81 _ => { 82 return ( 83 StatusCode::INTERNAL_SERVER_ERROR, 84 Json(json!({"error": "InternalError"})), 85 ) 86 .into_response(); 87 } 88 }; 89 90 let insert = sqlx::query!( 91 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING", 92 cid_str, 93 mime_type, 94 size, 95 user_id, 96 storage_key 97 ) 98 .execute(&state.db) 99 .await; 100 101 if let Err(e) = insert { 102 error!("Failed to insert blob record: {:?}", e); 103 return ( 104 StatusCode::INTERNAL_SERVER_ERROR, 105 Json(json!({"error": "InternalError"})), 106 ) 107 .into_response(); 108 } 109 110 Json(json!({ 111 "blob": { 112 "ref": { 113 "$link": cid_str 114 }, 115 "mimeType": mime_type, 116 "size": size 117 } 118 })) 119 .into_response() 120} 121 122#[derive(Deserialize)] 123pub struct ListMissingBlobsParams { 124 pub limit: Option<i64>, 125 pub cursor: Option<String>, 126} 127 128#[derive(Serialize)] 129#[serde(rename_all = "camelCase")] 130pub struct RecordBlob { 131 pub cid: String, 132 pub record_uri: String, 133} 134 135#[derive(Serialize)] 136pub struct ListMissingBlobsOutput { 137 pub cursor: Option<String>, 138 pub blobs: Vec<RecordBlob>, 139} 140 141fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) { 142 if let Some(obj) = val.as_object() { 143 if let Some(type_val) = obj.get("$type") { 144 if type_val == "blob" { 145 if let Some(r) = obj.get("ref") { 146 if let Some(link) = r.get("$link") { 147 if let Some(s) = link.as_str() { 148 blobs.push(s.to_string()); 149 } 150 } 151 } 152 } 153 } 154 for (_, v) in obj { 155 find_blobs(v, blobs); 156 } 157 } else if let Some(arr) = val.as_array() { 158 for v in arr { 159 find_blobs(v, blobs); 160 } 161 } 162} 163 164pub async fn list_missing_blobs( 165 State(state): State<AppState>, 166 headers: axum::http::HeaderMap, 167 Query(params): Query<ListMissingBlobsParams>, 168) -> Response { 169 let token = match crate::auth::extract_bearer_token_from_header( 170 headers.get("Authorization").and_then(|h| h.to_str().ok()) 171 ) { 172 Some(t) => t, 173 None => { 174 return ( 175 StatusCode::UNAUTHORIZED, 176 Json(json!({"error": "AuthenticationRequired"})), 177 ) 178 .into_response(); 179 } 180 }; 181 182 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 183 Ok(user) => user, 184 Err(_) => { 185 return ( 186 StatusCode::UNAUTHORIZED, 187 Json(json!({"error": "AuthenticationFailed"})), 188 ) 189 .into_response(); 190 } 191 }; 192 193 let did = auth_user.did; 194 195 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 196 .fetch_optional(&state.db) 197 .await; 198 199 let user_id = match user_query { 200 Ok(Some(row)) => row.id, 201 _ => { 202 return ( 203 StatusCode::INTERNAL_SERVER_ERROR, 204 Json(json!({"error": "InternalError"})), 205 ) 206 .into_response(); 207 } 208 }; 209 210 let limit = params.limit.unwrap_or(500).min(1000); 211 let cursor_str = params.cursor.unwrap_or_default(); 212 let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') { 213 let parts: Vec<&str> = cursor_str.split('|').collect(); 214 (parts[0].to_string(), parts[1].to_string()) 215 } else { 216 (String::new(), String::new()) 217 }; 218 219 let records_query = sqlx::query!( 220 "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4", 221 user_id, 222 cursor_collection, 223 cursor_rkey, 224 limit 225 ) 226 .fetch_all(&state.db) 227 .await; 228 229 let records = match records_query { 230 Ok(r) => r, 231 Err(e) => { 232 error!("DB error fetching records: {:?}", e); 233 return ( 234 StatusCode::INTERNAL_SERVER_ERROR, 235 Json(json!({"error": "InternalError"})), 236 ) 237 .into_response(); 238 } 239 }; 240 241 let mut missing_blobs = Vec::new(); 242 let mut last_cursor = None; 243 244 for row in &records { 245 let collection = &row.collection; 246 let rkey = &row.rkey; 247 let record_cid_str = &row.record_cid; 248 249 last_cursor = Some(format!("{}|{}", collection, rkey)); 250 251 let record_cid = match Cid::from_str(&record_cid_str) { 252 Ok(c) => c, 253 Err(_) => continue, 254 }; 255 256 let block_bytes = match state.block_store.get(&record_cid).await { 257 Ok(Some(b)) => b, 258 _ => continue, 259 }; 260 261 let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 262 Ok(v) => v, 263 Err(_) => continue, 264 }; 265 266 let mut blobs = Vec::new(); 267 find_blobs(&record_val, &mut blobs); 268 269 for blob_cid_str in blobs { 270 let exists = sqlx::query!("SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", blob_cid_str, user_id) 271 .fetch_optional(&state.db) 272 .await; 273 274 match exists { 275 Ok(None) => { 276 missing_blobs.push(RecordBlob { 277 cid: blob_cid_str, 278 record_uri: format!("at://{}/{}/{}", did, collection, rkey), 279 }); 280 } 281 Err(e) => { 282 error!("DB error checking blob existence: {:?}", e); 283 } 284 _ => {} 285 } 286 } 287 } 288 289 // if we fetched fewer records than limit, we are done, so cursor is None. 290 // otherwise, cursor is the last one we saw. 291 // ...right? 292 let next_cursor = if records.len() < limit as usize { 293 None 294 } else { 295 last_cursor 296 }; 297 298 ( 299 StatusCode::OK, 300 Json(ListMissingBlobsOutput { 301 cursor: next_cursor, 302 blobs: missing_blobs, 303 }), 304 ) 305 .into_response() 306}