A better Rust ATProto crate

okay think i got sync v1.1 proofs validating properly

Orual d996d252 d989e800

+1558 -86
+726 -15
crates/jacquard-repo/src/commit/firehose.rs
··· 9 9 use jacquard_common::types::crypto::PublicKey; 10 10 use jacquard_common::types::string::{Datetime, Did, Tid}; 11 11 use jacquard_common::{CowStr, IntoStatic}; 12 + use smol_str::ToSmolStr; 12 13 13 14 /// Firehose commit message (sync v1.0 and v1.1) 14 15 /// ··· 114 115 pub prev: Option<CidLink<'a>>, 115 116 } 116 117 118 + impl<'a> RepoOp<'a> { 119 + /// Convert to VerifiedWriteOp for v1.1 validation 120 + /// 121 + /// Validates that all required fields are present for inversion. 122 + pub fn to_invertible_op(&self) -> Result<VerifiedWriteOp> { 123 + let key = self.path.to_smolstr(); 124 + 125 + match self.action.as_ref() { 126 + "create" => { 127 + let cid = self 128 + .cid 129 + .as_ref() 130 + .ok_or_else(|| RepoError::invalid_commit("create operation missing cid field"))? 131 + .to_ipld() 132 + .map_err(|e| RepoError::invalid_cid_conversion(e, "create cid"))?; 133 + 134 + Ok(VerifiedWriteOp::Create { key, cid }) 135 + } 136 + "update" => { 137 + let cid = self 138 + .cid 139 + .as_ref() 140 + .ok_or_else(|| RepoError::invalid_commit("update operation missing cid field"))? 141 + .to_ipld() 142 + .map_err(|e| RepoError::invalid_cid_conversion(e, "update cid"))?; 143 + 144 + let prev = self 145 + .prev 146 + .as_ref() 147 + .ok_or_else(|| { 148 + RepoError::invalid_commit( 149 + "update operation missing prev field for v1.1 validation", 150 + ) 151 + })? 152 + .to_ipld() 153 + .map_err(|e| RepoError::invalid_cid_conversion(e, "update prev"))?; 154 + 155 + Ok(VerifiedWriteOp::Update { key, cid, prev }) 156 + } 157 + "delete" => { 158 + let prev = self 159 + .prev 160 + .as_ref() 161 + .ok_or_else(|| { 162 + RepoError::invalid_commit( 163 + "delete operation missing prev field for v1.1 validation", 164 + ) 165 + })? 166 + .to_ipld() 167 + .map_err(|e| RepoError::invalid_cid_conversion(e, "delete prev"))?; 168 + 169 + Ok(VerifiedWriteOp::Delete { key, prev }) 170 + } 171 + action => Err(RepoError::invalid_commit(format!( 172 + "unknown action type: {}", 173 + action 174 + ))), 175 + } 176 + } 177 + } 178 + 117 179 impl IntoStatic for FirehoseCommit<'_> { 118 180 type Output = FirehoseCommit<'static>; 119 181 ··· 153 215 /// 154 216 /// These functions validate commits from the `com.atproto.sync.subscribeRepos` firehose. 155 217 use crate::error::{RepoError, Result}; 156 - use crate::mst::Mst; 218 + use crate::mst::{Mst, VerifiedWriteOp}; 157 219 use crate::storage::{BlockStore, LayeredBlockStore, MemoryBlockStore}; 158 220 use cid::Cid as IpldCid; 159 221 use std::sync::Arc; ··· 302 364 // Verify signature 303 365 commit.verify(pubkey)?; 304 366 305 - // 4. Load previous MST from prev_data (all blocks should be in temp_storage) 306 - let prev_mst = Mst::load(temp_storage.clone(), prev_data_cid, None); 307 - 308 367 // 5. Load new MST from commit.data (claimed result) 309 368 let expected_root = *commit.data(); 310 - let new_mst = Mst::load(temp_storage, expected_root, None); 369 + let mut new_mst = Mst::load(temp_storage, expected_root, None); 311 370 312 - // 6. Compute diff to get verified write ops (with actual prev values from tree state) 313 - let diff = prev_mst.diff(&new_mst).await?; 314 - let verified_ops = diff.to_verified_ops(); 315 - 316 - // 7. Apply verified ops to prev MST 317 - let computed_mst = prev_mst.batch(&verified_ops).await?; 371 + let verified_ops = self 372 + .ops 373 + .iter() 374 + .filter_map(|op| op.to_invertible_op().ok()) 375 + .collect::<Vec<_>>(); 376 + if verified_ops.len() != self.ops.len() { 377 + return Err(RepoError::invalid_commit(format!( 378 + "Invalid commit: expected {} ops, got {}", 379 + self.ops.len(), 380 + verified_ops.len() 381 + ))); 382 + } 318 383 319 - // 8. Verify computed result matches claimed result 320 - let computed_root = computed_mst.get_pointer().await?; 384 + for op in verified_ops { 385 + if let Ok(inverted) = new_mst.invert_op(op.clone()).await { 386 + if !inverted { 387 + return Err(RepoError::invalid_commit(format!( 388 + "Invalid commit: op {:?} is not invertible", 389 + op 390 + ))); 391 + } 392 + } 393 + } 394 + // 8. Verify computed previous state matches claimed previous state 395 + let computed_root = new_mst.get_pointer().await?; 321 396 322 - if computed_root != expected_root { 397 + if computed_root != prev_data_cid { 323 398 return Err(RepoError::cid_mismatch(format!( 324 399 "MST root mismatch: expected {}, got {}", 325 - expected_root, computed_root 400 + prev_data_cid, computed_root 326 401 ))); 327 402 } 328 403 329 404 Ok(expected_root) 330 405 } 331 406 } 407 + 408 + #[cfg(test)] 409 + mod tests { 410 + use super::*; 411 + use crate::commit::{Commit, SigningKey as _}; 412 + use crate::mst::{Mst, RecordWriteOp}; 413 + use crate::storage::MemoryBlockStore; 414 + use crate::{CommitData, Repository}; 415 + use jacquard_common::types::crypto::{KeyCodec, PublicKey}; 416 + use jacquard_common::types::recordkey::Rkey; 417 + use jacquard_common::types::string::{Nsid, RecordKey}; 418 + use jacquard_common::types::tid::Ticker; 419 + use jacquard_common::types::value::RawData; 420 + use smol_str::SmolStr; 421 + use std::collections::BTreeMap; 422 + 423 + fn make_test_record(n: u32) -> BTreeMap<SmolStr, RawData<'static>> { 424 + let mut record = BTreeMap::new(); 425 + record.insert( 426 + SmolStr::new("$type"), 427 + RawData::String("app.bsky.feed.post".into()), 428 + ); 429 + record.insert( 430 + SmolStr::new("text"), 431 + RawData::String(format!("Test post #{}", n).into()), 432 + ); 433 + record.insert( 434 + SmolStr::new("createdAt"), 435 + RawData::String("2024-01-01T00:00:00Z".to_string().into()), 436 + ); 437 + record 438 + } 439 + 440 + async fn create_test_repo(storage: Arc<MemoryBlockStore>) -> Repository<MemoryBlockStore> { 441 + let did = Did::new("did:plc:test").unwrap(); 442 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 443 + 444 + let mst = Mst::new(storage.clone()); 445 + let data = mst.persist().await.unwrap(); 446 + 447 + let rev = Ticker::new().next(None); 448 + let commit = Commit::new_unsigned(did.into_static(), data, rev, None) 449 + .sign(&signing_key) 450 + .unwrap(); 451 + 452 + let commit_cbor = commit.to_cbor().unwrap(); 453 + let commit_cid = storage.put(&commit_cbor).await.unwrap(); 454 + 455 + Repository::new(storage, mst, commit.into_static(), commit_cid) 456 + } 457 + 458 + fn get_public_key(signing_key: &k256::ecdsa::SigningKey) -> PublicKey<'static> { 459 + let verifying_key = signing_key.verifying_key(); 460 + let pubkey_bytes = verifying_key.to_encoded_point(true).as_bytes().to_vec(); 461 + PublicKey { 462 + codec: KeyCodec::Secp256k1, 463 + bytes: pubkey_bytes.into(), 464 + } 465 + } 466 + 467 + #[tokio::test] 468 + async fn test_valid_v1_1_commit_roundtrip() { 469 + let storage = Arc::new(MemoryBlockStore::new()); 470 + let mut repo = create_test_repo(storage.clone()).await; 471 + 472 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 473 + let rkey = RecordKey(Rkey::new("test1").unwrap()); 474 + 475 + let did = Did::new("did:plc:test").unwrap(); 476 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 477 + let pubkey = get_public_key(&signing_key); 478 + 479 + // Create operation 480 + let ops = vec![RecordWriteOp::Create { 481 + collection: collection.clone(), 482 + rkey: rkey.clone(), 483 + record: make_test_record(1), 484 + }]; 485 + 486 + let (repo_ops, commit_data) = repo 487 + .create_commit( 488 + &ops, 489 + &did, 490 + Some(repo.current_commit_cid().clone()), 491 + &signing_key, 492 + ) 493 + .await 494 + .unwrap(); 495 + 496 + // Convert to firehose commit (v1.1 includes prev_data) 497 + let firehose_commit = commit_data 498 + .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 499 + .await 500 + .unwrap(); 501 + 502 + // Validate using v1.1 validation 503 + let result = firehose_commit.validate_v1_1(&pubkey).await; 504 + if let Err(ref e) = result { 505 + eprintln!("Validation error: {}", e); 506 + } 507 + assert!(result.is_ok(), "Valid v1.1 commit should pass validation"); 508 + 509 + let validated_root = result.unwrap(); 510 + assert_eq!( 511 + validated_root, commit_data.data, 512 + "Validated root should match commit data root" 513 + ); 514 + } 515 + 516 + #[tokio::test] 517 + async fn test_valid_v1_0_commit_with_prev_storage() { 518 + let storage = Arc::new(MemoryBlockStore::new()); 519 + let mut repo = create_test_repo(storage.clone()).await; 520 + 521 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 522 + let rkey = RecordKey(Rkey::new("test1").unwrap()); 523 + 524 + let did = Did::new("did:plc:test").unwrap(); 525 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 526 + let pubkey = get_public_key(&signing_key); 527 + 528 + let prev_root = *repo.current_commit().data(); 529 + 530 + // Create operation 531 + let ops = vec![RecordWriteOp::Create { 532 + collection: collection.clone(), 533 + rkey: rkey.clone(), 534 + record: make_test_record(1), 535 + }]; 536 + 537 + let (repo_ops, commit_data) = repo 538 + .create_commit( 539 + &ops, 540 + &did, 541 + Some(repo.current_commit_cid().clone()), 542 + &signing_key, 543 + ) 544 + .await 545 + .unwrap(); 546 + 547 + // For v1.0, we strip prev_data 548 + let mut firehose_commit = commit_data 549 + .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 550 + .await 551 + .unwrap(); 552 + 553 + firehose_commit.prev_data = None; 554 + 555 + // Validate using v1.0 validation with previous storage 556 + let result = firehose_commit 557 + .validate_v1_0(Some(prev_root), storage.clone(), &pubkey) 558 + .await; 559 + 560 + assert!(result.is_ok(), "Valid v1.0 commit should pass validation"); 561 + 562 + let validated_root = result.unwrap(); 563 + assert_eq!( 564 + validated_root, commit_data.data, 565 + "Validated root should match commit data root" 566 + ); 567 + } 568 + 569 + #[tokio::test] 570 + async fn test_multiple_operations_roundtrip() { 571 + let storage = Arc::new(MemoryBlockStore::new()); 572 + let mut repo = create_test_repo(storage.clone()).await; 573 + 574 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 575 + let did = Did::new("did:plc:test").unwrap(); 576 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 577 + let pubkey = get_public_key(&signing_key); 578 + 579 + // First commit: create two records 580 + let ops1 = vec![ 581 + RecordWriteOp::Create { 582 + collection: collection.clone(), 583 + rkey: RecordKey(Rkey::new("post1").unwrap()), 584 + record: make_test_record(1), 585 + }, 586 + RecordWriteOp::Create { 587 + collection: collection.clone(), 588 + rkey: RecordKey(Rkey::new("post2").unwrap()), 589 + record: make_test_record(2), 590 + }, 591 + ]; 592 + 593 + let (repo_ops, commit_data) = repo 594 + .create_commit( 595 + &ops1, 596 + &did, 597 + Some(repo.current_commit_cid().clone()), 598 + &signing_key, 599 + ) 600 + .await 601 + .unwrap(); 602 + 603 + let firehose_commit = commit_data 604 + .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 605 + .await 606 + .unwrap(); 607 + 608 + let result = firehose_commit.validate_v1_1(&pubkey).await; 609 + assert!(result.is_ok(), "Multiple creates should validate"); 610 + } 611 + 612 + #[tokio::test] 613 + async fn test_update_and_delete_operations_roundtrip() { 614 + let storage = Arc::new(MemoryBlockStore::new()); 615 + let mut repo = create_test_repo(storage.clone()).await; 616 + 617 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 618 + let did = Did::new("did:plc:test").unwrap(); 619 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 620 + let pubkey = get_public_key(&signing_key); 621 + 622 + // First: create records 623 + let rkey1 = RecordKey(Rkey::new("post1").unwrap()); 624 + let rkey2 = RecordKey(Rkey::new("post2").unwrap()); 625 + 626 + let create_ops = vec![ 627 + RecordWriteOp::Create { 628 + collection: collection.clone(), 629 + rkey: rkey1.clone(), 630 + record: make_test_record(1), 631 + }, 632 + RecordWriteOp::Create { 633 + collection: collection.clone(), 634 + rkey: rkey2.clone(), 635 + record: make_test_record(2), 636 + }, 637 + ]; 638 + 639 + let (_, commit_data) = repo 640 + .create_commit( 641 + &create_ops, 642 + &did, 643 + Some(repo.current_commit_cid().clone()), 644 + &signing_key, 645 + ) 646 + .await 647 + .unwrap(); 648 + 649 + repo.apply_commit(commit_data).await.unwrap(); 650 + 651 + // Second: update one, delete the other 652 + let update_ops = vec![ 653 + RecordWriteOp::Update { 654 + collection: collection.clone(), 655 + rkey: rkey1.clone(), 656 + record: make_test_record(10), 657 + prev: None, 658 + }, 659 + RecordWriteOp::Delete { 660 + collection: collection.clone(), 661 + rkey: rkey2.clone(), 662 + prev: None, 663 + }, 664 + ]; 665 + 666 + let (repo_ops, commit_data) = repo 667 + .create_commit( 668 + &update_ops, 669 + &did, 670 + Some(repo.current_commit_cid().clone()), 671 + &signing_key, 672 + ) 673 + .await 674 + .unwrap(); 675 + 676 + let firehose_commit = commit_data 677 + .to_firehose_commit(&did, 2, Datetime::now(), repo_ops, vec![]) 678 + .await 679 + .unwrap(); 680 + 681 + let result = firehose_commit.validate_v1_1(&pubkey).await; 682 + assert!( 683 + result.is_ok(), 684 + "Update and delete operations should validate" 685 + ); 686 + } 687 + 688 + #[tokio::test] 689 + async fn test_missing_commit_block_fails() { 690 + let storage = Arc::new(MemoryBlockStore::new()); 691 + let mut repo = create_test_repo(storage.clone()).await; 692 + 693 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 694 + let did = Did::new("did:plc:test").unwrap(); 695 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 696 + let pubkey = get_public_key(&signing_key); 697 + 698 + let ops = vec![RecordWriteOp::Create { 699 + collection: collection.clone(), 700 + rkey: RecordKey(Rkey::new("test1").unwrap()), 701 + record: make_test_record(1), 702 + }]; 703 + 704 + let (repo_ops, commit_data) = repo 705 + .create_commit( 706 + &ops, 707 + &did, 708 + Some(repo.current_commit_cid().clone()), 709 + &signing_key, 710 + ) 711 + .await 712 + .unwrap(); 713 + 714 + let mut firehose_commit = commit_data 715 + .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 716 + .await 717 + .unwrap(); 718 + 719 + // Parse CAR and remove commit block 720 + let parsed = parse_car_bytes(&firehose_commit.blocks).await.unwrap(); 721 + let commit_cid: IpldCid = firehose_commit.commit.to_ipld().unwrap(); 722 + 723 + let mut blocks_without_commit: BTreeMap<IpldCid, bytes::Bytes> = parsed 724 + .blocks 725 + .into_iter() 726 + .filter(|(cid, _)| cid != &commit_cid) 727 + .collect(); 728 + 729 + // Rebuild CAR without commit block 730 + let bad_car = crate::car::write_car_bytes(commit_cid, blocks_without_commit) 731 + .await 732 + .unwrap(); 733 + 734 + firehose_commit.blocks = bad_car.into(); 735 + 736 + let result = firehose_commit.validate_v1_1(&pubkey).await; 737 + assert!( 738 + result.is_err(), 739 + "Validation should fail when commit block is missing" 740 + ); 741 + } 742 + 743 + #[tokio::test] 744 + async fn test_missing_mst_blocks_fails() { 745 + let storage = Arc::new(MemoryBlockStore::new()); 746 + let mut repo = create_test_repo(storage.clone()).await; 747 + 748 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 749 + let did = Did::new("did:plc:test").unwrap(); 750 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 751 + let pubkey = get_public_key(&signing_key); 752 + 753 + // Create multiple records to ensure MST has nodes 754 + let ops = vec![ 755 + RecordWriteOp::Create { 756 + collection: collection.clone(), 757 + rkey: RecordKey(Rkey::new("aaa").unwrap()), 758 + record: make_test_record(1), 759 + }, 760 + RecordWriteOp::Create { 761 + collection: collection.clone(), 762 + rkey: RecordKey(Rkey::new("zzz").unwrap()), 763 + record: make_test_record(2), 764 + }, 765 + ]; 766 + 767 + let (repo_ops, commit_data) = repo 768 + .create_commit( 769 + &ops, 770 + &did, 771 + Some(repo.current_commit_cid().clone()), 772 + &signing_key, 773 + ) 774 + .await 775 + .unwrap(); 776 + 777 + let mut firehose_commit = commit_data 778 + .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 779 + .await 780 + .unwrap(); 781 + 782 + // Parse CAR and keep only commit block (remove MST nodes) 783 + let parsed = parse_car_bytes(&firehose_commit.blocks).await.unwrap(); 784 + let commit_cid: IpldCid = firehose_commit.commit.to_ipld().unwrap(); 785 + 786 + let blocks_commit_only: BTreeMap<IpldCid, bytes::Bytes> = parsed 787 + .blocks 788 + .into_iter() 789 + .filter(|(cid, _)| cid == &commit_cid) 790 + .collect(); 791 + 792 + let bad_car = crate::car::write_car_bytes(commit_cid, blocks_commit_only) 793 + .await 794 + .unwrap(); 795 + 796 + firehose_commit.blocks = bad_car.into(); 797 + 798 + let result = firehose_commit.validate_v1_1(&pubkey).await; 799 + assert!( 800 + result.is_err(), 801 + "Validation should fail when MST blocks are missing" 802 + ); 803 + } 804 + 805 + #[tokio::test] 806 + async fn test_wrong_mst_root_in_commit_fails() { 807 + let storage = Arc::new(MemoryBlockStore::new()); 808 + let mut repo = create_test_repo(storage.clone()).await; 809 + 810 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 811 + let did = Did::new("did:plc:test").unwrap(); 812 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 813 + let pubkey = get_public_key(&signing_key); 814 + 815 + let ops = vec![RecordWriteOp::Create { 816 + collection: collection.clone(), 817 + rkey: RecordKey(Rkey::new("test1").unwrap()), 818 + record: make_test_record(1), 819 + }]; 820 + 821 + let (repo_ops, mut commit_data) = repo 822 + .create_commit( 823 + &ops, 824 + &did, 825 + Some(repo.current_commit_cid().clone()), 826 + &signing_key, 827 + ) 828 + .await 829 + .unwrap(); 830 + 831 + // Create a fake commit with wrong data root 832 + use crate::mst::util::compute_cid; 833 + let wrong_root = compute_cid(&[1, 2, 3, 4]).unwrap(); 834 + 835 + let fake_commit = Commit::new_unsigned( 836 + did.clone().into_static(), 837 + wrong_root, 838 + commit_data.rev.clone(), 839 + commit_data.prev, 840 + ) 841 + .sign(&signing_key) 842 + .unwrap(); 843 + 844 + let fake_commit_cbor = fake_commit.to_cbor().unwrap(); 845 + let fake_commit_cid = compute_cid(&fake_commit_cbor).unwrap(); 846 + 847 + // Replace commit block in blocks 848 + commit_data.blocks.remove(&commit_data.cid); 849 + commit_data 850 + .blocks 851 + .insert(fake_commit_cid, bytes::Bytes::from(fake_commit_cbor)); 852 + commit_data.cid = fake_commit_cid; 853 + 854 + let mut firehose_commit = commit_data 855 + .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 856 + .await 857 + .unwrap(); 858 + 859 + let result = firehose_commit.validate_v1_1(&pubkey).await; 860 + assert!( 861 + result.is_err(), 862 + "Validation should fail when commit has wrong MST root" 863 + ); 864 + } 865 + 866 + #[tokio::test] 867 + async fn test_mismatched_did_fails() { 868 + let storage = Arc::new(MemoryBlockStore::new()); 869 + let mut repo = create_test_repo(storage.clone()).await; 870 + 871 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 872 + let did = Did::new("did:plc:test").unwrap(); 873 + let wrong_did = Did::new("did:plc:wrong").unwrap(); 874 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 875 + let pubkey = get_public_key(&signing_key); 876 + 877 + let ops = vec![RecordWriteOp::Create { 878 + collection: collection.clone(), 879 + rkey: RecordKey(Rkey::new("test1").unwrap()), 880 + record: make_test_record(1), 881 + }]; 882 + 883 + let (repo_ops, commit_data) = repo 884 + .create_commit( 885 + &ops, 886 + &did, 887 + Some(repo.current_commit_cid().clone()), 888 + &signing_key, 889 + ) 890 + .await 891 + .unwrap(); 892 + 893 + // Create firehose commit with wrong DID 894 + let mut firehose_commit = commit_data 895 + .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 896 + .await 897 + .unwrap(); 898 + 899 + firehose_commit.repo = wrong_did; 900 + 901 + let result = firehose_commit.validate_v1_1(&pubkey).await; 902 + assert!( 903 + result.is_err(), 904 + "Validation should fail with mismatched DID" 905 + ); 906 + 907 + let err_msg = result.unwrap_err().to_string(); 908 + assert!( 909 + err_msg.contains("DID mismatch"), 910 + "Error should mention DID mismatch" 911 + ); 912 + } 913 + 914 + #[tokio::test] 915 + async fn test_invalid_signature_fails() { 916 + let storage = Arc::new(MemoryBlockStore::new()); 917 + let mut repo = create_test_repo(storage.clone()).await; 918 + 919 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 920 + let did = Did::new("did:plc:test").unwrap(); 921 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 922 + 923 + // Use a different key for verification 924 + let wrong_signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 925 + let wrong_pubkey = get_public_key(&wrong_signing_key); 926 + 927 + let ops = vec![RecordWriteOp::Create { 928 + collection: collection.clone(), 929 + rkey: RecordKey(Rkey::new("test1").unwrap()), 930 + record: make_test_record(1), 931 + }]; 932 + 933 + let (repo_ops, commit_data) = repo 934 + .create_commit( 935 + &ops, 936 + &did, 937 + Some(repo.current_commit_cid().clone()), 938 + &signing_key, 939 + ) 940 + .await 941 + .unwrap(); 942 + 943 + let firehose_commit = commit_data 944 + .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 945 + .await 946 + .unwrap(); 947 + 948 + let result = firehose_commit.validate_v1_1(&wrong_pubkey).await; 949 + assert!( 950 + result.is_err(), 951 + "Validation should fail with wrong public key" 952 + ); 953 + } 954 + 955 + #[tokio::test] 956 + async fn test_missing_prev_data_for_v1_1_fails() { 957 + let storage = Arc::new(MemoryBlockStore::new()); 958 + let mut repo = create_test_repo(storage.clone()).await; 959 + 960 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 961 + let did = Did::new("did:plc:test").unwrap(); 962 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 963 + let pubkey = get_public_key(&signing_key); 964 + 965 + let ops = vec![RecordWriteOp::Create { 966 + collection: collection.clone(), 967 + rkey: RecordKey(Rkey::new("test1").unwrap()), 968 + record: make_test_record(1), 969 + }]; 970 + 971 + let (repo_ops, commit_data) = repo 972 + .create_commit( 973 + &ops, 974 + &did, 975 + Some(repo.current_commit_cid().clone()), 976 + &signing_key, 977 + ) 978 + .await 979 + .unwrap(); 980 + 981 + let mut firehose_commit = commit_data 982 + .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 983 + .await 984 + .unwrap(); 985 + 986 + // Strip prev_data to make it invalid for v1.1 987 + firehose_commit.prev_data = None; 988 + 989 + let result = firehose_commit.validate_v1_1(&pubkey).await; 990 + assert!( 991 + result.is_err(), 992 + "v1.1 validation should fail without prev_data" 993 + ); 994 + 995 + let err_msg = result.unwrap_err().to_string(); 996 + assert!( 997 + err_msg.contains("prev_data"), 998 + "Error should mention missing prev_data" 999 + ); 1000 + } 1001 + 1002 + #[tokio::test] 1003 + async fn test_wrong_prev_data_cid_fails() { 1004 + let storage = Arc::new(MemoryBlockStore::new()); 1005 + let mut repo = create_test_repo(storage.clone()).await; 1006 + 1007 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 1008 + let did = Did::new("did:plc:test").unwrap(); 1009 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 1010 + let pubkey = get_public_key(&signing_key); 1011 + 1012 + let ops = vec![RecordWriteOp::Create { 1013 + collection: collection.clone(), 1014 + rkey: RecordKey(Rkey::new("test1").unwrap()), 1015 + record: make_test_record(1), 1016 + }]; 1017 + 1018 + let (repo_ops, commit_data) = repo 1019 + .create_commit( 1020 + &ops, 1021 + &did, 1022 + Some(repo.current_commit_cid().clone()), 1023 + &signing_key, 1024 + ) 1025 + .await 1026 + .unwrap(); 1027 + 1028 + let mut firehose_commit = commit_data 1029 + .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 1030 + .await 1031 + .unwrap(); 1032 + 1033 + // Use wrong prev_data CID (point to commit instead of MST root) 1034 + firehose_commit.prev_data = Some(firehose_commit.commit.clone()); 1035 + 1036 + let result = firehose_commit.validate_v1_1(&pubkey).await; 1037 + assert!( 1038 + result.is_err(), 1039 + "Validation should fail with wrong prev_data CID" 1040 + ); 1041 + } 1042 + }
+2
crates/jacquard-repo/src/lib.rs
··· 15 15 //! - Zero-copy deserialization where possible 16 16 //! - Support for both current and future sync protocol versions 17 17 //! 18 + //! Note: thank you very much to Rudy and Clinton, rsky was very helpful in figuring this all out. 19 + //! 18 20 //! # Example 19 21 //! 20 22 //! ```rust,ignore
+1 -1
crates/jacquard-repo/src/mst/node.rs
··· 42 42 } 43 43 } 44 44 45 - impl<S: BlockStore> NodeEntry<S> { 45 + impl<S> NodeEntry<S> { 46 46 /// Check if this is a tree entry 47 47 pub fn is_tree(&self) -> bool { 48 48 matches!(self, NodeEntry::Tree(_))
+191 -27
crates/jacquard-repo/src/mst/tree.rs
··· 5 5 use crate::error::{RepoError, Result}; 6 6 use crate::mst::util::validate_key; 7 7 use crate::storage::BlockStore; 8 + use bytes::Bytes; 8 9 use cid::Cid as IpldCid; 9 10 use core::fmt; 10 11 use jacquard_common::types::recordkey::Rkey; ··· 12 13 use jacquard_common::types::value::RawData; 13 14 use n0_future::try_join_all; 14 15 use smol_str::SmolStr; 16 + use std::collections::BTreeMap; 15 17 use std::fmt::{Display, Formatter}; 16 18 use std::future::Future; 17 19 use std::pin::Pin; ··· 220 222 ) -> Result<Self> { 221 223 // Serialize and compute CID (don't persist yet) 222 224 let node_data = util::serialize_node_data(&entries).await?; 223 - let cbor = serde_ipld_dagcbor::to_vec(&node_data) 224 - .map_err(|e| RepoError::serialization(e).with_context("serializing MST node during creation"))?; 225 + let cbor = serde_ipld_dagcbor::to_vec(&node_data).map_err(|e| { 226 + RepoError::serialization(e).with_context("serializing MST node during creation") 227 + })?; 225 228 let cid = util::compute_cid(&cbor)?; 226 229 227 230 let mst = Self { ··· 282 285 })?; 283 286 284 287 let node_data: super::node::NodeData = serde_ipld_dagcbor::from_slice(&node_bytes) 285 - .map_err(|e| RepoError::serialization(e).with_context(format!("deserializing MST node from storage: {}", pointer)))?; 288 + .map_err(|e| { 289 + RepoError::serialization(e) 290 + .with_context(format!("deserializing MST node from storage: {}", pointer)) 291 + })?; 286 292 287 293 let entries = util::deserialize_node_data(self.storage.clone(), &node_data, self.layer)?; 288 294 ··· 333 339 334 340 // Now serialize and compute CID with fresh child CIDs 335 341 let node_data = util::serialize_node_data(&entries).await?; 336 - let cbor = serde_ipld_dagcbor::to_vec(&node_data) 337 - .map_err(|e| RepoError::serialization(e).with_context("serializing MST node for CID computation"))?; 342 + let cbor = serde_ipld_dagcbor::to_vec(&node_data).map_err(|e| { 343 + RepoError::serialization(e).with_context("serializing MST node for CID computation") 344 + })?; 338 345 let cid = util::compute_cid(&cbor)?; 339 346 340 347 // Update pointer and mark as fresh ··· 447 454 448 455 Ok(None) 449 456 }) 457 + } 458 + 459 + /// Add a key-value pair, mutating the current tree 460 + pub async fn add_mut<'a>(&'a mut self, key: &'a str, cid: IpldCid) -> Result<()> { 461 + *self = self.add(key, cid).await?; 462 + Ok(()) 450 463 } 451 464 452 465 /// Add a key-value pair (returns new tree) ··· 573 586 }) 574 587 } 575 588 589 + /// invert an update function, returning the previous cid 590 + pub async fn check_update(&mut self, key: &str, cid: IpldCid) -> Result<IpldCid> { 591 + validate_key(key)?; 592 + 593 + // Check key exists 594 + let Ok(Some(prev)) = self.get(key).await else { 595 + return Err(RepoError::not_found("key", key)); 596 + }; 597 + 598 + if prev == cid { 599 + return Ok(prev); 600 + } 601 + 602 + // Update is just add (which replaces) 603 + *self = self.add(key, cid).await?; 604 + Ok(prev) 605 + } 606 + 576 607 /// Delete a key (returns new tree) 577 608 pub fn delete<'a>( 578 609 &'a self, ··· 586 617 }) 587 618 } 588 619 620 + /// mutates a tree in place to delete, returns the CID of what was deleted 621 + /// 622 + /// Used to invert tree operations for verification 623 + pub fn delete_cid<'a>( 624 + &'a mut self, 625 + key: &'a str, 626 + ) -> Pin<Box<dyn Future<Output = Result<IpldCid>> + Send + 'a>> { 627 + Box::pin(async move { 628 + let cid = self 629 + .get(key) 630 + .await? 631 + .ok_or(RepoError::not_found("cid for key", key))?; 632 + *self = self.delete(key).await?; 633 + Ok(cid) 634 + }) 635 + } 636 + 589 637 /// Recursively delete a key 590 638 fn delete_recurse<'a>( 591 639 &'a self, ··· 924 972 self.new_tree(entries).await 925 973 } 926 974 975 + /// invert a tree operation in-place for validation 976 + pub async fn invert_op(&mut self, op: VerifiedWriteOp) -> Result<bool> { 977 + //println!("tree before op inversion:\n{}", self); 978 + match op { 979 + VerifiedWriteOp::Create { key, cid: expected } => { 980 + let Ok(found) = self.delete_cid(&key).await else { 981 + //println!("tree at failure:\n{}", self); 982 + return Ok(false); 983 + }; 984 + if found == expected { 985 + Ok(true) 986 + } else { 987 + //println!("tree at failure:\n{}", self); 988 + Ok(false) 989 + } 990 + } 991 + VerifiedWriteOp::Update { 992 + key, 993 + cid: expected, 994 + prev, 995 + } => { 996 + let Ok(found) = self.check_update(&key, prev).await else { 997 + //println!("tree at failure:\n{}", self); 998 + return Ok(false); 999 + }; 1000 + if found == expected { 1001 + Ok(true) 1002 + } else { 1003 + //println!("tree at failure:\n{}", self); 1004 + Ok(false) 1005 + } 1006 + } 1007 + VerifiedWriteOp::Delete { key, prev } => { 1008 + if let Ok(Some(_)) = self.get(&key).await { 1009 + Ok(false) 1010 + } else { 1011 + self.add_mut(&key, prev).await?; 1012 + Ok(true) 1013 + } 1014 + } 1015 + } 1016 + } 1017 + 927 1018 /// Apply batch of verified write operations (returns new tree) 928 1019 /// 929 1020 /// More efficient than individual operations as it only rebuilds ··· 1038 1129 // Serialize this node 1039 1130 let entries = self.get_entries().await?; 1040 1131 let node_data = util::serialize_node_data(&entries).await?; 1041 - let cbor = serde_ipld_dagcbor::to_vec(&node_data) 1042 - .map_err(|e| RepoError::serialization(e).with_context("serializing MST node for block collection"))?; 1132 + let cbor = serde_ipld_dagcbor::to_vec(&node_data).map_err(|e| { 1133 + RepoError::serialization(e) 1134 + .with_context("serializing MST node for block collection") 1135 + })?; 1043 1136 blocks.insert(pointer, Bytes::from(cbor)); 1044 1137 1045 1138 // Recursively collect from subtrees ··· 1119 1212 1120 1213 let mut cids = vec![self.get_pointer().await?]; 1121 1214 let entries = self.get_entries().await?; 1122 - let index = Self::find_gt_or_equal_leaf_index_in(&entries, key); 1215 + let index = Self::find_gt_or_equal_leaf_index_in(&entries, key) as isize; 1123 1216 1124 - // Check if we found exact match at this level 1125 - if index < entries.len() { 1126 - if let NodeEntry::Leaf { 1127 - key: leaf_key, 1128 - value, 1129 - } = &entries[index] 1130 - { 1131 - if leaf_key.as_str() == key { 1132 - cids.push(*value); 1133 - return Ok(cids); 1217 + let found = self.at_index(index).await?; 1218 + 1219 + if let Some(NodeEntry::Leaf { 1220 + key: leaf_key, 1221 + value, 1222 + }) = found 1223 + { 1224 + if leaf_key.as_str() == key { 1225 + cids.push(value); 1226 + return Ok(cids); 1227 + } 1228 + } 1229 + 1230 + // Not found at this level - check subtree before this index 1231 + if let Some(NodeEntry::Tree(subtree)) = self.at_index(index - 1).await? { 1232 + let mut subtree_cids = subtree.cids_for_path(key).await?; 1233 + cids.append(&mut subtree_cids); 1234 + return Ok(cids); 1235 + } 1236 + 1237 + // Key not found in tree 1238 + Ok(cids) 1239 + }) 1240 + } 1241 + 1242 + /// serialize the tree as car bytes, returning the cid of the car and the car bytes 1243 + pub fn serialize_tree<'a>( 1244 + &'a self, 1245 + ) -> Pin<Box<dyn Future<Output = Result<(IpldCid, Bytes)>> + Send + 'a>> { 1246 + Box::pin(async move { 1247 + let mut entries = self.get_entries().await?; 1248 + let mut outdated: Vec<Self> = Vec::new(); 1249 + for entry in &entries { 1250 + if let NodeEntry::Tree(mst) = entry { 1251 + let is_outdated = *mst.outdated_pointer.read().await; 1252 + if is_outdated { 1253 + outdated.push(mst.clone()); 1134 1254 } 1135 1255 } 1136 1256 } 1137 1257 1138 - // Not found at this level - check subtree before this index 1139 - if index > 0 { 1140 - if let NodeEntry::Tree(subtree) = &entries[index - 1] { 1141 - let mut subtree_cids = subtree.cids_for_path(key).await?; 1142 - cids.append(&mut subtree_cids); 1143 - return Ok(cids); 1258 + if outdated.len() > 0 { 1259 + for outdated_entry in &outdated { 1260 + let _ = outdated_entry.get_pointer().await?; 1261 + } 1262 + entries = self.get_entries().await? 1263 + } 1264 + let data = util::serialize_node_data(entries.as_slice()).await?; 1265 + let bytes = serde_ipld_dagcbor::to_vec(&data).map_err(|e| RepoError::car(e))?; 1266 + let cid = util::compute_cid(&bytes)?; 1267 + 1268 + Ok((cid, Bytes::from_owner(bytes))) 1269 + }) 1270 + } 1271 + 1272 + /// Find the node at the given index if any 1273 + pub async fn at_index(&self, index: isize) -> Result<Option<NodeEntry<S>>> { 1274 + let entries = self.get_entries().await?; 1275 + if index < 0 || index as usize >= entries.len() { 1276 + return Ok(None); 1277 + } 1278 + Ok(entries 1279 + .into_iter() 1280 + .nth(index as usize) 1281 + .map(|entry| entry.clone())) 1282 + } 1283 + 1284 + /// Add any relevant blocks along the path to the given key to the map 1285 + pub fn blocks_for_path<'a>( 1286 + &'a self, 1287 + key: &'a str, 1288 + blocks: &'a mut BTreeMap<IpldCid, bytes::Bytes>, 1289 + ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> { 1290 + Box::pin(async move { 1291 + validate_key(key)?; 1292 + let (cid, bytes) = self.serialize_tree().await?; 1293 + blocks.insert(cid, bytes); 1294 + 1295 + let entries = self.get_entries().await?; 1296 + let index = Self::find_gt_or_equal_leaf_index_in(&entries, key) as isize; 1297 + let found = self.at_index(index).await?; 1298 + 1299 + if let Some(NodeEntry::Leaf { key: leaf_key, .. }) = found { 1300 + if leaf_key.as_str() == key { 1301 + return Ok(()); 1144 1302 } 1145 1303 } 1304 + if let Some(NodeEntry::Tree(subtree)) = self.at_index(index - 1).await? { 1305 + subtree.blocks_for_path(key, blocks).await?; 1306 + return Ok(()); 1307 + } 1146 1308 1147 1309 // Key not found in tree 1148 - Ok(cids) 1310 + Ok(()) 1149 1311 }) 1150 1312 } 1151 1313 ··· 1334 1496 // Serialize this node 1335 1497 let entries = tree.get_entries().await?; 1336 1498 let node_data = util::serialize_node_data(&entries).await?; 1337 - let cbor = serde_ipld_dagcbor::to_vec(&node_data) 1338 - .map_err(|e| RepoError::serialization(e).with_context("serializing MST node for parallel block collection"))?; 1499 + let cbor = serde_ipld_dagcbor::to_vec(&node_data).map_err(|e| { 1500 + RepoError::serialization(e) 1501 + .with_context("serializing MST node for parallel block collection") 1502 + })?; 1339 1503 blocks.insert(pointer, Bytes::from(cbor)); 1340 1504 1341 1505 // Spawn tasks for each subtree
+186 -38
crates/jacquard-repo/src/repo.rs
··· 8 8 use crate::error::{RepoError, Result}; 9 9 use crate::mst::{Mst, MstDiff, RecordWriteOp}; 10 10 use crate::storage::BlockStore; 11 + use bytes::Bytes; 11 12 use cid::Cid as IpldCid; 12 13 use jacquard_common::IntoStatic; 13 14 use jacquard_common::types::cid::CidLink; ··· 16 17 use jacquard_common::types::tid::Ticker; 17 18 use smol_str::format_smolstr; 18 19 use std::collections::BTreeMap; 20 + use std::fmt::{self, Display, Formatter}; 19 21 use std::path::Path; 20 22 use std::sync::Arc; 21 23 ··· 71 73 ops: Vec<RepoOp<'static>>, 72 74 blobs: Vec<CidLink<'static>>, 73 75 ) -> Result<FirehoseCommit<'static>> { 76 + let mut proof_blocks = self.blocks.clone(); 77 + proof_blocks.append(&mut self.relevant_blocks.clone()); 74 78 // Convert relevant blocks to CAR format 75 - let blocks_car = 76 - crate::car::write_car_bytes(self.cid, self.relevant_blocks.clone()).await?; 79 + let blocks_car = crate::car::write_car_bytes(self.cid, proof_blocks).await?; 77 80 78 81 Ok(FirehoseCommit { 79 82 repo: repo.clone().into_static(), ··· 163 166 }) 164 167 } 165 168 169 + /// Format an initial commit for a new repository 170 + /// 171 + /// Creates an empty MST, optionally applies initial record writes, signs the commit, 172 + /// and returns CommitData ready to apply to storage. 173 + /// 174 + /// This does NOT persist to storage - use `create_from_commit` or `create` for that. 175 + pub async fn format_init_commit<K>( 176 + storage: Arc<S>, 177 + did: Did<'static>, 178 + signing_key: &K, 179 + initial_writes: Option<&[RecordWriteOp<'_>]>, 180 + ) -> Result<CommitData> 181 + where 182 + K: SigningKey, 183 + { 184 + let mut mst = Mst::new(storage.clone()); 185 + let mut blocks = BTreeMap::new(); 186 + 187 + // Apply initial writes if provided 188 + if let Some(ops) = initial_writes { 189 + for op in ops { 190 + let key = format_smolstr!("{}/{}", op.collection().as_ref(), op.rkey().as_ref()); 191 + 192 + match op { 193 + RecordWriteOp::Create { record, .. } => { 194 + // Serialize and store record 195 + let cbor = serde_ipld_dagcbor::to_vec(record) 196 + .map_err(|e| RepoError::serialization(e))?; 197 + let cid = storage.put(&cbor).await?; 198 + blocks.insert(cid, bytes::Bytes::from(cbor)); 199 + 200 + mst = mst.add(key.as_str(), cid).await?; 201 + } 202 + RecordWriteOp::Update { .. } | RecordWriteOp::Delete { .. } => { 203 + return Err(RepoError::invalid_commit( 204 + "Initial commit can only contain creates", 205 + )); 206 + } 207 + } 208 + } 209 + } 210 + 211 + // Persist MST and collect blocks 212 + let data = mst.persist().await?; 213 + let diff = Mst::new(storage.clone()).diff(&mst).await?; 214 + blocks.extend(diff.new_mst_blocks); 215 + 216 + // Create and sign initial commit 217 + let rev = Ticker::new().next(None); 218 + let commit = Commit::new_unsigned(did, data, rev.clone(), None).sign(signing_key)?; 219 + 220 + let commit_cbor = commit.to_cbor()?; 221 + let commit_cid = crate::mst::util::compute_cid(&commit_cbor)?; 222 + let commit_bytes = bytes::Bytes::from(commit_cbor); 223 + 224 + blocks.insert(commit_cid, commit_bytes.clone()); 225 + 226 + Ok(CommitData { 227 + cid: commit_cid, 228 + rev, 229 + since: None, 230 + prev: None, 231 + data, 232 + prev_data: None, 233 + blocks: blocks.clone(), 234 + relevant_blocks: blocks, 235 + deleted_cids: Vec::new(), 236 + }) 237 + } 238 + 239 + /// Create repository from CommitData 240 + /// 241 + /// Applies the commit to storage and loads the repository from it. 242 + pub async fn create_from_commit(storage: Arc<S>, commit_data: CommitData) -> Result<Self> { 243 + let commit_cid = commit_data.cid; 244 + storage.apply_commit(commit_data).await?; 245 + Self::from_commit(storage, &commit_cid).await 246 + } 247 + 248 + /// Create a new repository 249 + /// 250 + /// Convenience method that formats an initial commit and applies it to storage. 251 + pub async fn create<K>( 252 + storage: Arc<S>, 253 + did: Did<'static>, 254 + signing_key: &K, 255 + initial_writes: Option<&[RecordWriteOp<'_>]>, 256 + ) -> Result<Self> 257 + where 258 + K: SigningKey, 259 + { 260 + let commit = 261 + Self::format_init_commit(storage.clone(), did, signing_key, initial_writes).await?; 262 + Self::create_from_commit(storage, commit).await 263 + } 264 + 166 265 /// Get a record by collection and rkey 167 266 pub async fn get_record<T: RecordKeyType>( 168 267 &self, ··· 268 367 let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref()); 269 368 270 369 // Serialize record to DAG-CBOR 271 - let cbor = serde_ipld_dagcbor::to_vec(record) 272 - .map_err(|e| RepoError::serialization(e).with_context(format!("serializing record data for {}/{}", collection.as_ref(), rkey.as_ref())))?; 370 + let cbor = serde_ipld_dagcbor::to_vec(record).map_err(|e| { 371 + RepoError::serialization(e).with_context(format!( 372 + "serializing record data for {}/{}", 373 + collection.as_ref(), 374 + rkey.as_ref() 375 + )) 376 + })?; 273 377 274 378 // Compute CID and store data 275 379 let cid = self.storage.put(&cbor).await?; ··· 285 389 let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref()); 286 390 287 391 // Serialize record to DAG-CBOR 288 - let cbor = serde_ipld_dagcbor::to_vec(record) 289 - .map_err(|e| RepoError::serialization(e).with_context(format!("serializing record data for {}/{}", collection.as_ref(), rkey.as_ref())))?; 392 + let cbor = serde_ipld_dagcbor::to_vec(record).map_err(|e| { 393 + RepoError::serialization(e).with_context(format!( 394 + "serializing record data for {}/{}", 395 + collection.as_ref(), 396 + rkey.as_ref() 397 + )) 398 + })?; 290 399 291 400 // Compute CID and store data 292 401 let cid = self.storage.put(&cbor).await?; ··· 335 444 // Compute diff before updating 336 445 let diff = self.mst.diff(&updated_tree).await?; 337 446 447 + println!("Repo before:\n{}", self); 338 448 // Update mst 339 449 self.mst = updated_tree; 340 450 451 + println!("Repo after:\n{}", self); 341 452 Ok(diff) 342 453 } 343 454 ··· 360 471 where 361 472 K: SigningKey, 362 473 { 363 - // Step 1: Apply all write operations to build new MST 474 + // Step 1: Apply all write operations to build new MST and collect leaf blocks 364 475 let mut updated_tree = self.mst.clone(); 476 + let mut leaf_blocks = BTreeMap::new(); 365 477 366 478 for op in ops { 367 479 updated_tree = match op { ··· 373 485 let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref()); 374 486 375 487 // Serialize record to DAG-CBOR 376 - let cbor = serde_ipld_dagcbor::to_vec(record) 377 - .map_err(|e| RepoError::serialization(e).with_context(format!("serializing record data for {}/{}", collection.as_ref(), rkey.as_ref())))?; 488 + let cbor = serde_ipld_dagcbor::to_vec(record).map_err(|e| { 489 + RepoError::serialization(e).with_context(format!( 490 + "serializing record data for {}/{}", 491 + collection.as_ref(), 492 + rkey.as_ref() 493 + )) 494 + })?; 378 495 379 496 // Compute CID and store data 380 497 let cid = self.storage.put(&cbor).await?; 498 + leaf_blocks.insert(cid.clone(), Bytes::from(cbor)); 381 499 382 500 updated_tree.add(key.as_str(), cid).await? 383 501 } ··· 390 508 let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref()); 391 509 392 510 // Serialize record to DAG-CBOR 393 - let cbor = serde_ipld_dagcbor::to_vec(record) 394 - .map_err(|e| RepoError::serialization(e).with_context(format!("serializing record data for {}/{}", collection.as_ref(), rkey.as_ref())))?; 511 + let cbor = serde_ipld_dagcbor::to_vec(record).map_err(|e| { 512 + RepoError::serialization(e).with_context(format!( 513 + "serializing record data for {}/{}", 514 + collection.as_ref(), 515 + rkey.as_ref() 516 + )) 517 + })?; 395 518 396 519 // Compute CID and store data 397 520 let cid = self.storage.put(&cbor).await?; ··· 405 528 ))); 406 529 } 407 530 } 531 + 532 + leaf_blocks.insert(cid.clone(), Bytes::from(cbor)); 408 533 409 534 updated_tree.add(key.as_str(), cid).await? 410 535 } ··· 414 539 prev, 415 540 } => { 416 541 let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref()); 417 - 418 - // Check exists 419 - let current = self 420 - .mst 421 - .get(key.as_str()) 422 - .await? 423 - .ok_or_else(|| RepoError::not_found("record", key.as_str()))?; 424 542 425 543 // Validate prev if provided 426 544 if let Some(prev_cid) = prev { 545 + // Check exists 546 + let current = self 547 + .mst 548 + .get(key.as_str()) 549 + .await? 550 + .ok_or_else(|| RepoError::not_found("record", key.as_str()))?; 427 551 if &current != prev_cid { 428 552 return Err(RepoError::cid_mismatch(format!( 429 553 "Delete prev CID mismatch for key {}: expected {}, got {}", ··· 442 566 let prev_data = *self.commit.data(); 443 567 let diff = self.mst.diff(&updated_tree).await?; 444 568 445 - // Step 3: Extract everything we need from diff before moving it 446 - let new_leaf_blocks = diff.fetch_new_blocks(self.storage.as_ref()).await?; 569 + // Step 3: Extract everything we need from diff 447 570 let repo_ops = diff 448 571 .to_repo_ops() 449 572 .into_iter() ··· 451 574 .collect(); 452 575 let deleted_cids = diff.removed_cids; 453 576 454 - // Step 4: Use diff.new_mst_blocks instead of collect_blocks() 577 + // Step 4: Build blocks and relevant_blocks collections 455 578 let mut blocks = diff.new_mst_blocks; 579 + let mut relevant_blocks = BTreeMap::new(); 456 580 457 - // Step 5: Build relevant_blocks by walking paths for ORIGINAL operations 458 - let mut relevant_blocks = BTreeMap::new(); 581 + // Add the previous MST root block (needed to load prev_data in validation) 582 + if let Some(prev_root_block) = self.storage.get(&prev_data).await? { 583 + relevant_blocks.insert(prev_data, prev_root_block); 584 + } 585 + 586 + // Walk paths in both old and new trees for each operation 459 587 for op in ops { 460 588 let key = format_smolstr!("{}/{}", op.collection().as_ref(), op.rkey().as_ref()); 461 - let path_cids = updated_tree.cids_for_path(key.as_str()).await?; 462 589 463 - for path_cid in path_cids { 464 - if let Some(block) = blocks.get(&path_cid) { 465 - relevant_blocks.insert(path_cid, block.clone()); 466 - } else if let Some(block) = self.storage.get(&path_cid).await? { 467 - relevant_blocks.insert(path_cid, block); 468 - } 469 - } 590 + updated_tree 591 + .blocks_for_path(&key, &mut relevant_blocks) 592 + .await?; 593 + 594 + self.mst.blocks_for_path(&key, &mut relevant_blocks).await?; 470 595 } 471 596 472 - // Step 6: Add new leaf blocks (record data) to both collections 473 - for (cid, block) in new_leaf_blocks { 474 - blocks.insert(cid, block.clone()); 475 - relevant_blocks.insert(cid, block); 597 + // Add new leaf blocks to both collections (single iteration) 598 + for (cid, block) in &leaf_blocks { 599 + if diff.new_leaf_cids.contains(cid) { 600 + blocks.insert(*cid, block.clone()); 601 + relevant_blocks.insert(*cid, block.clone()); 602 + } 476 603 } 477 604 478 - // Step 7: Create and sign commit 605 + // Step 6: Create and sign commit 479 606 let rev = Ticker::new().next(Some(self.commit.rev.clone())); 480 607 let commit = Commit::new_unsigned(did.clone().into_static(), data, rev.clone(), prev) 481 608 .sign(signing_key)?; ··· 484 611 let commit_cid = crate::mst::util::compute_cid(&commit_cbor)?; 485 612 let commit_bytes = bytes::Bytes::from(commit_cbor); 486 613 487 - // Step 8: Add commit block to both collections 614 + // Step 7: Add commit block to both collections 488 615 blocks.insert(commit_cid, commit_bytes.clone()); 489 616 relevant_blocks.insert(commit_cid, commit_bytes); 490 617 491 - // Step 9: Update internal MST state 618 + // Step 8: Update internal MST state 492 619 self.mst = updated_tree; 493 620 494 621 Ok(( ··· 583 710 /// Get the DID from the current commit 584 711 pub fn did(&self) -> &Did<'_> { 585 712 self.commit.did() 713 + } 714 + } 715 + 716 + impl<S: BlockStore> Display for Repository<S> { 717 + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { 718 + use crate::mst::tree::short_cid; 719 + 720 + writeln!(f, "Repository {{")?; 721 + writeln!(f, " DID: {}", self.commit.did())?; 722 + writeln!(f, " Commit: {}", short_cid(&self.commit_cid))?; 723 + writeln!(f, " Rev: {}", self.commit.rev)?; 724 + writeln!(f, " Data: {}", short_cid(self.commit.data()))?; 725 + writeln!(f, " MST:")?; 726 + 727 + // Format MST with indentation 728 + let mst_display = format!("{}", self.mst); 729 + for line in mst_display.lines() { 730 + writeln!(f, " {}", line)?; 731 + } 732 + 733 + write!(f, "}}") 586 734 } 587 735 } 588 736
+451
crates/jacquard-repo/tests/firehose_stress.rs
··· 1 + //! Stress tests for firehose commit validation 2 + //! 3 + //! Generates thousands of random operations to catch edge cases in v1.1 validation. 4 + 5 + use jacquard_common::IntoStatic; 6 + use jacquard_common::types::crypto::{KeyCodec, PublicKey}; 7 + use jacquard_common::types::recordkey::Rkey; 8 + use jacquard_common::types::string::{Datetime, Did, Nsid, RecordKey}; 9 + use jacquard_common::types::tid::Ticker; 10 + use jacquard_common::types::value::RawData; 11 + use jacquard_repo::Repository; 12 + use jacquard_repo::car::read_car_header; 13 + use jacquard_repo::mst::RecordWriteOp; 14 + use jacquard_repo::storage::{BlockStore, MemoryBlockStore}; 15 + use rand::Rng; 16 + use rand::seq::SliceRandom; 17 + use smol_str::SmolStr; 18 + use std::collections::{BTreeMap, HashMap}; 19 + use std::sync::Arc; 20 + 21 + // Test configuration 22 + const INITIAL_RECORDS: usize = 50; 23 + const STRESS_OPERATIONS: usize = 100; 24 + const BATCH_SIZE_RANGE: (usize, usize) = (1, 10); 25 + 26 + fn make_test_record(n: u32, text: &str) -> BTreeMap<SmolStr, RawData<'static>> { 27 + let mut record = BTreeMap::new(); 28 + record.insert( 29 + SmolStr::new("$type"), 30 + RawData::String("app.bsky.feed.post".into()), 31 + ); 32 + record.insert( 33 + SmolStr::new("text"), 34 + RawData::String(format!("{} #{}", text, n).into()), 35 + ); 36 + record.insert( 37 + SmolStr::new("createdAt"), 38 + RawData::String("2024-01-01T00:00:00Z".to_string().into()), 39 + ); 40 + record 41 + } 42 + 43 + fn get_public_key(signing_key: &k256::ecdsa::SigningKey) -> PublicKey<'static> { 44 + let verifying_key = signing_key.verifying_key(); 45 + let pubkey_bytes = verifying_key.to_encoded_point(true).as_bytes().to_vec(); 46 + PublicKey { 47 + codec: KeyCodec::Secp256k1, 48 + bytes: pubkey_bytes.into(), 49 + } 50 + } 51 + 52 + async fn create_test_repo(storage: Arc<MemoryBlockStore>) -> Repository<MemoryBlockStore> { 53 + let did = Did::new("did:plc:stresstest").unwrap(); 54 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 55 + 56 + Repository::create(storage, did.into_static(), &signing_key, None) 57 + .await 58 + .unwrap() 59 + } 60 + 61 + /// Track existing records for generating realistic updates/deletes 62 + struct RecordTracker { 63 + records: HashMap<String, u32>, 64 + ticker: Ticker, 65 + } 66 + 67 + impl RecordTracker { 68 + fn new() -> Self { 69 + Self { 70 + records: HashMap::new(), 71 + ticker: Ticker::new(), 72 + } 73 + } 74 + 75 + fn gen_new_rkey(&mut self) -> String { 76 + self.ticker.next(None).into_static().to_string() 77 + } 78 + 79 + fn pick_random_existing<R: Rng>(&self, rng: &mut R) -> Option<String> { 80 + let keys: Vec<_> = self.records.keys().cloned().collect(); 81 + keys.choose(rng).cloned() 82 + } 83 + 84 + fn add(&mut self, rkey: String, counter: u32) { 85 + self.records.insert(rkey, counter); 86 + } 87 + 88 + fn remove(&mut self, rkey: &str) { 89 + self.records.remove(rkey); 90 + } 91 + 92 + fn len(&self) -> usize { 93 + self.records.len() 94 + } 95 + } 96 + 97 + #[derive(Debug, Clone)] 98 + enum TestOp { 99 + Create { rkey: String, counter: u32 }, 100 + Update { rkey: String, counter: u32 }, 101 + Delete { rkey: String }, 102 + } 103 + 104 + fn generate_creates_only<R: Rng>( 105 + rng: &mut R, 106 + tracker: &mut RecordTracker, 107 + count: usize, 108 + ) -> Vec<TestOp> { 109 + let mut ops = Vec::new(); 110 + for _ in 0..count { 111 + let rkey = tracker.gen_new_rkey(); 112 + let counter: u32 = rng.r#gen(); 113 + tracker.add(rkey.clone(), counter); 114 + ops.push(TestOp::Create { rkey, counter }); 115 + } 116 + ops 117 + } 118 + 119 + fn generate_random_ops<R: Rng>( 120 + rng: &mut R, 121 + tracker: &mut RecordTracker, 122 + count: usize, 123 + ) -> Vec<TestOp> { 124 + let mut ops = Vec::new(); 125 + 126 + for _ in 0..count { 127 + // Weighted random choice: 50% create, 30% update, 20% delete 128 + let action = rng.gen_range(0..100); 129 + 130 + let op = if action < 50 || tracker.len() == 0 { 131 + // Create 132 + let rkey = tracker.gen_new_rkey(); 133 + let counter: u32 = rng.r#gen(); 134 + tracker.add(rkey.clone(), counter); 135 + TestOp::Create { rkey, counter } 136 + } else if action < 80 { 137 + // Update 138 + if let Some(rkey) = tracker.pick_random_existing(rng) { 139 + let counter: u32 = rng.r#gen(); 140 + tracker.add(rkey.clone(), counter); 141 + TestOp::Update { rkey, counter } 142 + } else { 143 + // Fall back to create if no records exist 144 + let rkey = tracker.gen_new_rkey(); 145 + let counter: u32 = rng.r#gen(); 146 + tracker.add(rkey.clone(), counter); 147 + TestOp::Create { rkey, counter } 148 + } 149 + } else { 150 + // Delete 151 + if let Some(rkey) = tracker.pick_random_existing(rng) { 152 + tracker.remove(&rkey); 153 + TestOp::Delete { rkey } 154 + } else { 155 + // Fall back to create if no records exist 156 + let rkey = tracker.gen_new_rkey(); 157 + let counter: u32 = rng.r#gen(); 158 + tracker.add(rkey.clone(), counter); 159 + TestOp::Create { rkey, counter } 160 + } 161 + }; 162 + 163 + ops.push(op); 164 + } 165 + 166 + ops 167 + } 168 + 169 + fn test_ops_to_record_writes(ops: Vec<TestOp>, collection: &Nsid) -> Vec<RecordWriteOp<'static>> { 170 + let collection_static = collection.clone().into_static(); 171 + ops.into_iter() 172 + .map(|op| match op { 173 + TestOp::Create { rkey, counter } => RecordWriteOp::Create { 174 + collection: collection_static.clone(), 175 + rkey: RecordKey(Rkey::new(&rkey).unwrap()).into_static(), 176 + record: make_test_record(counter, "Random post"), 177 + }, 178 + TestOp::Update { rkey, counter } => RecordWriteOp::Update { 179 + collection: collection_static.clone(), 180 + rkey: RecordKey(Rkey::new(&rkey).unwrap()).into_static(), 181 + record: make_test_record(counter, "Updated post"), 182 + prev: None, 183 + }, 184 + TestOp::Delete { rkey } => RecordWriteOp::Delete { 185 + collection: collection_static.clone(), 186 + rkey: RecordKey(Rkey::new(&rkey).unwrap()).into_static(), 187 + prev: None, 188 + }, 189 + }) 190 + .collect() 191 + } 192 + 193 + #[tokio::test] 194 + async fn test_stress_random_operations() { 195 + let storage = Arc::new(MemoryBlockStore::new()); 196 + let mut repo = create_test_repo(storage.clone()).await; 197 + 198 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 199 + let did = Did::new("did:plc:stresstest").unwrap(); 200 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 201 + let pubkey = get_public_key(&signing_key); 202 + 203 + let mut rng = rand::thread_rng(); 204 + let mut tracker = RecordTracker::new(); 205 + 206 + // Step 1: Create initial batch of records 207 + println!("Creating {} initial records...", INITIAL_RECORDS); 208 + println!("Repo before initial commit:\n{}", repo); 209 + 210 + let initial_ops = generate_creates_only(&mut rng, &mut tracker, INITIAL_RECORDS); 211 + let record_writes = test_ops_to_record_writes(initial_ops, &collection); 212 + 213 + let (repo_ops, commit_data) = repo 214 + .create_commit(&record_writes, &did, None, &signing_key) 215 + .await 216 + .unwrap(); 217 + 218 + repo.apply_commit(commit_data.clone()).await.unwrap(); 219 + println!("Repo after initial commit:\n{}", repo); 220 + 221 + // Validate initial commit 222 + let firehose_commit = commit_data 223 + .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 224 + .await 225 + .unwrap(); 226 + 227 + firehose_commit 228 + .validate_v1_1(&pubkey) 229 + .await 230 + .expect("Initial batch should validate"); 231 + 232 + println!( 233 + "Initial repo created with {} records", 234 + tracker.records.len() 235 + ); 236 + 237 + // Step 2: Generate and apply random operations in batches 238 + let mut commit_count = 1; 239 + let mut total_ops = 0; 240 + 241 + while total_ops < STRESS_OPERATIONS { 242 + let batch_size = rng.gen_range(BATCH_SIZE_RANGE.0..=BATCH_SIZE_RANGE.1); 243 + let remaining = STRESS_OPERATIONS - total_ops; 244 + let ops_count = batch_size.min(remaining); 245 + 246 + let ops = generate_random_ops(&mut rng, &mut tracker, ops_count); 247 + let record_writes = test_ops_to_record_writes(ops, &collection); 248 + 249 + let (repo_ops, commit_data) = repo 250 + .create_commit(&record_writes, &did, None, &signing_key) 251 + .await 252 + .unwrap(); 253 + 254 + repo.apply_commit(commit_data.clone()).await.unwrap(); 255 + 256 + // Validate firehose commit 257 + commit_count += 1; 258 + let firehose_commit = commit_data 259 + .to_firehose_commit( 260 + &did, 261 + commit_count, 262 + Datetime::now(), 263 + repo_ops.clone(), 264 + vec![], 265 + ) 266 + .await 267 + .unwrap(); 268 + 269 + firehose_commit 270 + .validate_v1_1(&pubkey) 271 + .await 272 + .unwrap_or_else(|e| { 273 + eprintln!( 274 + "Validation failed at commit {} (batch size {})", 275 + commit_count, ops_count 276 + ); 277 + eprintln!("Error: {}", e); 278 + eprintln!("Operations:\n{:?}", repo_ops); 279 + eprintln!("Relevant blocks:\n{:?}", commit_data.relevant_blocks.keys()); 280 + eprintln!("All blocks:\n{:?}", commit_data.blocks.keys()); 281 + panic!( 282 + "Validation failed at commit {} (batch size {}): {}", 283 + commit_count, ops_count, e 284 + ) 285 + }); 286 + 287 + total_ops += ops_count; 288 + 289 + if commit_count % 50 == 0 { 290 + println!( 291 + "Processed {} commits, {} total operations, {} records in repo", 292 + commit_count, 293 + total_ops, 294 + tracker.records.len() 295 + ); 296 + } 297 + } 298 + 299 + println!( 300 + "Stress test complete: {} commits, {} operations, {} final records", 301 + commit_count, 302 + total_ops, 303 + tracker.records.len() 304 + ); 305 + } 306 + 307 + #[tokio::test] 308 + async fn test_stress_large_batches() { 309 + let storage = Arc::new(MemoryBlockStore::new()); 310 + let mut repo = create_test_repo(storage.clone()).await; 311 + 312 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 313 + let did = Did::new("did:plc:stresstest").unwrap(); 314 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 315 + let pubkey = get_public_key(&signing_key); 316 + 317 + let mut rng = rand::thread_rng(); 318 + let mut tracker = RecordTracker::new(); 319 + 320 + // Create initial records 321 + let initial_ops = generate_creates_only(&mut rng, &mut tracker, 100); 322 + let record_writes = test_ops_to_record_writes(initial_ops, &collection); 323 + let (repo_ops, commit_data) = repo 324 + .create_commit( 325 + &record_writes, 326 + &did, 327 + Some(repo.current_commit_cid().clone()), 328 + &signing_key, 329 + ) 330 + .await 331 + .unwrap(); 332 + repo.apply_commit(commit_data.clone()).await.unwrap(); 333 + 334 + let firehose_commit = commit_data 335 + .to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![]) 336 + .await 337 + .unwrap(); 338 + 339 + firehose_commit.validate_v1_1(&pubkey).await.unwrap(); 340 + 341 + for batch_num in 1..=2000 { 342 + let batch_size = rng.gen_range(1..=20); 343 + let ops = generate_random_ops(&mut rng, &mut tracker, batch_size); 344 + let record_writes = test_ops_to_record_writes(ops, &collection); 345 + 346 + let (repo_ops, commit_data) = repo 347 + .create_commit(&record_writes, &did, None, &signing_key) 348 + .await 349 + .unwrap(); 350 + 351 + repo.apply_commit(commit_data.clone()).await.unwrap(); 352 + 353 + let firehose_commit = commit_data 354 + .to_firehose_commit(&did, batch_num + 1, Datetime::now(), repo_ops, vec![]) 355 + .await 356 + .unwrap(); 357 + 358 + firehose_commit 359 + .validate_v1_1(&pubkey) 360 + .await 361 + .unwrap_or_else(|e| { 362 + panic!( 363 + "Large batch validation failed (batch size {}): {}", 364 + batch_size, e 365 + ) 366 + }); 367 + 368 + repo.apply_commit(commit_data).await.unwrap(); 369 + // println!( 370 + // "Validated large batch {} with {} ops", 371 + // batch_num, batch_size 372 + // ); 373 + } 374 + } 375 + 376 + #[tokio::test] 377 + async fn test_stress_with_fixture() { 378 + use jacquard_repo::car::read_car; 379 + use std::path::Path; 380 + let fixture_path = 381 + Path::new("tests/fixtures/repo-nonbinary.computer-2025-10-21T13_05_55.090Z.car"); 382 + 383 + // Skip test in CI if fixture doesn't exist 384 + if !fixture_path.exists() { 385 + println!( 386 + "Skipping fixture test - fixture not found at {:?}", 387 + fixture_path 388 + ); 389 + return; 390 + } 391 + 392 + println!("Loading fixture repo from {:?}", fixture_path); 393 + 394 + // Import CAR into storage 395 + let storage = Arc::new(MemoryBlockStore::new()); 396 + let header = read_car_header(fixture_path).await.unwrap(); 397 + let parsed_car = read_car(fixture_path).await.unwrap(); 398 + 399 + storage.put_many(parsed_car).await.unwrap(); 400 + 401 + let root_cid = header.first().unwrap(); 402 + 403 + // Load repository from fixture 404 + let mut repo = Repository::from_commit(storage.clone(), root_cid) 405 + .await 406 + .unwrap(); 407 + 408 + println!( 409 + "Loaded fixture repo with commit at {}", 410 + repo.current_commit_cid() 411 + ); 412 + 413 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 414 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 415 + let pubkey = get_public_key(&signing_key); 416 + let did = repo.did().clone().into_static(); 417 + 418 + let mut rng = rand::thread_rng(); 419 + let mut tracker = RecordTracker::new(); 420 + 421 + // Perform random operations on fixture repo 422 + for batch_num in 1..=20 { 423 + let batch_size = rng.gen_range(10..=50); 424 + let ops = generate_random_ops(&mut rng, &mut tracker, batch_size); 425 + let record_writes = test_ops_to_record_writes(ops, &collection); 426 + 427 + let (repo_ops, commit_data) = repo 428 + .create_commit( 429 + &record_writes, 430 + &did, 431 + Some(repo.current_commit_cid().clone()), 432 + &signing_key, 433 + ) 434 + .await 435 + .unwrap(); 436 + 437 + repo.apply_commit(commit_data.clone()).await.unwrap(); 438 + 439 + let firehose_commit = commit_data 440 + .to_firehose_commit(&did, batch_num, Datetime::now(), repo_ops, vec![]) 441 + .await 442 + .unwrap(); 443 + 444 + firehose_commit 445 + .validate_v1_1(&pubkey) 446 + .await 447 + .unwrap_or_else(|e| panic!("Fixture validation failed at batch {}: {}", batch_num, e)); 448 + } 449 + 450 + println!("Fixture stress test complete - 20 batches validated"); 451 + }
+1 -5
crates/jacquard-repo/tests/interop.rs
··· 888 888 let blocks = read_car(fixture_path).await.expect("Failed to read CAR"); 889 889 let storage = Arc::new(MemoryBlockStore::new()); 890 890 891 - let mut block_vec = Vec::new(); 892 - for (cid, data) in blocks.iter() { 893 - block_vec.push((*cid, data.clone())); 894 - } 895 891 storage 896 - .put_many(block_vec) 892 + .put_many(blocks) 897 893 .await 898 894 .expect("Failed to store blocks"); 899 895