this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 body::Body,
5 extract::{Query, State},
6 http::StatusCode,
7 http::header,
8 response::{IntoResponse, Response},
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use tracing::error;
13
14#[derive(Deserialize)]
15pub struct GetBlobParams {
16 pub did: String,
17 pub cid: String,
18}
19
20pub async fn get_blob(
21 State(state): State<AppState>,
22 Query(params): Query<GetBlobParams>,
23) -> Response {
24 let did = params.did.trim();
25 let cid = params.cid.trim();
26 if did.is_empty() {
27 return (
28 StatusCode::BAD_REQUEST,
29 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
30 )
31 .into_response();
32 }
33 if cid.is_empty() {
34 return (
35 StatusCode::BAD_REQUEST,
36 Json(json!({"error": "InvalidRequest", "message": "cid is required"})),
37 )
38 .into_response();
39 }
40 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
41 .fetch_optional(&state.db)
42 .await;
43 match user_exists {
44 Ok(None) => {
45 return (
46 StatusCode::NOT_FOUND,
47 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
48 )
49 .into_response();
50 }
51 Err(e) => {
52 error!("DB error in get_blob: {:?}", e);
53 return (
54 StatusCode::INTERNAL_SERVER_ERROR,
55 Json(json!({"error": "InternalError"})),
56 )
57 .into_response();
58 }
59 Ok(Some(_)) => {}
60 }
61 let blob_result = sqlx::query!("SELECT storage_key, mime_type FROM blobs WHERE cid = $1", cid)
62 .fetch_optional(&state.db)
63 .await;
64 match blob_result {
65 Ok(Some(row)) => {
66 let storage_key = &row.storage_key;
67 let mime_type = &row.mime_type;
68 match state.blob_store.get(&storage_key).await {
69 Ok(data) => Response::builder()
70 .status(StatusCode::OK)
71 .header(header::CONTENT_TYPE, mime_type)
72 .body(Body::from(data))
73 .unwrap(),
74 Err(e) => {
75 error!("Failed to fetch blob from storage: {:?}", e);
76 (
77 StatusCode::NOT_FOUND,
78 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})),
79 )
80 .into_response()
81 }
82 }
83 }
84 Ok(None) => (
85 StatusCode::NOT_FOUND,
86 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})),
87 )
88 .into_response(),
89 Err(e) => {
90 error!("DB error in get_blob: {:?}", e);
91 (
92 StatusCode::INTERNAL_SERVER_ERROR,
93 Json(json!({"error": "InternalError"})),
94 )
95 .into_response()
96 }
97 }
98}
99
100#[derive(Deserialize)]
101pub struct ListBlobsParams {
102 pub did: String,
103 pub since: Option<String>,
104 pub limit: Option<i64>,
105 pub cursor: Option<String>,
106}
107
108#[derive(Serialize)]
109pub struct ListBlobsOutput {
110 pub cursor: Option<String>,
111 pub cids: Vec<String>,
112}
113
114pub async fn list_blobs(
115 State(state): State<AppState>,
116 Query(params): Query<ListBlobsParams>,
117) -> Response {
118 let did = params.did.trim();
119 if did.is_empty() {
120 return (
121 StatusCode::BAD_REQUEST,
122 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
123 )
124 .into_response();
125 }
126 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
127 let cursor_cid = params.cursor.as_deref().unwrap_or("");
128 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
129 .fetch_optional(&state.db)
130 .await;
131 let user_id = match user_result {
132 Ok(Some(row)) => row.id,
133 Ok(None) => {
134 return (
135 StatusCode::NOT_FOUND,
136 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
137 )
138 .into_response();
139 }
140 Err(e) => {
141 error!("DB error in list_blobs: {:?}", e);
142 return (
143 StatusCode::INTERNAL_SERVER_ERROR,
144 Json(json!({"error": "InternalError"})),
145 )
146 .into_response();
147 }
148 };
149 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = ¶ms.since {
150 let since_time = chrono::DateTime::parse_from_rfc3339(since)
151 .map(|dt| dt.with_timezone(&chrono::Utc))
152 .unwrap_or_else(|_| chrono::Utc::now());
153 sqlx::query!(
154 r#"
155 SELECT cid FROM blobs
156 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3
157 ORDER BY cid ASC
158 LIMIT $4
159 "#,
160 user_id,
161 cursor_cid,
162 since_time,
163 limit + 1
164 )
165 .fetch_all(&state.db)
166 .await
167 .map(|rows| rows.into_iter().map(|r| r.cid).collect())
168 } else {
169 sqlx::query!(
170 r#"
171 SELECT cid FROM blobs
172 WHERE created_by_user = $1 AND cid > $2
173 ORDER BY cid ASC
174 LIMIT $3
175 "#,
176 user_id,
177 cursor_cid,
178 limit + 1
179 )
180 .fetch_all(&state.db)
181 .await
182 .map(|rows| rows.into_iter().map(|r| r.cid).collect())
183 };
184 match cids_result {
185 Ok(cids) => {
186 let has_more = cids.len() as i64 > limit;
187 let cids: Vec<String> = cids
188 .into_iter()
189 .take(limit as usize)
190 .collect();
191 let next_cursor = if has_more {
192 cids.last().cloned()
193 } else {
194 None
195 };
196 (
197 StatusCode::OK,
198 Json(ListBlobsOutput {
199 cursor: next_cursor,
200 cids,
201 }),
202 )
203 .into_response()
204 }
205 Err(e) => {
206 error!("DB error in list_blobs: {:?}", e);
207 (
208 StatusCode::INTERNAL_SERVER_ERROR,
209 Json(json!({"error": "InternalError"})),
210 )
211 .into_response()
212 }
213 }
214}