this repo has no description
1use super::validation::validate_record_with_status;
2use crate::validation::ValidationStatus;
3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
4use crate::delegation::{self, DelegationActionType};
5use crate::repo::tracking::TrackingBlockStore;
6use crate::state::AppState;
7use axum::{
8 Json,
9 extract::State,
10 http::{HeaderMap, StatusCode},
11 response::{IntoResponse, Response},
12};
13use cid::Cid;
14use jacquard::types::{
15 integer::LimitedU32,
16 string::{Nsid, Tid},
17};
18use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
19use serde::{Deserialize, Serialize};
20use serde_json::json;
21use sqlx::{PgPool, Row};
22use std::str::FromStr;
23use std::sync::Arc;
24use tracing::error;
25use uuid::Uuid;
26
27pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
28 let row = sqlx::query(
29 r#"
30 SELECT
31 email_verified,
32 discord_verified,
33 telegram_verified,
34 signal_verified
35 FROM users
36 WHERE did = $1
37 "#,
38 )
39 .bind(did)
40 .fetch_optional(db)
41 .await?;
42 match row {
43 Some(r) => {
44 let email_verified: bool = r.get("email_verified");
45 let discord_verified: bool = r.get("discord_verified");
46 let telegram_verified: bool = r.get("telegram_verified");
47 let signal_verified: bool = r.get("signal_verified");
48 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
49 }
50 None => Ok(false),
51 }
52}
53
54pub struct RepoWriteAuth {
55 pub did: String,
56 pub user_id: Uuid,
57 pub current_root_cid: Cid,
58 pub is_oauth: bool,
59 pub scope: Option<String>,
60 pub controller_did: Option<String>,
61}
62
63pub async fn prepare_repo_write(
64 state: &AppState,
65 headers: &HeaderMap,
66 repo_did: &str,
67 http_method: &str,
68 http_uri: &str,
69) -> Result<RepoWriteAuth, Response> {
70 let extracted = crate::auth::extract_auth_token_from_header(
71 headers.get("Authorization").and_then(|h| h.to_str().ok()),
72 )
73 .ok_or_else(|| {
74 (
75 StatusCode::UNAUTHORIZED,
76 Json(json!({"error": "AuthenticationRequired"})),
77 )
78 .into_response()
79 })?;
80 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
81 let auth_user = crate::auth::validate_token_with_dpop(
82 &state.db,
83 &extracted.token,
84 extracted.is_dpop,
85 dpop_proof,
86 http_method,
87 http_uri,
88 false,
89 )
90 .await
91 .map_err(|e| {
92 (
93 StatusCode::UNAUTHORIZED,
94 Json(json!({"error": e.to_string()})),
95 )
96 .into_response()
97 })?;
98 if repo_did != auth_user.did {
99 return Err((
100 StatusCode::FORBIDDEN,
101 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
102 )
103 .into_response());
104 }
105 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did)
106 .await
107 .unwrap_or(false);
108 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did)
109 .await
110 .unwrap_or(false);
111 if !is_verified && !is_delegated {
112 return Err((
113 StatusCode::FORBIDDEN,
114 Json(json!({
115 "error": "AccountNotVerified",
116 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
117 })),
118 )
119 .into_response());
120 }
121 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
122 .fetch_optional(&state.db)
123 .await
124 .map_err(|e| {
125 error!("DB error fetching user: {}", e);
126 (
127 StatusCode::INTERNAL_SERVER_ERROR,
128 Json(json!({"error": "InternalError"})),
129 )
130 .into_response()
131 })?
132 .ok_or_else(|| {
133 (
134 StatusCode::INTERNAL_SERVER_ERROR,
135 Json(json!({"error": "InternalError", "message": "User not found"})),
136 )
137 .into_response()
138 })?;
139 let root_cid_str: String = sqlx::query_scalar!(
140 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
141 user_id
142 )
143 .fetch_optional(&state.db)
144 .await
145 .map_err(|e| {
146 error!("DB error fetching repo root: {}", e);
147 (
148 StatusCode::INTERNAL_SERVER_ERROR,
149 Json(json!({"error": "InternalError"})),
150 )
151 .into_response()
152 })?
153 .ok_or_else(|| {
154 (
155 StatusCode::INTERNAL_SERVER_ERROR,
156 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
157 )
158 .into_response()
159 })?;
160 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
161 (
162 StatusCode::INTERNAL_SERVER_ERROR,
163 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
164 )
165 .into_response()
166 })?;
167 Ok(RepoWriteAuth {
168 did: auth_user.did,
169 user_id,
170 current_root_cid,
171 is_oauth: auth_user.is_oauth,
172 scope: auth_user.scope,
173 controller_did: auth_user.controller_did,
174 })
175}
176#[derive(Deserialize)]
177#[allow(dead_code)]
178pub struct CreateRecordInput {
179 pub repo: String,
180 pub collection: String,
181 pub rkey: Option<String>,
182 pub validate: Option<bool>,
183 pub record: serde_json::Value,
184 #[serde(rename = "swapCommit")]
185 pub swap_commit: Option<String>,
186}
187#[derive(Serialize)]
188#[serde(rename_all = "camelCase")]
189pub struct CommitInfo {
190 pub cid: String,
191 pub rev: String,
192}
193
194#[derive(Serialize)]
195#[serde(rename_all = "camelCase")]
196pub struct CreateRecordOutput {
197 pub uri: String,
198 pub cid: String,
199 pub commit: CommitInfo,
200 #[serde(skip_serializing_if = "Option::is_none")]
201 pub validation_status: Option<String>,
202}
203pub async fn create_record(
204 State(state): State<AppState>,
205 headers: HeaderMap,
206 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
207 Json(input): Json<CreateRecordInput>,
208) -> Response {
209 let auth =
210 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
211 Ok(res) => res,
212 Err(err_res) => return err_res,
213 };
214
215 if let Err(e) = crate::auth::scope_check::check_repo_scope(
216 auth.is_oauth,
217 auth.scope.as_deref(),
218 crate::oauth::RepoAction::Create,
219 &input.collection,
220 ) {
221 return e;
222 }
223
224 let did = auth.did;
225 let user_id = auth.user_id;
226 let current_root_cid = auth.current_root_cid;
227 let controller_did = auth.controller_did;
228
229 if let Some(swap_commit) = &input.swap_commit
230 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
231 {
232 return (
233 StatusCode::CONFLICT,
234 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
235 )
236 .into_response();
237 }
238 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
239 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
240 Ok(Some(b)) => b,
241 _ => {
242 return (
243 StatusCode::INTERNAL_SERVER_ERROR,
244 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
245 )
246 .into_response();
247 }
248 };
249 let commit = match Commit::from_cbor(&commit_bytes) {
250 Ok(c) => c,
251 _ => {
252 return (
253 StatusCode::INTERNAL_SERVER_ERROR,
254 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
255 )
256 .into_response();
257 }
258 };
259 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
260 let collection_nsid = match input.collection.parse::<Nsid>() {
261 Ok(n) => n,
262 Err(_) => {
263 return (
264 StatusCode::BAD_REQUEST,
265 Json(json!({"error": "InvalidCollection"})),
266 )
267 .into_response();
268 }
269 };
270 let validation_status = if input.validate == Some(false) {
271 None
272 } else {
273 let require_lexicon = input.validate == Some(true);
274 match validate_record_with_status(
275 &input.record,
276 &input.collection,
277 input.rkey.as_deref(),
278 require_lexicon,
279 ) {
280 Ok(status) => Some(status),
281 Err(err_response) => return *err_response,
282 }
283 };
284 let rkey = input
285 .rkey
286 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
287 let mut record_bytes = Vec::new();
288 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
289 return (
290 StatusCode::BAD_REQUEST,
291 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
292 )
293 .into_response();
294 }
295 let record_cid = match tracking_store.put(&record_bytes).await {
296 Ok(c) => c,
297 _ => {
298 return (
299 StatusCode::INTERNAL_SERVER_ERROR,
300 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
301 )
302 .into_response();
303 }
304 };
305 let key = format!("{}/{}", collection_nsid, rkey);
306 let new_mst = match mst.add(&key, record_cid).await {
307 Ok(m) => m,
308 _ => {
309 return (
310 StatusCode::INTERNAL_SERVER_ERROR,
311 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
312 )
313 .into_response();
314 }
315 };
316 let new_mst_root = match new_mst.persist().await {
317 Ok(c) => c,
318 _ => {
319 return (
320 StatusCode::INTERNAL_SERVER_ERROR,
321 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
322 )
323 .into_response();
324 }
325 };
326 let op = RecordOp::Create {
327 collection: input.collection.clone(),
328 rkey: rkey.clone(),
329 cid: record_cid,
330 };
331 let mut relevant_blocks = std::collections::BTreeMap::new();
332 if new_mst
333 .blocks_for_path(&key, &mut relevant_blocks)
334 .await
335 .is_err()
336 {
337 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
338 }
339 if mst
340 .blocks_for_path(&key, &mut relevant_blocks)
341 .await
342 .is_err()
343 {
344 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
345 }
346 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
347 let mut written_cids = tracking_store.get_all_relevant_cids();
348 for cid in relevant_blocks.keys() {
349 if !written_cids.contains(cid) {
350 written_cids.push(*cid);
351 }
352 }
353 let written_cids_str = written_cids
354 .iter()
355 .map(|c| c.to_string())
356 .collect::<Vec<_>>();
357 let blob_cids = extract_blob_cids(&input.record);
358 let commit_result = match commit_and_log(
359 &state,
360 CommitParams {
361 did: &did,
362 user_id,
363 current_root_cid: Some(current_root_cid),
364 prev_data_cid: Some(commit.data),
365 new_mst_root,
366 ops: vec![op],
367 blocks_cids: &written_cids_str,
368 blobs: &blob_cids,
369 },
370 )
371 .await
372 {
373 Ok(res) => res,
374 Err(e) => {
375 return (
376 StatusCode::INTERNAL_SERVER_ERROR,
377 Json(json!({"error": "InternalError", "message": e})),
378 )
379 .into_response();
380 }
381 };
382
383 if let Some(ref controller) = controller_did {
384 let _ = delegation::log_delegation_action(
385 &state.db,
386 &did,
387 controller,
388 Some(controller),
389 DelegationActionType::RepoWrite,
390 Some(json!({
391 "action": "create",
392 "collection": input.collection,
393 "rkey": rkey
394 })),
395 None,
396 None,
397 )
398 .await;
399 }
400
401 (
402 StatusCode::OK,
403 Json(CreateRecordOutput {
404 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
405 cid: record_cid.to_string(),
406 commit: CommitInfo {
407 cid: commit_result.commit_cid.to_string(),
408 rev: commit_result.rev,
409 },
410 validation_status: validation_status.map(|s| match s {
411 ValidationStatus::Valid => "valid".to_string(),
412 ValidationStatus::Unknown => "unknown".to_string(),
413 ValidationStatus::Invalid => "invalid".to_string(),
414 }),
415 }),
416 )
417 .into_response()
418}
419#[derive(Deserialize)]
420#[allow(dead_code)]
421pub struct PutRecordInput {
422 pub repo: String,
423 pub collection: String,
424 pub rkey: String,
425 pub validate: Option<bool>,
426 pub record: serde_json::Value,
427 #[serde(rename = "swapCommit")]
428 pub swap_commit: Option<String>,
429 #[serde(rename = "swapRecord")]
430 pub swap_record: Option<String>,
431}
432#[derive(Serialize)]
433#[serde(rename_all = "camelCase")]
434pub struct PutRecordOutput {
435 pub uri: String,
436 pub cid: String,
437 #[serde(skip_serializing_if = "Option::is_none")]
438 pub commit: Option<CommitInfo>,
439 #[serde(skip_serializing_if = "Option::is_none")]
440 pub validation_status: Option<String>,
441}
442pub async fn put_record(
443 State(state): State<AppState>,
444 headers: HeaderMap,
445 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
446 Json(input): Json<PutRecordInput>,
447) -> Response {
448 let auth =
449 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
450 Ok(res) => res,
451 Err(err_res) => return err_res,
452 };
453
454 if let Err(e) = crate::auth::scope_check::check_repo_scope(
455 auth.is_oauth,
456 auth.scope.as_deref(),
457 crate::oauth::RepoAction::Create,
458 &input.collection,
459 ) {
460 return e;
461 }
462 if let Err(e) = crate::auth::scope_check::check_repo_scope(
463 auth.is_oauth,
464 auth.scope.as_deref(),
465 crate::oauth::RepoAction::Update,
466 &input.collection,
467 ) {
468 return e;
469 }
470
471 let did = auth.did;
472 let user_id = auth.user_id;
473 let current_root_cid = auth.current_root_cid;
474 let controller_did = auth.controller_did;
475
476 if let Some(swap_commit) = &input.swap_commit
477 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
478 {
479 return (
480 StatusCode::CONFLICT,
481 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
482 )
483 .into_response();
484 }
485 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
486 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
487 Ok(Some(b)) => b,
488 _ => {
489 return (
490 StatusCode::INTERNAL_SERVER_ERROR,
491 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
492 )
493 .into_response();
494 }
495 };
496 let commit = match Commit::from_cbor(&commit_bytes) {
497 Ok(c) => c,
498 _ => {
499 return (
500 StatusCode::INTERNAL_SERVER_ERROR,
501 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
502 )
503 .into_response();
504 }
505 };
506 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
507 let collection_nsid = match input.collection.parse::<Nsid>() {
508 Ok(n) => n,
509 Err(_) => {
510 return (
511 StatusCode::BAD_REQUEST,
512 Json(json!({"error": "InvalidCollection"})),
513 )
514 .into_response();
515 }
516 };
517 let key = format!("{}/{}", collection_nsid, input.rkey);
518 let validation_status = if input.validate == Some(false) {
519 None
520 } else {
521 let require_lexicon = input.validate == Some(true);
522 match validate_record_with_status(
523 &input.record,
524 &input.collection,
525 Some(&input.rkey),
526 require_lexicon,
527 ) {
528 Ok(status) => Some(status),
529 Err(err_response) => return *err_response,
530 }
531 };
532 if let Some(swap_record_str) = &input.swap_record {
533 let expected_cid = Cid::from_str(swap_record_str).ok();
534 let actual_cid = mst.get(&key).await.ok().flatten();
535 if expected_cid != actual_cid {
536 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
537 }
538 }
539 let existing_cid = mst.get(&key).await.ok().flatten();
540 let mut record_bytes = Vec::new();
541 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
542 return (
543 StatusCode::BAD_REQUEST,
544 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
545 )
546 .into_response();
547 }
548 let record_cid = match tracking_store.put(&record_bytes).await {
549 Ok(c) => c,
550 _ => {
551 return (
552 StatusCode::INTERNAL_SERVER_ERROR,
553 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
554 )
555 .into_response();
556 }
557 };
558 if existing_cid == Some(record_cid) {
559 return (
560 StatusCode::OK,
561 Json(PutRecordOutput {
562 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
563 cid: record_cid.to_string(),
564 commit: None,
565 validation_status: validation_status.map(|s| match s {
566 ValidationStatus::Valid => "valid".to_string(),
567 ValidationStatus::Unknown => "unknown".to_string(),
568 ValidationStatus::Invalid => "invalid".to_string(),
569 }),
570 }),
571 )
572 .into_response();
573 }
574 let new_mst = if existing_cid.is_some() {
575 match mst.update(&key, record_cid).await {
576 Ok(m) => m,
577 Err(_) => {
578 return (
579 StatusCode::INTERNAL_SERVER_ERROR,
580 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
581 )
582 .into_response();
583 }
584 }
585 } else {
586 match mst.add(&key, record_cid).await {
587 Ok(m) => m,
588 Err(_) => {
589 return (
590 StatusCode::INTERNAL_SERVER_ERROR,
591 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
592 )
593 .into_response();
594 }
595 }
596 };
597 let new_mst_root = match new_mst.persist().await {
598 Ok(c) => c,
599 Err(_) => {
600 return (
601 StatusCode::INTERNAL_SERVER_ERROR,
602 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
603 )
604 .into_response();
605 }
606 };
607 let op = if existing_cid.is_some() {
608 RecordOp::Update {
609 collection: input.collection.clone(),
610 rkey: input.rkey.clone(),
611 cid: record_cid,
612 prev: existing_cid,
613 }
614 } else {
615 RecordOp::Create {
616 collection: input.collection.clone(),
617 rkey: input.rkey.clone(),
618 cid: record_cid,
619 }
620 };
621 let mut relevant_blocks = std::collections::BTreeMap::new();
622 if new_mst
623 .blocks_for_path(&key, &mut relevant_blocks)
624 .await
625 .is_err()
626 {
627 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
628 }
629 if mst
630 .blocks_for_path(&key, &mut relevant_blocks)
631 .await
632 .is_err()
633 {
634 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
635 }
636 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
637 let mut written_cids = tracking_store.get_all_relevant_cids();
638 for cid in relevant_blocks.keys() {
639 if !written_cids.contains(cid) {
640 written_cids.push(*cid);
641 }
642 }
643 let written_cids_str = written_cids
644 .iter()
645 .map(|c| c.to_string())
646 .collect::<Vec<_>>();
647 let is_update = existing_cid.is_some();
648 let blob_cids = extract_blob_cids(&input.record);
649 let commit_result = match commit_and_log(
650 &state,
651 CommitParams {
652 did: &did,
653 user_id,
654 current_root_cid: Some(current_root_cid),
655 prev_data_cid: Some(commit.data),
656 new_mst_root,
657 ops: vec![op],
658 blocks_cids: &written_cids_str,
659 blobs: &blob_cids,
660 },
661 )
662 .await
663 {
664 Ok(res) => res,
665 Err(e) => {
666 return (
667 StatusCode::INTERNAL_SERVER_ERROR,
668 Json(json!({"error": "InternalError", "message": e})),
669 )
670 .into_response();
671 }
672 };
673
674 if let Some(ref controller) = controller_did {
675 let _ = delegation::log_delegation_action(
676 &state.db,
677 &did,
678 controller,
679 Some(controller),
680 DelegationActionType::RepoWrite,
681 Some(json!({
682 "action": if is_update { "update" } else { "create" },
683 "collection": input.collection,
684 "rkey": input.rkey
685 })),
686 None,
687 None,
688 )
689 .await;
690 }
691
692 (
693 StatusCode::OK,
694 Json(PutRecordOutput {
695 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
696 cid: record_cid.to_string(),
697 commit: Some(CommitInfo {
698 cid: commit_result.commit_cid.to_string(),
699 rev: commit_result.rev,
700 }),
701 validation_status: validation_status.map(|s| match s {
702 ValidationStatus::Valid => "valid".to_string(),
703 ValidationStatus::Unknown => "unknown".to_string(),
704 ValidationStatus::Invalid => "invalid".to_string(),
705 }),
706 }),
707 )
708 .into_response()
709}