this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 body::Body,
5 extract::{Query, State},
6 http::StatusCode,
7 http::header,
8 response::{IntoResponse, Response},
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use tracing::error;
13
14#[derive(Deserialize)]
15pub struct GetBlobParams {
16 pub did: String,
17 pub cid: String,
18}
19
20pub async fn get_blob(
21 State(state): State<AppState>,
22 Query(params): Query<GetBlobParams>,
23) -> Response {
24 let did = params.did.trim();
25 let cid = params.cid.trim();
26
27 if did.is_empty() {
28 return (
29 StatusCode::BAD_REQUEST,
30 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
31 )
32 .into_response();
33 }
34
35 if cid.is_empty() {
36 return (
37 StatusCode::BAD_REQUEST,
38 Json(json!({"error": "InvalidRequest", "message": "cid is required"})),
39 )
40 .into_response();
41 }
42
43 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
44 .fetch_optional(&state.db)
45 .await;
46
47 match user_exists {
48 Ok(None) => {
49 return (
50 StatusCode::NOT_FOUND,
51 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
52 )
53 .into_response();
54 }
55 Err(e) => {
56 error!("DB error in get_blob: {:?}", e);
57 return (
58 StatusCode::INTERNAL_SERVER_ERROR,
59 Json(json!({"error": "InternalError"})),
60 )
61 .into_response();
62 }
63 Ok(Some(_)) => {}
64 }
65
66 let blob_result = sqlx::query!("SELECT storage_key, mime_type FROM blobs WHERE cid = $1", cid)
67 .fetch_optional(&state.db)
68 .await;
69
70 match blob_result {
71 Ok(Some(row)) => {
72 let storage_key = &row.storage_key;
73 let mime_type = &row.mime_type;
74
75 match state.blob_store.get(&storage_key).await {
76 Ok(data) => Response::builder()
77 .status(StatusCode::OK)
78 .header(header::CONTENT_TYPE, mime_type)
79 .body(Body::from(data))
80 .unwrap(),
81 Err(e) => {
82 error!("Failed to fetch blob from storage: {:?}", e);
83 (
84 StatusCode::NOT_FOUND,
85 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})),
86 )
87 .into_response()
88 }
89 }
90 }
91 Ok(None) => (
92 StatusCode::NOT_FOUND,
93 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})),
94 )
95 .into_response(),
96 Err(e) => {
97 error!("DB error in get_blob: {:?}", e);
98 (
99 StatusCode::INTERNAL_SERVER_ERROR,
100 Json(json!({"error": "InternalError"})),
101 )
102 .into_response()
103 }
104 }
105}
106
107#[derive(Deserialize)]
108pub struct ListBlobsParams {
109 pub did: String,
110 pub since: Option<String>,
111 pub limit: Option<i64>,
112 pub cursor: Option<String>,
113}
114
115#[derive(Serialize)]
116pub struct ListBlobsOutput {
117 pub cursor: Option<String>,
118 pub cids: Vec<String>,
119}
120
121pub async fn list_blobs(
122 State(state): State<AppState>,
123 Query(params): Query<ListBlobsParams>,
124) -> Response {
125 let did = params.did.trim();
126
127 if did.is_empty() {
128 return (
129 StatusCode::BAD_REQUEST,
130 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
131 )
132 .into_response();
133 }
134
135 let limit = params.limit.unwrap_or(500).min(1000);
136 let cursor_cid = params.cursor.as_deref().unwrap_or("");
137
138 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
139 .fetch_optional(&state.db)
140 .await;
141
142 let user_id = match user_result {
143 Ok(Some(row)) => row.id,
144 Ok(None) => {
145 return (
146 StatusCode::NOT_FOUND,
147 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
148 )
149 .into_response();
150 }
151 Err(e) => {
152 error!("DB error in list_blobs: {:?}", e);
153 return (
154 StatusCode::INTERNAL_SERVER_ERROR,
155 Json(json!({"error": "InternalError"})),
156 )
157 .into_response();
158 }
159 };
160
161 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = ¶ms.since {
162 let since_time = chrono::DateTime::parse_from_rfc3339(since)
163 .map(|dt| dt.with_timezone(&chrono::Utc))
164 .unwrap_or_else(|_| chrono::Utc::now());
165 sqlx::query!(
166 r#"
167 SELECT cid FROM blobs
168 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3
169 ORDER BY cid ASC
170 LIMIT $4
171 "#,
172 user_id,
173 cursor_cid,
174 since_time,
175 limit + 1
176 )
177 .fetch_all(&state.db)
178 .await
179 .map(|rows| rows.into_iter().map(|r| r.cid).collect())
180 } else {
181 sqlx::query!(
182 r#"
183 SELECT cid FROM blobs
184 WHERE created_by_user = $1 AND cid > $2
185 ORDER BY cid ASC
186 LIMIT $3
187 "#,
188 user_id,
189 cursor_cid,
190 limit + 1
191 )
192 .fetch_all(&state.db)
193 .await
194 .map(|rows| rows.into_iter().map(|r| r.cid).collect())
195 };
196
197 match cids_result {
198 Ok(cids) => {
199 let has_more = cids.len() as i64 > limit;
200 let cids: Vec<String> = cids
201 .into_iter()
202 .take(limit as usize)
203 .collect();
204
205 let next_cursor = if has_more {
206 cids.last().cloned()
207 } else {
208 None
209 };
210
211 (
212 StatusCode::OK,
213 Json(ListBlobsOutput {
214 cursor: next_cursor,
215 cids,
216 }),
217 )
218 .into_response()
219 }
220 Err(e) => {
221 error!("DB error in list_blobs: {:?}", e);
222 (
223 StatusCode::INTERNAL_SERVER_ERROR,
224 Json(json!({"error": "InternalError"})),
225 )
226 .into_response()
227 }
228 }
229}