this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 body::Body,
5 extract::{Query, State},
6 http::StatusCode,
7 http::header,
8 response::{IntoResponse, Response},
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use tracing::error;
13
14#[derive(Deserialize)]
15pub struct GetBlobParams {
16 pub did: String,
17 pub cid: String,
18}
19
20pub async fn get_blob(
21 State(state): State<AppState>,
22 Query(params): Query<GetBlobParams>,
23) -> Response {
24 let did = params.did.trim();
25 let cid = params.cid.trim();
26 if did.is_empty() {
27 return (
28 StatusCode::BAD_REQUEST,
29 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
30 )
31 .into_response();
32 }
33 if cid.is_empty() {
34 return (
35 StatusCode::BAD_REQUEST,
36 Json(json!({"error": "InvalidRequest", "message": "cid is required"})),
37 )
38 .into_response();
39 }
40 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
41 .fetch_optional(&state.db)
42 .await;
43 match user_exists {
44 Ok(None) => {
45 return (
46 StatusCode::NOT_FOUND,
47 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
48 )
49 .into_response();
50 }
51 Err(e) => {
52 error!("DB error in get_blob: {:?}", e);
53 return (
54 StatusCode::INTERNAL_SERVER_ERROR,
55 Json(json!({"error": "InternalError"})),
56 )
57 .into_response();
58 }
59 Ok(Some(_)) => {}
60 }
61 let blob_result = sqlx::query!(
62 "SELECT storage_key, mime_type FROM blobs WHERE cid = $1",
63 cid
64 )
65 .fetch_optional(&state.db)
66 .await;
67 match blob_result {
68 Ok(Some(row)) => {
69 let storage_key = &row.storage_key;
70 let mime_type = &row.mime_type;
71 match state.blob_store.get(storage_key).await {
72 Ok(data) => Response::builder()
73 .status(StatusCode::OK)
74 .header(header::CONTENT_TYPE, mime_type)
75 .body(Body::from(data))
76 .unwrap(),
77 Err(e) => {
78 error!("Failed to fetch blob from storage: {:?}", e);
79 (
80 StatusCode::NOT_FOUND,
81 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})),
82 )
83 .into_response()
84 }
85 }
86 }
87 Ok(None) => (
88 StatusCode::NOT_FOUND,
89 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})),
90 )
91 .into_response(),
92 Err(e) => {
93 error!("DB error in get_blob: {:?}", e);
94 (
95 StatusCode::INTERNAL_SERVER_ERROR,
96 Json(json!({"error": "InternalError"})),
97 )
98 .into_response()
99 }
100 }
101}
102
103#[derive(Deserialize)]
104pub struct ListBlobsParams {
105 pub did: String,
106 pub since: Option<String>,
107 pub limit: Option<i64>,
108 pub cursor: Option<String>,
109}
110
111#[derive(Serialize)]
112pub struct ListBlobsOutput {
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub cursor: Option<String>,
115 pub cids: Vec<String>,
116}
117
118pub async fn list_blobs(
119 State(state): State<AppState>,
120 Query(params): Query<ListBlobsParams>,
121) -> Response {
122 let did = params.did.trim();
123 if did.is_empty() {
124 return (
125 StatusCode::BAD_REQUEST,
126 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
127 )
128 .into_response();
129 }
130 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
131 let cursor_cid = params.cursor.as_deref().unwrap_or("");
132 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
133 .fetch_optional(&state.db)
134 .await;
135 let user_id = match user_result {
136 Ok(Some(row)) => row.id,
137 Ok(None) => {
138 return (
139 StatusCode::NOT_FOUND,
140 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
141 )
142 .into_response();
143 }
144 Err(e) => {
145 error!("DB error in list_blobs: {:?}", e);
146 return (
147 StatusCode::INTERNAL_SERVER_ERROR,
148 Json(json!({"error": "InternalError"})),
149 )
150 .into_response();
151 }
152 };
153 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = ¶ms.since {
154 let since_time = chrono::DateTime::parse_from_rfc3339(since)
155 .map(|dt| dt.with_timezone(&chrono::Utc))
156 .unwrap_or_else(|_| chrono::Utc::now());
157 sqlx::query!(
158 r#"
159 SELECT cid FROM blobs
160 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3
161 ORDER BY cid ASC
162 LIMIT $4
163 "#,
164 user_id,
165 cursor_cid,
166 since_time,
167 limit + 1
168 )
169 .fetch_all(&state.db)
170 .await
171 .map(|rows| rows.into_iter().map(|r| r.cid).collect())
172 } else {
173 sqlx::query!(
174 r#"
175 SELECT cid FROM blobs
176 WHERE created_by_user = $1 AND cid > $2
177 ORDER BY cid ASC
178 LIMIT $3
179 "#,
180 user_id,
181 cursor_cid,
182 limit + 1
183 )
184 .fetch_all(&state.db)
185 .await
186 .map(|rows| rows.into_iter().map(|r| r.cid).collect())
187 };
188 match cids_result {
189 Ok(cids) => {
190 let has_more = cids.len() as i64 > limit;
191 let cids: Vec<String> = cids.into_iter().take(limit as usize).collect();
192 let next_cursor = if has_more { cids.last().cloned() } else { None };
193 (
194 StatusCode::OK,
195 Json(ListBlobsOutput {
196 cursor: next_cursor,
197 cids,
198 }),
199 )
200 .into_response()
201 }
202 Err(e) => {
203 error!("DB error in list_blobs: {:?}", e);
204 (
205 StatusCode::INTERNAL_SERVER_ERROR,
206 Json(json!({"error": "InternalError"})),
207 )
208 .into_response()
209 }
210 }
211}