this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::State,
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use chrono::Utc;
9use cid::Cid;
10use jacquard::types::{
11 did::Did,
12 integer::LimitedU32,
13 string::{Nsid, Tid},
14};
15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use sqlx::Row;
19use std::str::FromStr;
20use std::sync::Arc;
21use tracing::error;
22
23#[derive(Deserialize)]
24#[allow(dead_code)]
25pub struct CreateRecordInput {
26 pub repo: String,
27 pub collection: String,
28 pub rkey: Option<String>,
29 pub validate: Option<bool>,
30 pub record: serde_json::Value,
31 #[serde(rename = "swapCommit")]
32 pub swap_commit: Option<String>,
33}
34
35#[derive(Serialize)]
36#[serde(rename_all = "camelCase")]
37pub struct CreateRecordOutput {
38 pub uri: String,
39 pub cid: String,
40}
41
42pub async fn create_record(
43 State(state): State<AppState>,
44 headers: axum::http::HeaderMap,
45 Json(input): Json<CreateRecordInput>,
46) -> Response {
47 let auth_header = headers.get("Authorization");
48 if auth_header.is_none() {
49 return (
50 StatusCode::UNAUTHORIZED,
51 Json(json!({"error": "AuthenticationRequired"})),
52 )
53 .into_response();
54 }
55 let token = auth_header
56 .unwrap()
57 .to_str()
58 .unwrap_or("")
59 .replace("Bearer ", "");
60
61 let session = sqlx::query(
62 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
63 )
64 .bind(&token)
65 .fetch_optional(&state.db)
66 .await
67 .unwrap_or(None);
68
69 let (did, key_bytes) = match session {
70 Some(row) => (
71 row.get::<String, _>("did"),
72 row.get::<Vec<u8>, _>("key_bytes"),
73 ),
74 None => {
75 return (
76 StatusCode::UNAUTHORIZED,
77 Json(json!({"error": "AuthenticationFailed"})),
78 )
79 .into_response();
80 }
81 };
82
83 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
84 return (
85 StatusCode::UNAUTHORIZED,
86 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
87 )
88 .into_response();
89 }
90
91 if input.repo != did {
92 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
93 }
94
95 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
96 .bind(&did)
97 .fetch_optional(&state.db)
98 .await;
99
100 let user_id: uuid::Uuid = match user_query {
101 Ok(Some(row)) => row.get("id"),
102 _ => {
103 return (
104 StatusCode::INTERNAL_SERVER_ERROR,
105 Json(json!({"error": "InternalError", "message": "User not found"})),
106 )
107 .into_response();
108 }
109 };
110
111 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
112 .bind(user_id)
113 .fetch_optional(&state.db)
114 .await;
115
116 let current_root_cid = match repo_root_query {
117 Ok(Some(row)) => {
118 let cid_str: String = row.get("repo_root_cid");
119 Cid::from_str(&cid_str).ok()
120 }
121 _ => None,
122 };
123
124 if current_root_cid.is_none() {
125 error!("Repo root not found for user {}", did);
126 return (
127 StatusCode::INTERNAL_SERVER_ERROR,
128 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
129 )
130 .into_response();
131 }
132 let current_root_cid = current_root_cid.unwrap();
133
134 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
135 Ok(Some(b)) => b,
136 Ok(None) => {
137 error!("Commit block not found: {}", current_root_cid);
138 return (
139 StatusCode::INTERNAL_SERVER_ERROR,
140 Json(json!({"error": "InternalError"})),
141 )
142 .into_response();
143 }
144 Err(e) => {
145 error!("Failed to load commit block: {:?}", e);
146 return (
147 StatusCode::INTERNAL_SERVER_ERROR,
148 Json(json!({"error": "InternalError"})),
149 )
150 .into_response();
151 }
152 };
153
154 let commit = match Commit::from_cbor(&commit_bytes) {
155 Ok(c) => c,
156 Err(e) => {
157 error!("Failed to parse commit: {:?}", e);
158 return (
159 StatusCode::INTERNAL_SERVER_ERROR,
160 Json(json!({"error": "InternalError"})),
161 )
162 .into_response();
163 }
164 };
165
166 let mst_root = commit.data;
167 let store = Arc::new(state.block_store.clone());
168 let mst = Mst::load(store.clone(), mst_root, None);
169
170 let collection_nsid = match input.collection.parse::<Nsid>() {
171 Ok(n) => n,
172 Err(_) => {
173 return (
174 StatusCode::BAD_REQUEST,
175 Json(json!({"error": "InvalidCollection"})),
176 )
177 .into_response();
178 }
179 };
180
181 let rkey = input
182 .rkey
183 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string());
184
185 if input.validate.unwrap_or(true) {
186 if input.collection == "app.bsky.feed.post" {
187 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() {
188 return (
189 StatusCode::BAD_REQUEST,
190 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})),
191 )
192 .into_response();
193 }
194 }
195 }
196
197 let mut record_bytes = Vec::new();
198 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
199 error!("Error serializing record: {:?}", e);
200 return (
201 StatusCode::BAD_REQUEST,
202 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
203 )
204 .into_response();
205 }
206
207 let record_cid = match state.block_store.put(&record_bytes).await {
208 Ok(c) => c,
209 Err(e) => {
210 error!("Failed to save record block: {:?}", e);
211 return (
212 StatusCode::INTERNAL_SERVER_ERROR,
213 Json(json!({"error": "InternalError"})),
214 )
215 .into_response();
216 }
217 };
218
219 let key = format!("{}/{}", collection_nsid, rkey);
220 let new_mst = match mst.add(&key, record_cid).await {
221 Ok(m) => m,
222 Err(e) => {
223 error!("Failed to add to MST: {:?}", e);
224 return (
225 StatusCode::INTERNAL_SERVER_ERROR,
226 Json(json!({"error": "InternalError"})),
227 )
228 .into_response();
229 }
230 };
231
232 let new_mst_root = match new_mst.persist().await {
233 Ok(c) => c,
234 Err(e) => {
235 error!("Failed to persist MST: {:?}", e);
236 return (
237 StatusCode::INTERNAL_SERVER_ERROR,
238 Json(json!({"error": "InternalError"})),
239 )
240 .into_response();
241 }
242 };
243
244 let did_obj = match Did::new(&did) {
245 Ok(d) => d,
246 Err(_) => {
247 return (
248 StatusCode::INTERNAL_SERVER_ERROR,
249 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
250 )
251 .into_response();
252 }
253 };
254
255 let rev = Tid::now(LimitedU32::MIN);
256
257 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid));
258
259 let new_commit_bytes = match new_commit.to_cbor() {
260 Ok(b) => b,
261 Err(e) => {
262 error!("Failed to serialize new commit: {:?}", e);
263 return (
264 StatusCode::INTERNAL_SERVER_ERROR,
265 Json(json!({"error": "InternalError"})),
266 )
267 .into_response();
268 }
269 };
270
271 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
272 Ok(c) => c,
273 Err(e) => {
274 error!("Failed to save new commit: {:?}", e);
275 return (
276 StatusCode::INTERNAL_SERVER_ERROR,
277 Json(json!({"error": "InternalError"})),
278 )
279 .into_response();
280 }
281 };
282
283 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
284 .bind(new_root_cid.to_string())
285 .bind(user_id)
286 .execute(&state.db)
287 .await;
288
289 if let Err(e) = update_repo {
290 error!("Failed to update repo root in DB: {:?}", e);
291 return (
292 StatusCode::INTERNAL_SERVER_ERROR,
293 Json(json!({"error": "InternalError"})),
294 )
295 .into_response();
296 }
297
298 let record_insert = sqlx::query(
299 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
300 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
301 )
302 .bind(user_id)
303 .bind(&input.collection)
304 .bind(&rkey)
305 .bind(record_cid.to_string())
306 .execute(&state.db)
307 .await;
308
309 if let Err(e) = record_insert {
310 error!("Error inserting record index: {:?}", e);
311 return (
312 StatusCode::INTERNAL_SERVER_ERROR,
313 Json(json!({"error": "InternalError", "message": "Failed to index record"})),
314 )
315 .into_response();
316 }
317
318 let output = CreateRecordOutput {
319 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
320 cid: record_cid.to_string(),
321 };
322 (StatusCode::OK, Json(output)).into_response()
323}
324
325#[derive(Deserialize)]
326#[allow(dead_code)]
327pub struct PutRecordInput {
328 pub repo: String,
329 pub collection: String,
330 pub rkey: String,
331 pub validate: Option<bool>,
332 pub record: serde_json::Value,
333 #[serde(rename = "swapCommit")]
334 pub swap_commit: Option<String>,
335 #[serde(rename = "swapRecord")]
336 pub swap_record: Option<String>,
337}
338
339#[derive(Serialize)]
340#[serde(rename_all = "camelCase")]
341pub struct PutRecordOutput {
342 pub uri: String,
343 pub cid: String,
344}
345
346pub async fn put_record(
347 State(state): State<AppState>,
348 headers: axum::http::HeaderMap,
349 Json(input): Json<PutRecordInput>,
350) -> Response {
351 let auth_header = headers.get("Authorization");
352 if auth_header.is_none() {
353 return (
354 StatusCode::UNAUTHORIZED,
355 Json(json!({"error": "AuthenticationRequired"})),
356 )
357 .into_response();
358 }
359 let token = auth_header
360 .unwrap()
361 .to_str()
362 .unwrap_or("")
363 .replace("Bearer ", "");
364
365 let session = sqlx::query(
366 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
367 )
368 .bind(&token)
369 .fetch_optional(&state.db)
370 .await
371 .unwrap_or(None);
372
373 let (did, key_bytes) = match session {
374 Some(row) => (
375 row.get::<String, _>("did"),
376 row.get::<Vec<u8>, _>("key_bytes"),
377 ),
378 None => {
379 return (
380 StatusCode::UNAUTHORIZED,
381 Json(json!({"error": "AuthenticationFailed"})),
382 )
383 .into_response();
384 }
385 };
386
387 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
388 return (
389 StatusCode::UNAUTHORIZED,
390 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
391 )
392 .into_response();
393 }
394
395 if input.repo != did {
396 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
397 }
398
399 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
400 .bind(&did)
401 .fetch_optional(&state.db)
402 .await;
403
404 let user_id: uuid::Uuid = match user_query {
405 Ok(Some(row)) => row.get("id"),
406 _ => {
407 return (
408 StatusCode::INTERNAL_SERVER_ERROR,
409 Json(json!({"error": "InternalError", "message": "User not found"})),
410 )
411 .into_response();
412 }
413 };
414
415 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
416 .bind(user_id)
417 .fetch_optional(&state.db)
418 .await;
419
420 let current_root_cid = match repo_root_query {
421 Ok(Some(row)) => {
422 let cid_str: String = row.get("repo_root_cid");
423 Cid::from_str(&cid_str).ok()
424 }
425 _ => None,
426 };
427
428 if current_root_cid.is_none() {
429 error!("Repo root not found for user {}", did);
430 return (
431 StatusCode::INTERNAL_SERVER_ERROR,
432 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
433 )
434 .into_response();
435 }
436 let current_root_cid = current_root_cid.unwrap();
437
438 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
439 Ok(Some(b)) => b,
440 Ok(None) => {
441 error!("Commit block not found: {}", current_root_cid);
442 return (
443 StatusCode::INTERNAL_SERVER_ERROR,
444 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
445 )
446 .into_response();
447 }
448 Err(e) => {
449 error!("Failed to load commit block: {:?}", e);
450 return (
451 StatusCode::INTERNAL_SERVER_ERROR,
452 Json(json!({"error": "InternalError", "message": "Failed to load commit block"})),
453 )
454 .into_response();
455 }
456 };
457
458 let commit = match Commit::from_cbor(&commit_bytes) {
459 Ok(c) => c,
460 Err(e) => {
461 error!("Failed to parse commit: {:?}", e);
462 return (
463 StatusCode::INTERNAL_SERVER_ERROR,
464 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
465 )
466 .into_response();
467 }
468 };
469
470 let mst_root = commit.data;
471 let store = Arc::new(state.block_store.clone());
472 let mst = Mst::load(store.clone(), mst_root, None);
473
474 let collection_nsid = match input.collection.parse::<Nsid>() {
475 Ok(n) => n,
476 Err(_) => {
477 return (
478 StatusCode::BAD_REQUEST,
479 Json(json!({"error": "InvalidCollection"})),
480 )
481 .into_response();
482 }
483 };
484
485 let rkey = input.rkey.clone();
486
487 if input.validate.unwrap_or(true) {
488 if input.collection == "app.bsky.feed.post" {
489 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() {
490 return (
491 StatusCode::BAD_REQUEST,
492 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})),
493 )
494 .into_response();
495 }
496 }
497 }
498
499 let mut record_bytes = Vec::new();
500 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
501 error!("Error serializing record: {:?}", e);
502 return (
503 StatusCode::BAD_REQUEST,
504 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
505 )
506 .into_response();
507 }
508
509 let record_cid = match state.block_store.put(&record_bytes).await {
510 Ok(c) => c,
511 Err(e) => {
512 error!("Failed to save record block: {:?}", e);
513 return (
514 StatusCode::INTERNAL_SERVER_ERROR,
515 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
516 )
517 .into_response();
518 }
519 };
520
521 let key = format!("{}/{}", collection_nsid, rkey);
522
523 let existing = match mst.get(&key).await {
524 Ok(v) => v,
525 Err(e) => {
526 error!("Failed to check MST key: {:?}", e);
527 return (
528 StatusCode::INTERNAL_SERVER_ERROR,
529 Json(
530 json!({"error": "InternalError", "message": "Failed to check existing record"}),
531 ),
532 )
533 .into_response();
534 }
535 };
536
537 if let Some(swap_record_str) = &input.swap_record {
538 let swap_record_cid = match Cid::from_str(swap_record_str) {
539 Ok(c) => c,
540 Err(_) => {
541 return (
542 StatusCode::BAD_REQUEST,
543 Json(
544 json!({"error": "InvalidSwapRecord", "message": "Invalid swapRecord CID"}),
545 ),
546 )
547 .into_response();
548 }
549 };
550 match &existing {
551 Some(current_cid) if *current_cid != swap_record_cid => {
552 return (
553 StatusCode::CONFLICT,
554 Json(json!({"error": "InvalidSwap", "message": "Record has been modified"})),
555 )
556 .into_response();
557 }
558 None => {
559 return (
560 StatusCode::CONFLICT,
561 Json(json!({"error": "InvalidSwap", "message": "Record does not exist"})),
562 )
563 .into_response();
564 }
565 _ => {}
566 }
567 }
568
569 let new_mst = if existing.is_some() {
570 match mst.update(&key, record_cid).await {
571 Ok(m) => m,
572 Err(e) => {
573 error!("Failed to update MST: {:?}", e);
574 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response();
575 }
576 }
577 } else {
578 match mst.add(&key, record_cid).await {
579 Ok(m) => m,
580 Err(e) => {
581 error!("Failed to add to MST: {:?}", e);
582 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to add to MST: {:?}", e)}))).into_response();
583 }
584 }
585 };
586
587 let new_mst_root = match new_mst.persist().await {
588 Ok(c) => c,
589 Err(e) => {
590 error!("Failed to persist MST: {:?}", e);
591 return (
592 StatusCode::INTERNAL_SERVER_ERROR,
593 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
594 )
595 .into_response();
596 }
597 };
598
599 let did_obj = match Did::new(&did) {
600 Ok(d) => d,
601 Err(_) => {
602 return (
603 StatusCode::INTERNAL_SERVER_ERROR,
604 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
605 )
606 .into_response();
607 }
608 };
609
610 let rev = Tid::now(LimitedU32::MIN);
611
612 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid));
613
614 let new_commit_bytes = match new_commit.to_cbor() {
615 Ok(b) => b,
616 Err(e) => {
617 error!("Failed to serialize new commit: {:?}", e);
618 return (
619 StatusCode::INTERNAL_SERVER_ERROR,
620 Json(
621 json!({"error": "InternalError", "message": "Failed to serialize new commit"}),
622 ),
623 )
624 .into_response();
625 }
626 };
627
628 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
629 Ok(c) => c,
630 Err(e) => {
631 error!("Failed to save new commit: {:?}", e);
632 return (
633 StatusCode::INTERNAL_SERVER_ERROR,
634 Json(json!({"error": "InternalError", "message": "Failed to save new commit"})),
635 )
636 .into_response();
637 }
638 };
639
640 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
641 .bind(new_root_cid.to_string())
642 .bind(user_id)
643 .execute(&state.db)
644 .await;
645
646 if let Err(e) = update_repo {
647 error!("Failed to update repo root in DB: {:?}", e);
648 return (
649 StatusCode::INTERNAL_SERVER_ERROR,
650 Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"})),
651 )
652 .into_response();
653 }
654
655 let record_insert = sqlx::query(
656 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
657 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
658 )
659 .bind(user_id)
660 .bind(&input.collection)
661 .bind(&rkey)
662 .bind(record_cid.to_string())
663 .execute(&state.db)
664 .await;
665
666 if let Err(e) = record_insert {
667 error!("Error inserting record index: {:?}", e);
668 return (
669 StatusCode::INTERNAL_SERVER_ERROR,
670 Json(json!({"error": "InternalError", "message": "Failed to index record"})),
671 )
672 .into_response();
673 }
674
675 let output = PutRecordOutput {
676 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
677 cid: record_cid.to_string(),
678 };
679 (StatusCode::OK, Json(output)).into_response()
680}