this repo has no description
1use super::validation::validate_record_with_status; 2use crate::validation::ValidationStatus; 3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 4use crate::delegation::{self, DelegationActionType}; 5use crate::repo::tracking::TrackingBlockStore; 6use crate::state::AppState; 7use axum::{ 8 Json, 9 extract::State, 10 http::{HeaderMap, StatusCode}, 11 response::{IntoResponse, Response}, 12}; 13use cid::Cid; 14use jacquard::types::{ 15 integer::LimitedU32, 16 string::{Nsid, Tid}, 17}; 18use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 19use serde::{Deserialize, Serialize}; 20use serde_json::json; 21use sqlx::{PgPool, Row}; 22use std::str::FromStr; 23use std::sync::Arc; 24use tracing::error; 25use uuid::Uuid; 26 27pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> { 28 let row = sqlx::query( 29 r#" 30 SELECT 31 email_verified, 32 discord_verified, 33 telegram_verified, 34 signal_verified 35 FROM users 36 WHERE did = $1 37 "#, 38 ) 39 .bind(did) 40 .fetch_optional(db) 41 .await?; 42 match row { 43 Some(r) => { 44 let email_verified: bool = r.get("email_verified"); 45 let discord_verified: bool = r.get("discord_verified"); 46 let telegram_verified: bool = r.get("telegram_verified"); 47 let signal_verified: bool = r.get("signal_verified"); 48 Ok(email_verified || discord_verified || telegram_verified || signal_verified) 49 } 50 None => Ok(false), 51 } 52} 53 54pub struct RepoWriteAuth { 55 pub did: String, 56 pub user_id: Uuid, 57 pub current_root_cid: Cid, 58 pub is_oauth: bool, 59 pub scope: Option<String>, 60 pub controller_did: Option<String>, 61} 62 63pub async fn prepare_repo_write( 64 state: &AppState, 65 headers: &HeaderMap, 66 repo_did: &str, 67 http_method: &str, 68 http_uri: &str, 69) -> Result<RepoWriteAuth, Response> { 70 let extracted = crate::auth::extract_auth_token_from_header( 71 headers.get("Authorization").and_then(|h| h.to_str().ok()), 72 ) 73 .ok_or_else(|| { 74 ( 75 StatusCode::UNAUTHORIZED, 76 Json(json!({"error": "AuthenticationRequired"})), 77 ) 78 .into_response() 79 })?; 80 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 81 let auth_user = crate::auth::validate_token_with_dpop( 82 &state.db, 83 &extracted.token, 84 extracted.is_dpop, 85 dpop_proof, 86 http_method, 87 http_uri, 88 false, 89 ) 90 .await 91 .map_err(|e| { 92 ( 93 StatusCode::UNAUTHORIZED, 94 Json(json!({"error": e.to_string()})), 95 ) 96 .into_response() 97 })?; 98 if repo_did != auth_user.did { 99 return Err(( 100 StatusCode::FORBIDDEN, 101 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 102 ) 103 .into_response()); 104 } 105 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did) 106 .await 107 .unwrap_or(false); 108 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did) 109 .await 110 .unwrap_or(false); 111 if !is_verified && !is_delegated { 112 return Err(( 113 StatusCode::FORBIDDEN, 114 Json(json!({ 115 "error": "AccountNotVerified", 116 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 117 })), 118 ) 119 .into_response()); 120 } 121 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 122 .fetch_optional(&state.db) 123 .await 124 .map_err(|e| { 125 error!("DB error fetching user: {}", e); 126 ( 127 StatusCode::INTERNAL_SERVER_ERROR, 128 Json(json!({"error": "InternalError"})), 129 ) 130 .into_response() 131 })? 132 .ok_or_else(|| { 133 ( 134 StatusCode::INTERNAL_SERVER_ERROR, 135 Json(json!({"error": "InternalError", "message": "User not found"})), 136 ) 137 .into_response() 138 })?; 139 let root_cid_str: String = sqlx::query_scalar!( 140 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 141 user_id 142 ) 143 .fetch_optional(&state.db) 144 .await 145 .map_err(|e| { 146 error!("DB error fetching repo root: {}", e); 147 ( 148 StatusCode::INTERNAL_SERVER_ERROR, 149 Json(json!({"error": "InternalError"})), 150 ) 151 .into_response() 152 })? 153 .ok_or_else(|| { 154 ( 155 StatusCode::INTERNAL_SERVER_ERROR, 156 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 157 ) 158 .into_response() 159 })?; 160 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 161 ( 162 StatusCode::INTERNAL_SERVER_ERROR, 163 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 164 ) 165 .into_response() 166 })?; 167 Ok(RepoWriteAuth { 168 did: auth_user.did, 169 user_id, 170 current_root_cid, 171 is_oauth: auth_user.is_oauth, 172 scope: auth_user.scope, 173 controller_did: auth_user.controller_did, 174 }) 175} 176#[derive(Deserialize)] 177#[allow(dead_code)] 178pub struct CreateRecordInput { 179 pub repo: String, 180 pub collection: String, 181 pub rkey: Option<String>, 182 pub validate: Option<bool>, 183 pub record: serde_json::Value, 184 #[serde(rename = "swapCommit")] 185 pub swap_commit: Option<String>, 186} 187#[derive(Serialize)] 188#[serde(rename_all = "camelCase")] 189pub struct CommitInfo { 190 pub cid: String, 191 pub rev: String, 192} 193 194#[derive(Serialize)] 195#[serde(rename_all = "camelCase")] 196pub struct CreateRecordOutput { 197 pub uri: String, 198 pub cid: String, 199 pub commit: CommitInfo, 200 #[serde(skip_serializing_if = "Option::is_none")] 201 pub validation_status: Option<String>, 202} 203pub async fn create_record( 204 State(state): State<AppState>, 205 headers: HeaderMap, 206 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 207 Json(input): Json<CreateRecordInput>, 208) -> Response { 209 let auth = 210 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 211 Ok(res) => res, 212 Err(err_res) => return err_res, 213 }; 214 215 if let Err(e) = crate::auth::scope_check::check_repo_scope( 216 auth.is_oauth, 217 auth.scope.as_deref(), 218 crate::oauth::RepoAction::Create, 219 &input.collection, 220 ) { 221 return e; 222 } 223 224 let did = auth.did; 225 let user_id = auth.user_id; 226 let current_root_cid = auth.current_root_cid; 227 let controller_did = auth.controller_did; 228 229 if let Some(swap_commit) = &input.swap_commit 230 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 231 { 232 return ( 233 StatusCode::CONFLICT, 234 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 235 ) 236 .into_response(); 237 } 238 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 239 let commit_bytes = match tracking_store.get(&current_root_cid).await { 240 Ok(Some(b)) => b, 241 _ => { 242 return ( 243 StatusCode::INTERNAL_SERVER_ERROR, 244 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 245 ) 246 .into_response(); 247 } 248 }; 249 let commit = match Commit::from_cbor(&commit_bytes) { 250 Ok(c) => c, 251 _ => { 252 return ( 253 StatusCode::INTERNAL_SERVER_ERROR, 254 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 255 ) 256 .into_response(); 257 } 258 }; 259 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 260 let collection_nsid = match input.collection.parse::<Nsid>() { 261 Ok(n) => n, 262 Err(_) => { 263 return ( 264 StatusCode::BAD_REQUEST, 265 Json(json!({"error": "InvalidCollection"})), 266 ) 267 .into_response(); 268 } 269 }; 270 let validation_status = if input.validate == Some(false) { 271 None 272 } else { 273 let require_lexicon = input.validate == Some(true); 274 match validate_record_with_status( 275 &input.record, 276 &input.collection, 277 input.rkey.as_deref(), 278 require_lexicon, 279 ) { 280 Ok(status) => Some(status), 281 Err(err_response) => return *err_response, 282 } 283 }; 284 let rkey = input 285 .rkey 286 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 287 let mut record_bytes = Vec::new(); 288 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 289 return ( 290 StatusCode::BAD_REQUEST, 291 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 292 ) 293 .into_response(); 294 } 295 let record_cid = match tracking_store.put(&record_bytes).await { 296 Ok(c) => c, 297 _ => { 298 return ( 299 StatusCode::INTERNAL_SERVER_ERROR, 300 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 301 ) 302 .into_response(); 303 } 304 }; 305 let key = format!("{}/{}", collection_nsid, rkey); 306 let new_mst = match mst.add(&key, record_cid).await { 307 Ok(m) => m, 308 _ => { 309 return ( 310 StatusCode::INTERNAL_SERVER_ERROR, 311 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 312 ) 313 .into_response(); 314 } 315 }; 316 let new_mst_root = match new_mst.persist().await { 317 Ok(c) => c, 318 _ => { 319 return ( 320 StatusCode::INTERNAL_SERVER_ERROR, 321 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 322 ) 323 .into_response(); 324 } 325 }; 326 let op = RecordOp::Create { 327 collection: input.collection.clone(), 328 rkey: rkey.clone(), 329 cid: record_cid, 330 }; 331 let mut relevant_blocks = std::collections::BTreeMap::new(); 332 if new_mst 333 .blocks_for_path(&key, &mut relevant_blocks) 334 .await 335 .is_err() 336 { 337 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 338 } 339 if mst 340 .blocks_for_path(&key, &mut relevant_blocks) 341 .await 342 .is_err() 343 { 344 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 345 } 346 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 347 let mut written_cids = tracking_store.get_all_relevant_cids(); 348 for cid in relevant_blocks.keys() { 349 if !written_cids.contains(cid) { 350 written_cids.push(*cid); 351 } 352 } 353 let written_cids_str = written_cids 354 .iter() 355 .map(|c| c.to_string()) 356 .collect::<Vec<_>>(); 357 let blob_cids = extract_blob_cids(&input.record); 358 let commit_result = match commit_and_log( 359 &state, 360 CommitParams { 361 did: &did, 362 user_id, 363 current_root_cid: Some(current_root_cid), 364 prev_data_cid: Some(commit.data), 365 new_mst_root, 366 ops: vec![op], 367 blocks_cids: &written_cids_str, 368 blobs: &blob_cids, 369 }, 370 ) 371 .await 372 { 373 Ok(res) => res, 374 Err(e) => { 375 return ( 376 StatusCode::INTERNAL_SERVER_ERROR, 377 Json(json!({"error": "InternalError", "message": e})), 378 ) 379 .into_response(); 380 } 381 }; 382 383 if let Some(ref controller) = controller_did { 384 let _ = delegation::log_delegation_action( 385 &state.db, 386 &did, 387 controller, 388 Some(controller), 389 DelegationActionType::RepoWrite, 390 Some(json!({ 391 "action": "create", 392 "collection": input.collection, 393 "rkey": rkey 394 })), 395 None, 396 None, 397 ) 398 .await; 399 } 400 401 ( 402 StatusCode::OK, 403 Json(CreateRecordOutput { 404 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 405 cid: record_cid.to_string(), 406 commit: CommitInfo { 407 cid: commit_result.commit_cid.to_string(), 408 rev: commit_result.rev, 409 }, 410 validation_status: validation_status.map(|s| match s { 411 ValidationStatus::Valid => "valid".to_string(), 412 ValidationStatus::Unknown => "unknown".to_string(), 413 ValidationStatus::Invalid => "invalid".to_string(), 414 }), 415 }), 416 ) 417 .into_response() 418} 419#[derive(Deserialize)] 420#[allow(dead_code)] 421pub struct PutRecordInput { 422 pub repo: String, 423 pub collection: String, 424 pub rkey: String, 425 pub validate: Option<bool>, 426 pub record: serde_json::Value, 427 #[serde(rename = "swapCommit")] 428 pub swap_commit: Option<String>, 429 #[serde(rename = "swapRecord")] 430 pub swap_record: Option<String>, 431} 432#[derive(Serialize)] 433#[serde(rename_all = "camelCase")] 434pub struct PutRecordOutput { 435 pub uri: String, 436 pub cid: String, 437 #[serde(skip_serializing_if = "Option::is_none")] 438 pub commit: Option<CommitInfo>, 439 #[serde(skip_serializing_if = "Option::is_none")] 440 pub validation_status: Option<String>, 441} 442pub async fn put_record( 443 State(state): State<AppState>, 444 headers: HeaderMap, 445 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 446 Json(input): Json<PutRecordInput>, 447) -> Response { 448 let auth = 449 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 450 Ok(res) => res, 451 Err(err_res) => return err_res, 452 }; 453 454 if let Err(e) = crate::auth::scope_check::check_repo_scope( 455 auth.is_oauth, 456 auth.scope.as_deref(), 457 crate::oauth::RepoAction::Create, 458 &input.collection, 459 ) { 460 return e; 461 } 462 if let Err(e) = crate::auth::scope_check::check_repo_scope( 463 auth.is_oauth, 464 auth.scope.as_deref(), 465 crate::oauth::RepoAction::Update, 466 &input.collection, 467 ) { 468 return e; 469 } 470 471 let did = auth.did; 472 let user_id = auth.user_id; 473 let current_root_cid = auth.current_root_cid; 474 let controller_did = auth.controller_did; 475 476 if let Some(swap_commit) = &input.swap_commit 477 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 478 { 479 return ( 480 StatusCode::CONFLICT, 481 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 482 ) 483 .into_response(); 484 } 485 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 486 let commit_bytes = match tracking_store.get(&current_root_cid).await { 487 Ok(Some(b)) => b, 488 _ => { 489 return ( 490 StatusCode::INTERNAL_SERVER_ERROR, 491 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 492 ) 493 .into_response(); 494 } 495 }; 496 let commit = match Commit::from_cbor(&commit_bytes) { 497 Ok(c) => c, 498 _ => { 499 return ( 500 StatusCode::INTERNAL_SERVER_ERROR, 501 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 502 ) 503 .into_response(); 504 } 505 }; 506 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 507 let collection_nsid = match input.collection.parse::<Nsid>() { 508 Ok(n) => n, 509 Err(_) => { 510 return ( 511 StatusCode::BAD_REQUEST, 512 Json(json!({"error": "InvalidCollection"})), 513 ) 514 .into_response(); 515 } 516 }; 517 let key = format!("{}/{}", collection_nsid, input.rkey); 518 let validation_status = if input.validate == Some(false) { 519 None 520 } else { 521 let require_lexicon = input.validate == Some(true); 522 match validate_record_with_status( 523 &input.record, 524 &input.collection, 525 Some(&input.rkey), 526 require_lexicon, 527 ) { 528 Ok(status) => Some(status), 529 Err(err_response) => return *err_response, 530 } 531 }; 532 if let Some(swap_record_str) = &input.swap_record { 533 let expected_cid = Cid::from_str(swap_record_str).ok(); 534 let actual_cid = mst.get(&key).await.ok().flatten(); 535 if expected_cid != actual_cid { 536 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 537 } 538 } 539 let existing_cid = mst.get(&key).await.ok().flatten(); 540 let mut record_bytes = Vec::new(); 541 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 542 return ( 543 StatusCode::BAD_REQUEST, 544 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 545 ) 546 .into_response(); 547 } 548 let record_cid = match tracking_store.put(&record_bytes).await { 549 Ok(c) => c, 550 _ => { 551 return ( 552 StatusCode::INTERNAL_SERVER_ERROR, 553 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 554 ) 555 .into_response(); 556 } 557 }; 558 if existing_cid == Some(record_cid) { 559 return ( 560 StatusCode::OK, 561 Json(PutRecordOutput { 562 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 563 cid: record_cid.to_string(), 564 commit: None, 565 validation_status: validation_status.map(|s| match s { 566 ValidationStatus::Valid => "valid".to_string(), 567 ValidationStatus::Unknown => "unknown".to_string(), 568 ValidationStatus::Invalid => "invalid".to_string(), 569 }), 570 }), 571 ) 572 .into_response(); 573 } 574 let new_mst = if existing_cid.is_some() { 575 match mst.update(&key, record_cid).await { 576 Ok(m) => m, 577 Err(_) => { 578 return ( 579 StatusCode::INTERNAL_SERVER_ERROR, 580 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 581 ) 582 .into_response(); 583 } 584 } 585 } else { 586 match mst.add(&key, record_cid).await { 587 Ok(m) => m, 588 Err(_) => { 589 return ( 590 StatusCode::INTERNAL_SERVER_ERROR, 591 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 592 ) 593 .into_response(); 594 } 595 } 596 }; 597 let new_mst_root = match new_mst.persist().await { 598 Ok(c) => c, 599 Err(_) => { 600 return ( 601 StatusCode::INTERNAL_SERVER_ERROR, 602 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 603 ) 604 .into_response(); 605 } 606 }; 607 let op = if existing_cid.is_some() { 608 RecordOp::Update { 609 collection: input.collection.clone(), 610 rkey: input.rkey.clone(), 611 cid: record_cid, 612 prev: existing_cid, 613 } 614 } else { 615 RecordOp::Create { 616 collection: input.collection.clone(), 617 rkey: input.rkey.clone(), 618 cid: record_cid, 619 } 620 }; 621 let mut relevant_blocks = std::collections::BTreeMap::new(); 622 if new_mst 623 .blocks_for_path(&key, &mut relevant_blocks) 624 .await 625 .is_err() 626 { 627 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 628 } 629 if mst 630 .blocks_for_path(&key, &mut relevant_blocks) 631 .await 632 .is_err() 633 { 634 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 635 } 636 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 637 let mut written_cids = tracking_store.get_all_relevant_cids(); 638 for cid in relevant_blocks.keys() { 639 if !written_cids.contains(cid) { 640 written_cids.push(*cid); 641 } 642 } 643 let written_cids_str = written_cids 644 .iter() 645 .map(|c| c.to_string()) 646 .collect::<Vec<_>>(); 647 let is_update = existing_cid.is_some(); 648 let blob_cids = extract_blob_cids(&input.record); 649 let commit_result = match commit_and_log( 650 &state, 651 CommitParams { 652 did: &did, 653 user_id, 654 current_root_cid: Some(current_root_cid), 655 prev_data_cid: Some(commit.data), 656 new_mst_root, 657 ops: vec![op], 658 blocks_cids: &written_cids_str, 659 blobs: &blob_cids, 660 }, 661 ) 662 .await 663 { 664 Ok(res) => res, 665 Err(e) => { 666 return ( 667 StatusCode::INTERNAL_SERVER_ERROR, 668 Json(json!({"error": "InternalError", "message": e})), 669 ) 670 .into_response(); 671 } 672 }; 673 674 if let Some(ref controller) = controller_did { 675 let _ = delegation::log_delegation_action( 676 &state.db, 677 &did, 678 controller, 679 Some(controller), 680 DelegationActionType::RepoWrite, 681 Some(json!({ 682 "action": if is_update { "update" } else { "create" }, 683 "collection": input.collection, 684 "rkey": input.rkey 685 })), 686 None, 687 None, 688 ) 689 .await; 690 } 691 692 ( 693 StatusCode::OK, 694 Json(PutRecordOutput { 695 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 696 cid: record_cid.to_string(), 697 commit: Some(CommitInfo { 698 cid: commit_result.commit_cid.to_string(), 699 rev: commit_result.rev, 700 }), 701 validation_status: validation_status.map(|s| match s { 702 ValidationStatus::Valid => "valid".to_string(), 703 ValidationStatus::Unknown => "unknown".to_string(), 704 ValidationStatus::Invalid => "invalid".to_string(), 705 }), 706 }), 707 ) 708 .into_response() 709}