this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::State,
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use chrono::Utc;
9use cid::Cid;
10use jacquard::types::{
11 did::Did,
12 integer::LimitedU32,
13 string::{Nsid, Tid},
14};
15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use sqlx::Row;
19use std::str::FromStr;
20use std::sync::Arc;
21use tracing::error;
22
23#[derive(Deserialize)]
24#[allow(dead_code)]
25pub struct CreateRecordInput {
26 pub repo: String,
27 pub collection: String,
28 pub rkey: Option<String>,
29 pub validate: Option<bool>,
30 pub record: serde_json::Value,
31 #[serde(rename = "swapCommit")]
32 pub swap_commit: Option<String>,
33}
34
35#[derive(Serialize)]
36#[serde(rename_all = "camelCase")]
37pub struct CreateRecordOutput {
38 pub uri: String,
39 pub cid: String,
40}
41
42pub async fn create_record(
43 State(state): State<AppState>,
44 headers: axum::http::HeaderMap,
45 Json(input): Json<CreateRecordInput>,
46) -> Response {
47 let auth_header = headers.get("Authorization");
48 if auth_header.is_none() {
49 return (
50 StatusCode::UNAUTHORIZED,
51 Json(json!({"error": "AuthenticationRequired"})),
52 )
53 .into_response();
54 }
55 let token = auth_header
56 .unwrap()
57 .to_str()
58 .unwrap_or("")
59 .replace("Bearer ", "");
60
61 let session = sqlx::query(
62 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
63 )
64 .bind(&token)
65 .fetch_optional(&state.db)
66 .await
67 .unwrap_or(None);
68
69 let (did, key_bytes) = match session {
70 Some(row) => (
71 row.get::<String, _>("did"),
72 row.get::<Vec<u8>, _>("key_bytes"),
73 ),
74 None => {
75 return (
76 StatusCode::UNAUTHORIZED,
77 Json(json!({"error": "AuthenticationFailed"})),
78 )
79 .into_response();
80 }
81 };
82
83 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
84 return (
85 StatusCode::UNAUTHORIZED,
86 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
87 )
88 .into_response();
89 }
90
91 if input.repo != did {
92 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
93 }
94
95 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
96 .bind(&did)
97 .fetch_optional(&state.db)
98 .await;
99
100 let user_id: uuid::Uuid = match user_query {
101 Ok(Some(row)) => row.get("id"),
102 _ => {
103 return (
104 StatusCode::INTERNAL_SERVER_ERROR,
105 Json(json!({"error": "InternalError", "message": "User not found"})),
106 )
107 .into_response();
108 }
109 };
110
111 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
112 .bind(user_id)
113 .fetch_optional(&state.db)
114 .await;
115
116 let current_root_cid = match repo_root_query {
117 Ok(Some(row)) => {
118 let cid_str: String = row.get("repo_root_cid");
119 Cid::from_str(&cid_str).ok()
120 }
121 _ => None,
122 };
123
124 if current_root_cid.is_none() {
125 error!("Repo root not found for user {}", did);
126 return (
127 StatusCode::INTERNAL_SERVER_ERROR,
128 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
129 )
130 .into_response();
131 }
132 let current_root_cid = current_root_cid.unwrap();
133
134 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
135 Ok(Some(b)) => b,
136 Ok(None) => {
137 error!("Commit block not found: {}", current_root_cid);
138 return (
139 StatusCode::INTERNAL_SERVER_ERROR,
140 Json(json!({"error": "InternalError"})),
141 )
142 .into_response();
143 }
144 Err(e) => {
145 error!("Failed to load commit block: {:?}", e);
146 return (
147 StatusCode::INTERNAL_SERVER_ERROR,
148 Json(json!({"error": "InternalError"})),
149 )
150 .into_response();
151 }
152 };
153
154 let commit = match Commit::from_cbor(&commit_bytes) {
155 Ok(c) => c,
156 Err(e) => {
157 error!("Failed to parse commit: {:?}", e);
158 return (
159 StatusCode::INTERNAL_SERVER_ERROR,
160 Json(json!({"error": "InternalError"})),
161 )
162 .into_response();
163 }
164 };
165
166 let mst_root = commit.data;
167 let store = Arc::new(state.block_store.clone());
168 let mst = Mst::load(store.clone(), mst_root, None);
169
170 let collection_nsid = match input.collection.parse::<Nsid>() {
171 Ok(n) => n,
172 Err(_) => {
173 return (
174 StatusCode::BAD_REQUEST,
175 Json(json!({"error": "InvalidCollection"})),
176 )
177 .into_response();
178 }
179 };
180
181 let rkey = input
182 .rkey
183 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string());
184
185 let mut record_bytes = Vec::new();
186 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
187 error!("Error serializing record: {:?}", e);
188 return (
189 StatusCode::BAD_REQUEST,
190 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
191 )
192 .into_response();
193 }
194
195 let record_cid = match state.block_store.put(&record_bytes).await {
196 Ok(c) => c,
197 Err(e) => {
198 error!("Failed to save record block: {:?}", e);
199 return (
200 StatusCode::INTERNAL_SERVER_ERROR,
201 Json(json!({"error": "InternalError"})),
202 )
203 .into_response();
204 }
205 };
206
207 let key = format!("{}/{}", collection_nsid, rkey);
208 let new_mst = match mst.add(&key, record_cid).await {
209 Ok(m) => m,
210 Err(e) => {
211 error!("Failed to add to MST: {:?}", e);
212 return (
213 StatusCode::INTERNAL_SERVER_ERROR,
214 Json(json!({"error": "InternalError"})),
215 )
216 .into_response();
217 }
218 };
219
220 let new_mst_root = match new_mst.persist().await {
221 Ok(c) => c,
222 Err(e) => {
223 error!("Failed to persist MST: {:?}", e);
224 return (
225 StatusCode::INTERNAL_SERVER_ERROR,
226 Json(json!({"error": "InternalError"})),
227 )
228 .into_response();
229 }
230 };
231
232 let did_obj = match Did::new(&did) {
233 Ok(d) => d,
234 Err(_) => {
235 return (
236 StatusCode::INTERNAL_SERVER_ERROR,
237 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
238 )
239 .into_response();
240 }
241 };
242
243 let rev = Tid::now(LimitedU32::MIN);
244
245 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid));
246
247 let new_commit_bytes = match new_commit.to_cbor() {
248 Ok(b) => b,
249 Err(e) => {
250 error!("Failed to serialize new commit: {:?}", e);
251 return (
252 StatusCode::INTERNAL_SERVER_ERROR,
253 Json(json!({"error": "InternalError"})),
254 )
255 .into_response();
256 }
257 };
258
259 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
260 Ok(c) => c,
261 Err(e) => {
262 error!("Failed to save new commit: {:?}", e);
263 return (
264 StatusCode::INTERNAL_SERVER_ERROR,
265 Json(json!({"error": "InternalError"})),
266 )
267 .into_response();
268 }
269 };
270
271 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
272 .bind(new_root_cid.to_string())
273 .bind(user_id)
274 .execute(&state.db)
275 .await;
276
277 if let Err(e) = update_repo {
278 error!("Failed to update repo root in DB: {:?}", e);
279 return (
280 StatusCode::INTERNAL_SERVER_ERROR,
281 Json(json!({"error": "InternalError"})),
282 )
283 .into_response();
284 }
285
286 let record_insert = sqlx::query(
287 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
288 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
289 )
290 .bind(user_id)
291 .bind(&input.collection)
292 .bind(&rkey)
293 .bind(record_cid.to_string())
294 .execute(&state.db)
295 .await;
296
297 if let Err(e) = record_insert {
298 error!("Error inserting record index: {:?}", e);
299 return (
300 StatusCode::INTERNAL_SERVER_ERROR,
301 Json(json!({"error": "InternalError", "message": "Failed to index record"})),
302 )
303 .into_response();
304 }
305
306 let output = CreateRecordOutput {
307 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
308 cid: record_cid.to_string(),
309 };
310 (StatusCode::OK, Json(output)).into_response()
311}
312
313#[derive(Deserialize)]
314#[allow(dead_code)]
315pub struct PutRecordInput {
316 pub repo: String,
317 pub collection: String,
318 pub rkey: String,
319 pub validate: Option<bool>,
320 pub record: serde_json::Value,
321 #[serde(rename = "swapCommit")]
322 pub swap_commit: Option<String>,
323 #[serde(rename = "swapRecord")]
324 pub swap_record: Option<String>,
325}
326
327#[derive(Serialize)]
328#[serde(rename_all = "camelCase")]
329pub struct PutRecordOutput {
330 pub uri: String,
331 pub cid: String,
332}
333
334pub async fn put_record(
335 State(state): State<AppState>,
336 headers: axum::http::HeaderMap,
337 Json(input): Json<PutRecordInput>,
338) -> Response {
339 let auth_header = headers.get("Authorization");
340 if auth_header.is_none() {
341 return (
342 StatusCode::UNAUTHORIZED,
343 Json(json!({"error": "AuthenticationRequired"})),
344 )
345 .into_response();
346 }
347 let token = auth_header
348 .unwrap()
349 .to_str()
350 .unwrap_or("")
351 .replace("Bearer ", "");
352
353 let session = sqlx::query(
354 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
355 )
356 .bind(&token)
357 .fetch_optional(&state.db)
358 .await
359 .unwrap_or(None);
360
361 let (did, key_bytes) = match session {
362 Some(row) => (
363 row.get::<String, _>("did"),
364 row.get::<Vec<u8>, _>("key_bytes"),
365 ),
366 None => {
367 return (
368 StatusCode::UNAUTHORIZED,
369 Json(json!({"error": "AuthenticationFailed"})),
370 )
371 .into_response();
372 }
373 };
374
375 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
376 return (
377 StatusCode::UNAUTHORIZED,
378 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
379 )
380 .into_response();
381 }
382
383 if input.repo != did {
384 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
385 }
386
387 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
388 .bind(&did)
389 .fetch_optional(&state.db)
390 .await;
391
392 let user_id: uuid::Uuid = match user_query {
393 Ok(Some(row)) => row.get("id"),
394 _ => {
395 return (
396 StatusCode::INTERNAL_SERVER_ERROR,
397 Json(json!({"error": "InternalError", "message": "User not found"})),
398 )
399 .into_response();
400 }
401 };
402
403 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
404 .bind(user_id)
405 .fetch_optional(&state.db)
406 .await;
407
408 let current_root_cid = match repo_root_query {
409 Ok(Some(row)) => {
410 let cid_str: String = row.get("repo_root_cid");
411 Cid::from_str(&cid_str).ok()
412 }
413 _ => None,
414 };
415
416 if current_root_cid.is_none() {
417 error!("Repo root not found for user {}", did);
418 return (
419 StatusCode::INTERNAL_SERVER_ERROR,
420 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
421 )
422 .into_response();
423 }
424 let current_root_cid = current_root_cid.unwrap();
425
426 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
427 Ok(Some(b)) => b,
428 Ok(None) => {
429 error!("Commit block not found: {}", current_root_cid);
430 return (
431 StatusCode::INTERNAL_SERVER_ERROR,
432 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
433 )
434 .into_response();
435 }
436 Err(e) => {
437 error!("Failed to load commit block: {:?}", e);
438 return (
439 StatusCode::INTERNAL_SERVER_ERROR,
440 Json(json!({"error": "InternalError", "message": "Failed to load commit block"})),
441 )
442 .into_response();
443 }
444 };
445
446 let commit = match Commit::from_cbor(&commit_bytes) {
447 Ok(c) => c,
448 Err(e) => {
449 error!("Failed to parse commit: {:?}", e);
450 return (
451 StatusCode::INTERNAL_SERVER_ERROR,
452 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
453 )
454 .into_response();
455 }
456 };
457
458 let mst_root = commit.data;
459 let store = Arc::new(state.block_store.clone());
460 let mst = Mst::load(store.clone(), mst_root, None);
461
462 let collection_nsid = match input.collection.parse::<Nsid>() {
463 Ok(n) => n,
464 Err(_) => {
465 return (
466 StatusCode::BAD_REQUEST,
467 Json(json!({"error": "InvalidCollection"})),
468 )
469 .into_response();
470 }
471 };
472
473 let rkey = input.rkey.clone();
474
475 let mut record_bytes = Vec::new();
476 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
477 error!("Error serializing record: {:?}", e);
478 return (
479 StatusCode::BAD_REQUEST,
480 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
481 )
482 .into_response();
483 }
484
485 let record_cid = match state.block_store.put(&record_bytes).await {
486 Ok(c) => c,
487 Err(e) => {
488 error!("Failed to save record block: {:?}", e);
489 return (
490 StatusCode::INTERNAL_SERVER_ERROR,
491 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
492 )
493 .into_response();
494 }
495 };
496
497 let key = format!("{}/{}", collection_nsid, rkey);
498
499 let existing = match mst.get(&key).await {
500 Ok(v) => v,
501 Err(e) => {
502 error!("Failed to check MST key: {:?}", e);
503 return (
504 StatusCode::INTERNAL_SERVER_ERROR,
505 Json(
506 json!({"error": "InternalError", "message": "Failed to check existing record"}),
507 ),
508 )
509 .into_response();
510 }
511 };
512
513 if let Some(swap_record_str) = &input.swap_record {
514 let swap_record_cid = match Cid::from_str(swap_record_str) {
515 Ok(c) => c,
516 Err(_) => {
517 return (
518 StatusCode::BAD_REQUEST,
519 Json(
520 json!({"error": "InvalidSwapRecord", "message": "Invalid swapRecord CID"}),
521 ),
522 )
523 .into_response();
524 }
525 };
526 match &existing {
527 Some(current_cid) if *current_cid != swap_record_cid => {
528 return (
529 StatusCode::CONFLICT,
530 Json(json!({"error": "InvalidSwap", "message": "Record has been modified"})),
531 )
532 .into_response();
533 }
534 None => {
535 return (
536 StatusCode::CONFLICT,
537 Json(json!({"error": "InvalidSwap", "message": "Record does not exist"})),
538 )
539 .into_response();
540 }
541 _ => {}
542 }
543 }
544
545 let new_mst = if existing.is_some() {
546 match mst.update(&key, record_cid).await {
547 Ok(m) => m,
548 Err(e) => {
549 error!("Failed to update MST: {:?}", e);
550 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response();
551 }
552 }
553 } else {
554 match mst.add(&key, record_cid).await {
555 Ok(m) => m,
556 Err(e) => {
557 error!("Failed to add to MST: {:?}", e);
558 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to add to MST: {:?}", e)}))).into_response();
559 }
560 }
561 };
562
563 let new_mst_root = match new_mst.persist().await {
564 Ok(c) => c,
565 Err(e) => {
566 error!("Failed to persist MST: {:?}", e);
567 return (
568 StatusCode::INTERNAL_SERVER_ERROR,
569 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
570 )
571 .into_response();
572 }
573 };
574
575 let did_obj = match Did::new(&did) {
576 Ok(d) => d,
577 Err(_) => {
578 return (
579 StatusCode::INTERNAL_SERVER_ERROR,
580 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
581 )
582 .into_response();
583 }
584 };
585
586 let rev = Tid::now(LimitedU32::MIN);
587
588 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid));
589
590 let new_commit_bytes = match new_commit.to_cbor() {
591 Ok(b) => b,
592 Err(e) => {
593 error!("Failed to serialize new commit: {:?}", e);
594 return (
595 StatusCode::INTERNAL_SERVER_ERROR,
596 Json(
597 json!({"error": "InternalError", "message": "Failed to serialize new commit"}),
598 ),
599 )
600 .into_response();
601 }
602 };
603
604 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
605 Ok(c) => c,
606 Err(e) => {
607 error!("Failed to save new commit: {:?}", e);
608 return (
609 StatusCode::INTERNAL_SERVER_ERROR,
610 Json(json!({"error": "InternalError", "message": "Failed to save new commit"})),
611 )
612 .into_response();
613 }
614 };
615
616 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
617 .bind(new_root_cid.to_string())
618 .bind(user_id)
619 .execute(&state.db)
620 .await;
621
622 if let Err(e) = update_repo {
623 error!("Failed to update repo root in DB: {:?}", e);
624 return (
625 StatusCode::INTERNAL_SERVER_ERROR,
626 Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"})),
627 )
628 .into_response();
629 }
630
631 let record_insert = sqlx::query(
632 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
633 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
634 )
635 .bind(user_id)
636 .bind(&input.collection)
637 .bind(&rkey)
638 .bind(record_cid.to_string())
639 .execute(&state.db)
640 .await;
641
642 if let Err(e) = record_insert {
643 error!("Error inserting record index: {:?}", e);
644 return (
645 StatusCode::INTERNAL_SERVER_ERROR,
646 Json(json!({"error": "InternalError", "message": "Failed to index record"})),
647 )
648 .into_response();
649 }
650
651 let output = PutRecordOutput {
652 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
653 cid: record_cid.to_string(),
654 };
655 (StatusCode::OK, Json(output)).into_response()
656}