this repo has no description
1use super::validation::validate_record_with_status;
2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
3use crate::delegation::{self, DelegationActionType};
4use crate::repo::tracking::TrackingBlockStore;
5use crate::state::AppState;
6use crate::validation::ValidationStatus;
7use axum::{
8 Json,
9 extract::State,
10 http::{HeaderMap, StatusCode},
11 response::{IntoResponse, Response},
12};
13use cid::Cid;
14use jacquard::types::{
15 integer::LimitedU32,
16 string::{Nsid, Tid},
17};
18use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
19use serde::{Deserialize, Serialize};
20use serde_json::json;
21use sqlx::{PgPool, Row};
22use std::str::FromStr;
23use std::sync::Arc;
24use tracing::error;
25use uuid::Uuid;
26
27pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
28 let row = sqlx::query(
29 r#"
30 SELECT
31 email_verified,
32 discord_verified,
33 telegram_verified,
34 signal_verified
35 FROM users
36 WHERE did = $1
37 "#,
38 )
39 .bind(did)
40 .fetch_optional(db)
41 .await?;
42 match row {
43 Some(r) => {
44 let email_verified: bool = r.get("email_verified");
45 let discord_verified: bool = r.get("discord_verified");
46 let telegram_verified: bool = r.get("telegram_verified");
47 let signal_verified: bool = r.get("signal_verified");
48 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
49 }
50 None => Ok(false),
51 }
52}
53
54pub struct RepoWriteAuth {
55 pub did: String,
56 pub user_id: Uuid,
57 pub current_root_cid: Cid,
58 pub is_oauth: bool,
59 pub scope: Option<String>,
60 pub controller_did: Option<String>,
61}
62
63pub async fn prepare_repo_write(
64 state: &AppState,
65 headers: &HeaderMap,
66 repo_did: &str,
67 http_method: &str,
68 http_uri: &str,
69) -> Result<RepoWriteAuth, Response> {
70 let extracted = crate::auth::extract_auth_token_from_header(
71 headers.get("Authorization").and_then(|h| h.to_str().ok()),
72 )
73 .ok_or_else(|| {
74 (
75 StatusCode::UNAUTHORIZED,
76 Json(json!({"error": "AuthenticationRequired"})),
77 )
78 .into_response()
79 })?;
80 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
81 let auth_user = crate::auth::validate_token_with_dpop(
82 &state.db,
83 &extracted.token,
84 extracted.is_dpop,
85 dpop_proof,
86 http_method,
87 http_uri,
88 false,
89 )
90 .await
91 .map_err(|e| {
92 (
93 StatusCode::UNAUTHORIZED,
94 Json(json!({"error": e.to_string()})),
95 )
96 .into_response()
97 })?;
98 if repo_did != auth_user.did {
99 return Err((
100 StatusCode::FORBIDDEN,
101 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
102 )
103 .into_response());
104 }
105 if crate::util::is_account_migrated(&state.db, &auth_user.did)
106 .await
107 .unwrap_or(false)
108 {
109 return Err((
110 StatusCode::FORBIDDEN,
111 Json(json!({
112 "error": "AccountMigrated",
113 "message": "Account has been migrated to another PDS. Repo operations are not allowed."
114 })),
115 )
116 .into_response());
117 }
118 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did)
119 .await
120 .unwrap_or(false);
121 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did)
122 .await
123 .unwrap_or(false);
124 if !is_verified && !is_delegated {
125 return Err((
126 StatusCode::FORBIDDEN,
127 Json(json!({
128 "error": "AccountNotVerified",
129 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
130 })),
131 )
132 .into_response());
133 }
134 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
135 .fetch_optional(&state.db)
136 .await
137 .map_err(|e| {
138 error!("DB error fetching user: {}", e);
139 (
140 StatusCode::INTERNAL_SERVER_ERROR,
141 Json(json!({"error": "InternalError"})),
142 )
143 .into_response()
144 })?
145 .ok_or_else(|| {
146 (
147 StatusCode::INTERNAL_SERVER_ERROR,
148 Json(json!({"error": "InternalError", "message": "User not found"})),
149 )
150 .into_response()
151 })?;
152 let root_cid_str: String = sqlx::query_scalar!(
153 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
154 user_id
155 )
156 .fetch_optional(&state.db)
157 .await
158 .map_err(|e| {
159 error!("DB error fetching repo root: {}", e);
160 (
161 StatusCode::INTERNAL_SERVER_ERROR,
162 Json(json!({"error": "InternalError"})),
163 )
164 .into_response()
165 })?
166 .ok_or_else(|| {
167 (
168 StatusCode::INTERNAL_SERVER_ERROR,
169 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
170 )
171 .into_response()
172 })?;
173 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
174 (
175 StatusCode::INTERNAL_SERVER_ERROR,
176 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
177 )
178 .into_response()
179 })?;
180 Ok(RepoWriteAuth {
181 did: auth_user.did,
182 user_id,
183 current_root_cid,
184 is_oauth: auth_user.is_oauth,
185 scope: auth_user.scope,
186 controller_did: auth_user.controller_did,
187 })
188}
189#[derive(Deserialize)]
190#[allow(dead_code)]
191pub struct CreateRecordInput {
192 pub repo: String,
193 pub collection: String,
194 pub rkey: Option<String>,
195 pub validate: Option<bool>,
196 pub record: serde_json::Value,
197 #[serde(rename = "swapCommit")]
198 pub swap_commit: Option<String>,
199}
200#[derive(Serialize)]
201#[serde(rename_all = "camelCase")]
202pub struct CommitInfo {
203 pub cid: String,
204 pub rev: String,
205}
206
207#[derive(Serialize)]
208#[serde(rename_all = "camelCase")]
209pub struct CreateRecordOutput {
210 pub uri: String,
211 pub cid: String,
212 pub commit: CommitInfo,
213 #[serde(skip_serializing_if = "Option::is_none")]
214 pub validation_status: Option<String>,
215}
216pub async fn create_record(
217 State(state): State<AppState>,
218 headers: HeaderMap,
219 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
220 Json(input): Json<CreateRecordInput>,
221) -> Response {
222 let auth =
223 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
224 Ok(res) => res,
225 Err(err_res) => return err_res,
226 };
227
228 if let Err(e) = crate::auth::scope_check::check_repo_scope(
229 auth.is_oauth,
230 auth.scope.as_deref(),
231 crate::oauth::RepoAction::Create,
232 &input.collection,
233 ) {
234 return e;
235 }
236
237 let did = auth.did;
238 let user_id = auth.user_id;
239 let current_root_cid = auth.current_root_cid;
240 let controller_did = auth.controller_did;
241
242 if let Some(swap_commit) = &input.swap_commit
243 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
244 {
245 return (
246 StatusCode::CONFLICT,
247 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
248 )
249 .into_response();
250 }
251 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
252 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
253 Ok(Some(b)) => b,
254 _ => {
255 return (
256 StatusCode::INTERNAL_SERVER_ERROR,
257 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
258 )
259 .into_response();
260 }
261 };
262 let commit = match Commit::from_cbor(&commit_bytes) {
263 Ok(c) => c,
264 _ => {
265 return (
266 StatusCode::INTERNAL_SERVER_ERROR,
267 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
268 )
269 .into_response();
270 }
271 };
272 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
273 let collection_nsid = match input.collection.parse::<Nsid>() {
274 Ok(n) => n,
275 Err(_) => {
276 return (
277 StatusCode::BAD_REQUEST,
278 Json(json!({"error": "InvalidCollection"})),
279 )
280 .into_response();
281 }
282 };
283 let validation_status = if input.validate == Some(false) {
284 None
285 } else {
286 let require_lexicon = input.validate == Some(true);
287 match validate_record_with_status(
288 &input.record,
289 &input.collection,
290 input.rkey.as_deref(),
291 require_lexicon,
292 ) {
293 Ok(status) => Some(status),
294 Err(err_response) => return *err_response,
295 }
296 };
297 let rkey = input
298 .rkey
299 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
300 let mut record_bytes = Vec::new();
301 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
302 return (
303 StatusCode::BAD_REQUEST,
304 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
305 )
306 .into_response();
307 }
308 let record_cid = match tracking_store.put(&record_bytes).await {
309 Ok(c) => c,
310 _ => {
311 return (
312 StatusCode::INTERNAL_SERVER_ERROR,
313 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
314 )
315 .into_response();
316 }
317 };
318 let key = format!("{}/{}", collection_nsid, rkey);
319 let new_mst = match mst.add(&key, record_cid).await {
320 Ok(m) => m,
321 _ => {
322 return (
323 StatusCode::INTERNAL_SERVER_ERROR,
324 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
325 )
326 .into_response();
327 }
328 };
329 let new_mst_root = match new_mst.persist().await {
330 Ok(c) => c,
331 _ => {
332 return (
333 StatusCode::INTERNAL_SERVER_ERROR,
334 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
335 )
336 .into_response();
337 }
338 };
339 let op = RecordOp::Create {
340 collection: input.collection.clone(),
341 rkey: rkey.clone(),
342 cid: record_cid,
343 };
344 let mut relevant_blocks = std::collections::BTreeMap::new();
345 if new_mst
346 .blocks_for_path(&key, &mut relevant_blocks)
347 .await
348 .is_err()
349 {
350 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
351 }
352 if mst
353 .blocks_for_path(&key, &mut relevant_blocks)
354 .await
355 .is_err()
356 {
357 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
358 }
359 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
360 let mut written_cids = tracking_store.get_all_relevant_cids();
361 for cid in relevant_blocks.keys() {
362 if !written_cids.contains(cid) {
363 written_cids.push(*cid);
364 }
365 }
366 let written_cids_str = written_cids
367 .iter()
368 .map(|c| c.to_string())
369 .collect::<Vec<_>>();
370 let blob_cids = extract_blob_cids(&input.record);
371 let commit_result = match commit_and_log(
372 &state,
373 CommitParams {
374 did: &did,
375 user_id,
376 current_root_cid: Some(current_root_cid),
377 prev_data_cid: Some(commit.data),
378 new_mst_root,
379 ops: vec![op],
380 blocks_cids: &written_cids_str,
381 blobs: &blob_cids,
382 },
383 )
384 .await
385 {
386 Ok(res) => res,
387 Err(e) => {
388 return (
389 StatusCode::INTERNAL_SERVER_ERROR,
390 Json(json!({"error": "InternalError", "message": e})),
391 )
392 .into_response();
393 }
394 };
395
396 if let Some(ref controller) = controller_did {
397 let _ = delegation::log_delegation_action(
398 &state.db,
399 &did,
400 controller,
401 Some(controller),
402 DelegationActionType::RepoWrite,
403 Some(json!({
404 "action": "create",
405 "collection": input.collection,
406 "rkey": rkey
407 })),
408 None,
409 None,
410 )
411 .await;
412 }
413
414 (
415 StatusCode::OK,
416 Json(CreateRecordOutput {
417 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
418 cid: record_cid.to_string(),
419 commit: CommitInfo {
420 cid: commit_result.commit_cid.to_string(),
421 rev: commit_result.rev,
422 },
423 validation_status: validation_status.map(|s| match s {
424 ValidationStatus::Valid => "valid".to_string(),
425 ValidationStatus::Unknown => "unknown".to_string(),
426 ValidationStatus::Invalid => "invalid".to_string(),
427 }),
428 }),
429 )
430 .into_response()
431}
432#[derive(Deserialize)]
433#[allow(dead_code)]
434pub struct PutRecordInput {
435 pub repo: String,
436 pub collection: String,
437 pub rkey: String,
438 pub validate: Option<bool>,
439 pub record: serde_json::Value,
440 #[serde(rename = "swapCommit")]
441 pub swap_commit: Option<String>,
442 #[serde(rename = "swapRecord")]
443 pub swap_record: Option<String>,
444}
445#[derive(Serialize)]
446#[serde(rename_all = "camelCase")]
447pub struct PutRecordOutput {
448 pub uri: String,
449 pub cid: String,
450 #[serde(skip_serializing_if = "Option::is_none")]
451 pub commit: Option<CommitInfo>,
452 #[serde(skip_serializing_if = "Option::is_none")]
453 pub validation_status: Option<String>,
454}
455pub async fn put_record(
456 State(state): State<AppState>,
457 headers: HeaderMap,
458 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
459 Json(input): Json<PutRecordInput>,
460) -> Response {
461 let auth =
462 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
463 Ok(res) => res,
464 Err(err_res) => return err_res,
465 };
466
467 if let Err(e) = crate::auth::scope_check::check_repo_scope(
468 auth.is_oauth,
469 auth.scope.as_deref(),
470 crate::oauth::RepoAction::Create,
471 &input.collection,
472 ) {
473 return e;
474 }
475 if let Err(e) = crate::auth::scope_check::check_repo_scope(
476 auth.is_oauth,
477 auth.scope.as_deref(),
478 crate::oauth::RepoAction::Update,
479 &input.collection,
480 ) {
481 return e;
482 }
483
484 let did = auth.did;
485 let user_id = auth.user_id;
486 let current_root_cid = auth.current_root_cid;
487 let controller_did = auth.controller_did;
488
489 if let Some(swap_commit) = &input.swap_commit
490 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
491 {
492 return (
493 StatusCode::CONFLICT,
494 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
495 )
496 .into_response();
497 }
498 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
499 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
500 Ok(Some(b)) => b,
501 _ => {
502 return (
503 StatusCode::INTERNAL_SERVER_ERROR,
504 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
505 )
506 .into_response();
507 }
508 };
509 let commit = match Commit::from_cbor(&commit_bytes) {
510 Ok(c) => c,
511 _ => {
512 return (
513 StatusCode::INTERNAL_SERVER_ERROR,
514 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
515 )
516 .into_response();
517 }
518 };
519 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
520 let collection_nsid = match input.collection.parse::<Nsid>() {
521 Ok(n) => n,
522 Err(_) => {
523 return (
524 StatusCode::BAD_REQUEST,
525 Json(json!({"error": "InvalidCollection"})),
526 )
527 .into_response();
528 }
529 };
530 let key = format!("{}/{}", collection_nsid, input.rkey);
531 let validation_status = if input.validate == Some(false) {
532 None
533 } else {
534 let require_lexicon = input.validate == Some(true);
535 match validate_record_with_status(
536 &input.record,
537 &input.collection,
538 Some(&input.rkey),
539 require_lexicon,
540 ) {
541 Ok(status) => Some(status),
542 Err(err_response) => return *err_response,
543 }
544 };
545 if let Some(swap_record_str) = &input.swap_record {
546 let expected_cid = Cid::from_str(swap_record_str).ok();
547 let actual_cid = mst.get(&key).await.ok().flatten();
548 if expected_cid != actual_cid {
549 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
550 }
551 }
552 let existing_cid = mst.get(&key).await.ok().flatten();
553 let mut record_bytes = Vec::new();
554 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
555 return (
556 StatusCode::BAD_REQUEST,
557 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
558 )
559 .into_response();
560 }
561 let record_cid = match tracking_store.put(&record_bytes).await {
562 Ok(c) => c,
563 _ => {
564 return (
565 StatusCode::INTERNAL_SERVER_ERROR,
566 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
567 )
568 .into_response();
569 }
570 };
571 if existing_cid == Some(record_cid) {
572 return (
573 StatusCode::OK,
574 Json(PutRecordOutput {
575 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
576 cid: record_cid.to_string(),
577 commit: None,
578 validation_status: validation_status.map(|s| match s {
579 ValidationStatus::Valid => "valid".to_string(),
580 ValidationStatus::Unknown => "unknown".to_string(),
581 ValidationStatus::Invalid => "invalid".to_string(),
582 }),
583 }),
584 )
585 .into_response();
586 }
587 let new_mst = if existing_cid.is_some() {
588 match mst.update(&key, record_cid).await {
589 Ok(m) => m,
590 Err(_) => {
591 return (
592 StatusCode::INTERNAL_SERVER_ERROR,
593 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
594 )
595 .into_response();
596 }
597 }
598 } else {
599 match mst.add(&key, record_cid).await {
600 Ok(m) => m,
601 Err(_) => {
602 return (
603 StatusCode::INTERNAL_SERVER_ERROR,
604 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
605 )
606 .into_response();
607 }
608 }
609 };
610 let new_mst_root = match new_mst.persist().await {
611 Ok(c) => c,
612 Err(_) => {
613 return (
614 StatusCode::INTERNAL_SERVER_ERROR,
615 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
616 )
617 .into_response();
618 }
619 };
620 let op = if existing_cid.is_some() {
621 RecordOp::Update {
622 collection: input.collection.clone(),
623 rkey: input.rkey.clone(),
624 cid: record_cid,
625 prev: existing_cid,
626 }
627 } else {
628 RecordOp::Create {
629 collection: input.collection.clone(),
630 rkey: input.rkey.clone(),
631 cid: record_cid,
632 }
633 };
634 let mut relevant_blocks = std::collections::BTreeMap::new();
635 if new_mst
636 .blocks_for_path(&key, &mut relevant_blocks)
637 .await
638 .is_err()
639 {
640 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
641 }
642 if mst
643 .blocks_for_path(&key, &mut relevant_blocks)
644 .await
645 .is_err()
646 {
647 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
648 }
649 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
650 let mut written_cids = tracking_store.get_all_relevant_cids();
651 for cid in relevant_blocks.keys() {
652 if !written_cids.contains(cid) {
653 written_cids.push(*cid);
654 }
655 }
656 let written_cids_str = written_cids
657 .iter()
658 .map(|c| c.to_string())
659 .collect::<Vec<_>>();
660 let is_update = existing_cid.is_some();
661 let blob_cids = extract_blob_cids(&input.record);
662 let commit_result = match commit_and_log(
663 &state,
664 CommitParams {
665 did: &did,
666 user_id,
667 current_root_cid: Some(current_root_cid),
668 prev_data_cid: Some(commit.data),
669 new_mst_root,
670 ops: vec![op],
671 blocks_cids: &written_cids_str,
672 blobs: &blob_cids,
673 },
674 )
675 .await
676 {
677 Ok(res) => res,
678 Err(e) => {
679 return (
680 StatusCode::INTERNAL_SERVER_ERROR,
681 Json(json!({"error": "InternalError", "message": e})),
682 )
683 .into_response();
684 }
685 };
686
687 if let Some(ref controller) = controller_did {
688 let _ = delegation::log_delegation_action(
689 &state.db,
690 &did,
691 controller,
692 Some(controller),
693 DelegationActionType::RepoWrite,
694 Some(json!({
695 "action": if is_update { "update" } else { "create" },
696 "collection": input.collection,
697 "rkey": input.rkey
698 })),
699 None,
700 None,
701 )
702 .await;
703 }
704
705 (
706 StatusCode::OK,
707 Json(PutRecordOutput {
708 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
709 cid: record_cid.to_string(),
710 commit: Some(CommitInfo {
711 cid: commit_result.commit_cid.to_string(),
712 rev: commit_result.rev,
713 }),
714 validation_status: validation_status.map(|s| match s {
715 ValidationStatus::Valid => "valid".to_string(),
716 ValidationStatus::Unknown => "unknown".to_string(),
717 ValidationStatus::Invalid => "invalid".to_string(),
718 }),
719 }),
720 )
721 .into_response()
722}