this repo has no description
1use crate::state::AppState;
2use axum::body::Bytes;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use multihash::Multihash;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use sha2::{Digest, Sha256};
14use sqlx::Row;
15use tracing::error;
16
17pub async fn upload_blob(
18 State(state): State<AppState>,
19 headers: axum::http::HeaderMap,
20 body: Bytes,
21) -> Response {
22 let auth_header = headers.get("Authorization");
23 if auth_header.is_none() {
24 return (
25 StatusCode::UNAUTHORIZED,
26 Json(json!({"error": "AuthenticationRequired"})),
27 )
28 .into_response();
29 }
30 let token = auth_header
31 .unwrap()
32 .to_str()
33 .unwrap_or("")
34 .replace("Bearer ", "");
35
36 let session = sqlx::query(
37 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
38 )
39 .bind(&token)
40 .fetch_optional(&state.db)
41 .await
42 .unwrap_or(None);
43
44 let (did, key_bytes) = match session {
45 Some(row) => (
46 row.get::<String, _>("did"),
47 row.get::<Vec<u8>, _>("key_bytes"),
48 ),
49 None => {
50 return (
51 StatusCode::UNAUTHORIZED,
52 Json(json!({"error": "AuthenticationFailed"})),
53 )
54 .into_response();
55 }
56 };
57
58 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
59 return (
60 StatusCode::UNAUTHORIZED,
61 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
62 )
63 .into_response();
64 }
65
66 let mime_type = headers
67 .get("content-type")
68 .and_then(|h| h.to_str().ok())
69 .unwrap_or("application/octet-stream")
70 .to_string();
71
72 let size = body.len() as i64;
73 let data = body.to_vec();
74
75 let mut hasher = Sha256::new();
76 hasher.update(&data);
77 let hash = hasher.finalize();
78 let multihash = Multihash::wrap(0x12, &hash).unwrap();
79 let cid = Cid::new_v1(0x55, multihash);
80 let cid_str = cid.to_string();
81
82 let storage_key = format!("blobs/{}", cid_str);
83
84 if let Err(e) = state.blob_store.put(&storage_key, &data).await {
85 error!("Failed to upload blob to storage: {:?}", e);
86 return (
87 StatusCode::INTERNAL_SERVER_ERROR,
88 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
89 )
90 .into_response();
91 }
92
93 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
94 .bind(&did)
95 .fetch_optional(&state.db)
96 .await;
97
98 let user_id: uuid::Uuid = match user_query {
99 Ok(Some(row)) => row.get("id"),
100 _ => {
101 return (
102 StatusCode::INTERNAL_SERVER_ERROR,
103 Json(json!({"error": "InternalError"})),
104 )
105 .into_response();
106 }
107 };
108
109 let insert = sqlx::query(
110 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING"
111 )
112 .bind(&cid_str)
113 .bind(&mime_type)
114 .bind(size)
115 .bind(user_id)
116 .bind(&storage_key)
117 .execute(&state.db)
118 .await;
119
120 if let Err(e) = insert {
121 error!("Failed to insert blob record: {:?}", e);
122 return (
123 StatusCode::INTERNAL_SERVER_ERROR,
124 Json(json!({"error": "InternalError"})),
125 )
126 .into_response();
127 }
128
129 Json(json!({
130 "blob": {
131 "ref": {
132 "$link": cid_str
133 },
134 "mimeType": mime_type,
135 "size": size
136 }
137 }))
138 .into_response()
139}
140
141#[derive(Deserialize)]
142pub struct ListMissingBlobsParams {
143 pub limit: Option<i64>,
144 pub cursor: Option<String>,
145}
146
147#[derive(Serialize)]
148#[serde(rename_all = "camelCase")]
149pub struct RecordBlob {
150 pub cid: String,
151 pub record_uri: String,
152}
153
154#[derive(Serialize)]
155pub struct ListMissingBlobsOutput {
156 pub cursor: Option<String>,
157 pub blobs: Vec<RecordBlob>,
158}
159
160pub async fn list_missing_blobs(
161 State(_state): State<AppState>,
162 headers: axum::http::HeaderMap,
163 Query(_params): Query<ListMissingBlobsParams>,
164) -> Response {
165 let auth_header = headers.get("Authorization");
166 if auth_header.is_none() {
167 return (
168 StatusCode::UNAUTHORIZED,
169 Json(json!({"error": "AuthenticationRequired"})),
170 )
171 .into_response();
172 }
173
174 (
175 StatusCode::OK,
176 Json(ListMissingBlobsOutput {
177 cursor: None,
178 blobs: vec![],
179 }),
180 )
181 .into_response()
182}