this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 body::Body,
5 extract::{Query, State},
6 http::StatusCode,
7 http::header,
8 response::{IntoResponse, Response},
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use tracing::{error, info};
13
14#[derive(Deserialize)]
15pub struct GetLatestCommitParams {
16 pub did: String,
17}
18
19#[derive(Serialize)]
20pub struct GetLatestCommitOutput {
21 pub cid: String,
22 pub rev: String,
23}
24
25pub async fn get_latest_commit(
26 State(state): State<AppState>,
27 Query(params): Query<GetLatestCommitParams>,
28) -> Response {
29 let did = params.did.trim();
30
31 if did.is_empty() {
32 return (
33 StatusCode::BAD_REQUEST,
34 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
35 )
36 .into_response();
37 }
38
39 let result = sqlx::query!(
40 r#"
41 SELECT r.repo_root_cid
42 FROM repos r
43 JOIN users u ON r.user_id = u.id
44 WHERE u.did = $1
45 "#,
46 did
47 )
48 .fetch_optional(&state.db)
49 .await;
50
51 match result {
52 Ok(Some(row)) => {
53 (
54 StatusCode::OK,
55 Json(GetLatestCommitOutput {
56 cid: row.repo_root_cid,
57 rev: chrono::Utc::now().timestamp_millis().to_string(),
58 }),
59 )
60 .into_response()
61 }
62 Ok(None) => (
63 StatusCode::NOT_FOUND,
64 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
65 )
66 .into_response(),
67 Err(e) => {
68 error!("DB error in get_latest_commit: {:?}", e);
69 (
70 StatusCode::INTERNAL_SERVER_ERROR,
71 Json(json!({"error": "InternalError"})),
72 )
73 .into_response()
74 }
75 }
76}
77
78#[derive(Deserialize)]
79pub struct ListReposParams {
80 pub limit: Option<i64>,
81 pub cursor: Option<String>,
82}
83
84#[derive(Serialize)]
85#[serde(rename_all = "camelCase")]
86pub struct RepoInfo {
87 pub did: String,
88 pub head: String,
89 pub rev: String,
90 pub active: bool,
91}
92
93#[derive(Serialize)]
94pub struct ListReposOutput {
95 pub cursor: Option<String>,
96 pub repos: Vec<RepoInfo>,
97}
98
99pub async fn list_repos(
100 State(state): State<AppState>,
101 Query(params): Query<ListReposParams>,
102) -> Response {
103 let limit = params.limit.unwrap_or(50).min(1000);
104 let cursor_did = params.cursor.as_deref().unwrap_or("");
105
106 let result = sqlx::query!(
107 r#"
108 SELECT u.did, r.repo_root_cid
109 FROM repos r
110 JOIN users u ON r.user_id = u.id
111 WHERE u.did > $1
112 ORDER BY u.did ASC
113 LIMIT $2
114 "#,
115 cursor_did,
116 limit + 1
117 )
118 .fetch_all(&state.db)
119 .await;
120
121 match result {
122 Ok(rows) => {
123 let has_more = rows.len() as i64 > limit;
124 let repos: Vec<RepoInfo> = rows
125 .iter()
126 .take(limit as usize)
127 .map(|row| {
128 RepoInfo {
129 did: row.did.clone(),
130 head: row.repo_root_cid.clone(),
131 rev: chrono::Utc::now().timestamp_millis().to_string(),
132 active: true,
133 }
134 })
135 .collect();
136
137 let next_cursor = if has_more {
138 repos.last().map(|r| r.did.clone())
139 } else {
140 None
141 };
142
143 (
144 StatusCode::OK,
145 Json(ListReposOutput {
146 cursor: next_cursor,
147 repos,
148 }),
149 )
150 .into_response()
151 }
152 Err(e) => {
153 error!("DB error in list_repos: {:?}", e);
154 (
155 StatusCode::INTERNAL_SERVER_ERROR,
156 Json(json!({"error": "InternalError"})),
157 )
158 .into_response()
159 }
160 }
161}
162
163#[derive(Deserialize)]
164pub struct GetBlobParams {
165 pub did: String,
166 pub cid: String,
167}
168
169pub async fn get_blob(
170 State(state): State<AppState>,
171 Query(params): Query<GetBlobParams>,
172) -> Response {
173 let did = params.did.trim();
174 let cid = params.cid.trim();
175
176 if did.is_empty() {
177 return (
178 StatusCode::BAD_REQUEST,
179 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
180 )
181 .into_response();
182 }
183
184 if cid.is_empty() {
185 return (
186 StatusCode::BAD_REQUEST,
187 Json(json!({"error": "InvalidRequest", "message": "cid is required"})),
188 )
189 .into_response();
190 }
191
192 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
193 .fetch_optional(&state.db)
194 .await;
195
196 match user_exists {
197 Ok(None) => {
198 return (
199 StatusCode::NOT_FOUND,
200 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
201 )
202 .into_response();
203 }
204 Err(e) => {
205 error!("DB error in get_blob: {:?}", e);
206 return (
207 StatusCode::INTERNAL_SERVER_ERROR,
208 Json(json!({"error": "InternalError"})),
209 )
210 .into_response();
211 }
212 Ok(Some(_)) => {}
213 }
214
215 let blob_result = sqlx::query!("SELECT storage_key, mime_type FROM blobs WHERE cid = $1", cid)
216 .fetch_optional(&state.db)
217 .await;
218
219 match blob_result {
220 Ok(Some(row)) => {
221 let storage_key = &row.storage_key;
222 let mime_type = &row.mime_type;
223
224 match state.blob_store.get(&storage_key).await {
225 Ok(data) => Response::builder()
226 .status(StatusCode::OK)
227 .header(header::CONTENT_TYPE, mime_type)
228 .body(Body::from(data))
229 .unwrap(),
230 Err(e) => {
231 error!("Failed to fetch blob from storage: {:?}", e);
232 (
233 StatusCode::NOT_FOUND,
234 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})),
235 )
236 .into_response()
237 }
238 }
239 }
240 Ok(None) => (
241 StatusCode::NOT_FOUND,
242 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})),
243 )
244 .into_response(),
245 Err(e) => {
246 error!("DB error in get_blob: {:?}", e);
247 (
248 StatusCode::INTERNAL_SERVER_ERROR,
249 Json(json!({"error": "InternalError"})),
250 )
251 .into_response()
252 }
253 }
254}
255
256#[derive(Deserialize)]
257pub struct ListBlobsParams {
258 pub did: String,
259 pub since: Option<String>,
260 pub limit: Option<i64>,
261 pub cursor: Option<String>,
262}
263
264#[derive(Serialize)]
265pub struct ListBlobsOutput {
266 pub cursor: Option<String>,
267 pub cids: Vec<String>,
268}
269
270pub async fn list_blobs(
271 State(state): State<AppState>,
272 Query(params): Query<ListBlobsParams>,
273) -> Response {
274 let did = params.did.trim();
275
276 if did.is_empty() {
277 return (
278 StatusCode::BAD_REQUEST,
279 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
280 )
281 .into_response();
282 }
283
284 let limit = params.limit.unwrap_or(500).min(1000);
285 let cursor_cid = params.cursor.as_deref().unwrap_or("");
286
287 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
288 .fetch_optional(&state.db)
289 .await;
290
291 let user_id = match user_result {
292 Ok(Some(row)) => row.id,
293 Ok(None) => {
294 return (
295 StatusCode::NOT_FOUND,
296 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
297 )
298 .into_response();
299 }
300 Err(e) => {
301 error!("DB error in list_blobs: {:?}", e);
302 return (
303 StatusCode::INTERNAL_SERVER_ERROR,
304 Json(json!({"error": "InternalError"})),
305 )
306 .into_response();
307 }
308 };
309
310 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = ¶ms.since {
311 let since_time = chrono::DateTime::parse_from_rfc3339(since)
312 .map(|dt| dt.with_timezone(&chrono::Utc))
313 .unwrap_or_else(|_| chrono::Utc::now());
314 sqlx::query!(
315 r#"
316 SELECT cid FROM blobs
317 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3
318 ORDER BY cid ASC
319 LIMIT $4
320 "#,
321 user_id,
322 cursor_cid,
323 since_time,
324 limit + 1
325 )
326 .fetch_all(&state.db)
327 .await
328 .map(|rows| rows.into_iter().map(|r| r.cid).collect())
329 } else {
330 sqlx::query!(
331 r#"
332 SELECT cid FROM blobs
333 WHERE created_by_user = $1 AND cid > $2
334 ORDER BY cid ASC
335 LIMIT $3
336 "#,
337 user_id,
338 cursor_cid,
339 limit + 1
340 )
341 .fetch_all(&state.db)
342 .await
343 .map(|rows| rows.into_iter().map(|r| r.cid).collect())
344 };
345
346 match cids_result {
347 Ok(cids) => {
348 let has_more = cids.len() as i64 > limit;
349 let cids: Vec<String> = cids
350 .into_iter()
351 .take(limit as usize)
352 .collect();
353
354 let next_cursor = if has_more {
355 cids.last().cloned()
356 } else {
357 None
358 };
359
360 (
361 StatusCode::OK,
362 Json(ListBlobsOutput {
363 cursor: next_cursor,
364 cids,
365 }),
366 )
367 .into_response()
368 }
369 Err(e) => {
370 error!("DB error in list_blobs: {:?}", e);
371 (
372 StatusCode::INTERNAL_SERVER_ERROR,
373 Json(json!({"error": "InternalError"})),
374 )
375 .into_response()
376 }
377 }
378}
379
380#[derive(Deserialize)]
381pub struct GetRepoStatusParams {
382 pub did: String,
383}
384
385#[derive(Serialize)]
386pub struct GetRepoStatusOutput {
387 pub did: String,
388 pub active: bool,
389 pub rev: Option<String>,
390}
391
392pub async fn get_repo_status(
393 State(state): State<AppState>,
394 Query(params): Query<GetRepoStatusParams>,
395) -> Response {
396 let did = params.did.trim();
397
398 if did.is_empty() {
399 return (
400 StatusCode::BAD_REQUEST,
401 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
402 )
403 .into_response();
404 }
405
406 let result = sqlx::query!(
407 r#"
408 SELECT u.did, r.repo_root_cid
409 FROM users u
410 LEFT JOIN repos r ON u.id = r.user_id
411 WHERE u.did = $1
412 "#,
413 did
414 )
415 .fetch_optional(&state.db)
416 .await;
417
418 match result {
419 Ok(Some(row)) => {
420 let rev = Some(chrono::Utc::now().timestamp_millis().to_string());
421
422 (
423 StatusCode::OK,
424 Json(GetRepoStatusOutput {
425 did: row.did,
426 active: true,
427 rev,
428 }),
429 )
430 .into_response()
431 }
432 Ok(None) => (
433 StatusCode::NOT_FOUND,
434 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
435 )
436 .into_response(),
437 Err(e) => {
438 error!("DB error in get_repo_status: {:?}", e);
439 (
440 StatusCode::INTERNAL_SERVER_ERROR,
441 Json(json!({"error": "InternalError"})),
442 )
443 .into_response()
444 }
445 }
446}
447
448#[derive(Deserialize)]
449pub struct NotifyOfUpdateParams {
450 pub hostname: String,
451}
452
453pub async fn notify_of_update(
454 State(_state): State<AppState>,
455 Query(params): Query<NotifyOfUpdateParams>,
456) -> Response {
457 info!("Received notifyOfUpdate from hostname: {}", params.hostname);
458 // TODO: Queue job for crawler interaction or relay notification
459 info!("TODO: Queue job for notifyOfUpdate (not implemented)");
460
461 (StatusCode::OK, Json(json!({}))).into_response()
462}
463
464#[derive(Deserialize)]
465pub struct RequestCrawlInput {
466 pub hostname: String,
467}
468
469pub async fn request_crawl(
470 State(_state): State<AppState>,
471 Json(input): Json<RequestCrawlInput>,
472) -> Response {
473 info!("Received requestCrawl for hostname: {}", input.hostname);
474 // TODO: Queue job for crawling
475 info!("TODO: Queue job for requestCrawl (not implemented)");
476
477 (StatusCode::OK, Json(json!({}))).into_response()
478}