this repo has no description
1use super::validation::validate_record_with_status;
2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
3use crate::delegation::{self, DelegationActionType};
4use crate::repo::tracking::TrackingBlockStore;
5use crate::state::AppState;
6use crate::validation::ValidationStatus;
7use axum::{
8 Json,
9 extract::State,
10 http::{HeaderMap, StatusCode},
11 response::{IntoResponse, Response},
12};
13use cid::Cid;
14use jacquard::types::{
15 integer::LimitedU32,
16 string::{Nsid, Tid},
17};
18use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
19use serde::{Deserialize, Serialize};
20use serde_json::json;
21use sqlx::{PgPool, Row};
22use std::str::FromStr;
23use std::sync::Arc;
24use tracing::error;
25use uuid::Uuid;
26
27pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
28 let row = sqlx::query(
29 r#"
30 SELECT
31 email_verified,
32 discord_verified,
33 telegram_verified,
34 signal_verified
35 FROM users
36 WHERE did = $1
37 "#,
38 )
39 .bind(did)
40 .fetch_optional(db)
41 .await?;
42 match row {
43 Some(r) => {
44 let email_verified: bool = r.get("email_verified");
45 let discord_verified: bool = r.get("discord_verified");
46 let telegram_verified: bool = r.get("telegram_verified");
47 let signal_verified: bool = r.get("signal_verified");
48 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
49 }
50 None => Ok(false),
51 }
52}
53
54pub struct RepoWriteAuth {
55 pub did: String,
56 pub user_id: Uuid,
57 pub current_root_cid: Cid,
58 pub is_oauth: bool,
59 pub scope: Option<String>,
60 pub controller_did: Option<String>,
61}
62
63pub async fn prepare_repo_write(
64 state: &AppState,
65 headers: &HeaderMap,
66 repo_did: &str,
67 http_method: &str,
68 http_uri: &str,
69) -> Result<RepoWriteAuth, Response> {
70 let extracted = crate::auth::extract_auth_token_from_header(
71 headers.get("Authorization").and_then(|h| h.to_str().ok()),
72 )
73 .ok_or_else(|| {
74 (
75 StatusCode::UNAUTHORIZED,
76 Json(json!({"error": "AuthenticationRequired"})),
77 )
78 .into_response()
79 })?;
80 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
81 let auth_user = crate::auth::validate_token_with_dpop(
82 &state.db,
83 &extracted.token,
84 extracted.is_dpop,
85 dpop_proof,
86 http_method,
87 http_uri,
88 false,
89 )
90 .await
91 .map_err(|e| {
92 (
93 StatusCode::UNAUTHORIZED,
94 Json(json!({"error": e.to_string()})),
95 )
96 .into_response()
97 })?;
98 if repo_did != auth_user.did {
99 return Err((
100 StatusCode::FORBIDDEN,
101 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
102 )
103 .into_response());
104 }
105 if crate::util::is_account_migrated(&state.db, &auth_user.did)
106 .await
107 .unwrap_or(false)
108 {
109 return Err((
110 StatusCode::FORBIDDEN,
111 Json(json!({
112 "error": "AccountMigrated",
113 "message": "Account has been migrated to another PDS. Repo operations are not allowed."
114 })),
115 )
116 .into_response());
117 }
118 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did)
119 .await
120 .unwrap_or(false);
121 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did)
122 .await
123 .unwrap_or(false);
124 if !is_verified && !is_delegated {
125 return Err((
126 StatusCode::FORBIDDEN,
127 Json(json!({
128 "error": "AccountNotVerified",
129 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
130 })),
131 )
132 .into_response());
133 }
134 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
135 .fetch_optional(&state.db)
136 .await
137 .map_err(|e| {
138 error!("DB error fetching user: {}", e);
139 (
140 StatusCode::INTERNAL_SERVER_ERROR,
141 Json(json!({"error": "InternalError"})),
142 )
143 .into_response()
144 })?
145 .ok_or_else(|| {
146 (
147 StatusCode::INTERNAL_SERVER_ERROR,
148 Json(json!({"error": "InternalError", "message": "User not found"})),
149 )
150 .into_response()
151 })?;
152 let root_cid_str: String = sqlx::query_scalar!(
153 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
154 user_id
155 )
156 .fetch_optional(&state.db)
157 .await
158 .map_err(|e| {
159 error!("DB error fetching repo root: {}", e);
160 (
161 StatusCode::INTERNAL_SERVER_ERROR,
162 Json(json!({"error": "InternalError"})),
163 )
164 .into_response()
165 })?
166 .ok_or_else(|| {
167 (
168 StatusCode::INTERNAL_SERVER_ERROR,
169 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
170 )
171 .into_response()
172 })?;
173 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
174 (
175 StatusCode::INTERNAL_SERVER_ERROR,
176 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
177 )
178 .into_response()
179 })?;
180 Ok(RepoWriteAuth {
181 did: auth_user.did,
182 user_id,
183 current_root_cid,
184 is_oauth: auth_user.is_oauth,
185 scope: auth_user.scope,
186 controller_did: auth_user.controller_did,
187 })
188}
189#[derive(Deserialize)]
190#[allow(dead_code)]
191pub struct CreateRecordInput {
192 pub repo: String,
193 pub collection: String,
194 pub rkey: Option<String>,
195 pub validate: Option<bool>,
196 pub record: serde_json::Value,
197 #[serde(rename = "swapCommit")]
198 pub swap_commit: Option<String>,
199}
200#[derive(Serialize)]
201#[serde(rename_all = "camelCase")]
202pub struct CommitInfo {
203 pub cid: String,
204 pub rev: String,
205}
206
207#[derive(Serialize)]
208#[serde(rename_all = "camelCase")]
209pub struct CreateRecordOutput {
210 pub uri: String,
211 pub cid: String,
212 pub commit: CommitInfo,
213 #[serde(skip_serializing_if = "Option::is_none")]
214 pub validation_status: Option<String>,
215}
216pub async fn create_record(
217 State(state): State<AppState>,
218 headers: HeaderMap,
219 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
220 Json(input): Json<CreateRecordInput>,
221) -> Response {
222 let auth =
223 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
224 Ok(res) => res,
225 Err(err_res) => return err_res,
226 };
227
228 if let Err(e) = crate::auth::scope_check::check_repo_scope(
229 auth.is_oauth,
230 auth.scope.as_deref(),
231 crate::oauth::RepoAction::Create,
232 &input.collection,
233 ) {
234 return e;
235 }
236
237 let did = auth.did;
238 let user_id = auth.user_id;
239 let current_root_cid = auth.current_root_cid;
240 let controller_did = auth.controller_did;
241
242 if let Some(swap_commit) = &input.swap_commit
243 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
244 {
245 return (
246 StatusCode::CONFLICT,
247 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
248 )
249 .into_response();
250 }
251 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
252 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
253 Ok(Some(b)) => b,
254 _ => {
255 return (
256 StatusCode::INTERNAL_SERVER_ERROR,
257 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
258 )
259 .into_response();
260 }
261 };
262 let commit = match Commit::from_cbor(&commit_bytes) {
263 Ok(c) => c,
264 _ => {
265 return (
266 StatusCode::INTERNAL_SERVER_ERROR,
267 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
268 )
269 .into_response();
270 }
271 };
272 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
273 let collection_nsid = match input.collection.parse::<Nsid>() {
274 Ok(n) => n,
275 Err(_) => {
276 return (
277 StatusCode::BAD_REQUEST,
278 Json(json!({"error": "InvalidCollection"})),
279 )
280 .into_response();
281 }
282 };
283 let validation_status = if input.validate == Some(false) {
284 None
285 } else {
286 let require_lexicon = input.validate == Some(true);
287 match validate_record_with_status(
288 &input.record,
289 &input.collection,
290 input.rkey.as_deref(),
291 require_lexicon,
292 ) {
293 Ok(status) => Some(status),
294 Err(err_response) => return *err_response,
295 }
296 };
297 let rkey = input
298 .rkey
299 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
300 let record_ipld = crate::util::json_to_ipld(&input.record);
301 let mut record_bytes = Vec::new();
302 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
303 return (
304 StatusCode::BAD_REQUEST,
305 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
306 )
307 .into_response();
308 }
309 let record_cid = match tracking_store.put(&record_bytes).await {
310 Ok(c) => c,
311 _ => {
312 return (
313 StatusCode::INTERNAL_SERVER_ERROR,
314 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
315 )
316 .into_response();
317 }
318 };
319 let key = format!("{}/{}", collection_nsid, rkey);
320 let new_mst = match mst.add(&key, record_cid).await {
321 Ok(m) => m,
322 _ => {
323 return (
324 StatusCode::INTERNAL_SERVER_ERROR,
325 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
326 )
327 .into_response();
328 }
329 };
330 let new_mst_root = match new_mst.persist().await {
331 Ok(c) => c,
332 _ => {
333 return (
334 StatusCode::INTERNAL_SERVER_ERROR,
335 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
336 )
337 .into_response();
338 }
339 };
340 let op = RecordOp::Create {
341 collection: input.collection.clone(),
342 rkey: rkey.clone(),
343 cid: record_cid,
344 };
345 let mut relevant_blocks = std::collections::BTreeMap::new();
346 if new_mst
347 .blocks_for_path(&key, &mut relevant_blocks)
348 .await
349 .is_err()
350 {
351 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
352 }
353 if mst
354 .blocks_for_path(&key, &mut relevant_blocks)
355 .await
356 .is_err()
357 {
358 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
359 }
360 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
361 let mut written_cids = tracking_store.get_all_relevant_cids();
362 for cid in relevant_blocks.keys() {
363 if !written_cids.contains(cid) {
364 written_cids.push(*cid);
365 }
366 }
367 let written_cids_str = written_cids
368 .iter()
369 .map(|c| c.to_string())
370 .collect::<Vec<_>>();
371 let blob_cids = extract_blob_cids(&input.record);
372 let commit_result = match commit_and_log(
373 &state,
374 CommitParams {
375 did: &did,
376 user_id,
377 current_root_cid: Some(current_root_cid),
378 prev_data_cid: Some(commit.data),
379 new_mst_root,
380 ops: vec![op],
381 blocks_cids: &written_cids_str,
382 blobs: &blob_cids,
383 },
384 )
385 .await
386 {
387 Ok(res) => res,
388 Err(e) => {
389 return (
390 StatusCode::INTERNAL_SERVER_ERROR,
391 Json(json!({"error": "InternalError", "message": e})),
392 )
393 .into_response();
394 }
395 };
396
397 if let Some(ref controller) = controller_did {
398 let _ = delegation::log_delegation_action(
399 &state.db,
400 &did,
401 controller,
402 Some(controller),
403 DelegationActionType::RepoWrite,
404 Some(json!({
405 "action": "create",
406 "collection": input.collection,
407 "rkey": rkey
408 })),
409 None,
410 None,
411 )
412 .await;
413 }
414
415 (
416 StatusCode::OK,
417 Json(CreateRecordOutput {
418 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
419 cid: record_cid.to_string(),
420 commit: CommitInfo {
421 cid: commit_result.commit_cid.to_string(),
422 rev: commit_result.rev,
423 },
424 validation_status: validation_status.map(|s| match s {
425 ValidationStatus::Valid => "valid".to_string(),
426 ValidationStatus::Unknown => "unknown".to_string(),
427 ValidationStatus::Invalid => "invalid".to_string(),
428 }),
429 }),
430 )
431 .into_response()
432}
433#[derive(Deserialize)]
434#[allow(dead_code)]
435pub struct PutRecordInput {
436 pub repo: String,
437 pub collection: String,
438 pub rkey: String,
439 pub validate: Option<bool>,
440 pub record: serde_json::Value,
441 #[serde(rename = "swapCommit")]
442 pub swap_commit: Option<String>,
443 #[serde(rename = "swapRecord")]
444 pub swap_record: Option<String>,
445}
446#[derive(Serialize)]
447#[serde(rename_all = "camelCase")]
448pub struct PutRecordOutput {
449 pub uri: String,
450 pub cid: String,
451 #[serde(skip_serializing_if = "Option::is_none")]
452 pub commit: Option<CommitInfo>,
453 #[serde(skip_serializing_if = "Option::is_none")]
454 pub validation_status: Option<String>,
455}
456pub async fn put_record(
457 State(state): State<AppState>,
458 headers: HeaderMap,
459 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
460 Json(input): Json<PutRecordInput>,
461) -> Response {
462 let auth =
463 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
464 Ok(res) => res,
465 Err(err_res) => return err_res,
466 };
467
468 if let Err(e) = crate::auth::scope_check::check_repo_scope(
469 auth.is_oauth,
470 auth.scope.as_deref(),
471 crate::oauth::RepoAction::Create,
472 &input.collection,
473 ) {
474 return e;
475 }
476 if let Err(e) = crate::auth::scope_check::check_repo_scope(
477 auth.is_oauth,
478 auth.scope.as_deref(),
479 crate::oauth::RepoAction::Update,
480 &input.collection,
481 ) {
482 return e;
483 }
484
485 let did = auth.did;
486 let user_id = auth.user_id;
487 let current_root_cid = auth.current_root_cid;
488 let controller_did = auth.controller_did;
489
490 if let Some(swap_commit) = &input.swap_commit
491 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
492 {
493 return (
494 StatusCode::CONFLICT,
495 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
496 )
497 .into_response();
498 }
499 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
500 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
501 Ok(Some(b)) => b,
502 _ => {
503 return (
504 StatusCode::INTERNAL_SERVER_ERROR,
505 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
506 )
507 .into_response();
508 }
509 };
510 let commit = match Commit::from_cbor(&commit_bytes) {
511 Ok(c) => c,
512 _ => {
513 return (
514 StatusCode::INTERNAL_SERVER_ERROR,
515 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
516 )
517 .into_response();
518 }
519 };
520 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
521 let collection_nsid = match input.collection.parse::<Nsid>() {
522 Ok(n) => n,
523 Err(_) => {
524 return (
525 StatusCode::BAD_REQUEST,
526 Json(json!({"error": "InvalidCollection"})),
527 )
528 .into_response();
529 }
530 };
531 let key = format!("{}/{}", collection_nsid, input.rkey);
532 let validation_status = if input.validate == Some(false) {
533 None
534 } else {
535 let require_lexicon = input.validate == Some(true);
536 match validate_record_with_status(
537 &input.record,
538 &input.collection,
539 Some(&input.rkey),
540 require_lexicon,
541 ) {
542 Ok(status) => Some(status),
543 Err(err_response) => return *err_response,
544 }
545 };
546 if let Some(swap_record_str) = &input.swap_record {
547 let expected_cid = Cid::from_str(swap_record_str).ok();
548 let actual_cid = mst.get(&key).await.ok().flatten();
549 if expected_cid != actual_cid {
550 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
551 }
552 }
553 let existing_cid = mst.get(&key).await.ok().flatten();
554 let record_ipld = crate::util::json_to_ipld(&input.record);
555 let mut record_bytes = Vec::new();
556 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
557 return (
558 StatusCode::BAD_REQUEST,
559 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
560 )
561 .into_response();
562 }
563 let record_cid = match tracking_store.put(&record_bytes).await {
564 Ok(c) => c,
565 _ => {
566 return (
567 StatusCode::INTERNAL_SERVER_ERROR,
568 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
569 )
570 .into_response();
571 }
572 };
573 if existing_cid == Some(record_cid) {
574 return (
575 StatusCode::OK,
576 Json(PutRecordOutput {
577 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
578 cid: record_cid.to_string(),
579 commit: None,
580 validation_status: validation_status.map(|s| match s {
581 ValidationStatus::Valid => "valid".to_string(),
582 ValidationStatus::Unknown => "unknown".to_string(),
583 ValidationStatus::Invalid => "invalid".to_string(),
584 }),
585 }),
586 )
587 .into_response();
588 }
589 let new_mst = if existing_cid.is_some() {
590 match mst.update(&key, record_cid).await {
591 Ok(m) => m,
592 Err(_) => {
593 return (
594 StatusCode::INTERNAL_SERVER_ERROR,
595 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
596 )
597 .into_response();
598 }
599 }
600 } else {
601 match mst.add(&key, record_cid).await {
602 Ok(m) => m,
603 Err(_) => {
604 return (
605 StatusCode::INTERNAL_SERVER_ERROR,
606 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
607 )
608 .into_response();
609 }
610 }
611 };
612 let new_mst_root = match new_mst.persist().await {
613 Ok(c) => c,
614 Err(_) => {
615 return (
616 StatusCode::INTERNAL_SERVER_ERROR,
617 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
618 )
619 .into_response();
620 }
621 };
622 let op = if existing_cid.is_some() {
623 RecordOp::Update {
624 collection: input.collection.clone(),
625 rkey: input.rkey.clone(),
626 cid: record_cid,
627 prev: existing_cid,
628 }
629 } else {
630 RecordOp::Create {
631 collection: input.collection.clone(),
632 rkey: input.rkey.clone(),
633 cid: record_cid,
634 }
635 };
636 let mut relevant_blocks = std::collections::BTreeMap::new();
637 if new_mst
638 .blocks_for_path(&key, &mut relevant_blocks)
639 .await
640 .is_err()
641 {
642 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
643 }
644 if mst
645 .blocks_for_path(&key, &mut relevant_blocks)
646 .await
647 .is_err()
648 {
649 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
650 }
651 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
652 let mut written_cids = tracking_store.get_all_relevant_cids();
653 for cid in relevant_blocks.keys() {
654 if !written_cids.contains(cid) {
655 written_cids.push(*cid);
656 }
657 }
658 let written_cids_str = written_cids
659 .iter()
660 .map(|c| c.to_string())
661 .collect::<Vec<_>>();
662 let is_update = existing_cid.is_some();
663 let blob_cids = extract_blob_cids(&input.record);
664 let commit_result = match commit_and_log(
665 &state,
666 CommitParams {
667 did: &did,
668 user_id,
669 current_root_cid: Some(current_root_cid),
670 prev_data_cid: Some(commit.data),
671 new_mst_root,
672 ops: vec![op],
673 blocks_cids: &written_cids_str,
674 blobs: &blob_cids,
675 },
676 )
677 .await
678 {
679 Ok(res) => res,
680 Err(e) => {
681 return (
682 StatusCode::INTERNAL_SERVER_ERROR,
683 Json(json!({"error": "InternalError", "message": e})),
684 )
685 .into_response();
686 }
687 };
688
689 if let Some(ref controller) = controller_did {
690 let _ = delegation::log_delegation_action(
691 &state.db,
692 &did,
693 controller,
694 Some(controller),
695 DelegationActionType::RepoWrite,
696 Some(json!({
697 "action": if is_update { "update" } else { "create" },
698 "collection": input.collection,
699 "rkey": input.rkey
700 })),
701 None,
702 None,
703 )
704 .await;
705 }
706
707 (
708 StatusCode::OK,
709 Json(PutRecordOutput {
710 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
711 cid: record_cid.to_string(),
712 commit: Some(CommitInfo {
713 cid: commit_result.commit_cid.to_string(),
714 rev: commit_result.rev,
715 }),
716 validation_status: validation_status.map(|s| match s {
717 ValidationStatus::Valid => "valid".to_string(),
718 ValidationStatus::Unknown => "unknown".to_string(),
719 ValidationStatus::Invalid => "invalid".to_string(),
720 }),
721 }),
722 )
723 .into_response()
724}