use crate::state::AppState; use axum::body::Bytes; use axum::{ Json, extract::{Query, State}, http::StatusCode, response::{IntoResponse, Response}, }; use cid::Cid; use jacquard_repo::storage::BlockStore; use multihash::Multihash; use serde::{Deserialize, Serialize}; use serde_json::json; use sha2::{Digest, Sha256}; use std::str::FromStr; use tracing::error; pub async fn upload_blob( State(state): State, headers: axum::http::HeaderMap, body: Bytes, ) -> Response { let auth_header = headers.get("Authorization"); if auth_header.is_none() { return ( StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"})), ) .into_response(); } let token = auth_header .unwrap() .to_str() .unwrap_or("") .replace("Bearer ", ""); let session = sqlx::query!( "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", token ) .fetch_optional(&state.db) .await .unwrap_or(None); let (did, key_bytes) = match session { Some(row) => (row.did, row.key_bytes), None => { return ( StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"})), ) .into_response(); } }; if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { return ( StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), ) .into_response(); } let mime_type = headers .get("content-type") .and_then(|h| h.to_str().ok()) .unwrap_or("application/octet-stream") .to_string(); let size = body.len() as i64; let data = body.to_vec(); let mut hasher = Sha256::new(); hasher.update(&data); let hash = hasher.finalize(); let multihash = Multihash::wrap(0x12, &hash).unwrap(); let cid = Cid::new_v1(0x55, multihash); let cid_str = cid.to_string(); let storage_key = format!("blobs/{}", cid_str); if let Err(e) = state.blob_store.put(&storage_key, &data).await { error!("Failed to upload blob to storage: {:?}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to store blob"})), ) .into_response(); } let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) .fetch_optional(&state.db) .await; let user_id = match user_query { Ok(Some(row)) => row.id, _ => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"})), ) .into_response(); } }; let insert = sqlx::query!( "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING", cid_str, mime_type, size, user_id, storage_key ) .execute(&state.db) .await; if let Err(e) = insert { error!("Failed to insert blob record: {:?}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"})), ) .into_response(); } Json(json!({ "blob": { "ref": { "$link": cid_str }, "mimeType": mime_type, "size": size } })) .into_response() } #[derive(Deserialize)] pub struct ListMissingBlobsParams { pub limit: Option, pub cursor: Option, } #[derive(Serialize)] #[serde(rename_all = "camelCase")] pub struct RecordBlob { pub cid: String, pub record_uri: String, } #[derive(Serialize)] pub struct ListMissingBlobsOutput { pub cursor: Option, pub blobs: Vec, } fn find_blobs(val: &serde_json::Value, blobs: &mut Vec) { if let Some(obj) = val.as_object() { if let Some(type_val) = obj.get("$type") { if type_val == "blob" { if let Some(r) = obj.get("ref") { if let Some(link) = r.get("$link") { if let Some(s) = link.as_str() { blobs.push(s.to_string()); } } } } } for (_, v) in obj { find_blobs(v, blobs); } } else if let Some(arr) = val.as_array() { for v in arr { find_blobs(v, blobs); } } } pub async fn list_missing_blobs( State(state): State, headers: axum::http::HeaderMap, Query(params): Query, ) -> Response { let auth_header = headers.get("Authorization"); if auth_header.is_none() { return ( StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"})), ) .into_response(); } let token = auth_header .unwrap() .to_str() .unwrap_or("") .replace("Bearer ", ""); let session = sqlx::query!( "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", token ) .fetch_optional(&state.db) .await .unwrap_or(None); let (did, key_bytes) = match session { Some(row) => (row.did, row.key_bytes), None => { return ( StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"})), ) .into_response(); } }; if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { return ( StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), ) .into_response(); } let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) .fetch_optional(&state.db) .await; let user_id = match user_query { Ok(Some(row)) => row.id, _ => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"})), ) .into_response(); } }; let limit = params.limit.unwrap_or(500).min(1000); let cursor_str = params.cursor.unwrap_or_default(); let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') { let parts: Vec<&str> = cursor_str.split('|').collect(); (parts[0].to_string(), parts[1].to_string()) } else { (String::new(), String::new()) }; let records_query = sqlx::query!( "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4", user_id, cursor_collection, cursor_rkey, limit ) .fetch_all(&state.db) .await; let records = match records_query { Ok(r) => r, Err(e) => { error!("DB error fetching records: {:?}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"})), ) .into_response(); } }; let mut missing_blobs = Vec::new(); let mut last_cursor = None; for row in &records { let collection = &row.collection; let rkey = &row.rkey; let record_cid_str = &row.record_cid; last_cursor = Some(format!("{}|{}", collection, rkey)); let record_cid = match Cid::from_str(&record_cid_str) { Ok(c) => c, Err(_) => continue, }; let block_bytes = match state.block_store.get(&record_cid).await { Ok(Some(b)) => b, _ => continue, }; let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { Ok(v) => v, Err(_) => continue, }; let mut blobs = Vec::new(); find_blobs(&record_val, &mut blobs); for blob_cid_str in blobs { let exists = sqlx::query!("SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2", blob_cid_str, user_id) .fetch_optional(&state.db) .await; match exists { Ok(None) => { missing_blobs.push(RecordBlob { cid: blob_cid_str, record_uri: format!("at://{}/{}/{}", did, collection, rkey), }); } Err(e) => { error!("DB error checking blob existence: {:?}", e); } _ => {} } } } // if we fetched fewer records than limit, we are done, so cursor is None. // otherwise, cursor is the last one we saw. // ...right? let next_cursor = if records.len() < limit as usize { None } else { last_cursor }; ( StatusCode::OK, Json(ListMissingBlobsOutput { cursor: next_cursor, blobs: missing_blobs, }), ) .into_response() }