this repo has no description
1use crate::state::AppState;
2use axum::body::Bytes;
3use axum::{
4 Json,
5 extract::State,
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use multihash::Multihash;
11use serde_json::json;
12use sha2::{Digest, Sha256};
13use sqlx::Row;
14use tracing::error;
15
16pub async fn upload_blob(
17 State(state): State<AppState>,
18 headers: axum::http::HeaderMap,
19 body: Bytes,
20) -> Response {
21 let auth_header = headers.get("Authorization");
22 if auth_header.is_none() {
23 return (
24 StatusCode::UNAUTHORIZED,
25 Json(json!({"error": "AuthenticationRequired"})),
26 )
27 .into_response();
28 }
29 let token = auth_header
30 .unwrap()
31 .to_str()
32 .unwrap_or("")
33 .replace("Bearer ", "");
34
35 let session = sqlx::query(
36 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
37 )
38 .bind(&token)
39 .fetch_optional(&state.db)
40 .await
41 .unwrap_or(None);
42
43 let (did, key_bytes) = match session {
44 Some(row) => (
45 row.get::<String, _>("did"),
46 row.get::<Vec<u8>, _>("key_bytes"),
47 ),
48 None => {
49 return (
50 StatusCode::UNAUTHORIZED,
51 Json(json!({"error": "AuthenticationFailed"})),
52 )
53 .into_response();
54 }
55 };
56
57 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
58 return (
59 StatusCode::UNAUTHORIZED,
60 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
61 )
62 .into_response();
63 }
64
65 let mime_type = headers
66 .get("content-type")
67 .and_then(|h| h.to_str().ok())
68 .unwrap_or("application/octet-stream")
69 .to_string();
70
71 let size = body.len() as i64;
72 let data = body.to_vec();
73
74 let mut hasher = Sha256::new();
75 hasher.update(&data);
76 let hash = hasher.finalize();
77 let multihash = Multihash::wrap(0x12, &hash).unwrap();
78 let cid = Cid::new_v1(0x55, multihash);
79 let cid_str = cid.to_string();
80
81 let storage_key = format!("blobs/{}", cid_str);
82
83 if let Err(e) = state.blob_store.put(&storage_key, &data).await {
84 error!("Failed to upload blob to storage: {:?}", e);
85 return (
86 StatusCode::INTERNAL_SERVER_ERROR,
87 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
88 )
89 .into_response();
90 }
91
92 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
93 .bind(&did)
94 .fetch_optional(&state.db)
95 .await;
96
97 let user_id: uuid::Uuid = match user_query {
98 Ok(Some(row)) => row.get("id"),
99 _ => {
100 return (
101 StatusCode::INTERNAL_SERVER_ERROR,
102 Json(json!({"error": "InternalError"})),
103 )
104 .into_response();
105 }
106 };
107
108 let insert = sqlx::query(
109 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING"
110 )
111 .bind(&cid_str)
112 .bind(&mime_type)
113 .bind(size)
114 .bind(user_id)
115 .bind(&storage_key)
116 .execute(&state.db)
117 .await;
118
119 if let Err(e) = insert {
120 error!("Failed to insert blob record: {:?}", e);
121 return (
122 StatusCode::INTERNAL_SERVER_ERROR,
123 Json(json!({"error": "InternalError"})),
124 )
125 .into_response();
126 }
127
128 Json(json!({
129 "blob": {
130 "ref": {
131 "$link": cid_str
132 },
133 "mimeType": mime_type,
134 "size": size
135 }
136 }))
137 .into_response()
138}