this repo has no description
1use crate::state::AppState; 2use axum::body::Bytes; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use multihash::Multihash; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use sha2::{Digest, Sha256}; 14use sqlx::Row; 15use tracing::error; 16 17pub async fn upload_blob( 18 State(state): State<AppState>, 19 headers: axum::http::HeaderMap, 20 body: Bytes, 21) -> Response { 22 let auth_header = headers.get("Authorization"); 23 if auth_header.is_none() { 24 return ( 25 StatusCode::UNAUTHORIZED, 26 Json(json!({"error": "AuthenticationRequired"})), 27 ) 28 .into_response(); 29 } 30 let token = auth_header 31 .unwrap() 32 .to_str() 33 .unwrap_or("") 34 .replace("Bearer ", ""); 35 36 let session = sqlx::query( 37 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 38 ) 39 .bind(&token) 40 .fetch_optional(&state.db) 41 .await 42 .unwrap_or(None); 43 44 let (did, key_bytes) = match session { 45 Some(row) => ( 46 row.get::<String, _>("did"), 47 row.get::<Vec<u8>, _>("key_bytes"), 48 ), 49 None => { 50 return ( 51 StatusCode::UNAUTHORIZED, 52 Json(json!({"error": "AuthenticationFailed"})), 53 ) 54 .into_response(); 55 } 56 }; 57 58 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 59 return ( 60 StatusCode::UNAUTHORIZED, 61 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 62 ) 63 .into_response(); 64 } 65 66 let mime_type = headers 67 .get("content-type") 68 .and_then(|h| h.to_str().ok()) 69 .unwrap_or("application/octet-stream") 70 .to_string(); 71 72 let size = body.len() as i64; 73 let data = body.to_vec(); 74 75 let mut hasher = Sha256::new(); 76 hasher.update(&data); 77 let hash = hasher.finalize(); 78 let multihash = Multihash::wrap(0x12, &hash).unwrap(); 79 let cid = Cid::new_v1(0x55, multihash); 80 let cid_str = cid.to_string(); 81 82 let storage_key = format!("blobs/{}", cid_str); 83 84 if let Err(e) = state.blob_store.put(&storage_key, &data).await { 85 error!("Failed to upload blob to storage: {:?}", e); 86 return ( 87 StatusCode::INTERNAL_SERVER_ERROR, 88 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 89 ) 90 .into_response(); 91 } 92 93 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 94 .bind(&did) 95 .fetch_optional(&state.db) 96 .await; 97 98 let user_id: uuid::Uuid = match user_query { 99 Ok(Some(row)) => row.get("id"), 100 _ => { 101 return ( 102 StatusCode::INTERNAL_SERVER_ERROR, 103 Json(json!({"error": "InternalError"})), 104 ) 105 .into_response(); 106 } 107 }; 108 109 let insert = sqlx::query( 110 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING" 111 ) 112 .bind(&cid_str) 113 .bind(&mime_type) 114 .bind(size) 115 .bind(user_id) 116 .bind(&storage_key) 117 .execute(&state.db) 118 .await; 119 120 if let Err(e) = insert { 121 error!("Failed to insert blob record: {:?}", e); 122 return ( 123 StatusCode::INTERNAL_SERVER_ERROR, 124 Json(json!({"error": "InternalError"})), 125 ) 126 .into_response(); 127 } 128 129 Json(json!({ 130 "blob": { 131 "ref": { 132 "$link": cid_str 133 }, 134 "mimeType": mime_type, 135 "size": size 136 } 137 })) 138 .into_response() 139} 140 141#[derive(Deserialize)] 142pub struct ListMissingBlobsParams { 143 pub limit: Option<i64>, 144 pub cursor: Option<String>, 145} 146 147#[derive(Serialize)] 148#[serde(rename_all = "camelCase")] 149pub struct RecordBlob { 150 pub cid: String, 151 pub record_uri: String, 152} 153 154#[derive(Serialize)] 155pub struct ListMissingBlobsOutput { 156 pub cursor: Option<String>, 157 pub blobs: Vec<RecordBlob>, 158} 159 160pub async fn list_missing_blobs( 161 State(_state): State<AppState>, 162 headers: axum::http::HeaderMap, 163 Query(_params): Query<ListMissingBlobsParams>, 164) -> Response { 165 let auth_header = headers.get("Authorization"); 166 if auth_header.is_none() { 167 return ( 168 StatusCode::UNAUTHORIZED, 169 Json(json!({"error": "AuthenticationRequired"})), 170 ) 171 .into_response(); 172 } 173 174 ( 175 StatusCode::OK, 176 Json(ListMissingBlobsOutput { 177 cursor: None, 178 blobs: vec![], 179 }), 180 ) 181 .into_response() 182}