this repo has no description
1use super::validation::validate_record_with_status; 2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 3use crate::delegation::{self, DelegationActionType}; 4use crate::repo::tracking::TrackingBlockStore; 5use crate::state::AppState; 6use crate::validation::ValidationStatus; 7use axum::{ 8 Json, 9 extract::State, 10 http::{HeaderMap, StatusCode}, 11 response::{IntoResponse, Response}, 12}; 13use cid::Cid; 14use jacquard::types::{ 15 integer::LimitedU32, 16 string::{Nsid, Tid}, 17}; 18use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 19use serde::{Deserialize, Serialize}; 20use serde_json::json; 21use sqlx::{PgPool, Row}; 22use std::str::FromStr; 23use std::sync::Arc; 24use tracing::error; 25use uuid::Uuid; 26 27pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> { 28 let row = sqlx::query( 29 r#" 30 SELECT 31 email_verified, 32 discord_verified, 33 telegram_verified, 34 signal_verified 35 FROM users 36 WHERE did = $1 37 "#, 38 ) 39 .bind(did) 40 .fetch_optional(db) 41 .await?; 42 match row { 43 Some(r) => { 44 let email_verified: bool = r.get("email_verified"); 45 let discord_verified: bool = r.get("discord_verified"); 46 let telegram_verified: bool = r.get("telegram_verified"); 47 let signal_verified: bool = r.get("signal_verified"); 48 Ok(email_verified || discord_verified || telegram_verified || signal_verified) 49 } 50 None => Ok(false), 51 } 52} 53 54pub struct RepoWriteAuth { 55 pub did: String, 56 pub user_id: Uuid, 57 pub current_root_cid: Cid, 58 pub is_oauth: bool, 59 pub scope: Option<String>, 60 pub controller_did: Option<String>, 61} 62 63pub async fn prepare_repo_write( 64 state: &AppState, 65 headers: &HeaderMap, 66 repo_did: &str, 67 http_method: &str, 68 http_uri: &str, 69) -> Result<RepoWriteAuth, Response> { 70 let extracted = crate::auth::extract_auth_token_from_header( 71 headers.get("Authorization").and_then(|h| h.to_str().ok()), 72 ) 73 .ok_or_else(|| { 74 ( 75 StatusCode::UNAUTHORIZED, 76 Json(json!({"error": "AuthenticationRequired"})), 77 ) 78 .into_response() 79 })?; 80 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 81 let auth_user = crate::auth::validate_token_with_dpop( 82 &state.db, 83 &extracted.token, 84 extracted.is_dpop, 85 dpop_proof, 86 http_method, 87 http_uri, 88 false, 89 ) 90 .await 91 .map_err(|e| { 92 ( 93 StatusCode::UNAUTHORIZED, 94 Json(json!({"error": e.to_string()})), 95 ) 96 .into_response() 97 })?; 98 if repo_did != auth_user.did { 99 return Err(( 100 StatusCode::FORBIDDEN, 101 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 102 ) 103 .into_response()); 104 } 105 if crate::util::is_account_migrated(&state.db, &auth_user.did) 106 .await 107 .unwrap_or(false) 108 { 109 return Err(( 110 StatusCode::FORBIDDEN, 111 Json(json!({ 112 "error": "AccountMigrated", 113 "message": "Account has been migrated to another PDS. Repo operations are not allowed." 114 })), 115 ) 116 .into_response()); 117 } 118 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did) 119 .await 120 .unwrap_or(false); 121 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did) 122 .await 123 .unwrap_or(false); 124 if !is_verified && !is_delegated { 125 return Err(( 126 StatusCode::FORBIDDEN, 127 Json(json!({ 128 "error": "AccountNotVerified", 129 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 130 })), 131 ) 132 .into_response()); 133 } 134 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 135 .fetch_optional(&state.db) 136 .await 137 .map_err(|e| { 138 error!("DB error fetching user: {}", e); 139 ( 140 StatusCode::INTERNAL_SERVER_ERROR, 141 Json(json!({"error": "InternalError"})), 142 ) 143 .into_response() 144 })? 145 .ok_or_else(|| { 146 ( 147 StatusCode::INTERNAL_SERVER_ERROR, 148 Json(json!({"error": "InternalError", "message": "User not found"})), 149 ) 150 .into_response() 151 })?; 152 let root_cid_str: String = sqlx::query_scalar!( 153 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 154 user_id 155 ) 156 .fetch_optional(&state.db) 157 .await 158 .map_err(|e| { 159 error!("DB error fetching repo root: {}", e); 160 ( 161 StatusCode::INTERNAL_SERVER_ERROR, 162 Json(json!({"error": "InternalError"})), 163 ) 164 .into_response() 165 })? 166 .ok_or_else(|| { 167 ( 168 StatusCode::INTERNAL_SERVER_ERROR, 169 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 170 ) 171 .into_response() 172 })?; 173 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 174 ( 175 StatusCode::INTERNAL_SERVER_ERROR, 176 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 177 ) 178 .into_response() 179 })?; 180 Ok(RepoWriteAuth { 181 did: auth_user.did, 182 user_id, 183 current_root_cid, 184 is_oauth: auth_user.is_oauth, 185 scope: auth_user.scope, 186 controller_did: auth_user.controller_did, 187 }) 188} 189#[derive(Deserialize)] 190#[allow(dead_code)] 191pub struct CreateRecordInput { 192 pub repo: String, 193 pub collection: String, 194 pub rkey: Option<String>, 195 pub validate: Option<bool>, 196 pub record: serde_json::Value, 197 #[serde(rename = "swapCommit")] 198 pub swap_commit: Option<String>, 199} 200#[derive(Serialize)] 201#[serde(rename_all = "camelCase")] 202pub struct CommitInfo { 203 pub cid: String, 204 pub rev: String, 205} 206 207#[derive(Serialize)] 208#[serde(rename_all = "camelCase")] 209pub struct CreateRecordOutput { 210 pub uri: String, 211 pub cid: String, 212 pub commit: CommitInfo, 213 #[serde(skip_serializing_if = "Option::is_none")] 214 pub validation_status: Option<String>, 215} 216pub async fn create_record( 217 State(state): State<AppState>, 218 headers: HeaderMap, 219 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 220 Json(input): Json<CreateRecordInput>, 221) -> Response { 222 let auth = 223 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 224 Ok(res) => res, 225 Err(err_res) => return err_res, 226 }; 227 228 if let Err(e) = crate::auth::scope_check::check_repo_scope( 229 auth.is_oauth, 230 auth.scope.as_deref(), 231 crate::oauth::RepoAction::Create, 232 &input.collection, 233 ) { 234 return e; 235 } 236 237 let did = auth.did; 238 let user_id = auth.user_id; 239 let current_root_cid = auth.current_root_cid; 240 let controller_did = auth.controller_did; 241 242 if let Some(swap_commit) = &input.swap_commit 243 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 244 { 245 return ( 246 StatusCode::CONFLICT, 247 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 248 ) 249 .into_response(); 250 } 251 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 252 let commit_bytes = match tracking_store.get(&current_root_cid).await { 253 Ok(Some(b)) => b, 254 _ => { 255 return ( 256 StatusCode::INTERNAL_SERVER_ERROR, 257 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 258 ) 259 .into_response(); 260 } 261 }; 262 let commit = match Commit::from_cbor(&commit_bytes) { 263 Ok(c) => c, 264 _ => { 265 return ( 266 StatusCode::INTERNAL_SERVER_ERROR, 267 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 268 ) 269 .into_response(); 270 } 271 }; 272 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 273 let collection_nsid = match input.collection.parse::<Nsid>() { 274 Ok(n) => n, 275 Err(_) => { 276 return ( 277 StatusCode::BAD_REQUEST, 278 Json(json!({"error": "InvalidCollection"})), 279 ) 280 .into_response(); 281 } 282 }; 283 let validation_status = if input.validate == Some(false) { 284 None 285 } else { 286 let require_lexicon = input.validate == Some(true); 287 match validate_record_with_status( 288 &input.record, 289 &input.collection, 290 input.rkey.as_deref(), 291 require_lexicon, 292 ) { 293 Ok(status) => Some(status), 294 Err(err_response) => return *err_response, 295 } 296 }; 297 let rkey = input 298 .rkey 299 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 300 let mut record_bytes = Vec::new(); 301 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 302 return ( 303 StatusCode::BAD_REQUEST, 304 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 305 ) 306 .into_response(); 307 } 308 let record_cid = match tracking_store.put(&record_bytes).await { 309 Ok(c) => c, 310 _ => { 311 return ( 312 StatusCode::INTERNAL_SERVER_ERROR, 313 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 314 ) 315 .into_response(); 316 } 317 }; 318 let key = format!("{}/{}", collection_nsid, rkey); 319 let new_mst = match mst.add(&key, record_cid).await { 320 Ok(m) => m, 321 _ => { 322 return ( 323 StatusCode::INTERNAL_SERVER_ERROR, 324 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 325 ) 326 .into_response(); 327 } 328 }; 329 let new_mst_root = match new_mst.persist().await { 330 Ok(c) => c, 331 _ => { 332 return ( 333 StatusCode::INTERNAL_SERVER_ERROR, 334 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 335 ) 336 .into_response(); 337 } 338 }; 339 let op = RecordOp::Create { 340 collection: input.collection.clone(), 341 rkey: rkey.clone(), 342 cid: record_cid, 343 }; 344 let mut relevant_blocks = std::collections::BTreeMap::new(); 345 if new_mst 346 .blocks_for_path(&key, &mut relevant_blocks) 347 .await 348 .is_err() 349 { 350 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 351 } 352 if mst 353 .blocks_for_path(&key, &mut relevant_blocks) 354 .await 355 .is_err() 356 { 357 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 358 } 359 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 360 let mut written_cids = tracking_store.get_all_relevant_cids(); 361 for cid in relevant_blocks.keys() { 362 if !written_cids.contains(cid) { 363 written_cids.push(*cid); 364 } 365 } 366 let written_cids_str = written_cids 367 .iter() 368 .map(|c| c.to_string()) 369 .collect::<Vec<_>>(); 370 let blob_cids = extract_blob_cids(&input.record); 371 let commit_result = match commit_and_log( 372 &state, 373 CommitParams { 374 did: &did, 375 user_id, 376 current_root_cid: Some(current_root_cid), 377 prev_data_cid: Some(commit.data), 378 new_mst_root, 379 ops: vec![op], 380 blocks_cids: &written_cids_str, 381 blobs: &blob_cids, 382 }, 383 ) 384 .await 385 { 386 Ok(res) => res, 387 Err(e) => { 388 return ( 389 StatusCode::INTERNAL_SERVER_ERROR, 390 Json(json!({"error": "InternalError", "message": e})), 391 ) 392 .into_response(); 393 } 394 }; 395 396 if let Some(ref controller) = controller_did { 397 let _ = delegation::log_delegation_action( 398 &state.db, 399 &did, 400 controller, 401 Some(controller), 402 DelegationActionType::RepoWrite, 403 Some(json!({ 404 "action": "create", 405 "collection": input.collection, 406 "rkey": rkey 407 })), 408 None, 409 None, 410 ) 411 .await; 412 } 413 414 ( 415 StatusCode::OK, 416 Json(CreateRecordOutput { 417 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 418 cid: record_cid.to_string(), 419 commit: CommitInfo { 420 cid: commit_result.commit_cid.to_string(), 421 rev: commit_result.rev, 422 }, 423 validation_status: validation_status.map(|s| match s { 424 ValidationStatus::Valid => "valid".to_string(), 425 ValidationStatus::Unknown => "unknown".to_string(), 426 ValidationStatus::Invalid => "invalid".to_string(), 427 }), 428 }), 429 ) 430 .into_response() 431} 432#[derive(Deserialize)] 433#[allow(dead_code)] 434pub struct PutRecordInput { 435 pub repo: String, 436 pub collection: String, 437 pub rkey: String, 438 pub validate: Option<bool>, 439 pub record: serde_json::Value, 440 #[serde(rename = "swapCommit")] 441 pub swap_commit: Option<String>, 442 #[serde(rename = "swapRecord")] 443 pub swap_record: Option<String>, 444} 445#[derive(Serialize)] 446#[serde(rename_all = "camelCase")] 447pub struct PutRecordOutput { 448 pub uri: String, 449 pub cid: String, 450 #[serde(skip_serializing_if = "Option::is_none")] 451 pub commit: Option<CommitInfo>, 452 #[serde(skip_serializing_if = "Option::is_none")] 453 pub validation_status: Option<String>, 454} 455pub async fn put_record( 456 State(state): State<AppState>, 457 headers: HeaderMap, 458 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 459 Json(input): Json<PutRecordInput>, 460) -> Response { 461 let auth = 462 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 463 Ok(res) => res, 464 Err(err_res) => return err_res, 465 }; 466 467 if let Err(e) = crate::auth::scope_check::check_repo_scope( 468 auth.is_oauth, 469 auth.scope.as_deref(), 470 crate::oauth::RepoAction::Create, 471 &input.collection, 472 ) { 473 return e; 474 } 475 if let Err(e) = crate::auth::scope_check::check_repo_scope( 476 auth.is_oauth, 477 auth.scope.as_deref(), 478 crate::oauth::RepoAction::Update, 479 &input.collection, 480 ) { 481 return e; 482 } 483 484 let did = auth.did; 485 let user_id = auth.user_id; 486 let current_root_cid = auth.current_root_cid; 487 let controller_did = auth.controller_did; 488 489 if let Some(swap_commit) = &input.swap_commit 490 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 491 { 492 return ( 493 StatusCode::CONFLICT, 494 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 495 ) 496 .into_response(); 497 } 498 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 499 let commit_bytes = match tracking_store.get(&current_root_cid).await { 500 Ok(Some(b)) => b, 501 _ => { 502 return ( 503 StatusCode::INTERNAL_SERVER_ERROR, 504 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 505 ) 506 .into_response(); 507 } 508 }; 509 let commit = match Commit::from_cbor(&commit_bytes) { 510 Ok(c) => c, 511 _ => { 512 return ( 513 StatusCode::INTERNAL_SERVER_ERROR, 514 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 515 ) 516 .into_response(); 517 } 518 }; 519 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 520 let collection_nsid = match input.collection.parse::<Nsid>() { 521 Ok(n) => n, 522 Err(_) => { 523 return ( 524 StatusCode::BAD_REQUEST, 525 Json(json!({"error": "InvalidCollection"})), 526 ) 527 .into_response(); 528 } 529 }; 530 let key = format!("{}/{}", collection_nsid, input.rkey); 531 let validation_status = if input.validate == Some(false) { 532 None 533 } else { 534 let require_lexicon = input.validate == Some(true); 535 match validate_record_with_status( 536 &input.record, 537 &input.collection, 538 Some(&input.rkey), 539 require_lexicon, 540 ) { 541 Ok(status) => Some(status), 542 Err(err_response) => return *err_response, 543 } 544 }; 545 if let Some(swap_record_str) = &input.swap_record { 546 let expected_cid = Cid::from_str(swap_record_str).ok(); 547 let actual_cid = mst.get(&key).await.ok().flatten(); 548 if expected_cid != actual_cid { 549 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 550 } 551 } 552 let existing_cid = mst.get(&key).await.ok().flatten(); 553 let mut record_bytes = Vec::new(); 554 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 555 return ( 556 StatusCode::BAD_REQUEST, 557 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 558 ) 559 .into_response(); 560 } 561 let record_cid = match tracking_store.put(&record_bytes).await { 562 Ok(c) => c, 563 _ => { 564 return ( 565 StatusCode::INTERNAL_SERVER_ERROR, 566 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 567 ) 568 .into_response(); 569 } 570 }; 571 if existing_cid == Some(record_cid) { 572 return ( 573 StatusCode::OK, 574 Json(PutRecordOutput { 575 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 576 cid: record_cid.to_string(), 577 commit: None, 578 validation_status: validation_status.map(|s| match s { 579 ValidationStatus::Valid => "valid".to_string(), 580 ValidationStatus::Unknown => "unknown".to_string(), 581 ValidationStatus::Invalid => "invalid".to_string(), 582 }), 583 }), 584 ) 585 .into_response(); 586 } 587 let new_mst = if existing_cid.is_some() { 588 match mst.update(&key, record_cid).await { 589 Ok(m) => m, 590 Err(_) => { 591 return ( 592 StatusCode::INTERNAL_SERVER_ERROR, 593 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 594 ) 595 .into_response(); 596 } 597 } 598 } else { 599 match mst.add(&key, record_cid).await { 600 Ok(m) => m, 601 Err(_) => { 602 return ( 603 StatusCode::INTERNAL_SERVER_ERROR, 604 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 605 ) 606 .into_response(); 607 } 608 } 609 }; 610 let new_mst_root = match new_mst.persist().await { 611 Ok(c) => c, 612 Err(_) => { 613 return ( 614 StatusCode::INTERNAL_SERVER_ERROR, 615 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 616 ) 617 .into_response(); 618 } 619 }; 620 let op = if existing_cid.is_some() { 621 RecordOp::Update { 622 collection: input.collection.clone(), 623 rkey: input.rkey.clone(), 624 cid: record_cid, 625 prev: existing_cid, 626 } 627 } else { 628 RecordOp::Create { 629 collection: input.collection.clone(), 630 rkey: input.rkey.clone(), 631 cid: record_cid, 632 } 633 }; 634 let mut relevant_blocks = std::collections::BTreeMap::new(); 635 if new_mst 636 .blocks_for_path(&key, &mut relevant_blocks) 637 .await 638 .is_err() 639 { 640 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 641 } 642 if mst 643 .blocks_for_path(&key, &mut relevant_blocks) 644 .await 645 .is_err() 646 { 647 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 648 } 649 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 650 let mut written_cids = tracking_store.get_all_relevant_cids(); 651 for cid in relevant_blocks.keys() { 652 if !written_cids.contains(cid) { 653 written_cids.push(*cid); 654 } 655 } 656 let written_cids_str = written_cids 657 .iter() 658 .map(|c| c.to_string()) 659 .collect::<Vec<_>>(); 660 let is_update = existing_cid.is_some(); 661 let blob_cids = extract_blob_cids(&input.record); 662 let commit_result = match commit_and_log( 663 &state, 664 CommitParams { 665 did: &did, 666 user_id, 667 current_root_cid: Some(current_root_cid), 668 prev_data_cid: Some(commit.data), 669 new_mst_root, 670 ops: vec![op], 671 blocks_cids: &written_cids_str, 672 blobs: &blob_cids, 673 }, 674 ) 675 .await 676 { 677 Ok(res) => res, 678 Err(e) => { 679 return ( 680 StatusCode::INTERNAL_SERVER_ERROR, 681 Json(json!({"error": "InternalError", "message": e})), 682 ) 683 .into_response(); 684 } 685 }; 686 687 if let Some(ref controller) = controller_did { 688 let _ = delegation::log_delegation_action( 689 &state.db, 690 &did, 691 controller, 692 Some(controller), 693 DelegationActionType::RepoWrite, 694 Some(json!({ 695 "action": if is_update { "update" } else { "create" }, 696 "collection": input.collection, 697 "rkey": input.rkey 698 })), 699 None, 700 None, 701 ) 702 .await; 703 } 704 705 ( 706 StatusCode::OK, 707 Json(PutRecordOutput { 708 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 709 cid: record_cid.to_string(), 710 commit: Some(CommitInfo { 711 cid: commit_result.commit_cid.to_string(), 712 rev: commit_result.rev, 713 }), 714 validation_status: validation_status.map(|s| match s { 715 ValidationStatus::Valid => "valid".to_string(), 716 ValidationStatus::Unknown => "unknown".to_string(), 717 ValidationStatus::Invalid => "invalid".to_string(), 718 }), 719 }), 720 ) 721 .into_response() 722}