this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::State,
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use chrono::Utc;
9use cid::Cid;
10use jacquard::types::{
11 did::Did,
12 integer::LimitedU32,
13 string::{Nsid, Tid},
14};
15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use std::str::FromStr;
19use std::sync::Arc;
20use tracing::error;
21
22#[derive(Deserialize)]
23#[allow(dead_code)]
24pub struct CreateRecordInput {
25 pub repo: String,
26 pub collection: String,
27 pub rkey: Option<String>,
28 pub validate: Option<bool>,
29 pub record: serde_json::Value,
30 #[serde(rename = "swapCommit")]
31 pub swap_commit: Option<String>,
32}
33
34#[derive(Serialize)]
35#[serde(rename_all = "camelCase")]
36pub struct CreateRecordOutput {
37 pub uri: String,
38 pub cid: String,
39}
40
41pub async fn create_record(
42 State(state): State<AppState>,
43 headers: axum::http::HeaderMap,
44 Json(input): Json<CreateRecordInput>,
45) -> Response {
46 let auth_header = headers.get("Authorization");
47 if auth_header.is_none() {
48 return (
49 StatusCode::UNAUTHORIZED,
50 Json(json!({"error": "AuthenticationRequired"})),
51 )
52 .into_response();
53 }
54 let token = auth_header
55 .unwrap()
56 .to_str()
57 .unwrap_or("")
58 .replace("Bearer ", "");
59
60 let session = sqlx::query!(
61 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1",
62 token
63 )
64 .fetch_optional(&state.db)
65 .await
66 .unwrap_or(None);
67
68 let (did, key_bytes) = match session {
69 Some(row) => (
70 row.did,
71 row.key_bytes,
72 ),
73 None => {
74 return (
75 StatusCode::UNAUTHORIZED,
76 Json(json!({"error": "AuthenticationFailed"})),
77 )
78 .into_response();
79 }
80 };
81
82 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
83 return (
84 StatusCode::UNAUTHORIZED,
85 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
86 )
87 .into_response();
88 }
89
90 if input.repo != did {
91 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
92 }
93
94 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
95 .fetch_optional(&state.db)
96 .await;
97
98 let user_id: uuid::Uuid = match user_query {
99 Ok(Some(row)) => row.id,
100 _ => {
101 return (
102 StatusCode::INTERNAL_SERVER_ERROR,
103 Json(json!({"error": "InternalError", "message": "User not found"})),
104 )
105 .into_response();
106 }
107 };
108
109 let repo_root_query = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
110 .fetch_optional(&state.db)
111 .await;
112
113 let current_root_cid = match repo_root_query {
114 Ok(Some(row)) => {
115 let cid_str: String = row.repo_root_cid;
116 Cid::from_str(&cid_str).ok()
117 }
118 _ => None,
119 };
120
121 if current_root_cid.is_none() {
122 error!("Repo root not found for user {}", did);
123 return (
124 StatusCode::INTERNAL_SERVER_ERROR,
125 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
126 )
127 .into_response();
128 }
129 let current_root_cid = current_root_cid.unwrap();
130
131 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
132 Ok(Some(b)) => b,
133 Ok(None) => {
134 error!("Commit block not found: {}", current_root_cid);
135 return (
136 StatusCode::INTERNAL_SERVER_ERROR,
137 Json(json!({"error": "InternalError"})),
138 )
139 .into_response();
140 }
141 Err(e) => {
142 error!("Failed to load commit block: {:?}", e);
143 return (
144 StatusCode::INTERNAL_SERVER_ERROR,
145 Json(json!({"error": "InternalError"})),
146 )
147 .into_response();
148 }
149 };
150
151 let commit = match Commit::from_cbor(&commit_bytes) {
152 Ok(c) => c,
153 Err(e) => {
154 error!("Failed to parse commit: {:?}", e);
155 return (
156 StatusCode::INTERNAL_SERVER_ERROR,
157 Json(json!({"error": "InternalError"})),
158 )
159 .into_response();
160 }
161 };
162
163 let mst_root = commit.data;
164 let store = Arc::new(state.block_store.clone());
165 let mst = Mst::load(store.clone(), mst_root, None);
166
167 let collection_nsid = match input.collection.parse::<Nsid>() {
168 Ok(n) => n,
169 Err(_) => {
170 return (
171 StatusCode::BAD_REQUEST,
172 Json(json!({"error": "InvalidCollection"})),
173 )
174 .into_response();
175 }
176 };
177
178 let rkey = input
179 .rkey
180 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string());
181
182 if input.validate.unwrap_or(true) {
183 if input.collection == "app.bsky.feed.post" {
184 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() {
185 return (
186 StatusCode::BAD_REQUEST,
187 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})),
188 )
189 .into_response();
190 }
191 }
192 }
193
194 let mut record_bytes = Vec::new();
195 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
196 error!("Error serializing record: {:?}", e);
197 return (
198 StatusCode::BAD_REQUEST,
199 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
200 )
201 .into_response();
202 }
203
204 let record_cid = match state.block_store.put(&record_bytes).await {
205 Ok(c) => c,
206 Err(e) => {
207 error!("Failed to save record block: {:?}", e);
208 return (
209 StatusCode::INTERNAL_SERVER_ERROR,
210 Json(json!({"error": "InternalError"})),
211 )
212 .into_response();
213 }
214 };
215
216 let key = format!("{}/{}", collection_nsid, rkey);
217 let new_mst = match mst.add(&key, record_cid).await {
218 Ok(m) => m,
219 Err(e) => {
220 error!("Failed to add to MST: {:?}", e);
221 return (
222 StatusCode::INTERNAL_SERVER_ERROR,
223 Json(json!({"error": "InternalError"})),
224 )
225 .into_response();
226 }
227 };
228
229 let new_mst_root = match new_mst.persist().await {
230 Ok(c) => c,
231 Err(e) => {
232 error!("Failed to persist MST: {:?}", e);
233 return (
234 StatusCode::INTERNAL_SERVER_ERROR,
235 Json(json!({"error": "InternalError"})),
236 )
237 .into_response();
238 }
239 };
240
241 let did_obj = match Did::new(&did) {
242 Ok(d) => d,
243 Err(_) => {
244 return (
245 StatusCode::INTERNAL_SERVER_ERROR,
246 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
247 )
248 .into_response();
249 }
250 };
251
252 let rev = Tid::now(LimitedU32::MIN);
253
254 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid));
255
256 let new_commit_bytes = match new_commit.to_cbor() {
257 Ok(b) => b,
258 Err(e) => {
259 error!("Failed to serialize new commit: {:?}", e);
260 return (
261 StatusCode::INTERNAL_SERVER_ERROR,
262 Json(json!({"error": "InternalError"})),
263 )
264 .into_response();
265 }
266 };
267
268 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
269 Ok(c) => c,
270 Err(e) => {
271 error!("Failed to save new commit: {:?}", e);
272 return (
273 StatusCode::INTERNAL_SERVER_ERROR,
274 Json(json!({"error": "InternalError"})),
275 )
276 .into_response();
277 }
278 };
279
280 let update_repo = sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id)
281 .execute(&state.db)
282 .await;
283
284 if let Err(e) = update_repo {
285 error!("Failed to update repo root in DB: {:?}", e);
286 return (
287 StatusCode::INTERNAL_SERVER_ERROR,
288 Json(json!({"error": "InternalError"})),
289 )
290 .into_response();
291 }
292
293 let record_insert = sqlx::query!(
294 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
295 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
296 user_id,
297 input.collection,
298 rkey,
299 record_cid.to_string()
300 )
301 .execute(&state.db)
302 .await;
303
304 if let Err(e) = record_insert {
305 error!("Error inserting record index: {:?}", e);
306 return (
307 StatusCode::INTERNAL_SERVER_ERROR,
308 Json(json!({"error": "InternalError", "message": "Failed to index record"})),
309 )
310 .into_response();
311 }
312
313 let output = CreateRecordOutput {
314 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
315 cid: record_cid.to_string(),
316 };
317 (StatusCode::OK, Json(output)).into_response()
318}
319
320#[derive(Deserialize)]
321#[allow(dead_code)]
322pub struct PutRecordInput {
323 pub repo: String,
324 pub collection: String,
325 pub rkey: String,
326 pub validate: Option<bool>,
327 pub record: serde_json::Value,
328 #[serde(rename = "swapCommit")]
329 pub swap_commit: Option<String>,
330 #[serde(rename = "swapRecord")]
331 pub swap_record: Option<String>,
332}
333
334#[derive(Serialize)]
335#[serde(rename_all = "camelCase")]
336pub struct PutRecordOutput {
337 pub uri: String,
338 pub cid: String,
339}
340
341pub async fn put_record(
342 State(state): State<AppState>,
343 headers: axum::http::HeaderMap,
344 Json(input): Json<PutRecordInput>,
345) -> Response {
346 let auth_header = headers.get("Authorization");
347 if auth_header.is_none() {
348 return (
349 StatusCode::UNAUTHORIZED,
350 Json(json!({"error": "AuthenticationRequired"})),
351 )
352 .into_response();
353 }
354 let token = auth_header
355 .unwrap()
356 .to_str()
357 .unwrap_or("")
358 .replace("Bearer ", "");
359
360 let session = sqlx::query!(
361 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1",
362 token
363 )
364 .fetch_optional(&state.db)
365 .await
366 .unwrap_or(None);
367
368 let (did, key_bytes) = match session {
369 Some(row) => (
370 row.did,
371 row.key_bytes,
372 ),
373 None => {
374 return (
375 StatusCode::UNAUTHORIZED,
376 Json(json!({"error": "AuthenticationFailed"})),
377 )
378 .into_response();
379 }
380 };
381
382 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
383 return (
384 StatusCode::UNAUTHORIZED,
385 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
386 )
387 .into_response();
388 }
389
390 if input.repo != did {
391 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
392 }
393
394 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
395 .fetch_optional(&state.db)
396 .await;
397
398 let user_id: uuid::Uuid = match user_query {
399 Ok(Some(row)) => row.id,
400 _ => {
401 return (
402 StatusCode::INTERNAL_SERVER_ERROR,
403 Json(json!({"error": "InternalError", "message": "User not found"})),
404 )
405 .into_response();
406 }
407 };
408
409 let repo_root_query = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
410 .fetch_optional(&state.db)
411 .await;
412
413 let current_root_cid = match repo_root_query {
414 Ok(Some(row)) => {
415 let cid_str: String = row.repo_root_cid;
416 Cid::from_str(&cid_str).ok()
417 }
418 _ => None,
419 };
420
421 if current_root_cid.is_none() {
422 error!("Repo root not found for user {}", did);
423 return (
424 StatusCode::INTERNAL_SERVER_ERROR,
425 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
426 )
427 .into_response();
428 }
429 let current_root_cid = current_root_cid.unwrap();
430
431 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
432 Ok(Some(b)) => b,
433 Ok(None) => {
434 error!("Commit block not found: {}", current_root_cid);
435 return (
436 StatusCode::INTERNAL_SERVER_ERROR,
437 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
438 )
439 .into_response();
440 }
441 Err(e) => {
442 error!("Failed to load commit block: {:?}", e);
443 return (
444 StatusCode::INTERNAL_SERVER_ERROR,
445 Json(json!({"error": "InternalError", "message": "Failed to load commit block"})),
446 )
447 .into_response();
448 }
449 };
450
451 let commit = match Commit::from_cbor(&commit_bytes) {
452 Ok(c) => c,
453 Err(e) => {
454 error!("Failed to parse commit: {:?}", e);
455 return (
456 StatusCode::INTERNAL_SERVER_ERROR,
457 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
458 )
459 .into_response();
460 }
461 };
462
463 let mst_root = commit.data;
464 let store = Arc::new(state.block_store.clone());
465 let mst = Mst::load(store.clone(), mst_root, None);
466
467 let collection_nsid = match input.collection.parse::<Nsid>() {
468 Ok(n) => n,
469 Err(_) => {
470 return (
471 StatusCode::BAD_REQUEST,
472 Json(json!({"error": "InvalidCollection"})),
473 )
474 .into_response();
475 }
476 };
477
478 let rkey = input.rkey.clone();
479
480 if input.validate.unwrap_or(true) {
481 if input.collection == "app.bsky.feed.post" {
482 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() {
483 return (
484 StatusCode::BAD_REQUEST,
485 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})),
486 )
487 .into_response();
488 }
489 }
490 }
491
492 let mut record_bytes = Vec::new();
493 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
494 error!("Error serializing record: {:?}", e);
495 return (
496 StatusCode::BAD_REQUEST,
497 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
498 )
499 .into_response();
500 }
501
502 let record_cid = match state.block_store.put(&record_bytes).await {
503 Ok(c) => c,
504 Err(e) => {
505 error!("Failed to save record block: {:?}", e);
506 return (
507 StatusCode::INTERNAL_SERVER_ERROR,
508 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
509 )
510 .into_response();
511 }
512 };
513
514 let key = format!("{}/{}", collection_nsid, rkey);
515
516 let existing = match mst.get(&key).await {
517 Ok(v) => v,
518 Err(e) => {
519 error!("Failed to check MST key: {:?}", e);
520 return (
521 StatusCode::INTERNAL_SERVER_ERROR,
522 Json(
523 json!({"error": "InternalError", "message": "Failed to check existing record"}),
524 ),
525 )
526 .into_response();
527 }
528 };
529
530 if let Some(swap_record_str) = &input.swap_record {
531 let swap_record_cid = match Cid::from_str(swap_record_str) {
532 Ok(c) => c,
533 Err(_) => {
534 return (
535 StatusCode::BAD_REQUEST,
536 Json(
537 json!({"error": "InvalidSwapRecord", "message": "Invalid swapRecord CID"}),
538 ),
539 )
540 .into_response();
541 }
542 };
543 match &existing {
544 Some(current_cid) if *current_cid != swap_record_cid => {
545 return (
546 StatusCode::CONFLICT,
547 Json(json!({"error": "InvalidSwap", "message": "Record has been modified"})),
548 )
549 .into_response();
550 }
551 None => {
552 return (
553 StatusCode::CONFLICT,
554 Json(json!({"error": "InvalidSwap", "message": "Record does not exist"})),
555 )
556 .into_response();
557 }
558 _ => {}
559 }
560 }
561
562 let new_mst = if existing.is_some() {
563 match mst.update(&key, record_cid).await {
564 Ok(m) => m,
565 Err(e) => {
566 error!("Failed to update MST: {:?}", e);
567 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response();
568 }
569 }
570 } else {
571 match mst.add(&key, record_cid).await {
572 Ok(m) => m,
573 Err(e) => {
574 error!("Failed to add to MST: {:?}", e);
575 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to add to MST: {:?}", e)}))).into_response();
576 }
577 }
578 };
579
580 let new_mst_root = match new_mst.persist().await {
581 Ok(c) => c,
582 Err(e) => {
583 error!("Failed to persist MST: {:?}", e);
584 return (
585 StatusCode::INTERNAL_SERVER_ERROR,
586 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
587 )
588 .into_response();
589 }
590 };
591
592 let did_obj = match Did::new(&did) {
593 Ok(d) => d,
594 Err(_) => {
595 return (
596 StatusCode::INTERNAL_SERVER_ERROR,
597 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
598 )
599 .into_response();
600 }
601 };
602
603 let rev = Tid::now(LimitedU32::MIN);
604
605 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid));
606
607 let new_commit_bytes = match new_commit.to_cbor() {
608 Ok(b) => b,
609 Err(e) => {
610 error!("Failed to serialize new commit: {:?}", e);
611 return (
612 StatusCode::INTERNAL_SERVER_ERROR,
613 Json(
614 json!({"error": "InternalError", "message": "Failed to serialize new commit"}),
615 ),
616 )
617 .into_response();
618 }
619 };
620
621 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
622 Ok(c) => c,
623 Err(e) => {
624 error!("Failed to save new commit: {:?}", e);
625 return (
626 StatusCode::INTERNAL_SERVER_ERROR,
627 Json(json!({"error": "InternalError", "message": "Failed to save new commit"})),
628 )
629 .into_response();
630 }
631 };
632
633 let update_repo = sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id)
634 .execute(&state.db)
635 .await;
636
637 if let Err(e) = update_repo {
638 error!("Failed to update repo root in DB: {:?}", e);
639 return (
640 StatusCode::INTERNAL_SERVER_ERROR,
641 Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"})),
642 )
643 .into_response();
644 }
645
646 let record_insert = sqlx::query!(
647 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
648 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
649 user_id,
650 input.collection,
651 rkey,
652 record_cid.to_string()
653 )
654 .execute(&state.db)
655 .await;
656
657 if let Err(e) = record_insert {
658 error!("Error inserting record index: {:?}", e);
659 return (
660 StatusCode::INTERNAL_SERVER_ERROR,
661 Json(json!({"error": "InternalError", "message": "Failed to index record"})),
662 )
663 .into_response();
664 }
665
666 let output = PutRecordOutput {
667 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
668 cid: record_cid.to_string(),
669 };
670 (StatusCode::OK, Json(output)).into_response()
671}