this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::State,
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use chrono::Utc;
9use cid::Cid;
10use jacquard::types::{
11 did::Did,
12 integer::LimitedU32,
13 string::{Nsid, Tid},
14};
15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use sqlx::Row;
19use std::str::FromStr;
20use std::sync::Arc;
21use tracing::error;
22
23#[derive(Deserialize)]
24#[allow(dead_code)]
25pub struct CreateRecordInput {
26 pub repo: String,
27 pub collection: String,
28 pub rkey: Option<String>,
29 pub validate: Option<bool>,
30 pub record: serde_json::Value,
31 #[serde(rename = "swapCommit")]
32 pub swap_commit: Option<String>,
33}
34
35#[derive(Serialize)]
36#[serde(rename_all = "camelCase")]
37pub struct CreateRecordOutput {
38 pub uri: String,
39 pub cid: String,
40}
41
42pub async fn create_record(
43 State(state): State<AppState>,
44 headers: axum::http::HeaderMap,
45 Json(input): Json<CreateRecordInput>,
46) -> Response {
47 let auth_header = headers.get("Authorization");
48 if auth_header.is_none() {
49 return (
50 StatusCode::UNAUTHORIZED,
51 Json(json!({"error": "AuthenticationRequired"})),
52 )
53 .into_response();
54 }
55 let token = auth_header
56 .unwrap()
57 .to_str()
58 .unwrap_or("")
59 .replace("Bearer ", "");
60
61 let session = sqlx::query(
62 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
63 )
64 .bind(&token)
65 .fetch_optional(&state.db)
66 .await
67 .unwrap_or(None);
68
69 let (did, key_bytes) = match session {
70 Some(row) => (
71 row.get::<String, _>("did"),
72 row.get::<Vec<u8>, _>("key_bytes"),
73 ),
74 None => {
75 return (
76 StatusCode::UNAUTHORIZED,
77 Json(json!({"error": "AuthenticationFailed"})),
78 )
79 .into_response();
80 }
81 };
82
83 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
84 return (
85 StatusCode::UNAUTHORIZED,
86 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
87 )
88 .into_response();
89 }
90
91 if input.repo != did {
92 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
93 }
94
95 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
96 .bind(&did)
97 .fetch_optional(&state.db)
98 .await;
99
100 let user_id: uuid::Uuid = match user_query {
101 Ok(Some(row)) => row.get("id"),
102 _ => {
103 return (
104 StatusCode::INTERNAL_SERVER_ERROR,
105 Json(json!({"error": "InternalError", "message": "User not found"})),
106 )
107 .into_response();
108 }
109 };
110
111 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
112 .bind(user_id)
113 .fetch_optional(&state.db)
114 .await;
115
116 let current_root_cid = match repo_root_query {
117 Ok(Some(row)) => {
118 let cid_str: String = row.get("repo_root_cid");
119 Cid::from_str(&cid_str).ok()
120 }
121 _ => None,
122 };
123
124 if current_root_cid.is_none() {
125 error!("Repo root not found for user {}", did);
126 return (
127 StatusCode::INTERNAL_SERVER_ERROR,
128 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
129 )
130 .into_response();
131 }
132 let current_root_cid = current_root_cid.unwrap();
133
134 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
135 Ok(Some(b)) => b,
136 Ok(None) => {
137 error!("Commit block not found: {}", current_root_cid);
138 return (
139 StatusCode::INTERNAL_SERVER_ERROR,
140 Json(json!({"error": "InternalError"})),
141 )
142 .into_response();
143 }
144 Err(e) => {
145 error!("Failed to load commit block: {:?}", e);
146 return (
147 StatusCode::INTERNAL_SERVER_ERROR,
148 Json(json!({"error": "InternalError"})),
149 )
150 .into_response();
151 }
152 };
153
154 let commit = match Commit::from_cbor(&commit_bytes) {
155 Ok(c) => c,
156 Err(e) => {
157 error!("Failed to parse commit: {:?}", e);
158 return (
159 StatusCode::INTERNAL_SERVER_ERROR,
160 Json(json!({"error": "InternalError"})),
161 )
162 .into_response();
163 }
164 };
165
166 let mst_root = commit.data;
167 let store = Arc::new(state.block_store.clone());
168 let mst = Mst::load(store.clone(), mst_root, None);
169
170 let collection_nsid = match input.collection.parse::<Nsid>() {
171 Ok(n) => n,
172 Err(_) => {
173 return (
174 StatusCode::BAD_REQUEST,
175 Json(json!({"error": "InvalidCollection"})),
176 )
177 .into_response();
178 }
179 };
180
181 let rkey = input
182 .rkey
183 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string());
184
185 let mut record_bytes = Vec::new();
186 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
187 error!("Error serializing record: {:?}", e);
188 return (
189 StatusCode::BAD_REQUEST,
190 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
191 )
192 .into_response();
193 }
194
195 let record_cid = match state.block_store.put(&record_bytes).await {
196 Ok(c) => c,
197 Err(e) => {
198 error!("Failed to save record block: {:?}", e);
199 return (
200 StatusCode::INTERNAL_SERVER_ERROR,
201 Json(json!({"error": "InternalError"})),
202 )
203 .into_response();
204 }
205 };
206
207 let key = format!("{}/{}", collection_nsid, rkey);
208 if let Err(e) = mst.update(&key, record_cid).await {
209 error!("Failed to update MST: {:?}", e);
210 return (
211 StatusCode::INTERNAL_SERVER_ERROR,
212 Json(json!({"error": "InternalError"})),
213 )
214 .into_response();
215 }
216
217 let new_mst_root = match mst.root().await {
218 Ok(c) => c,
219 Err(e) => {
220 error!("Failed to get new MST root: {:?}", e);
221 return (
222 StatusCode::INTERNAL_SERVER_ERROR,
223 Json(json!({"error": "InternalError"})),
224 )
225 .into_response();
226 }
227 };
228
229 let did_obj = match Did::new(&did) {
230 Ok(d) => d,
231 Err(_) => {
232 return (
233 StatusCode::INTERNAL_SERVER_ERROR,
234 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
235 )
236 .into_response();
237 }
238 };
239
240 let rev = Tid::now(LimitedU32::MIN);
241
242 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid));
243
244 let new_commit_bytes = match new_commit.to_cbor() {
245 Ok(b) => b,
246 Err(e) => {
247 error!("Failed to serialize new commit: {:?}", e);
248 return (
249 StatusCode::INTERNAL_SERVER_ERROR,
250 Json(json!({"error": "InternalError"})),
251 )
252 .into_response();
253 }
254 };
255
256 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
257 Ok(c) => c,
258 Err(e) => {
259 error!("Failed to save new commit: {:?}", e);
260 return (
261 StatusCode::INTERNAL_SERVER_ERROR,
262 Json(json!({"error": "InternalError"})),
263 )
264 .into_response();
265 }
266 };
267
268 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
269 .bind(new_root_cid.to_string())
270 .bind(user_id)
271 .execute(&state.db)
272 .await;
273
274 if let Err(e) = update_repo {
275 error!("Failed to update repo root in DB: {:?}", e);
276 return (
277 StatusCode::INTERNAL_SERVER_ERROR,
278 Json(json!({"error": "InternalError"})),
279 )
280 .into_response();
281 }
282
283 let record_insert = sqlx::query(
284 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
285 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
286 )
287 .bind(user_id)
288 .bind(&input.collection)
289 .bind(&rkey)
290 .bind(record_cid.to_string())
291 .execute(&state.db)
292 .await;
293
294 if let Err(e) = record_insert {
295 error!("Error inserting record index: {:?}", e);
296 return (
297 StatusCode::INTERNAL_SERVER_ERROR,
298 Json(json!({"error": "InternalError", "message": "Failed to index record"})),
299 )
300 .into_response();
301 }
302
303 let output = CreateRecordOutput {
304 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
305 cid: record_cid.to_string(),
306 };
307 (StatusCode::OK, Json(output)).into_response()
308}
309
310#[derive(Deserialize)]
311#[allow(dead_code)]
312pub struct PutRecordInput {
313 pub repo: String,
314 pub collection: String,
315 pub rkey: String,
316 pub validate: Option<bool>,
317 pub record: serde_json::Value,
318 #[serde(rename = "swapCommit")]
319 pub swap_commit: Option<String>,
320}
321
322#[derive(Serialize)]
323#[serde(rename_all = "camelCase")]
324pub struct PutRecordOutput {
325 pub uri: String,
326 pub cid: String,
327}
328
329pub async fn put_record(
330 State(state): State<AppState>,
331 headers: axum::http::HeaderMap,
332 Json(input): Json<PutRecordInput>,
333) -> Response {
334 let auth_header = headers.get("Authorization");
335 if auth_header.is_none() {
336 return (
337 StatusCode::UNAUTHORIZED,
338 Json(json!({"error": "AuthenticationRequired"})),
339 )
340 .into_response();
341 }
342 let token = auth_header
343 .unwrap()
344 .to_str()
345 .unwrap_or("")
346 .replace("Bearer ", "");
347
348 let session = sqlx::query(
349 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
350 )
351 .bind(&token)
352 .fetch_optional(&state.db)
353 .await
354 .unwrap_or(None);
355
356 let (did, key_bytes) = match session {
357 Some(row) => (
358 row.get::<String, _>("did"),
359 row.get::<Vec<u8>, _>("key_bytes"),
360 ),
361 None => {
362 return (
363 StatusCode::UNAUTHORIZED,
364 Json(json!({"error": "AuthenticationFailed"})),
365 )
366 .into_response();
367 }
368 };
369
370 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
371 return (
372 StatusCode::UNAUTHORIZED,
373 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
374 )
375 .into_response();
376 }
377
378 if input.repo != did {
379 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
380 }
381
382 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
383 .bind(&did)
384 .fetch_optional(&state.db)
385 .await;
386
387 let user_id: uuid::Uuid = match user_query {
388 Ok(Some(row)) => row.get("id"),
389 _ => {
390 return (
391 StatusCode::INTERNAL_SERVER_ERROR,
392 Json(json!({"error": "InternalError", "message": "User not found"})),
393 )
394 .into_response();
395 }
396 };
397
398 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
399 .bind(user_id)
400 .fetch_optional(&state.db)
401 .await;
402
403 let current_root_cid = match repo_root_query {
404 Ok(Some(row)) => {
405 let cid_str: String = row.get("repo_root_cid");
406 Cid::from_str(&cid_str).ok()
407 }
408 _ => None,
409 };
410
411 if current_root_cid.is_none() {
412 error!("Repo root not found for user {}", did);
413 return (
414 StatusCode::INTERNAL_SERVER_ERROR,
415 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
416 )
417 .into_response();
418 }
419 let current_root_cid = current_root_cid.unwrap();
420
421 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
422 Ok(Some(b)) => b,
423 Ok(None) => {
424 error!("Commit block not found: {}", current_root_cid);
425 return (
426 StatusCode::INTERNAL_SERVER_ERROR,
427 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
428 )
429 .into_response();
430 }
431 Err(e) => {
432 error!("Failed to load commit block: {:?}", e);
433 return (
434 StatusCode::INTERNAL_SERVER_ERROR,
435 Json(json!({"error": "InternalError", "message": "Failed to load commit block"})),
436 )
437 .into_response();
438 }
439 };
440
441 let commit = match Commit::from_cbor(&commit_bytes) {
442 Ok(c) => c,
443 Err(e) => {
444 error!("Failed to parse commit: {:?}", e);
445 return (
446 StatusCode::INTERNAL_SERVER_ERROR,
447 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
448 )
449 .into_response();
450 }
451 };
452
453 let mst_root = commit.data;
454 let store = Arc::new(state.block_store.clone());
455 let mst = Mst::load(store.clone(), mst_root, None);
456
457 let collection_nsid = match input.collection.parse::<Nsid>() {
458 Ok(n) => n,
459 Err(_) => {
460 return (
461 StatusCode::BAD_REQUEST,
462 Json(json!({"error": "InvalidCollection"})),
463 )
464 .into_response();
465 }
466 };
467
468 let rkey = input.rkey.clone();
469
470 let mut record_bytes = Vec::new();
471 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
472 error!("Error serializing record: {:?}", e);
473 return (
474 StatusCode::BAD_REQUEST,
475 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
476 )
477 .into_response();
478 }
479
480 let record_cid = match state.block_store.put(&record_bytes).await {
481 Ok(c) => c,
482 Err(e) => {
483 error!("Failed to save record block: {:?}", e);
484 return (
485 StatusCode::INTERNAL_SERVER_ERROR,
486 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
487 )
488 .into_response();
489 }
490 };
491
492 let key = format!("{}/{}", collection_nsid, rkey);
493 if let Err(e) = mst.update(&key, record_cid).await {
494 error!("Failed to update MST: {:?}", e);
495 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response();
496 }
497
498 let new_mst_root = match mst.root().await {
499 Ok(c) => c,
500 Err(e) => {
501 error!("Failed to get new MST root: {:?}", e);
502 return (
503 StatusCode::INTERNAL_SERVER_ERROR,
504 Json(json!({"error": "InternalError", "message": "Failed to get new MST root"})),
505 )
506 .into_response();
507 }
508 };
509
510 let did_obj = match Did::new(&did) {
511 Ok(d) => d,
512 Err(_) => {
513 return (
514 StatusCode::INTERNAL_SERVER_ERROR,
515 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
516 )
517 .into_response();
518 }
519 };
520
521 let rev = Tid::now(LimitedU32::MIN);
522
523 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid));
524
525 let new_commit_bytes = match new_commit.to_cbor() {
526 Ok(b) => b,
527 Err(e) => {
528 error!("Failed to serialize new commit: {:?}", e);
529 return (
530 StatusCode::INTERNAL_SERVER_ERROR,
531 Json(
532 json!({"error": "InternalError", "message": "Failed to serialize new commit"}),
533 ),
534 )
535 .into_response();
536 }
537 };
538
539 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
540 Ok(c) => c,
541 Err(e) => {
542 error!("Failed to save new commit: {:?}", e);
543 return (
544 StatusCode::INTERNAL_SERVER_ERROR,
545 Json(json!({"error": "InternalError", "message": "Failed to save new commit"})),
546 )
547 .into_response();
548 }
549 };
550
551 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
552 .bind(new_root_cid.to_string())
553 .bind(user_id)
554 .execute(&state.db)
555 .await;
556
557 if let Err(e) = update_repo {
558 error!("Failed to update repo root in DB: {:?}", e);
559 return (
560 StatusCode::INTERNAL_SERVER_ERROR,
561 Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"})),
562 )
563 .into_response();
564 }
565
566 let record_insert = sqlx::query(
567 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
568 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
569 )
570 .bind(user_id)
571 .bind(&input.collection)
572 .bind(&rkey)
573 .bind(record_cid.to_string())
574 .execute(&state.db)
575 .await;
576
577 if let Err(e) = record_insert {
578 error!("Error inserting record index: {:?}", e);
579 return (
580 StatusCode::INTERNAL_SERVER_ERROR,
581 Json(json!({"error": "InternalError", "message": "Failed to index record"})),
582 )
583 .into_response();
584 }
585
586 let output = PutRecordOutput {
587 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
588 cid: record_cid.to_string(),
589 };
590 (StatusCode::OK, Json(output)).into_response()
591}