this repo has no description
1use super::validation::validate_record_with_status; 2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 3use crate::delegation::{self, DelegationActionType}; 4use crate::repo::tracking::TrackingBlockStore; 5use crate::state::AppState; 6use crate::validation::ValidationStatus; 7use axum::{ 8 Json, 9 extract::State, 10 http::{HeaderMap, StatusCode}, 11 response::{IntoResponse, Response}, 12}; 13use cid::Cid; 14use jacquard::types::{ 15 integer::LimitedU32, 16 string::{Nsid, Tid}, 17}; 18use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 19use serde::{Deserialize, Serialize}; 20use serde_json::json; 21use sqlx::{PgPool, Row}; 22use std::str::FromStr; 23use std::sync::Arc; 24use tracing::error; 25use uuid::Uuid; 26 27pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> { 28 let row = sqlx::query( 29 r#" 30 SELECT 31 email_verified, 32 discord_verified, 33 telegram_verified, 34 signal_verified 35 FROM users 36 WHERE did = $1 37 "#, 38 ) 39 .bind(did) 40 .fetch_optional(db) 41 .await?; 42 match row { 43 Some(r) => { 44 let email_verified: bool = r.get("email_verified"); 45 let discord_verified: bool = r.get("discord_verified"); 46 let telegram_verified: bool = r.get("telegram_verified"); 47 let signal_verified: bool = r.get("signal_verified"); 48 Ok(email_verified || discord_verified || telegram_verified || signal_verified) 49 } 50 None => Ok(false), 51 } 52} 53 54pub struct RepoWriteAuth { 55 pub did: String, 56 pub user_id: Uuid, 57 pub current_root_cid: Cid, 58 pub is_oauth: bool, 59 pub scope: Option<String>, 60 pub controller_did: Option<String>, 61} 62 63pub async fn prepare_repo_write( 64 state: &AppState, 65 headers: &HeaderMap, 66 repo_did: &str, 67 http_method: &str, 68 http_uri: &str, 69) -> Result<RepoWriteAuth, Response> { 70 let extracted = crate::auth::extract_auth_token_from_header( 71 headers.get("Authorization").and_then(|h| h.to_str().ok()), 72 ) 73 .ok_or_else(|| { 74 ( 75 StatusCode::UNAUTHORIZED, 76 Json(json!({"error": "AuthenticationRequired"})), 77 ) 78 .into_response() 79 })?; 80 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 81 let auth_user = crate::auth::validate_token_with_dpop( 82 &state.db, 83 &extracted.token, 84 extracted.is_dpop, 85 dpop_proof, 86 http_method, 87 http_uri, 88 false, 89 ) 90 .await 91 .map_err(|e| { 92 ( 93 StatusCode::UNAUTHORIZED, 94 Json(json!({"error": e.to_string()})), 95 ) 96 .into_response() 97 })?; 98 if repo_did != auth_user.did { 99 return Err(( 100 StatusCode::FORBIDDEN, 101 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 102 ) 103 .into_response()); 104 } 105 if crate::util::is_account_migrated(&state.db, &auth_user.did) 106 .await 107 .unwrap_or(false) 108 { 109 return Err(( 110 StatusCode::FORBIDDEN, 111 Json(json!({ 112 "error": "AccountMigrated", 113 "message": "Account has been migrated to another PDS. Repo operations are not allowed." 114 })), 115 ) 116 .into_response()); 117 } 118 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did) 119 .await 120 .unwrap_or(false); 121 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did) 122 .await 123 .unwrap_or(false); 124 if !is_verified && !is_delegated { 125 return Err(( 126 StatusCode::FORBIDDEN, 127 Json(json!({ 128 "error": "AccountNotVerified", 129 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 130 })), 131 ) 132 .into_response()); 133 } 134 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 135 .fetch_optional(&state.db) 136 .await 137 .map_err(|e| { 138 error!("DB error fetching user: {}", e); 139 ( 140 StatusCode::INTERNAL_SERVER_ERROR, 141 Json(json!({"error": "InternalError"})), 142 ) 143 .into_response() 144 })? 145 .ok_or_else(|| { 146 ( 147 StatusCode::INTERNAL_SERVER_ERROR, 148 Json(json!({"error": "InternalError", "message": "User not found"})), 149 ) 150 .into_response() 151 })?; 152 let root_cid_str: String = sqlx::query_scalar!( 153 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 154 user_id 155 ) 156 .fetch_optional(&state.db) 157 .await 158 .map_err(|e| { 159 error!("DB error fetching repo root: {}", e); 160 ( 161 StatusCode::INTERNAL_SERVER_ERROR, 162 Json(json!({"error": "InternalError"})), 163 ) 164 .into_response() 165 })? 166 .ok_or_else(|| { 167 ( 168 StatusCode::INTERNAL_SERVER_ERROR, 169 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 170 ) 171 .into_response() 172 })?; 173 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 174 ( 175 StatusCode::INTERNAL_SERVER_ERROR, 176 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 177 ) 178 .into_response() 179 })?; 180 Ok(RepoWriteAuth { 181 did: auth_user.did, 182 user_id, 183 current_root_cid, 184 is_oauth: auth_user.is_oauth, 185 scope: auth_user.scope, 186 controller_did: auth_user.controller_did, 187 }) 188} 189#[derive(Deserialize)] 190#[allow(dead_code)] 191pub struct CreateRecordInput { 192 pub repo: String, 193 pub collection: String, 194 pub rkey: Option<String>, 195 pub validate: Option<bool>, 196 pub record: serde_json::Value, 197 #[serde(rename = "swapCommit")] 198 pub swap_commit: Option<String>, 199} 200#[derive(Serialize)] 201#[serde(rename_all = "camelCase")] 202pub struct CommitInfo { 203 pub cid: String, 204 pub rev: String, 205} 206 207#[derive(Serialize)] 208#[serde(rename_all = "camelCase")] 209pub struct CreateRecordOutput { 210 pub uri: String, 211 pub cid: String, 212 pub commit: CommitInfo, 213 #[serde(skip_serializing_if = "Option::is_none")] 214 pub validation_status: Option<String>, 215} 216pub async fn create_record( 217 State(state): State<AppState>, 218 headers: HeaderMap, 219 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 220 Json(input): Json<CreateRecordInput>, 221) -> Response { 222 let auth = 223 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 224 Ok(res) => res, 225 Err(err_res) => return err_res, 226 }; 227 228 if let Err(e) = crate::auth::scope_check::check_repo_scope( 229 auth.is_oauth, 230 auth.scope.as_deref(), 231 crate::oauth::RepoAction::Create, 232 &input.collection, 233 ) { 234 return e; 235 } 236 237 let did = auth.did; 238 let user_id = auth.user_id; 239 let current_root_cid = auth.current_root_cid; 240 let controller_did = auth.controller_did; 241 242 if let Some(swap_commit) = &input.swap_commit 243 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 244 { 245 return ( 246 StatusCode::CONFLICT, 247 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 248 ) 249 .into_response(); 250 } 251 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 252 let commit_bytes = match tracking_store.get(&current_root_cid).await { 253 Ok(Some(b)) => b, 254 _ => { 255 return ( 256 StatusCode::INTERNAL_SERVER_ERROR, 257 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 258 ) 259 .into_response(); 260 } 261 }; 262 let commit = match Commit::from_cbor(&commit_bytes) { 263 Ok(c) => c, 264 _ => { 265 return ( 266 StatusCode::INTERNAL_SERVER_ERROR, 267 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 268 ) 269 .into_response(); 270 } 271 }; 272 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 273 let collection_nsid = match input.collection.parse::<Nsid>() { 274 Ok(n) => n, 275 Err(_) => { 276 return ( 277 StatusCode::BAD_REQUEST, 278 Json(json!({"error": "InvalidCollection"})), 279 ) 280 .into_response(); 281 } 282 }; 283 let validation_status = if input.validate == Some(false) { 284 None 285 } else { 286 let require_lexicon = input.validate == Some(true); 287 match validate_record_with_status( 288 &input.record, 289 &input.collection, 290 input.rkey.as_deref(), 291 require_lexicon, 292 ) { 293 Ok(status) => Some(status), 294 Err(err_response) => return *err_response, 295 } 296 }; 297 let rkey = input 298 .rkey 299 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 300 let record_ipld = crate::util::json_to_ipld(&input.record); 301 let mut record_bytes = Vec::new(); 302 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 303 return ( 304 StatusCode::BAD_REQUEST, 305 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 306 ) 307 .into_response(); 308 } 309 let record_cid = match tracking_store.put(&record_bytes).await { 310 Ok(c) => c, 311 _ => { 312 return ( 313 StatusCode::INTERNAL_SERVER_ERROR, 314 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 315 ) 316 .into_response(); 317 } 318 }; 319 let key = format!("{}/{}", collection_nsid, rkey); 320 let new_mst = match mst.add(&key, record_cid).await { 321 Ok(m) => m, 322 _ => { 323 return ( 324 StatusCode::INTERNAL_SERVER_ERROR, 325 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 326 ) 327 .into_response(); 328 } 329 }; 330 let new_mst_root = match new_mst.persist().await { 331 Ok(c) => c, 332 _ => { 333 return ( 334 StatusCode::INTERNAL_SERVER_ERROR, 335 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 336 ) 337 .into_response(); 338 } 339 }; 340 let op = RecordOp::Create { 341 collection: input.collection.clone(), 342 rkey: rkey.clone(), 343 cid: record_cid, 344 }; 345 let mut relevant_blocks = std::collections::BTreeMap::new(); 346 if new_mst 347 .blocks_for_path(&key, &mut relevant_blocks) 348 .await 349 .is_err() 350 { 351 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 352 } 353 if mst 354 .blocks_for_path(&key, &mut relevant_blocks) 355 .await 356 .is_err() 357 { 358 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 359 } 360 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 361 let mut written_cids = tracking_store.get_all_relevant_cids(); 362 for cid in relevant_blocks.keys() { 363 if !written_cids.contains(cid) { 364 written_cids.push(*cid); 365 } 366 } 367 let written_cids_str = written_cids 368 .iter() 369 .map(|c| c.to_string()) 370 .collect::<Vec<_>>(); 371 let blob_cids = extract_blob_cids(&input.record); 372 let commit_result = match commit_and_log( 373 &state, 374 CommitParams { 375 did: &did, 376 user_id, 377 current_root_cid: Some(current_root_cid), 378 prev_data_cid: Some(commit.data), 379 new_mst_root, 380 ops: vec![op], 381 blocks_cids: &written_cids_str, 382 blobs: &blob_cids, 383 }, 384 ) 385 .await 386 { 387 Ok(res) => res, 388 Err(e) => { 389 return ( 390 StatusCode::INTERNAL_SERVER_ERROR, 391 Json(json!({"error": "InternalError", "message": e})), 392 ) 393 .into_response(); 394 } 395 }; 396 397 if let Some(ref controller) = controller_did { 398 let _ = delegation::log_delegation_action( 399 &state.db, 400 &did, 401 controller, 402 Some(controller), 403 DelegationActionType::RepoWrite, 404 Some(json!({ 405 "action": "create", 406 "collection": input.collection, 407 "rkey": rkey 408 })), 409 None, 410 None, 411 ) 412 .await; 413 } 414 415 ( 416 StatusCode::OK, 417 Json(CreateRecordOutput { 418 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 419 cid: record_cid.to_string(), 420 commit: CommitInfo { 421 cid: commit_result.commit_cid.to_string(), 422 rev: commit_result.rev, 423 }, 424 validation_status: validation_status.map(|s| match s { 425 ValidationStatus::Valid => "valid".to_string(), 426 ValidationStatus::Unknown => "unknown".to_string(), 427 ValidationStatus::Invalid => "invalid".to_string(), 428 }), 429 }), 430 ) 431 .into_response() 432} 433#[derive(Deserialize)] 434#[allow(dead_code)] 435pub struct PutRecordInput { 436 pub repo: String, 437 pub collection: String, 438 pub rkey: String, 439 pub validate: Option<bool>, 440 pub record: serde_json::Value, 441 #[serde(rename = "swapCommit")] 442 pub swap_commit: Option<String>, 443 #[serde(rename = "swapRecord")] 444 pub swap_record: Option<String>, 445} 446#[derive(Serialize)] 447#[serde(rename_all = "camelCase")] 448pub struct PutRecordOutput { 449 pub uri: String, 450 pub cid: String, 451 #[serde(skip_serializing_if = "Option::is_none")] 452 pub commit: Option<CommitInfo>, 453 #[serde(skip_serializing_if = "Option::is_none")] 454 pub validation_status: Option<String>, 455} 456pub async fn put_record( 457 State(state): State<AppState>, 458 headers: HeaderMap, 459 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 460 Json(input): Json<PutRecordInput>, 461) -> Response { 462 let auth = 463 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 464 Ok(res) => res, 465 Err(err_res) => return err_res, 466 }; 467 468 if let Err(e) = crate::auth::scope_check::check_repo_scope( 469 auth.is_oauth, 470 auth.scope.as_deref(), 471 crate::oauth::RepoAction::Create, 472 &input.collection, 473 ) { 474 return e; 475 } 476 if let Err(e) = crate::auth::scope_check::check_repo_scope( 477 auth.is_oauth, 478 auth.scope.as_deref(), 479 crate::oauth::RepoAction::Update, 480 &input.collection, 481 ) { 482 return e; 483 } 484 485 let did = auth.did; 486 let user_id = auth.user_id; 487 let current_root_cid = auth.current_root_cid; 488 let controller_did = auth.controller_did; 489 490 if let Some(swap_commit) = &input.swap_commit 491 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 492 { 493 return ( 494 StatusCode::CONFLICT, 495 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 496 ) 497 .into_response(); 498 } 499 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 500 let commit_bytes = match tracking_store.get(&current_root_cid).await { 501 Ok(Some(b)) => b, 502 _ => { 503 return ( 504 StatusCode::INTERNAL_SERVER_ERROR, 505 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 506 ) 507 .into_response(); 508 } 509 }; 510 let commit = match Commit::from_cbor(&commit_bytes) { 511 Ok(c) => c, 512 _ => { 513 return ( 514 StatusCode::INTERNAL_SERVER_ERROR, 515 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 516 ) 517 .into_response(); 518 } 519 }; 520 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 521 let collection_nsid = match input.collection.parse::<Nsid>() { 522 Ok(n) => n, 523 Err(_) => { 524 return ( 525 StatusCode::BAD_REQUEST, 526 Json(json!({"error": "InvalidCollection"})), 527 ) 528 .into_response(); 529 } 530 }; 531 let key = format!("{}/{}", collection_nsid, input.rkey); 532 let validation_status = if input.validate == Some(false) { 533 None 534 } else { 535 let require_lexicon = input.validate == Some(true); 536 match validate_record_with_status( 537 &input.record, 538 &input.collection, 539 Some(&input.rkey), 540 require_lexicon, 541 ) { 542 Ok(status) => Some(status), 543 Err(err_response) => return *err_response, 544 } 545 }; 546 if let Some(swap_record_str) = &input.swap_record { 547 let expected_cid = Cid::from_str(swap_record_str).ok(); 548 let actual_cid = mst.get(&key).await.ok().flatten(); 549 if expected_cid != actual_cid { 550 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 551 } 552 } 553 let existing_cid = mst.get(&key).await.ok().flatten(); 554 let record_ipld = crate::util::json_to_ipld(&input.record); 555 let mut record_bytes = Vec::new(); 556 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 557 return ( 558 StatusCode::BAD_REQUEST, 559 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 560 ) 561 .into_response(); 562 } 563 let record_cid = match tracking_store.put(&record_bytes).await { 564 Ok(c) => c, 565 _ => { 566 return ( 567 StatusCode::INTERNAL_SERVER_ERROR, 568 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 569 ) 570 .into_response(); 571 } 572 }; 573 if existing_cid == Some(record_cid) { 574 return ( 575 StatusCode::OK, 576 Json(PutRecordOutput { 577 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 578 cid: record_cid.to_string(), 579 commit: None, 580 validation_status: validation_status.map(|s| match s { 581 ValidationStatus::Valid => "valid".to_string(), 582 ValidationStatus::Unknown => "unknown".to_string(), 583 ValidationStatus::Invalid => "invalid".to_string(), 584 }), 585 }), 586 ) 587 .into_response(); 588 } 589 let new_mst = if existing_cid.is_some() { 590 match mst.update(&key, record_cid).await { 591 Ok(m) => m, 592 Err(_) => { 593 return ( 594 StatusCode::INTERNAL_SERVER_ERROR, 595 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 596 ) 597 .into_response(); 598 } 599 } 600 } else { 601 match mst.add(&key, record_cid).await { 602 Ok(m) => m, 603 Err(_) => { 604 return ( 605 StatusCode::INTERNAL_SERVER_ERROR, 606 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 607 ) 608 .into_response(); 609 } 610 } 611 }; 612 let new_mst_root = match new_mst.persist().await { 613 Ok(c) => c, 614 Err(_) => { 615 return ( 616 StatusCode::INTERNAL_SERVER_ERROR, 617 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 618 ) 619 .into_response(); 620 } 621 }; 622 let op = if existing_cid.is_some() { 623 RecordOp::Update { 624 collection: input.collection.clone(), 625 rkey: input.rkey.clone(), 626 cid: record_cid, 627 prev: existing_cid, 628 } 629 } else { 630 RecordOp::Create { 631 collection: input.collection.clone(), 632 rkey: input.rkey.clone(), 633 cid: record_cid, 634 } 635 }; 636 let mut relevant_blocks = std::collections::BTreeMap::new(); 637 if new_mst 638 .blocks_for_path(&key, &mut relevant_blocks) 639 .await 640 .is_err() 641 { 642 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 643 } 644 if mst 645 .blocks_for_path(&key, &mut relevant_blocks) 646 .await 647 .is_err() 648 { 649 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 650 } 651 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 652 let mut written_cids = tracking_store.get_all_relevant_cids(); 653 for cid in relevant_blocks.keys() { 654 if !written_cids.contains(cid) { 655 written_cids.push(*cid); 656 } 657 } 658 let written_cids_str = written_cids 659 .iter() 660 .map(|c| c.to_string()) 661 .collect::<Vec<_>>(); 662 let is_update = existing_cid.is_some(); 663 let blob_cids = extract_blob_cids(&input.record); 664 let commit_result = match commit_and_log( 665 &state, 666 CommitParams { 667 did: &did, 668 user_id, 669 current_root_cid: Some(current_root_cid), 670 prev_data_cid: Some(commit.data), 671 new_mst_root, 672 ops: vec![op], 673 blocks_cids: &written_cids_str, 674 blobs: &blob_cids, 675 }, 676 ) 677 .await 678 { 679 Ok(res) => res, 680 Err(e) => { 681 return ( 682 StatusCode::INTERNAL_SERVER_ERROR, 683 Json(json!({"error": "InternalError", "message": e})), 684 ) 685 .into_response(); 686 } 687 }; 688 689 if let Some(ref controller) = controller_did { 690 let _ = delegation::log_delegation_action( 691 &state.db, 692 &did, 693 controller, 694 Some(controller), 695 DelegationActionType::RepoWrite, 696 Some(json!({ 697 "action": if is_update { "update" } else { "create" }, 698 "collection": input.collection, 699 "rkey": input.rkey 700 })), 701 None, 702 None, 703 ) 704 .await; 705 } 706 707 ( 708 StatusCode::OK, 709 Json(PutRecordOutput { 710 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 711 cid: record_cid.to_string(), 712 commit: Some(CommitInfo { 713 cid: commit_result.commit_cid.to_string(), 714 rev: commit_result.rev, 715 }), 716 validation_status: validation_status.map(|s| match s { 717 ValidationStatus::Valid => "valid".to_string(), 718 ValidationStatus::Unknown => "unknown".to_string(), 719 ValidationStatus::Invalid => "invalid".to_string(), 720 }), 721 }), 722 ) 723 .into_response() 724}