this repo has no description
1use axum::{
2 extract::{State, Query},
3 Json,
4 response::{IntoResponse, Response},
5 http::StatusCode,
6};
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use crate::state::AppState;
10use chrono::Utc;
11use sqlx::Row;
12use cid::Cid;
13use std::str::FromStr;
14use jacquard_repo::{mst::Mst, commit::Commit, storage::BlockStore};
15use jacquard::types::{string::{Nsid, Tid}, did::Did, integer::LimitedU32};
16use tracing::error;
17use std::sync::Arc;
18use sha2::{Sha256, Digest};
19use multihash::Multihash;
20use axum::body::Bytes;
21
22#[derive(Deserialize)]
23#[allow(dead_code)]
24pub struct CreateRecordInput {
25 pub repo: String,
26 pub collection: String,
27 pub rkey: Option<String>,
28 pub validate: Option<bool>,
29 pub record: serde_json::Value,
30 #[serde(rename = "swapCommit")]
31 pub swap_commit: Option<String>,
32}
33
34#[derive(Serialize)]
35#[serde(rename_all = "camelCase")]
36pub struct CreateRecordOutput {
37 pub uri: String,
38 pub cid: String,
39}
40
41pub async fn create_record(
42 State(state): State<AppState>,
43 headers: axum::http::HeaderMap,
44 Json(input): Json<CreateRecordInput>,
45) -> Response {
46 let auth_header = headers.get("Authorization");
47 if auth_header.is_none() {
48 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response();
49 }
50 let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
51
52 let session = sqlx::query(
53 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
54 )
55 .bind(&token)
56 .fetch_optional(&state.db)
57 .await
58 .unwrap_or(None);
59
60 let (did, key_bytes) = match session {
61 Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")),
62 None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(),
63 };
64
65 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
66 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response();
67 }
68
69 if input.repo != did {
70 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
71 }
72
73 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
74 .bind(&did)
75 .fetch_optional(&state.db)
76 .await;
77
78 let user_id: uuid::Uuid = match user_query {
79 Ok(Some(row)) => row.get("id"),
80 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "User not found"}))).into_response(),
81 };
82
83 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
84 .bind(user_id)
85 .fetch_optional(&state.db)
86 .await;
87
88 let current_root_cid = match repo_root_query {
89 Ok(Some(row)) => {
90 let cid_str: String = row.get("repo_root_cid");
91 Cid::from_str(&cid_str).ok()
92 },
93 _ => None,
94 };
95
96 if current_root_cid.is_none() {
97 error!("Repo root not found for user {}", did);
98 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Repo root not found"}))).into_response();
99 }
100 let current_root_cid = current_root_cid.unwrap();
101
102 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
103 Ok(Some(b)) => b,
104 Ok(None) => {
105 error!("Commit block not found: {}", current_root_cid);
106 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
107 },
108 Err(e) => {
109 error!("Failed to load commit block: {:?}", e);
110 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
111 }
112 };
113
114 let commit = match Commit::from_cbor(&commit_bytes) {
115 Ok(c) => c,
116 Err(e) => {
117 error!("Failed to parse commit: {:?}", e);
118 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
119 }
120 };
121
122 let mst_root = commit.data;
123 let store = Arc::new(state.block_store.clone());
124 let mst = Mst::load(store.clone(), mst_root, None);
125
126 let collection_nsid = match input.collection.parse::<Nsid>() {
127 Ok(n) => n,
128 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
129 };
130
131 let rkey = input.rkey.unwrap_or_else(|| {
132 Utc::now().format("%Y%m%d%H%M%S%f").to_string()
133 });
134
135 let mut record_bytes = Vec::new();
136 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
137 error!("Error serializing record: {:?}", e);
138 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
139 }
140
141 let record_cid = match state.block_store.put(&record_bytes).await {
142 Ok(c) => c,
143 Err(e) => {
144 error!("Failed to save record block: {:?}", e);
145 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
146 }
147 };
148
149 let key = format!("{}/{}", collection_nsid, rkey);
150 if let Err(e) = mst.update(&key, record_cid).await {
151 error!("Failed to update MST: {:?}", e);
152 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
153 }
154
155 let new_mst_root = match mst.root().await {
156 Ok(c) => c,
157 Err(e) => {
158 error!("Failed to get new MST root: {:?}", e);
159 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
160 }
161 };
162
163 let did_obj = match Did::new(&did) {
164 Ok(d) => d,
165 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(),
166 };
167
168 let rev = Tid::now(LimitedU32::MIN);
169
170 let new_commit = Commit::new_unsigned(
171 did_obj,
172 new_mst_root,
173 rev,
174 Some(current_root_cid)
175 );
176
177 let new_commit_bytes = match new_commit.to_cbor() {
178 Ok(b) => b,
179 Err(e) => {
180 error!("Failed to serialize new commit: {:?}", e);
181 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
182 }
183 };
184
185 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
186 Ok(c) => c,
187 Err(e) => {
188 error!("Failed to save new commit: {:?}", e);
189 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
190 }
191 };
192
193 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
194 .bind(new_root_cid.to_string())
195 .bind(user_id)
196 .execute(&state.db)
197 .await;
198
199 if let Err(e) = update_repo {
200 error!("Failed to update repo root in DB: {:?}", e);
201 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
202 }
203
204 let record_insert = sqlx::query(
205 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
206 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()"
207 )
208 .bind(user_id)
209 .bind(&input.collection)
210 .bind(&rkey)
211 .bind(record_cid.to_string())
212 .execute(&state.db)
213 .await;
214
215 if let Err(e) = record_insert {
216 error!("Error inserting record index: {:?}", e);
217 }
218
219 let output = CreateRecordOutput {
220 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
221 cid: record_cid.to_string(),
222 };
223 (StatusCode::OK, Json(output)).into_response()
224}
225
226#[derive(Deserialize)]
227#[allow(dead_code)]
228pub struct PutRecordInput {
229 pub repo: String,
230 pub collection: String,
231 pub rkey: String,
232 pub validate: Option<bool>,
233 pub record: serde_json::Value,
234 #[serde(rename = "swapCommit")]
235 pub swap_commit: Option<String>,
236}
237
238#[derive(Serialize)]
239#[serde(rename_all = "camelCase")]
240pub struct PutRecordOutput {
241 pub uri: String,
242 pub cid: String,
243}
244
245pub async fn put_record(
246 State(state): State<AppState>,
247 headers: axum::http::HeaderMap,
248 Json(input): Json<PutRecordInput>,
249) -> Response {
250 let auth_header = headers.get("Authorization");
251 if auth_header.is_none() {
252 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response();
253 }
254 let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
255
256 let session = sqlx::query(
257 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
258 )
259 .bind(&token)
260 .fetch_optional(&state.db)
261 .await
262 .unwrap_or(None);
263
264 let (did, key_bytes) = match session {
265 Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")),
266 None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(),
267 };
268
269 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
270 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response();
271 }
272
273 if input.repo != did {
274 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
275 }
276
277 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
278 .bind(&did)
279 .fetch_optional(&state.db)
280 .await;
281
282 let user_id: uuid::Uuid = match user_query {
283 Ok(Some(row)) => row.get("id"),
284 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "User not found"}))).into_response(),
285 };
286
287 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
288 .bind(user_id)
289 .fetch_optional(&state.db)
290 .await;
291
292 let current_root_cid = match repo_root_query {
293 Ok(Some(row)) => {
294 let cid_str: String = row.get("repo_root_cid");
295 Cid::from_str(&cid_str).ok()
296 },
297 _ => None,
298 };
299
300 if current_root_cid.is_none() {
301 error!("Repo root not found for user {}", did);
302 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Repo root not found"}))).into_response();
303 }
304 let current_root_cid = current_root_cid.unwrap();
305
306 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
307 Ok(Some(b)) => b,
308 Ok(None) => {
309 error!("Commit block not found: {}", current_root_cid);
310 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response();
311 },
312 Err(e) => {
313 error!("Failed to load commit block: {:?}", e);
314 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to load commit block"}))).into_response();
315 }
316 };
317
318 let commit = match Commit::from_cbor(&commit_bytes) {
319 Ok(c) => c,
320 Err(e) => {
321 error!("Failed to parse commit: {:?}", e);
322 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response();
323 }
324 };
325
326 let mst_root = commit.data;
327 let store = Arc::new(state.block_store.clone());
328 let mst = Mst::load(store.clone(), mst_root, None);
329
330 let collection_nsid = match input.collection.parse::<Nsid>() {
331 Ok(n) => n,
332 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
333 };
334
335 let rkey = input.rkey.clone();
336
337 let mut record_bytes = Vec::new();
338 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
339 error!("Error serializing record: {:?}", e);
340 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
341 }
342
343 let record_cid = match state.block_store.put(&record_bytes).await {
344 Ok(c) => c,
345 Err(e) => {
346 error!("Failed to save record block: {:?}", e);
347 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response();
348 }
349 };
350
351 let key = format!("{}/{}", collection_nsid, rkey);
352 if let Err(e) = mst.update(&key, record_cid).await {
353 error!("Failed to update MST: {:?}", e);
354 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response();
355 }
356
357 let new_mst_root = match mst.root().await {
358 Ok(c) => c,
359 Err(e) => {
360 error!("Failed to get new MST root: {:?}", e);
361 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST root"}))).into_response();
362 }
363 };
364
365 let did_obj = match Did::new(&did) {
366 Ok(d) => d,
367 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(),
368 };
369
370 let rev = Tid::now(LimitedU32::MIN);
371
372 let new_commit = Commit::new_unsigned(
373 did_obj,
374 new_mst_root,
375 rev,
376 Some(current_root_cid)
377 );
378
379 let new_commit_bytes = match new_commit.to_cbor() {
380 Ok(b) => b,
381 Err(e) => {
382 error!("Failed to serialize new commit: {:?}", e);
383 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to serialize new commit"}))).into_response();
384 }
385 };
386
387 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
388 Ok(c) => c,
389 Err(e) => {
390 error!("Failed to save new commit: {:?}", e);
391 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save new commit"}))).into_response();
392 }
393 };
394
395 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
396 .bind(new_root_cid.to_string())
397 .bind(user_id)
398 .execute(&state.db)
399 .await;
400
401 if let Err(e) = update_repo {
402 error!("Failed to update repo root in DB: {:?}", e);
403 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"}))).into_response();
404 }
405
406 let record_insert = sqlx::query(
407 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
408 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()"
409 )
410 .bind(user_id)
411 .bind(&input.collection)
412 .bind(&rkey)
413 .bind(record_cid.to_string())
414 .execute(&state.db)
415 .await;
416
417 if let Err(e) = record_insert {
418 error!("Error inserting record index: {:?}", e);
419 }
420
421 let output = PutRecordOutput {
422 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
423 cid: record_cid.to_string(),
424 };
425 (StatusCode::OK, Json(output)).into_response()
426}
427
428#[derive(Deserialize)]
429pub struct GetRecordInput {
430 pub repo: String,
431 pub collection: String,
432 pub rkey: String,
433 pub cid: Option<String>,
434}
435
436pub async fn get_record(
437 State(state): State<AppState>,
438 Query(input): Query<GetRecordInput>,
439) -> Response {
440 let user_row = if input.repo.starts_with("did:") {
441 sqlx::query("SELECT id FROM users WHERE did = $1")
442 .bind(&input.repo)
443 .fetch_optional(&state.db)
444 .await
445 } else {
446 sqlx::query("SELECT id FROM users WHERE handle = $1")
447 .bind(&input.repo)
448 .fetch_optional(&state.db)
449 .await
450 };
451
452 let user_id: uuid::Uuid = match user_row {
453 Ok(Some(row)) => row.get("id"),
454 _ => return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Repo not found"}))).into_response(),
455 };
456
457 let record_row = sqlx::query("SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3")
458 .bind(user_id)
459 .bind(&input.collection)
460 .bind(&input.rkey)
461 .fetch_optional(&state.db)
462 .await;
463
464 let record_cid_str: String = match record_row {
465 Ok(Some(row)) => row.get("record_cid"),
466 _ => return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Record not found"}))).into_response(),
467 };
468
469 if let Some(expected_cid) = &input.cid {
470 if &record_cid_str != expected_cid {
471 return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Record CID mismatch"}))).into_response();
472 }
473 }
474
475 let cid = match Cid::from_str(&record_cid_str) {
476 Ok(c) => c,
477 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid CID in DB"}))).into_response(),
478 };
479
480 let block = match state.block_store.get(&cid).await {
481 Ok(Some(b)) => b,
482 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Record block not found"}))).into_response(),
483 };
484
485 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
486 Ok(v) => v,
487 Err(e) => {
488 error!("Failed to deserialize record: {:?}", e);
489 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
490 }
491 };
492
493 Json(json!({
494 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
495 "cid": record_cid_str,
496 "value": value
497 })).into_response()
498}
499
500#[derive(Deserialize)]
501pub struct DeleteRecordInput {
502 pub repo: String,
503 pub collection: String,
504 pub rkey: String,
505 #[serde(rename = "swapRecord")]
506 pub swap_record: Option<String>,
507 #[serde(rename = "swapCommit")]
508 pub swap_commit: Option<String>,
509}
510
511pub async fn delete_record(
512 State(state): State<AppState>,
513 headers: axum::http::HeaderMap,
514 Json(input): Json<DeleteRecordInput>,
515) -> Response {
516 let auth_header = headers.get("Authorization");
517 if auth_header.is_none() {
518 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response();
519 }
520 let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
521
522 let session = sqlx::query(
523 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
524 )
525 .bind(&token)
526 .fetch_optional(&state.db)
527 .await
528 .unwrap_or(None);
529
530 let (did, key_bytes) = match session {
531 Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")),
532 None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(),
533 };
534
535 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
536 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response();
537 }
538
539 if input.repo != did {
540 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
541 }
542
543 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
544 .bind(&did)
545 .fetch_optional(&state.db)
546 .await;
547
548 let user_id: uuid::Uuid = match user_query {
549 Ok(Some(row)) => row.get("id"),
550 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "User not found"}))).into_response(),
551 };
552
553 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
554 .bind(user_id)
555 .fetch_optional(&state.db)
556 .await;
557
558 let current_root_cid = match repo_root_query {
559 Ok(Some(row)) => {
560 let cid_str: String = row.get("repo_root_cid");
561 Cid::from_str(&cid_str).ok()
562 },
563 _ => None,
564 };
565
566 if current_root_cid.is_none() {
567 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Repo root not found"}))).into_response();
568 }
569 let current_root_cid = current_root_cid.unwrap();
570
571 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
572 Ok(Some(b)) => b,
573 Ok(None) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
574 Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to load commit block: {:?}", e)}))).into_response(),
575 };
576
577 let commit = match Commit::from_cbor(&commit_bytes) {
578 Ok(c) => c,
579 Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to parse commit: {:?}", e)}))).into_response(),
580 };
581
582 let mst_root = commit.data;
583 let store = Arc::new(state.block_store.clone());
584 let mst = Mst::load(store.clone(), mst_root, None);
585
586 let collection_nsid = match input.collection.parse::<Nsid>() {
587 Ok(n) => n,
588 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
589 };
590
591 let key = format!("{}/{}", collection_nsid, input.rkey);
592
593 // TODO: Check swapRecord if provided? Skipping for brevity/robustness
594
595 if let Err(e) = mst.delete(&key).await {
596 error!("Failed to delete from MST: {:?}", e);
597 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to delete from MST: {:?}", e)}))).into_response();
598 }
599
600 let new_mst_root = match mst.root().await {
601 Ok(c) => c,
602 Err(_e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST root"}))).into_response(),
603 };
604
605 let did_obj = match Did::new(&did) {
606 Ok(d) => d,
607 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(),
608 };
609
610 let rev = Tid::now(LimitedU32::MIN);
611
612 let new_commit = Commit::new_unsigned(
613 did_obj,
614 new_mst_root,
615 rev,
616 Some(current_root_cid)
617 );
618
619 let new_commit_bytes = match new_commit.to_cbor() {
620 Ok(b) => b,
621 Err(_e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to serialize new commit"}))).into_response(),
622 };
623
624 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
625 Ok(c) => c,
626 Err(_e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save new commit"}))).into_response(),
627 };
628
629 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
630 .bind(new_root_cid.to_string())
631 .bind(user_id)
632 .execute(&state.db)
633 .await;
634
635 if let Err(e) = update_repo {
636 error!("Failed to update repo root in DB: {:?}", e);
637 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"}))).into_response();
638 }
639
640 let record_delete = sqlx::query("DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3")
641 .bind(user_id)
642 .bind(&input.collection)
643 .bind(&input.rkey)
644 .execute(&state.db)
645 .await;
646
647 if let Err(e) = record_delete {
648 error!("Error deleting record index: {:?}", e);
649 }
650
651 (StatusCode::OK, Json(json!({}))).into_response()
652}
653
654#[derive(Deserialize)]
655pub struct ListRecordsInput {
656 pub repo: String,
657 pub collection: String,
658 pub limit: Option<i32>,
659 pub cursor: Option<String>,
660 #[serde(rename = "rkeyStart")]
661 pub rkey_start: Option<String>,
662 #[serde(rename = "rkeyEnd")]
663 pub rkey_end: Option<String>,
664 pub reverse: Option<bool>,
665}
666
667#[derive(Serialize)]
668pub struct ListRecordsOutput {
669 pub cursor: Option<String>,
670 pub records: Vec<serde_json::Value>,
671}
672
673pub async fn list_records(
674 State(state): State<AppState>,
675 Query(input): Query<ListRecordsInput>,
676) -> Response {
677 let user_row = if input.repo.starts_with("did:") {
678 sqlx::query("SELECT id FROM users WHERE did = $1")
679 .bind(&input.repo)
680 .fetch_optional(&state.db)
681 .await
682 } else {
683 sqlx::query("SELECT id FROM users WHERE handle = $1")
684 .bind(&input.repo)
685 .fetch_optional(&state.db)
686 .await
687 };
688
689 let user_id: uuid::Uuid = match user_row {
690 Ok(Some(row)) => row.get("id"),
691 _ => return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Repo not found"}))).into_response(),
692 };
693
694 let limit = input.limit.unwrap_or(50).clamp(1, 100);
695 let reverse = input.reverse.unwrap_or(false);
696
697 // Simplistic query construction - no sophisticated cursor handling or rkey ranges for now, just basic pagination
698 // TODO: Implement rkeyStart/End and correct cursor logic
699
700 let query_str = format!(
701 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 {} ORDER BY rkey {} LIMIT {}",
702 if let Some(_c) = &input.cursor {
703 if reverse { "AND rkey < $3" } else { "AND rkey > $3" }
704 } else {
705 ""
706 },
707 if reverse { "DESC" } else { "ASC" },
708 limit
709 );
710
711 let mut query = sqlx::query(&query_str)
712 .bind(user_id)
713 .bind(&input.collection);
714
715 if let Some(c) = &input.cursor {
716 query = query.bind(c);
717 }
718
719 let rows = match query.fetch_all(&state.db).await {
720 Ok(r) => r,
721 Err(e) => {
722 error!("Error listing records: {:?}", e);
723 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
724 }
725 };
726
727 let mut records = Vec::new();
728 let mut last_rkey = None;
729
730 for row in rows {
731 let rkey: String = row.get("rkey");
732 let cid_str: String = row.get("record_cid");
733 last_rkey = Some(rkey.clone());
734
735 if let Ok(cid) = Cid::from_str(&cid_str) {
736 if let Ok(Some(block)) = state.block_store.get(&cid).await {
737 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
738 records.push(json!({
739 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
740 "cid": cid_str,
741 "value": value
742 }));
743 }
744 }
745 }
746 }
747
748 Json(ListRecordsOutput {
749 cursor: last_rkey,
750 records,
751 }).into_response()
752}
753
754#[derive(Deserialize)]
755pub struct DescribeRepoInput {
756 pub repo: String,
757}
758
759pub async fn describe_repo(
760 State(state): State<AppState>,
761 Query(input): Query<DescribeRepoInput>,
762) -> Response {
763 let user_row = if input.repo.starts_with("did:") {
764 sqlx::query("SELECT id, handle, did FROM users WHERE did = $1")
765 .bind(&input.repo)
766 .fetch_optional(&state.db)
767 .await
768 } else {
769 sqlx::query("SELECT id, handle, did FROM users WHERE handle = $1")
770 .bind(&input.repo)
771 .fetch_optional(&state.db)
772 .await
773 };
774
775 let (user_id, handle, did) = match user_row {
776 Ok(Some(row)) => (row.get::<uuid::Uuid, _>("id"), row.get::<String, _>("handle"), row.get::<String, _>("did")),
777 _ => return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Repo not found"}))).into_response(),
778 };
779
780 let collections_query = sqlx::query("SELECT DISTINCT collection FROM records WHERE repo_id = $1")
781 .bind(user_id)
782 .fetch_all(&state.db)
783 .await;
784
785 let collections: Vec<String> = match collections_query {
786 Ok(rows) => rows.iter().map(|r| r.get("collection")).collect(),
787 Err(_) => Vec::new(),
788 };
789
790 let did_doc = json!({
791 "id": did,
792 "alsoKnownAs": [format!("at://{}", handle)]
793 });
794
795 Json(json!({
796 "handle": handle,
797 "did": did,
798 "didDoc": did_doc,
799 "collections": collections,
800 "handleIsCorrect": true
801 })).into_response()
802}
803
804pub async fn upload_blob(
805 State(state): State<AppState>,
806 headers: axum::http::HeaderMap,
807 body: Bytes,
808) -> Response {
809 let auth_header = headers.get("Authorization");
810 if auth_header.is_none() {
811 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response();
812 }
813 let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
814
815 let session = sqlx::query(
816 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
817 )
818 .bind(&token)
819 .fetch_optional(&state.db)
820 .await
821 .unwrap_or(None);
822
823 let (did, key_bytes) = match session {
824 Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")),
825 None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(),
826 };
827
828 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
829 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response();
830 }
831
832 let mime_type = headers.get("content-type")
833 .and_then(|h| h.to_str().ok())
834 .unwrap_or("application/octet-stream")
835 .to_string();
836
837 let size = body.len() as i64;
838 let data = body.to_vec();
839
840 let mut hasher = Sha256::new();
841 hasher.update(&data);
842 let hash = hasher.finalize();
843 let multihash = Multihash::wrap(0x12, &hash).unwrap();
844 let cid = Cid::new_v1(0x55, multihash);
845 let cid_str = cid.to_string();
846
847 let storage_key = format!("blobs/{}", cid_str);
848
849 if let Err(e) = state.blob_store.put(&storage_key, &data).await {
850 error!("Failed to upload blob to storage: {:?}", e);
851 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to store blob"}))).into_response();
852 }
853
854 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
855 .bind(&did)
856 .fetch_optional(&state.db)
857 .await;
858
859 let user_id: uuid::Uuid = match user_query {
860 Ok(Some(row)) => row.get("id"),
861 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(),
862 };
863
864 let insert = sqlx::query(
865 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING"
866 )
867 .bind(&cid_str)
868 .bind(&mime_type)
869 .bind(size)
870 .bind(user_id)
871 .bind(&storage_key)
872 .execute(&state.db)
873 .await;
874
875 if let Err(e) = insert {
876 error!("Failed to insert blob record: {:?}", e);
877 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
878 }
879
880 Json(json!({
881 "blob": {
882 "ref": {
883 "$link": cid_str
884 },
885 "mimeType": mime_type,
886 "size": size
887 }
888 })).into_response()
889}