this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 body::Body,
5 extract::{Query, State},
6 http::StatusCode,
7 http::header,
8 response::{IntoResponse, Response},
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use tracing::error;
13#[derive(Deserialize)]
14pub struct GetBlobParams {
15 pub did: String,
16 pub cid: String,
17}
18pub async fn get_blob(
19 State(state): State<AppState>,
20 Query(params): Query<GetBlobParams>,
21) -> Response {
22 let did = params.did.trim();
23 let cid = params.cid.trim();
24 if did.is_empty() {
25 return (
26 StatusCode::BAD_REQUEST,
27 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
28 )
29 .into_response();
30 }
31 if cid.is_empty() {
32 return (
33 StatusCode::BAD_REQUEST,
34 Json(json!({"error": "InvalidRequest", "message": "cid is required"})),
35 )
36 .into_response();
37 }
38 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
39 .fetch_optional(&state.db)
40 .await;
41 match user_exists {
42 Ok(None) => {
43 return (
44 StatusCode::NOT_FOUND,
45 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
46 )
47 .into_response();
48 }
49 Err(e) => {
50 error!("DB error in get_blob: {:?}", e);
51 return (
52 StatusCode::INTERNAL_SERVER_ERROR,
53 Json(json!({"error": "InternalError"})),
54 )
55 .into_response();
56 }
57 Ok(Some(_)) => {}
58 }
59 let blob_result = sqlx::query!("SELECT storage_key, mime_type FROM blobs WHERE cid = $1", cid)
60 .fetch_optional(&state.db)
61 .await;
62 match blob_result {
63 Ok(Some(row)) => {
64 let storage_key = &row.storage_key;
65 let mime_type = &row.mime_type;
66 match state.blob_store.get(&storage_key).await {
67 Ok(data) => Response::builder()
68 .status(StatusCode::OK)
69 .header(header::CONTENT_TYPE, mime_type)
70 .body(Body::from(data))
71 .unwrap(),
72 Err(e) => {
73 error!("Failed to fetch blob from storage: {:?}", e);
74 (
75 StatusCode::NOT_FOUND,
76 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})),
77 )
78 .into_response()
79 }
80 }
81 }
82 Ok(None) => (
83 StatusCode::NOT_FOUND,
84 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})),
85 )
86 .into_response(),
87 Err(e) => {
88 error!("DB error in get_blob: {:?}", e);
89 (
90 StatusCode::INTERNAL_SERVER_ERROR,
91 Json(json!({"error": "InternalError"})),
92 )
93 .into_response()
94 }
95 }
96}
97#[derive(Deserialize)]
98pub struct ListBlobsParams {
99 pub did: String,
100 pub since: Option<String>,
101 pub limit: Option<i64>,
102 pub cursor: Option<String>,
103}
104#[derive(Serialize)]
105pub struct ListBlobsOutput {
106 pub cursor: Option<String>,
107 pub cids: Vec<String>,
108}
109pub async fn list_blobs(
110 State(state): State<AppState>,
111 Query(params): Query<ListBlobsParams>,
112) -> Response {
113 let did = params.did.trim();
114 if did.is_empty() {
115 return (
116 StatusCode::BAD_REQUEST,
117 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
118 )
119 .into_response();
120 }
121 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
122 let cursor_cid = params.cursor.as_deref().unwrap_or("");
123 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
124 .fetch_optional(&state.db)
125 .await;
126 let user_id = match user_result {
127 Ok(Some(row)) => row.id,
128 Ok(None) => {
129 return (
130 StatusCode::NOT_FOUND,
131 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
132 )
133 .into_response();
134 }
135 Err(e) => {
136 error!("DB error in list_blobs: {:?}", e);
137 return (
138 StatusCode::INTERNAL_SERVER_ERROR,
139 Json(json!({"error": "InternalError"})),
140 )
141 .into_response();
142 }
143 };
144 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = ¶ms.since {
145 let since_time = chrono::DateTime::parse_from_rfc3339(since)
146 .map(|dt| dt.with_timezone(&chrono::Utc))
147 .unwrap_or_else(|_| chrono::Utc::now());
148 sqlx::query!(
149 r#"
150 SELECT cid FROM blobs
151 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3
152 ORDER BY cid ASC
153 LIMIT $4
154 "#,
155 user_id,
156 cursor_cid,
157 since_time,
158 limit + 1
159 )
160 .fetch_all(&state.db)
161 .await
162 .map(|rows| rows.into_iter().map(|r| r.cid).collect())
163 } else {
164 sqlx::query!(
165 r#"
166 SELECT cid FROM blobs
167 WHERE created_by_user = $1 AND cid > $2
168 ORDER BY cid ASC
169 LIMIT $3
170 "#,
171 user_id,
172 cursor_cid,
173 limit + 1
174 )
175 .fetch_all(&state.db)
176 .await
177 .map(|rows| rows.into_iter().map(|r| r.cid).collect())
178 };
179 match cids_result {
180 Ok(cids) => {
181 let has_more = cids.len() as i64 > limit;
182 let cids: Vec<String> = cids
183 .into_iter()
184 .take(limit as usize)
185 .collect();
186 let next_cursor = if has_more {
187 cids.last().cloned()
188 } else {
189 None
190 };
191 (
192 StatusCode::OK,
193 Json(ListBlobsOutput {
194 cursor: next_cursor,
195 cids,
196 }),
197 )
198 .into_response()
199 }
200 Err(e) => {
201 error!("DB error in list_blobs: {:?}", e);
202 (
203 StatusCode::INTERNAL_SERVER_ERROR,
204 Json(json!({"error": "InternalError"})),
205 )
206 .into_response()
207 }
208 }
209}