this repo has no description
1use super::validation::validate_record_with_status;
2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
3use crate::delegation::{self, DelegationActionType};
4use crate::repo::tracking::TrackingBlockStore;
5use crate::state::AppState;
6use crate::validation::ValidationStatus;
7use axum::{
8 Json,
9 extract::State,
10 http::{HeaderMap, StatusCode},
11 response::{IntoResponse, Response},
12};
13use cid::Cid;
14use jacquard::types::{
15 integer::LimitedU32,
16 string::{Nsid, Tid},
17};
18use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
19use serde::{Deserialize, Serialize};
20use serde_json::json;
21use sqlx::{PgPool, Row};
22use std::str::FromStr;
23use std::sync::Arc;
24use tracing::error;
25use uuid::Uuid;
26
27pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
28 let row = sqlx::query(
29 r#"
30 SELECT
31 email_verified,
32 discord_verified,
33 telegram_verified,
34 signal_verified
35 FROM users
36 WHERE did = $1
37 "#,
38 )
39 .bind(did)
40 .fetch_optional(db)
41 .await?;
42 match row {
43 Some(r) => {
44 let email_verified: bool = r.get("email_verified");
45 let discord_verified: bool = r.get("discord_verified");
46 let telegram_verified: bool = r.get("telegram_verified");
47 let signal_verified: bool = r.get("signal_verified");
48 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
49 }
50 None => Ok(false),
51 }
52}
53
54pub struct RepoWriteAuth {
55 pub did: String,
56 pub user_id: Uuid,
57 pub current_root_cid: Cid,
58 pub is_oauth: bool,
59 pub scope: Option<String>,
60 pub controller_did: Option<String>,
61}
62
63pub async fn prepare_repo_write(
64 state: &AppState,
65 headers: &HeaderMap,
66 repo_did: &str,
67 http_method: &str,
68 http_uri: &str,
69) -> Result<RepoWriteAuth, Response> {
70 let extracted = crate::auth::extract_auth_token_from_header(
71 headers.get("Authorization").and_then(|h| h.to_str().ok()),
72 )
73 .ok_or_else(|| {
74 (
75 StatusCode::UNAUTHORIZED,
76 Json(json!({"error": "AuthenticationRequired"})),
77 )
78 .into_response()
79 })?;
80 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
81 let auth_user = crate::auth::validate_token_with_dpop(
82 &state.db,
83 &extracted.token,
84 extracted.is_dpop,
85 dpop_proof,
86 http_method,
87 http_uri,
88 false,
89 )
90 .await
91 .map_err(|e| {
92 tracing::warn!(error = ?e, is_dpop = extracted.is_dpop, "Token validation failed in prepare_repo_write");
93 let mut response = (
94 StatusCode::UNAUTHORIZED,
95 Json(json!({"error": e.to_string()})),
96 )
97 .into_response();
98 if matches!(e, crate::auth::TokenValidationError::TokenExpired) {
99 let scheme = if extracted.is_dpop { "DPoP" } else { "Bearer" };
100 let www_auth = format!(
101 "{} error=\"invalid_token\", error_description=\"Token has expired\"",
102 scheme
103 );
104 response.headers_mut().insert(
105 "WWW-Authenticate",
106 www_auth.parse().unwrap(),
107 );
108 if extracted.is_dpop {
109 let nonce = crate::oauth::verify::generate_dpop_nonce();
110 response.headers_mut().insert("DPoP-Nonce", nonce.parse().unwrap());
111 }
112 }
113 response
114 })?;
115 if repo_did != auth_user.did {
116 return Err((
117 StatusCode::FORBIDDEN,
118 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
119 )
120 .into_response());
121 }
122 if crate::util::is_account_migrated(&state.db, &auth_user.did)
123 .await
124 .unwrap_or(false)
125 {
126 return Err((
127 StatusCode::FORBIDDEN,
128 Json(json!({
129 "error": "AccountMigrated",
130 "message": "Account has been migrated to another PDS. Repo operations are not allowed."
131 })),
132 )
133 .into_response());
134 }
135 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did)
136 .await
137 .unwrap_or(false);
138 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did)
139 .await
140 .unwrap_or(false);
141 if !is_verified && !is_delegated {
142 return Err((
143 StatusCode::FORBIDDEN,
144 Json(json!({
145 "error": "AccountNotVerified",
146 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
147 })),
148 )
149 .into_response());
150 }
151 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
152 .fetch_optional(&state.db)
153 .await
154 .map_err(|e| {
155 error!("DB error fetching user: {}", e);
156 (
157 StatusCode::INTERNAL_SERVER_ERROR,
158 Json(json!({"error": "InternalError"})),
159 )
160 .into_response()
161 })?
162 .ok_or_else(|| {
163 (
164 StatusCode::INTERNAL_SERVER_ERROR,
165 Json(json!({"error": "InternalError", "message": "User not found"})),
166 )
167 .into_response()
168 })?;
169 let root_cid_str: String = sqlx::query_scalar!(
170 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
171 user_id
172 )
173 .fetch_optional(&state.db)
174 .await
175 .map_err(|e| {
176 error!("DB error fetching repo root: {}", e);
177 (
178 StatusCode::INTERNAL_SERVER_ERROR,
179 Json(json!({"error": "InternalError"})),
180 )
181 .into_response()
182 })?
183 .ok_or_else(|| {
184 (
185 StatusCode::INTERNAL_SERVER_ERROR,
186 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
187 )
188 .into_response()
189 })?;
190 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
191 (
192 StatusCode::INTERNAL_SERVER_ERROR,
193 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
194 )
195 .into_response()
196 })?;
197 Ok(RepoWriteAuth {
198 did: auth_user.did,
199 user_id,
200 current_root_cid,
201 is_oauth: auth_user.is_oauth,
202 scope: auth_user.scope,
203 controller_did: auth_user.controller_did,
204 })
205}
206#[derive(Deserialize)]
207#[allow(dead_code)]
208pub struct CreateRecordInput {
209 pub repo: String,
210 pub collection: String,
211 pub rkey: Option<String>,
212 pub validate: Option<bool>,
213 pub record: serde_json::Value,
214 #[serde(rename = "swapCommit")]
215 pub swap_commit: Option<String>,
216}
217#[derive(Serialize)]
218#[serde(rename_all = "camelCase")]
219pub struct CommitInfo {
220 pub cid: String,
221 pub rev: String,
222}
223
224#[derive(Serialize)]
225#[serde(rename_all = "camelCase")]
226pub struct CreateRecordOutput {
227 pub uri: String,
228 pub cid: String,
229 pub commit: CommitInfo,
230 #[serde(skip_serializing_if = "Option::is_none")]
231 pub validation_status: Option<String>,
232}
233pub async fn create_record(
234 State(state): State<AppState>,
235 headers: HeaderMap,
236 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
237 Json(input): Json<CreateRecordInput>,
238) -> Response {
239 let auth = match prepare_repo_write(
240 &state,
241 &headers,
242 &input.repo,
243 "POST",
244 &crate::util::build_full_url(&uri.to_string()),
245 )
246 .await
247 {
248 Ok(res) => res,
249 Err(err_res) => return err_res,
250 };
251
252 if let Err(e) = crate::auth::scope_check::check_repo_scope(
253 auth.is_oauth,
254 auth.scope.as_deref(),
255 crate::oauth::RepoAction::Create,
256 &input.collection,
257 ) {
258 return e;
259 }
260
261 let did = auth.did;
262 let user_id = auth.user_id;
263 let current_root_cid = auth.current_root_cid;
264 let controller_did = auth.controller_did;
265
266 if let Some(swap_commit) = &input.swap_commit
267 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
268 {
269 return (
270 StatusCode::CONFLICT,
271 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
272 )
273 .into_response();
274 }
275 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
276 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
277 Ok(Some(b)) => b,
278 _ => {
279 return (
280 StatusCode::INTERNAL_SERVER_ERROR,
281 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
282 )
283 .into_response();
284 }
285 };
286 let commit = match Commit::from_cbor(&commit_bytes) {
287 Ok(c) => c,
288 _ => {
289 return (
290 StatusCode::INTERNAL_SERVER_ERROR,
291 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
292 )
293 .into_response();
294 }
295 };
296 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
297 let collection_nsid = match input.collection.parse::<Nsid>() {
298 Ok(n) => n,
299 Err(_) => {
300 return (
301 StatusCode::BAD_REQUEST,
302 Json(json!({"error": "InvalidCollection"})),
303 )
304 .into_response();
305 }
306 };
307 let validation_status = if input.validate == Some(false) {
308 None
309 } else {
310 let require_lexicon = input.validate == Some(true);
311 match validate_record_with_status(
312 &input.record,
313 &input.collection,
314 input.rkey.as_deref(),
315 require_lexicon,
316 ) {
317 Ok(status) => Some(status),
318 Err(err_response) => return *err_response,
319 }
320 };
321 let rkey = input
322 .rkey
323 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
324 let record_ipld = crate::util::json_to_ipld(&input.record);
325 let mut record_bytes = Vec::new();
326 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
327 return (
328 StatusCode::BAD_REQUEST,
329 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
330 )
331 .into_response();
332 }
333 let record_cid = match tracking_store.put(&record_bytes).await {
334 Ok(c) => c,
335 _ => {
336 return (
337 StatusCode::INTERNAL_SERVER_ERROR,
338 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
339 )
340 .into_response();
341 }
342 };
343 let key = format!("{}/{}", collection_nsid, rkey);
344 let new_mst = match mst.add(&key, record_cid).await {
345 Ok(m) => m,
346 _ => {
347 return (
348 StatusCode::INTERNAL_SERVER_ERROR,
349 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
350 )
351 .into_response();
352 }
353 };
354 let new_mst_root = match new_mst.persist().await {
355 Ok(c) => c,
356 _ => {
357 return (
358 StatusCode::INTERNAL_SERVER_ERROR,
359 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
360 )
361 .into_response();
362 }
363 };
364 let op = RecordOp::Create {
365 collection: input.collection.clone(),
366 rkey: rkey.clone(),
367 cid: record_cid,
368 };
369 let mut relevant_blocks = std::collections::BTreeMap::new();
370 if new_mst
371 .blocks_for_path(&key, &mut relevant_blocks)
372 .await
373 .is_err()
374 {
375 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
376 }
377 if mst
378 .blocks_for_path(&key, &mut relevant_blocks)
379 .await
380 .is_err()
381 {
382 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
383 }
384 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
385 let mut written_cids = tracking_store.get_all_relevant_cids();
386 for cid in relevant_blocks.keys() {
387 if !written_cids.contains(cid) {
388 written_cids.push(*cid);
389 }
390 }
391 let written_cids_str = written_cids
392 .iter()
393 .map(|c| c.to_string())
394 .collect::<Vec<_>>();
395 let blob_cids = extract_blob_cids(&input.record);
396 let commit_result = match commit_and_log(
397 &state,
398 CommitParams {
399 did: &did,
400 user_id,
401 current_root_cid: Some(current_root_cid),
402 prev_data_cid: Some(commit.data),
403 new_mst_root,
404 ops: vec![op],
405 blocks_cids: &written_cids_str,
406 blobs: &blob_cids,
407 },
408 )
409 .await
410 {
411 Ok(res) => res,
412 Err(e) => {
413 return (
414 StatusCode::INTERNAL_SERVER_ERROR,
415 Json(json!({"error": "InternalError", "message": e})),
416 )
417 .into_response();
418 }
419 };
420
421 if let Some(ref controller) = controller_did {
422 let _ = delegation::log_delegation_action(
423 &state.db,
424 &did,
425 controller,
426 Some(controller),
427 DelegationActionType::RepoWrite,
428 Some(json!({
429 "action": "create",
430 "collection": input.collection,
431 "rkey": rkey
432 })),
433 None,
434 None,
435 )
436 .await;
437 }
438
439 (
440 StatusCode::OK,
441 Json(CreateRecordOutput {
442 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
443 cid: record_cid.to_string(),
444 commit: CommitInfo {
445 cid: commit_result.commit_cid.to_string(),
446 rev: commit_result.rev,
447 },
448 validation_status: validation_status.map(|s| match s {
449 ValidationStatus::Valid => "valid".to_string(),
450 ValidationStatus::Unknown => "unknown".to_string(),
451 ValidationStatus::Invalid => "invalid".to_string(),
452 }),
453 }),
454 )
455 .into_response()
456}
457#[derive(Deserialize)]
458#[allow(dead_code)]
459pub struct PutRecordInput {
460 pub repo: String,
461 pub collection: String,
462 pub rkey: String,
463 pub validate: Option<bool>,
464 pub record: serde_json::Value,
465 #[serde(rename = "swapCommit")]
466 pub swap_commit: Option<String>,
467 #[serde(rename = "swapRecord")]
468 pub swap_record: Option<String>,
469}
470#[derive(Serialize)]
471#[serde(rename_all = "camelCase")]
472pub struct PutRecordOutput {
473 pub uri: String,
474 pub cid: String,
475 #[serde(skip_serializing_if = "Option::is_none")]
476 pub commit: Option<CommitInfo>,
477 #[serde(skip_serializing_if = "Option::is_none")]
478 pub validation_status: Option<String>,
479}
480pub async fn put_record(
481 State(state): State<AppState>,
482 headers: HeaderMap,
483 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
484 Json(input): Json<PutRecordInput>,
485) -> Response {
486 let auth = match prepare_repo_write(
487 &state,
488 &headers,
489 &input.repo,
490 "POST",
491 &crate::util::build_full_url(&uri.to_string()),
492 )
493 .await
494 {
495 Ok(res) => res,
496 Err(err_res) => return err_res,
497 };
498
499 if let Err(e) = crate::auth::scope_check::check_repo_scope(
500 auth.is_oauth,
501 auth.scope.as_deref(),
502 crate::oauth::RepoAction::Create,
503 &input.collection,
504 ) {
505 return e;
506 }
507 if let Err(e) = crate::auth::scope_check::check_repo_scope(
508 auth.is_oauth,
509 auth.scope.as_deref(),
510 crate::oauth::RepoAction::Update,
511 &input.collection,
512 ) {
513 return e;
514 }
515
516 let did = auth.did;
517 let user_id = auth.user_id;
518 let current_root_cid = auth.current_root_cid;
519 let controller_did = auth.controller_did;
520
521 if let Some(swap_commit) = &input.swap_commit
522 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
523 {
524 return (
525 StatusCode::CONFLICT,
526 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
527 )
528 .into_response();
529 }
530 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
531 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
532 Ok(Some(b)) => b,
533 _ => {
534 return (
535 StatusCode::INTERNAL_SERVER_ERROR,
536 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
537 )
538 .into_response();
539 }
540 };
541 let commit = match Commit::from_cbor(&commit_bytes) {
542 Ok(c) => c,
543 _ => {
544 return (
545 StatusCode::INTERNAL_SERVER_ERROR,
546 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
547 )
548 .into_response();
549 }
550 };
551 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
552 let collection_nsid = match input.collection.parse::<Nsid>() {
553 Ok(n) => n,
554 Err(_) => {
555 return (
556 StatusCode::BAD_REQUEST,
557 Json(json!({"error": "InvalidCollection"})),
558 )
559 .into_response();
560 }
561 };
562 let key = format!("{}/{}", collection_nsid, input.rkey);
563 let validation_status = if input.validate == Some(false) {
564 None
565 } else {
566 let require_lexicon = input.validate == Some(true);
567 match validate_record_with_status(
568 &input.record,
569 &input.collection,
570 Some(&input.rkey),
571 require_lexicon,
572 ) {
573 Ok(status) => Some(status),
574 Err(err_response) => return *err_response,
575 }
576 };
577 if let Some(swap_record_str) = &input.swap_record {
578 let expected_cid = Cid::from_str(swap_record_str).ok();
579 let actual_cid = mst.get(&key).await.ok().flatten();
580 if expected_cid != actual_cid {
581 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
582 }
583 }
584 let existing_cid = mst.get(&key).await.ok().flatten();
585 let record_ipld = crate::util::json_to_ipld(&input.record);
586 let mut record_bytes = Vec::new();
587 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
588 return (
589 StatusCode::BAD_REQUEST,
590 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
591 )
592 .into_response();
593 }
594 let record_cid = match tracking_store.put(&record_bytes).await {
595 Ok(c) => c,
596 _ => {
597 return (
598 StatusCode::INTERNAL_SERVER_ERROR,
599 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
600 )
601 .into_response();
602 }
603 };
604 if existing_cid == Some(record_cid) {
605 return (
606 StatusCode::OK,
607 Json(PutRecordOutput {
608 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
609 cid: record_cid.to_string(),
610 commit: None,
611 validation_status: validation_status.map(|s| match s {
612 ValidationStatus::Valid => "valid".to_string(),
613 ValidationStatus::Unknown => "unknown".to_string(),
614 ValidationStatus::Invalid => "invalid".to_string(),
615 }),
616 }),
617 )
618 .into_response();
619 }
620 let new_mst = if existing_cid.is_some() {
621 match mst.update(&key, record_cid).await {
622 Ok(m) => m,
623 Err(_) => {
624 return (
625 StatusCode::INTERNAL_SERVER_ERROR,
626 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
627 )
628 .into_response();
629 }
630 }
631 } else {
632 match mst.add(&key, record_cid).await {
633 Ok(m) => m,
634 Err(_) => {
635 return (
636 StatusCode::INTERNAL_SERVER_ERROR,
637 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
638 )
639 .into_response();
640 }
641 }
642 };
643 let new_mst_root = match new_mst.persist().await {
644 Ok(c) => c,
645 Err(_) => {
646 return (
647 StatusCode::INTERNAL_SERVER_ERROR,
648 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
649 )
650 .into_response();
651 }
652 };
653 let op = if existing_cid.is_some() {
654 RecordOp::Update {
655 collection: input.collection.clone(),
656 rkey: input.rkey.clone(),
657 cid: record_cid,
658 prev: existing_cid,
659 }
660 } else {
661 RecordOp::Create {
662 collection: input.collection.clone(),
663 rkey: input.rkey.clone(),
664 cid: record_cid,
665 }
666 };
667 let mut relevant_blocks = std::collections::BTreeMap::new();
668 if new_mst
669 .blocks_for_path(&key, &mut relevant_blocks)
670 .await
671 .is_err()
672 {
673 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
674 }
675 if mst
676 .blocks_for_path(&key, &mut relevant_blocks)
677 .await
678 .is_err()
679 {
680 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
681 }
682 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
683 let mut written_cids = tracking_store.get_all_relevant_cids();
684 for cid in relevant_blocks.keys() {
685 if !written_cids.contains(cid) {
686 written_cids.push(*cid);
687 }
688 }
689 let written_cids_str = written_cids
690 .iter()
691 .map(|c| c.to_string())
692 .collect::<Vec<_>>();
693 let is_update = existing_cid.is_some();
694 let blob_cids = extract_blob_cids(&input.record);
695 let commit_result = match commit_and_log(
696 &state,
697 CommitParams {
698 did: &did,
699 user_id,
700 current_root_cid: Some(current_root_cid),
701 prev_data_cid: Some(commit.data),
702 new_mst_root,
703 ops: vec![op],
704 blocks_cids: &written_cids_str,
705 blobs: &blob_cids,
706 },
707 )
708 .await
709 {
710 Ok(res) => res,
711 Err(e) => {
712 return (
713 StatusCode::INTERNAL_SERVER_ERROR,
714 Json(json!({"error": "InternalError", "message": e})),
715 )
716 .into_response();
717 }
718 };
719
720 if let Some(ref controller) = controller_did {
721 let _ = delegation::log_delegation_action(
722 &state.db,
723 &did,
724 controller,
725 Some(controller),
726 DelegationActionType::RepoWrite,
727 Some(json!({
728 "action": if is_update { "update" } else { "create" },
729 "collection": input.collection,
730 "rkey": input.rkey
731 })),
732 None,
733 None,
734 )
735 .await;
736 }
737
738 (
739 StatusCode::OK,
740 Json(PutRecordOutput {
741 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
742 cid: record_cid.to_string(),
743 commit: Some(CommitInfo {
744 cid: commit_result.commit_cid.to_string(),
745 rev: commit_result.rev,
746 }),
747 validation_status: validation_status.map(|s| match s {
748 ValidationStatus::Valid => "valid".to_string(),
749 ValidationStatus::Unknown => "unknown".to_string(),
750 ValidationStatus::Invalid => "invalid".to_string(),
751 }),
752 }),
753 )
754 .into_response()
755}