this repo has no description
1use crate::state::AppState;
2use crate::sync::util::assert_repo_availability;
3use axum::{
4 Json,
5 body::Body,
6 extract::{Query, State},
7 http::StatusCode,
8 http::header,
9 response::{IntoResponse, Response},
10};
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use tracing::error;
14
15#[derive(Deserialize)]
16pub struct GetBlobParams {
17 pub did: String,
18 pub cid: String,
19}
20
21pub async fn get_blob(
22 State(state): State<AppState>,
23 Query(params): Query<GetBlobParams>,
24) -> Response {
25 let did = params.did.trim();
26 let cid = params.cid.trim();
27 if did.is_empty() {
28 return (
29 StatusCode::BAD_REQUEST,
30 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
31 )
32 .into_response();
33 }
34 if cid.is_empty() {
35 return (
36 StatusCode::BAD_REQUEST,
37 Json(json!({"error": "InvalidRequest", "message": "cid is required"})),
38 )
39 .into_response();
40 }
41
42 let _account = match assert_repo_availability(&state.db, did, false).await {
43 Ok(a) => a,
44 Err(e) => return e.into_response(),
45 };
46
47 let blob_result = sqlx::query!(
48 "SELECT storage_key, mime_type, size_bytes FROM blobs WHERE cid = $1",
49 cid
50 )
51 .fetch_optional(&state.db)
52 .await;
53 match blob_result {
54 Ok(Some(row)) => {
55 let storage_key = &row.storage_key;
56 let mime_type = &row.mime_type;
57 let size_bytes = row.size_bytes;
58 match state.blob_store.get(storage_key).await {
59 Ok(data) => Response::builder()
60 .status(StatusCode::OK)
61 .header(header::CONTENT_TYPE, mime_type)
62 .header(header::CONTENT_LENGTH, size_bytes.to_string())
63 .header("x-content-type-options", "nosniff")
64 .header("content-security-policy", "default-src 'none'; sandbox")
65 .body(Body::from(data))
66 .unwrap(),
67 Err(e) => {
68 error!("Failed to fetch blob from storage: {:?}", e);
69 (
70 StatusCode::NOT_FOUND,
71 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})),
72 )
73 .into_response()
74 }
75 }
76 }
77 Ok(None) => (
78 StatusCode::NOT_FOUND,
79 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})),
80 )
81 .into_response(),
82 Err(e) => {
83 error!("DB error in get_blob: {:?}", e);
84 (
85 StatusCode::INTERNAL_SERVER_ERROR,
86 Json(json!({"error": "InternalError"})),
87 )
88 .into_response()
89 }
90 }
91}
92
93#[derive(Deserialize)]
94pub struct ListBlobsParams {
95 pub did: String,
96 pub since: Option<String>,
97 pub limit: Option<i64>,
98 pub cursor: Option<String>,
99}
100
101#[derive(Serialize)]
102pub struct ListBlobsOutput {
103 #[serde(skip_serializing_if = "Option::is_none")]
104 pub cursor: Option<String>,
105 pub cids: Vec<String>,
106}
107
108pub async fn list_blobs(
109 State(state): State<AppState>,
110 Query(params): Query<ListBlobsParams>,
111) -> Response {
112 let did = params.did.trim();
113 if did.is_empty() {
114 return (
115 StatusCode::BAD_REQUEST,
116 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
117 )
118 .into_response();
119 }
120
121 let account = match assert_repo_availability(&state.db, did, false).await {
122 Ok(a) => a,
123 Err(e) => return e.into_response(),
124 };
125
126 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
127 let cursor_cid = params.cursor.as_deref().unwrap_or("");
128 let user_id = account.user_id;
129
130 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = ¶ms.since {
131 sqlx::query_scalar!(
132 r#"
133 SELECT DISTINCT unnest(blobs) as "cid!"
134 FROM repo_seq
135 WHERE did = $1 AND rev > $2 AND blobs IS NOT NULL
136 "#,
137 did,
138 since
139 )
140 .fetch_all(&state.db)
141 .await
142 .map(|mut cids| {
143 cids.sort();
144 cids.into_iter()
145 .filter(|c| c.as_str() > cursor_cid)
146 .take((limit + 1) as usize)
147 .collect()
148 })
149 } else {
150 sqlx::query!(
151 r#"
152 SELECT cid FROM blobs
153 WHERE created_by_user = $1 AND cid > $2
154 ORDER BY cid ASC
155 LIMIT $3
156 "#,
157 user_id,
158 cursor_cid,
159 limit + 1
160 )
161 .fetch_all(&state.db)
162 .await
163 .map(|rows| rows.into_iter().map(|r| r.cid).collect())
164 };
165 match cids_result {
166 Ok(cids) => {
167 let has_more = cids.len() as i64 > limit;
168 let cids: Vec<String> = cids.into_iter().take(limit as usize).collect();
169 let next_cursor = if has_more { cids.last().cloned() } else { None };
170 (
171 StatusCode::OK,
172 Json(ListBlobsOutput {
173 cursor: next_cursor,
174 cids,
175 }),
176 )
177 .into_response()
178 }
179 Err(e) => {
180 error!("DB error in list_blobs: {:?}", e);
181 (
182 StatusCode::INTERNAL_SERVER_ERROR,
183 Json(json!({"error": "InternalError"})),
184 )
185 .into_response()
186 }
187 }
188}