this repo has no description
1use crate::state::AppState; 2use axum::body::Bytes; 3use axum::{ 4 Json, 5 extract::State, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use multihash::Multihash; 11use serde_json::json; 12use sha2::{Digest, Sha256}; 13use sqlx::Row; 14use tracing::error; 15 16pub async fn upload_blob( 17 State(state): State<AppState>, 18 headers: axum::http::HeaderMap, 19 body: Bytes, 20) -> Response { 21 let auth_header = headers.get("Authorization"); 22 if auth_header.is_none() { 23 return ( 24 StatusCode::UNAUTHORIZED, 25 Json(json!({"error": "AuthenticationRequired"})), 26 ) 27 .into_response(); 28 } 29 let token = auth_header 30 .unwrap() 31 .to_str() 32 .unwrap_or("") 33 .replace("Bearer ", ""); 34 35 let session = sqlx::query( 36 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 37 ) 38 .bind(&token) 39 .fetch_optional(&state.db) 40 .await 41 .unwrap_or(None); 42 43 let (did, key_bytes) = match session { 44 Some(row) => ( 45 row.get::<String, _>("did"), 46 row.get::<Vec<u8>, _>("key_bytes"), 47 ), 48 None => { 49 return ( 50 StatusCode::UNAUTHORIZED, 51 Json(json!({"error": "AuthenticationFailed"})), 52 ) 53 .into_response(); 54 } 55 }; 56 57 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 58 return ( 59 StatusCode::UNAUTHORIZED, 60 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 61 ) 62 .into_response(); 63 } 64 65 let mime_type = headers 66 .get("content-type") 67 .and_then(|h| h.to_str().ok()) 68 .unwrap_or("application/octet-stream") 69 .to_string(); 70 71 let size = body.len() as i64; 72 let data = body.to_vec(); 73 74 let mut hasher = Sha256::new(); 75 hasher.update(&data); 76 let hash = hasher.finalize(); 77 let multihash = Multihash::wrap(0x12, &hash).unwrap(); 78 let cid = Cid::new_v1(0x55, multihash); 79 let cid_str = cid.to_string(); 80 81 let storage_key = format!("blobs/{}", cid_str); 82 83 if let Err(e) = state.blob_store.put(&storage_key, &data).await { 84 error!("Failed to upload blob to storage: {:?}", e); 85 return ( 86 StatusCode::INTERNAL_SERVER_ERROR, 87 Json(json!({"error": "InternalError", "message": "Failed to store blob"})), 88 ) 89 .into_response(); 90 } 91 92 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 93 .bind(&did) 94 .fetch_optional(&state.db) 95 .await; 96 97 let user_id: uuid::Uuid = match user_query { 98 Ok(Some(row)) => row.get("id"), 99 _ => { 100 return ( 101 StatusCode::INTERNAL_SERVER_ERROR, 102 Json(json!({"error": "InternalError"})), 103 ) 104 .into_response(); 105 } 106 }; 107 108 let insert = sqlx::query( 109 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING" 110 ) 111 .bind(&cid_str) 112 .bind(&mime_type) 113 .bind(size) 114 .bind(user_id) 115 .bind(&storage_key) 116 .execute(&state.db) 117 .await; 118 119 if let Err(e) = insert { 120 error!("Failed to insert blob record: {:?}", e); 121 return ( 122 StatusCode::INTERNAL_SERVER_ERROR, 123 Json(json!({"error": "InternalError"})), 124 ) 125 .into_response(); 126 } 127 128 Json(json!({ 129 "blob": { 130 "ref": { 131 "$link": cid_str 132 }, 133 "mimeType": mime_type, 134 "size": size 135 } 136 })) 137 .into_response() 138}