A better Rust ATProto crate

okay more robust, should work

Orual 62fb1f0f 574f3097

+1470 -212
+12
crates/jacquard-common/src/types/crypto.rs
··· 192 192 } 193 193 } 194 194 195 + /// Decode unsigned varint from bytes 196 + /// 197 + /// Returns `Some((value, bytes_read))` on success, `None` on invalid input. 198 + /// Used for decoding multicodec prefixes in multibase-encoded keys. 195 199 pub fn decode_uvarint(data: &[u8]) -> Option<(u64, usize)> { 196 200 let mut x: u64 = 0; 197 201 let mut s: u32 = 0; ··· 208 212 None 209 213 } 210 214 215 + /// Encode unsigned varint to bytes 216 + /// 217 + /// Encodes a u64 value as a multicodec-style varint. 218 + /// Used for encoding multicodec prefixes in multibase-encoded keys. 211 219 pub fn encode_uvarint(mut x: u64) -> Vec<u8> { 212 220 let mut out = Vec::new(); 213 221 while x >= 0x80 { ··· 218 226 out 219 227 } 220 228 229 + /// Encode public key as multibase multikey string 230 + /// 231 + /// Creates a multikey string with the given multicodec code and key bytes. 232 + /// Returns base58btc-encoded string with varint prefix. 221 233 pub fn multikey(code: u64, key: &[u8]) -> String { 222 234 let mut buf = encode_uvarint(code); 223 235 buf.extend_from_slice(key);
+25
crates/jacquard-common/src/types/recordkey.rs
··· 32 32 #[repr(transparent)] 33 33 pub struct RecordKey<T: RecordKeyType>(pub T); 34 34 35 + impl<'a> RecordKey<Rkey<'a>> { 36 + /// Create a new `RecordKey` from a string slice 37 + pub fn any(str: &'a str) -> Result<Self, AtStrError> { 38 + Ok(RecordKey(Rkey::new(str)?)) 39 + } 40 + 41 + /// Create a new `RecordKey` from a CowStr 42 + pub fn any_cow(str: CowStr<'a>) -> Result<Self, AtStrError> { 43 + Ok(RecordKey(Rkey::new_cow(str)?)) 44 + } 45 + 46 + /// Create a new `RecordKey` from a static string slice 47 + pub fn any_static(str: &'static str) -> Result<Self, AtStrError> { 48 + Ok(RecordKey(Rkey::new_static(str)?)) 49 + } 50 + } 51 + 35 52 impl<T> From<T> for RecordKey<Rkey<'_>> 36 53 where 37 54 T: RecordKeyType, 38 55 { 39 56 fn from(value: T) -> Self { 40 57 RecordKey(Rkey::from_str(value.as_str()).expect("Invalid rkey")) 58 + } 59 + } 60 + 61 + impl FromStr for RecordKey<Rkey<'_>> { 62 + type Err = AtStrError; 63 + 64 + fn from_str(s: &str) -> Result<Self, Self::Err> { 65 + Ok(RecordKey(Rkey::from_str(s)?)) 41 66 } 42 67 } 43 68
+1 -1
crates/jacquard-repo/src/car/writer.rs
··· 46 46 47 47 /// Write blocks to CAR bytes (in-memory) 48 48 /// 49 - /// Like `write_car()` but writes to a Vec<u8> instead of a file. 49 + /// Like `write_car()` but writes to a `Vec<u8>` instead of a file. 50 50 /// Useful for tests and proof generation. 51 51 pub async fn write_car_bytes( 52 52 root: IpldCid,
+320
crates/jacquard-repo/src/mst/cursor.rs
··· 1 + //! MST cursor for efficient tree traversal 2 + 3 + use super::node::NodeEntry; 4 + use super::tree::Mst; 5 + use crate::error::Result; 6 + use crate::storage::BlockStore; 7 + use cid::Cid as IpldCid; 8 + use smol_str::SmolStr; 9 + 10 + /// Position within an MST traversal 11 + #[derive(Debug, Clone)] 12 + pub enum CursorPosition<S: BlockStore> { 13 + /// Pointing at a leaf entry 14 + Leaf { 15 + /// Leaf key 16 + key: SmolStr, 17 + /// Leaf CID (record value) 18 + cid: IpldCid, 19 + }, 20 + 21 + /// Pointing at a tree (subtree root) 22 + Tree { 23 + /// Subtree MST 24 + mst: Mst<S>, 25 + }, 26 + 27 + /// Past the end of traversal 28 + End, 29 + } 30 + 31 + /// Cursor for navigating an MST in sorted order 32 + /// 33 + /// Maintains a position in the tree and supports efficient navigation: 34 + /// - `advance()`: Move to next item in sorted order 35 + /// - `skip_subtree()`: Skip entire subtree at current position 36 + /// - `current()`: Get current position without moving 37 + /// 38 + /// # Example traversal 39 + /// 40 + /// ```ignore 41 + /// let mut cursor = MstCursor::new(tree); 42 + /// 43 + /// while !cursor.is_end() { 44 + /// match cursor.current() { 45 + /// CursorPosition::Leaf { key, cid } => { 46 + /// println!("Leaf: {} -> {}", key, cid); 47 + /// } 48 + /// CursorPosition::Tree { mst } => { 49 + /// println!("Subtree at layer {}", mst.layer); 50 + /// } 51 + /// CursorPosition::End => break, 52 + /// } 53 + /// cursor.advance().await?; 54 + /// } 55 + /// ``` 56 + #[derive(Debug, Clone)] 57 + pub struct MstCursor<S: BlockStore> { 58 + /// Stack of (node, entries, index) pairs tracking path from root 59 + /// 60 + /// Each entry represents a level we've descended into: 61 + /// - `node`: The MST node at this level 62 + /// - `entries`: Cached entries for this node 63 + /// - `index`: Current position within entries 64 + path: Vec<(Mst<S>, Vec<NodeEntry<S>>, usize)>, 65 + 66 + /// Current position in traversal 67 + current: CursorPosition<S>, 68 + } 69 + 70 + impl<S: BlockStore + Sync + 'static> MstCursor<S> { 71 + /// Create new cursor at the start of a tree 72 + /// 73 + /// Initial position is the root of the tree (which is a Tree position). 74 + /// Call `advance()` to move to the first leaf. 75 + pub fn new(root: Mst<S>) -> Self { 76 + Self { 77 + path: Vec::new(), 78 + current: CursorPosition::Tree { mst: root }, 79 + } 80 + } 81 + 82 + /// Get current position without advancing 83 + pub fn current(&self) -> &CursorPosition<S> { 84 + &self.current 85 + } 86 + 87 + /// Check if cursor is at end 88 + pub fn is_end(&self) -> bool { 89 + matches!(self.current, CursorPosition::End) 90 + } 91 + 92 + /// Get key at current position (if pointing at leaf) 93 + pub fn key(&self) -> Option<&str> { 94 + match &self.current { 95 + CursorPosition::Leaf { key, .. } => Some(key.as_str()), 96 + _ => None, 97 + } 98 + } 99 + 100 + /// Get the layer we're currently walking at 101 + /// 102 + /// Returns the layer of the node we're traversing within. 103 + /// If at the root level (before stepping in), returns root's layer + 1. 104 + pub async fn layer(&self) -> Result<usize> { 105 + if let Some((walking_node, _, _)) = self.path.last() { 106 + // We're inside a node - return its layer 107 + walking_node.get_layer().await 108 + } else { 109 + // At root level (not yet stepped in) - return root's layer + 1 110 + // This matches rsky's walker behavior: being "at" the root (before entering) 111 + // is one layer higher than being "inside" the root 112 + match &self.current { 113 + CursorPosition::Tree { mst } => { 114 + let root_layer = mst.get_layer().await?; 115 + Ok(root_layer + 1) 116 + } 117 + CursorPosition::End => Ok(0), 118 + CursorPosition::Leaf { .. } => { 119 + // Shouldn't happen - root can't be a leaf 120 + Ok(0) 121 + } 122 + } 123 + } 124 + } 125 + 126 + /// Advance to next position in sorted order 127 + /// 128 + /// - If at Leaf: move to next sibling or pop up 129 + /// - If at Tree: descend into it (step into first entry) 130 + /// - If at End: stay at End 131 + pub async fn advance(&mut self) -> Result<()> { 132 + match &self.current { 133 + CursorPosition::End => Ok(()), 134 + 135 + CursorPosition::Leaf { .. } => { 136 + // Move to next sibling 137 + self.step_over().await 138 + } 139 + 140 + CursorPosition::Tree { mst } => { 141 + // Descend into tree 142 + self.step_into(mst.clone()).await 143 + } 144 + } 145 + } 146 + 147 + /// Skip entire subtree at current position 148 + /// 149 + /// If pointing at a Tree, skips all its contents. 150 + /// If pointing at a Leaf, equivalent to `advance()`. 151 + pub async fn skip_subtree(&mut self) -> Result<()> { 152 + self.step_over().await 153 + } 154 + 155 + /// Move to next sibling or pop up 156 + fn step_over<'a>( 157 + &'a mut self, 158 + ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> { 159 + Box::pin(async move { 160 + if let Some((_node, entries, index)) = self.path.last_mut() { 161 + // Try to move to next entry in current node 162 + *index += 1; 163 + 164 + if *index < entries.len() { 165 + // Move to next entry 166 + self.current = match &entries[*index] { 167 + NodeEntry::Leaf { key, value } => CursorPosition::Leaf { 168 + key: key.clone(), 169 + cid: *value, 170 + }, 171 + NodeEntry::Tree(tree) => CursorPosition::Tree { mst: tree.clone() }, 172 + }; 173 + Ok(()) 174 + } else { 175 + // No more entries at this level - pop up 176 + self.path.pop(); 177 + self.step_over().await 178 + } 179 + } else { 180 + // No parent - we're done 181 + self.current = CursorPosition::End; 182 + Ok(()) 183 + } 184 + }) 185 + } 186 + 187 + /// Descend into a tree node 188 + async fn step_into(&mut self, mst: Mst<S>) -> Result<()> { 189 + let entries = mst.get_entries().await?; 190 + 191 + if entries.is_empty() { 192 + // Empty tree - skip it 193 + self.step_over().await 194 + } else { 195 + // Push current level onto stack and move to first entry 196 + self.path.push((mst, entries.clone(), 0)); 197 + 198 + self.current = match &entries[0] { 199 + NodeEntry::Leaf { key, value } => CursorPosition::Leaf { 200 + key: key.clone(), 201 + cid: *value, 202 + }, 203 + NodeEntry::Tree(tree) => CursorPosition::Tree { mst: tree.clone() }, 204 + }; 205 + 206 + Ok(()) 207 + } 208 + } 209 + } 210 + 211 + #[cfg(test)] 212 + mod tests { 213 + use super::*; 214 + use crate::DAG_CBOR_CID_CODEC; 215 + use crate::mst::tree::Mst; 216 + use crate::storage::memory::MemoryBlockStore; 217 + use jacquard_common::types::crypto::SHA2_256; 218 + use std::sync::Arc; 219 + 220 + fn test_cid(n: u8) -> IpldCid { 221 + let data = vec![n; 32]; 222 + let mh = multihash::Multihash::wrap(SHA2_256, &data).unwrap(); 223 + IpldCid::new_v1(DAG_CBOR_CID_CODEC, mh) 224 + } 225 + 226 + #[tokio::test] 227 + async fn test_cursor_empty_tree() { 228 + let storage = Arc::new(MemoryBlockStore::new()); 229 + let tree = Mst::new(storage); 230 + 231 + let mut cursor = MstCursor::new(tree); 232 + 233 + // Should start at root (Tree position) 234 + assert!(matches!(cursor.current(), CursorPosition::Tree { .. })); 235 + 236 + // Advance into empty tree should reach end 237 + cursor.advance().await.unwrap(); 238 + assert!(cursor.is_end()); 239 + } 240 + 241 + #[tokio::test] 242 + async fn test_cursor_single_leaf() { 243 + let storage = Arc::new(MemoryBlockStore::new()); 244 + let tree = Mst::new(storage); 245 + let tree = tree.add("key1", test_cid(1)).await.unwrap(); 246 + 247 + let mut cursor = MstCursor::new(tree); 248 + 249 + // Start at root 250 + assert!(matches!(cursor.current(), CursorPosition::Tree { .. })); 251 + 252 + // Advance to first leaf 253 + cursor.advance().await.unwrap(); 254 + assert_eq!(cursor.key(), Some("key1")); 255 + 256 + // Advance past last leaf 257 + cursor.advance().await.unwrap(); 258 + assert!(cursor.is_end()); 259 + } 260 + 261 + #[tokio::test] 262 + async fn test_cursor_multiple_leaves() { 263 + let storage = Arc::new(MemoryBlockStore::new()); 264 + let tree = Mst::new(storage); 265 + let tree = tree.add("a", test_cid(1)).await.unwrap(); 266 + let tree = tree.add("b", test_cid(2)).await.unwrap(); 267 + let tree = tree.add("c", test_cid(3)).await.unwrap(); 268 + 269 + let mut cursor = MstCursor::new(tree); 270 + 271 + let mut keys = Vec::new(); 272 + 273 + // Skip root 274 + cursor.advance().await.unwrap(); 275 + 276 + while !cursor.is_end() { 277 + if let Some(key) = cursor.key() { 278 + keys.push(key.to_string()); 279 + } 280 + cursor.advance().await.unwrap(); 281 + } 282 + 283 + assert_eq!(keys, vec!["a", "b", "c"]); 284 + } 285 + 286 + #[tokio::test] 287 + async fn test_cursor_skip_subtree() { 288 + let storage = Arc::new(MemoryBlockStore::new()); 289 + let tree = Mst::new(storage); 290 + 291 + // Add enough keys to create subtrees 292 + let tree = tree.add("a", test_cid(1)).await.unwrap(); 293 + let tree = tree.add("b", test_cid(2)).await.unwrap(); 294 + let tree = tree.add("c", test_cid(3)).await.unwrap(); 295 + 296 + let mut cursor = MstCursor::new(tree); 297 + 298 + // Advance to first position 299 + cursor.advance().await.unwrap(); 300 + 301 + // If we hit a tree, skip it 302 + let mut leaf_count = 0; 303 + while !cursor.is_end() { 304 + match cursor.current() { 305 + CursorPosition::Leaf { .. } => { 306 + leaf_count += 1; 307 + cursor.advance().await.unwrap(); 308 + } 309 + CursorPosition::Tree { .. } => { 310 + // Skip entire subtree 311 + cursor.skip_subtree().await.unwrap(); 312 + } 313 + CursorPosition::End => break, 314 + } 315 + } 316 + 317 + // We should have encountered some leaves 318 + assert!(leaf_count > 0); 319 + } 320 + }
+345 -32
crates/jacquard-repo/src/mst/diff.rs
··· 1 1 //! MST diff calculation 2 2 3 + use std::collections::BTreeMap; 4 + 5 + use super::cursor::{CursorPosition, MstCursor}; 3 6 use super::tree::Mst; 4 7 use crate::error::Result; 5 8 use crate::storage::BlockStore; 9 + use bytes::Bytes; 6 10 use cid::Cid as IpldCid; 7 11 use smol_str::SmolStr; 8 - use std::collections::HashMap; 9 12 10 13 /// Diff between two MST states 11 14 /// ··· 21 24 22 25 /// Records deleted (key, old CID) 23 26 pub deletes: Vec<(SmolStr, IpldCid)>, 27 + 28 + /// Record CIDs that are newly referenced (from creates + updates) 29 + /// 30 + /// This includes: 31 + /// - CIDs from created records 32 + /// - New CIDs from updated records 33 + /// 34 + /// These need to be available in storage for the new tree. 35 + pub new_leaf_cids: Vec<IpldCid>, 36 + 37 + /// Record CIDs that are no longer referenced (from deletes + updates) 38 + /// 39 + /// This includes: 40 + /// - CIDs from deleted records 41 + /// - Old CIDs from updated records 42 + /// 43 + /// These can be garbage collected if not referenced elsewhere. 44 + pub removed_cids: Vec<IpldCid>, 45 + 46 + /// MST node blocks that are newly created 47 + /// 48 + /// When modifying a tree, new MST nodes are created along changed paths. 49 + /// This tracks those nodes for persistence/commit inclusion. 50 + pub new_mst_blocks: BTreeMap<IpldCid, Bytes>, 51 + 52 + /// MST node blocks that are no longer needed 53 + /// 54 + /// When modifying a tree, old MST nodes along changed paths become unreachable. 55 + /// This tracks those nodes for garbage collection. 56 + pub removed_mst_blocks: Vec<IpldCid>, 24 57 } 25 58 26 59 use super::tree::VerifiedWriteOp; ··· 32 65 creates: Vec::new(), 33 66 updates: Vec::new(), 34 67 deletes: Vec::new(), 68 + new_leaf_cids: Vec::new(), 69 + removed_cids: Vec::new(), 70 + new_mst_blocks: BTreeMap::new(), 71 + removed_mst_blocks: Vec::new(), 35 72 } 36 73 } 37 74 ··· 94 131 ops 95 132 } 96 133 134 + /// Fetch new record data blocks from storage 135 + /// 136 + /// Returns a map of CID → bytes for all new record data (creates + updates). 137 + /// This is useful for including record data in commits and firehose messages. 138 + pub async fn fetch_new_blocks<S: BlockStore>( 139 + &self, 140 + storage: &S, 141 + ) -> Result<std::collections::BTreeMap<IpldCid, bytes::Bytes>> { 142 + use std::collections::BTreeMap; 143 + 144 + let mut blocks = BTreeMap::new(); 145 + 146 + for cid in &self.new_leaf_cids { 147 + if let Some(block) = storage.get(cid).await? { 148 + blocks.insert(*cid, block); 149 + } 150 + } 151 + 152 + Ok(blocks) 153 + } 154 + 97 155 /// Convert diff to firehose repository operations 98 156 /// 99 157 /// Returns operations in the format used by `com.atproto.sync.subscribeRepos`. ··· 150 208 /// - Creates: keys in `other` but not in `self` 151 209 /// - Updates: keys in both but with different CIDs 152 210 /// - Deletes: keys in `self` but not in `other` 211 + /// 212 + /// Uses an efficient walker-based algorithm that only visits changed subtrees. 213 + /// When two subtrees have the same CID, the entire subtree is skipped. 153 214 pub async fn diff(&self, other: &Mst<S>) -> Result<MstDiff> { 154 - // Collect all leaves from both trees 155 - let self_leaves = self.leaves().await?; 156 - let other_leaves = other.leaves().await?; 215 + let mut diff = MstDiff::new(); 216 + diff_recursive(self, other, &mut diff).await?; 217 + Ok(diff) 218 + } 219 + } 220 + 221 + /// Recursively diff two MST nodes using cursors 222 + fn diff_recursive<'a, S: BlockStore + Sync + 'static>( 223 + old: &'a Mst<S>, 224 + new: &'a Mst<S>, 225 + diff: &'a mut MstDiff, 226 + ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> { 227 + Box::pin(async move { 228 + // If CIDs are equal, trees are identical - skip entire subtree 229 + let old_cid = old.get_pointer().await?; 230 + let new_cid = new.get_pointer().await?; 231 + if old_cid == new_cid { 232 + return Ok(()); 233 + } 234 + 235 + // CIDs differ - use cursors to walk both trees 236 + let mut old_cursor = MstCursor::new(old.clone()); 237 + let mut new_cursor = MstCursor::new(new.clone()); 238 + 239 + // Don't advance yet - let loop handle roots like any other tree comparison 240 + loop { 241 + match (old_cursor.current(), new_cursor.current()) { 242 + (CursorPosition::End, CursorPosition::End) => break, 243 + 244 + // Only new entries remain - all adds 245 + (CursorPosition::End, CursorPosition::Leaf { key, cid }) => { 246 + diff.creates.push((key.clone(), *cid)); 247 + diff.new_leaf_cids.push(*cid); 248 + new_cursor.advance().await?; 249 + } 250 + (CursorPosition::End, CursorPosition::Tree { mst }) => { 251 + track_added_tree(mst, diff).await?; 252 + new_cursor.skip_subtree().await?; 253 + } 254 + 255 + // Only old entries remain - all deletes 256 + (CursorPosition::Leaf { key, cid }, CursorPosition::End) => { 257 + diff.deletes.push((key.clone(), *cid)); 258 + diff.removed_cids.push(*cid); 259 + old_cursor.advance().await?; 260 + } 261 + (CursorPosition::Tree { mst }, CursorPosition::End) => { 262 + track_removed_tree(mst, diff).await?; 263 + old_cursor.skip_subtree().await?; 264 + } 265 + 266 + // Both have entries - compare them 267 + (old_pos, new_pos) => { 268 + // Handle Leaf/Leaf comparison FIRST (before layer checks) 269 + // This matches rsky's logic - key comparison takes precedence 270 + if let ( 271 + CursorPosition::Leaf { 272 + key: old_key, 273 + cid: old_cid, 274 + }, 275 + CursorPosition::Leaf { 276 + key: new_key, 277 + cid: new_cid, 278 + }, 279 + ) = (old_pos, new_pos) 280 + { 281 + match old_key.as_str().cmp(new_key.as_str()) { 282 + std::cmp::Ordering::Equal => { 283 + // Same key - check if value changed 284 + if old_cid != new_cid { 285 + diff.updates.push((old_key.clone(), *new_cid, *old_cid)); 286 + diff.new_leaf_cids.push(*new_cid); 287 + diff.removed_cids.push(*old_cid); 288 + } 289 + old_cursor.advance().await?; 290 + new_cursor.advance().await?; 291 + } 292 + std::cmp::Ordering::Less => { 293 + // Old key < new key - old was deleted 294 + diff.deletes.push((old_key.clone(), *old_cid)); 295 + diff.removed_cids.push(*old_cid); 296 + old_cursor.advance().await?; 297 + } 298 + std::cmp::Ordering::Greater => { 299 + // Old key > new key - new was created 300 + diff.creates.push((new_key.clone(), *new_cid)); 301 + diff.new_leaf_cids.push(*new_cid); 302 + new_cursor.advance().await?; 303 + } 304 + } 305 + continue; 306 + } 307 + 308 + // Now check layers for Tree comparisons 309 + let old_layer = old_cursor.layer().await?; 310 + let new_layer = new_cursor.layer().await?; 311 + 312 + match (old_pos, new_pos) { 313 + // Both trees at same layer - check if CIDs match, skip or recurse 314 + ( 315 + CursorPosition::Tree { mst: old_tree }, 316 + CursorPosition::Tree { mst: new_tree }, 317 + ) if old_layer == new_layer => { 318 + let old_tree_cid = old_tree.get_pointer().await?; 319 + let new_tree_cid = new_tree.get_pointer().await?; 320 + 321 + if old_tree_cid == new_tree_cid { 322 + // Same subtree - skip both 323 + old_cursor.skip_subtree().await?; 324 + new_cursor.skip_subtree().await?; 325 + } else { 326 + // Different subtrees - serialize and track MST blocks, then step in to find leaf diff 327 + serialize_and_track_mst(new_tree, diff).await?; 328 + diff.removed_mst_blocks.push(old_tree_cid); 329 + // Don't track recursively - step in to compare leaves 330 + old_cursor.advance().await?; 331 + new_cursor.advance().await?; 332 + } 333 + } 334 + 335 + // Layer mismatch handling (rsky pattern) 336 + _ if old_layer > new_layer => { 337 + // Old is at higher layer - need to descend or advance appropriately 338 + match old_pos { 339 + CursorPosition::Leaf { .. } => { 340 + // Higher layer leaf - serialize and track new node, advance new to continue comparing 341 + if let CursorPosition::Tree { mst } = new_pos { 342 + serialize_and_track_mst(mst, diff).await?; 343 + } 344 + new_cursor.advance().await?; // Don't blindly add - let loop compare 345 + } 346 + CursorPosition::Tree { mst } => { 347 + // Higher layer tree - track MST block removal, then step into to find leaves 348 + let tree_cid = mst.get_pointer().await?; 349 + diff.removed_mst_blocks.push(tree_cid); 350 + old_cursor.advance().await?; // Step into to continue comparing 351 + } 352 + _ => {} 353 + } 354 + } 157 355 158 - // Build hashmaps for efficient lookup 159 - let self_map: HashMap<SmolStr, IpldCid> = self_leaves.into_iter().collect(); 160 - let other_map: HashMap<SmolStr, IpldCid> = other_leaves.into_iter().collect(); 356 + _ if old_layer < new_layer => { 357 + // New is at higher layer 358 + match new_pos { 359 + CursorPosition::Leaf { .. } => { 360 + // Higher layer leaf - track old node, advance old to continue comparing 361 + if let CursorPosition::Tree { mst } = old_pos { 362 + let tree_cid = mst.get_pointer().await?; 363 + diff.removed_mst_blocks.push(tree_cid); 364 + } 365 + old_cursor.advance().await?; // Don't blindly delete - let loop compare 366 + } 367 + CursorPosition::Tree { mst } => { 368 + // Higher layer tree - serialize and track MST block addition, then step into to find leaves 369 + serialize_and_track_mst(mst, diff).await?; 370 + new_cursor.advance().await?; // Step into to continue comparing 371 + } 372 + _ => {} 373 + } 374 + } 161 375 162 - let mut diff = MstDiff::new(); 376 + // Same layer, mixed Leaf/Tree - step into tree to compare 377 + (CursorPosition::Leaf { .. }, CursorPosition::Tree { mst }) => { 378 + // Old has leaf, new has tree - serialize and track new MST block, step in to compare leaves 379 + serialize_and_track_mst(mst, diff).await?; 380 + new_cursor.advance().await?; 381 + } 163 382 164 - // Find creates and updates 165 - for (key, new_cid) in &other_map { 166 - match self_map.get(key) { 167 - Some(old_cid) => { 168 - // Key exists in both - check if CID changed 169 - if old_cid != new_cid { 170 - diff.updates.push((key.clone(), *new_cid, *old_cid)); 383 + (CursorPosition::Tree { mst }, CursorPosition::Leaf { .. }) => { 384 + // Old has tree, new has leaf - track removed MST block, step in to compare leaves 385 + let tree_cid = mst.get_pointer().await?; 386 + diff.removed_mst_blocks.push(tree_cid); 387 + old_cursor.advance().await?; 388 + } 389 + 390 + _ => {} 171 391 } 172 392 } 173 - None => { 174 - // Key only in other - create 175 - diff.creates.push((key.clone(), *new_cid)); 393 + } 394 + } 395 + 396 + Ok(()) 397 + }) 398 + } 399 + 400 + /// Serialize MST node and add to new_mst_blocks 401 + async fn serialize_and_track_mst<S: BlockStore + Sync + 'static>( 402 + tree: &Mst<S>, 403 + diff: &mut MstDiff, 404 + ) -> Result<()> { 405 + let tree_cid = tree.get_pointer().await?; 406 + 407 + // Serialize the MST node 408 + let entries = tree.get_entries().await?; 409 + let node_data = super::util::serialize_node_data(&entries).await?; 410 + let cbor = serde_ipld_dagcbor::to_vec(&node_data) 411 + .map_err(|e| crate::error::RepoError::serialization(e))?; 412 + 413 + // Track the serialized block 414 + diff.new_mst_blocks.insert(tree_cid, Bytes::from(cbor)); 415 + 416 + Ok(()) 417 + } 418 + 419 + /// Track entire tree as added (all leaves and nodes) 420 + fn track_added_tree<'a, S: BlockStore + Sync + 'static>( 421 + tree: &'a Mst<S>, 422 + diff: &'a mut MstDiff, 423 + ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> { 424 + Box::pin(async move { 425 + use super::node::NodeEntry; 426 + 427 + // Serialize and track this MST node 428 + serialize_and_track_mst(tree, diff).await?; 429 + 430 + let entries = tree.get_entries().await?; 431 + for entry in &entries { 432 + match entry { 433 + NodeEntry::Leaf { key, value } => { 434 + diff.creates.push((key.clone(), *value)); 435 + diff.new_leaf_cids.push(*value); 436 + } 437 + NodeEntry::Tree(subtree) => { 438 + track_added_tree(subtree, diff).await?; 176 439 } 177 440 } 178 441 } 179 442 180 - // Find deletes 181 - for (key, old_cid) in &self_map { 182 - if !other_map.contains_key(key) { 183 - // Key only in self - delete 184 - diff.deletes.push((key.clone(), *old_cid)); 443 + Ok(()) 444 + }) 445 + } 446 + 447 + /// Track entire tree as removed (all leaves and nodes) 448 + fn track_removed_tree<'a, S: BlockStore + Sync + 'static>( 449 + tree: &'a Mst<S>, 450 + diff: &'a mut MstDiff, 451 + ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> { 452 + Box::pin(async move { 453 + use super::node::NodeEntry; 454 + 455 + // Track this MST node as removed 456 + let tree_cid = tree.get_pointer().await?; 457 + diff.removed_mst_blocks.push(tree_cid); 458 + 459 + // Recursively remove all leaves and nodes 460 + let entries = tree.get_entries().await?; 461 + for entry in &entries { 462 + match entry { 463 + NodeEntry::Leaf { key, value } => { 464 + diff.deletes.push((key.clone(), *value)); 465 + diff.removed_cids.push(*value); 466 + } 467 + NodeEntry::Tree(subtree) => { 468 + track_removed_tree(subtree, diff).await?; 469 + } 185 470 } 186 471 } 187 472 188 - Ok(diff) 189 - } 473 + Ok(()) 474 + }) 475 + } 190 476 477 + impl<S: BlockStore + Sync + 'static> Mst<S> { 191 478 /// Compute diff from this tree to empty (all deletes) 192 479 /// 193 480 /// Returns diff representing deletion of all records in this tree. 194 481 pub async fn diff_to_empty(&self) -> Result<MstDiff> { 195 - let leaves = self.leaves().await?; 482 + let mut diff = MstDiff::new(); 483 + track_removed_tree_all(self, &mut diff).await?; 484 + Ok(diff) 485 + } 486 + } 487 + 488 + /// Track entire tree as removed (all nodes and leaves) 489 + fn track_removed_tree_all<'a, S: BlockStore + Sync + 'static>( 490 + tree: &'a Mst<S>, 491 + diff: &'a mut MstDiff, 492 + ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> { 493 + Box::pin(async move { 494 + use super::node::NodeEntry; 495 + 496 + // Track this node as removed 497 + let tree_cid = tree.get_pointer().await?; 498 + diff.removed_mst_blocks.push(tree_cid); 499 + 500 + // Recurse through entries 501 + let entries = tree.get_entries().await?; 502 + for entry in &entries { 503 + match entry { 504 + NodeEntry::Leaf { key, value } => { 505 + diff.deletes.push((key.clone(), *value)); 506 + diff.removed_cids.push(*value); 507 + } 508 + NodeEntry::Tree(subtree) => { 509 + track_removed_tree_all(subtree, diff).await?; 510 + } 511 + } 512 + } 196 513 197 - Ok(MstDiff { 198 - creates: Vec::new(), 199 - updates: Vec::new(), 200 - deletes: leaves, 201 - }) 202 - } 514 + Ok(()) 515 + }) 203 516 } 204 517 205 518 #[cfg(test)]
+3 -1
crates/jacquard-repo/src/mst/mod.rs
··· 4 4 pub mod tree; 5 5 pub mod util; 6 6 pub mod diff; 7 + pub mod cursor; 7 8 8 9 pub use node::{NodeData, NodeEntry, TreeEntry}; 9 - pub use tree::{Mst, WriteOp}; 10 + pub use tree::{Mst, WriteOp, RecordWriteOp, VerifiedWriteOp}; 10 11 pub use diff::MstDiff; 12 + pub use cursor::{MstCursor, CursorPosition};
+14 -1
crates/jacquard-repo/src/mst/node.rs
··· 1 1 //! MST node data structures 2 2 3 + use std::fmt; 4 + 3 5 use bytes::Bytes; 4 6 use cid::Cid as IpldCid; 5 7 use smol_str::SmolStr; ··· 11 13 /// `[Tree, Leaf, Tree, Leaf, Leaf, Tree]` etc. 12 14 /// 13 15 /// The wire format (CBOR) is different - see `NodeData` and `TreeEntry`. 14 - #[derive(Debug, Clone)] 16 + #[derive(Clone)] 15 17 pub enum NodeEntry<S: crate::storage::BlockStore> { 16 18 /// Subtree reference 17 19 /// ··· 25 27 /// CID of the record value 26 28 value: IpldCid, 27 29 }, 30 + } 31 + 32 + impl<S: crate::storage::BlockStore> fmt::Debug for NodeEntry<S> { 33 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 34 + match self { 35 + NodeEntry::Tree(t) => write!(f, "{:?}", t), 36 + NodeEntry::Leaf { key, value } => { 37 + write!(f, "Leaf {{ key: {}, value: {} }}", key, value) 38 + } 39 + } 40 + } 28 41 } 29 42 30 43 impl<S: crate::storage::BlockStore> NodeEntry<S> {
+233 -25
crates/jacquard-repo/src/mst/tree.rs
··· 5 5 use crate::error::{RepoError, Result}; 6 6 use crate::storage::BlockStore; 7 7 use cid::Cid as IpldCid; 8 + use core::fmt; 9 + use jacquard_common::types::recordkey::Rkey; 10 + use jacquard_common::types::string::{Nsid, RecordKey}; 11 + use jacquard_common::types::value::RawData; 8 12 use smol_str::SmolStr; 13 + use std::fmt::{Display, Formatter}; 14 + use std::pin::Pin; 9 15 use std::sync::Arc; 10 16 use tokio::sync::RwLock; 11 17 ··· 46 52 }, 47 53 } 48 54 55 + /// Record write operation with inline data 56 + /// 57 + /// Used for high-level record operations where the actual record data 58 + /// needs to be serialized and stored. The data is a generic IPLD map 59 + /// (similar to rsky's `RepoRecord = BTreeMap<String, Lex>`). 60 + #[derive(Debug, Clone, PartialEq)] 61 + pub enum RecordWriteOp<'a> { 62 + /// Create new record with data 63 + Create { 64 + /// Collection NSID 65 + collection: Nsid<'a>, 66 + /// Record key 67 + rkey: RecordKey<Rkey<'a>>, 68 + /// Record data (will be serialized to DAG-CBOR and CID computed) 69 + record: std::collections::BTreeMap<SmolStr, RawData<'a>>, 70 + }, 71 + 72 + /// Update existing record with new data 73 + Update { 74 + /// Collection NSID 75 + collection: Nsid<'a>, 76 + /// Record key 77 + rkey: RecordKey<Rkey<'a>>, 78 + /// New record data 79 + record: std::collections::BTreeMap<SmolStr, RawData<'a>>, 80 + /// Previous CID (optional for validation) 81 + prev: Option<IpldCid>, 82 + }, 83 + 84 + /// Delete record 85 + Delete { 86 + /// Collection NSID 87 + collection: Nsid<'a>, 88 + /// Record key 89 + rkey: RecordKey<Rkey<'a>>, 90 + /// Previous CID (optional for validation) 91 + prev: Option<IpldCid>, 92 + }, 93 + } 94 + 95 + impl<'a> RecordWriteOp<'a> { 96 + /// Get the collection NSID for this operation 97 + pub fn collection(&self) -> &Nsid<'a> { 98 + match self { 99 + RecordWriteOp::Create { collection, .. } => collection, 100 + RecordWriteOp::Update { collection, .. } => collection, 101 + RecordWriteOp::Delete { collection, .. } => collection, 102 + } 103 + } 104 + 105 + /// Get the record key for this operation 106 + pub fn rkey(&self) -> &RecordKey<Rkey<'a>> { 107 + match self { 108 + RecordWriteOp::Create { rkey, .. } => rkey, 109 + RecordWriteOp::Update { rkey, .. } => rkey, 110 + RecordWriteOp::Delete { rkey, .. } => rkey, 111 + } 112 + } 113 + } 114 + 49 115 /// Verified write operation with required prev fields 50 116 /// 51 117 /// Used for operations where prev CID has been verified against tree state. ··· 101 167 /// - More leading zeros = higher layer (deeper in tree) 102 168 /// - Layer = floor(leading_zeros / 2) for ~4 fanout 103 169 /// - Deterministic and insertion-order independent 104 - #[derive(Debug, Clone)] 170 + #[derive(Clone)] 105 171 pub struct Mst<S: BlockStore> { 106 172 /// Block storage for loading/saving nodes (shared via Arc) 107 173 storage: Arc<S>, ··· 186 252 Ok(Self { 187 253 storage: self.storage.clone(), 188 254 entries: Arc::new(RwLock::new(Some(entries))), 189 - pointer: self.pointer.clone(), 255 + pointer: Arc::new(RwLock::new(self.pointer.read().await.clone())), 190 256 outdated_pointer: Arc::new(RwLock::new(true)), 191 257 layer: self.layer, 192 258 }) 193 259 } 194 260 195 261 /// Get entries (lazy load if needed) 196 - async fn get_entries(&self) -> Result<Vec<NodeEntry<S>>> { 262 + pub(crate) async fn get_entries(&self) -> Result<Vec<NodeEntry<S>>> { 197 263 { 198 264 let entries_guard = self.entries.read().await; 199 265 if let Some(ref entries) = *entries_guard { ··· 227 293 /// 228 294 /// Computes CID from current entries but doesn't persist to storage. 229 295 /// Use `collect_blocks()` to gather blocks for persistence. 230 - pub async fn get_pointer(&self) -> Result<IpldCid> { 231 - let outdated = *self.outdated_pointer.read().await; 232 - if !outdated { 233 - return Ok(*self.pointer.read().await); 234 - } 296 + pub fn get_pointer<'a>(&'a self) -> Pin<Box<dyn Future<Output = Result<IpldCid>> + Send + 'a>> { 297 + Box::pin(async move { 298 + let outdated = *self.outdated_pointer.read().await; 299 + if !outdated { 300 + return Ok(*self.pointer.read().await); 301 + } 235 302 236 - // Serialize and compute CID (don't persist yet) 237 - let entries = self.get_entries().await?; 238 - let node_data = util::serialize_node_data(&entries).await?; 239 - let cbor = 240 - serde_ipld_dagcbor::to_vec(&node_data).map_err(|e| RepoError::serialization(e))?; 241 - let cid = util::compute_cid(&cbor)?; 303 + // Check for outdated children and recursively update them first 304 + let mut entries = self.get_entries().await?; 305 + let mut outdated_children = Vec::new(); 242 306 243 - // Update pointer and mark as fresh 244 - { 245 - let mut pointer_guard = self.pointer.write().await; 246 - *pointer_guard = cid; 247 - } 248 - { 249 - let mut outdated_guard = self.outdated_pointer.write().await; 250 - *outdated_guard = false; 251 - } 307 + for entry in &entries { 308 + if let NodeEntry::Tree(mst) = entry { 309 + let is_outdated = *mst.outdated_pointer.read().await; 310 + if is_outdated { 311 + outdated_children.push(mst.clone()); 312 + } 313 + } 314 + } 252 315 253 - Ok(cid) 316 + // Recursively update outdated children 317 + if !outdated_children.is_empty() { 318 + for child in &outdated_children { 319 + let _ = child.get_pointer().await?; 320 + } 321 + // Re-fetch entries with updated child CIDs 322 + entries = self.get_entries().await?; 323 + } 324 + 325 + // Now serialize and compute CID with fresh child CIDs 326 + let node_data = util::serialize_node_data(&entries).await?; 327 + let cbor = 328 + serde_ipld_dagcbor::to_vec(&node_data).map_err(|e| RepoError::serialization(e))?; 329 + let cid = util::compute_cid(&cbor)?; 330 + 331 + // Update pointer and mark as fresh 332 + { 333 + let mut pointer_guard = self.pointer.write().await; 334 + *pointer_guard = cid; 335 + } 336 + { 337 + let mut outdated_guard = self.outdated_pointer.write().await; 338 + *outdated_guard = false; 339 + } 340 + 341 + Ok(cid) 342 + }) 254 343 } 255 344 256 345 /// Get root CID (alias for get_pointer) ··· 269 358 /// 270 359 /// Layer is the maximum layer of any leaf key in this node. 271 360 /// For nodes with no leaves, recursively checks subtrees. 272 - fn get_layer<'a>( 361 + pub(crate) fn get_layer<'a>( 273 362 &'a self, 274 363 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize>> + Send + 'a>> { 275 364 Box::pin(async move { ··· 818 907 }) 819 908 } 820 909 910 + /// Copy tree with same entries (marking pointer as outdated) 911 + /// 912 + /// Internal helper for creating modified tree copies. 913 + pub async fn copy_tree(&self) -> Result<Self> { 914 + let entries = self.get_entries().await?; 915 + self.new_tree(entries).await 916 + } 917 + 821 918 /// Apply batch of verified write operations (returns new tree) 822 919 /// 823 920 /// More efficient than individual operations as it only rebuilds ··· 946 1043 Ok(root_cid) 947 1044 } 948 1045 1046 + /// Collect all MST node CIDs in this tree 1047 + /// 1048 + /// Returns all CIDs for MST nodes (internal nodes), not leaves. 1049 + /// Used for diff calculation to determine which MST blocks are removed. 1050 + pub async fn collect_node_cids(&self) -> Result<Vec<IpldCid>> { 1051 + let mut cids = Vec::new(); 1052 + let pointer = self.get_pointer().await?; 1053 + cids.push(pointer); 1054 + 1055 + let entries = self.get_entries().await?; 1056 + for entry in &entries { 1057 + if let NodeEntry::Tree(subtree) = entry { 1058 + let subtree_cids = subtree.collect_node_cids().await?; 1059 + cids.extend(subtree_cids); 1060 + } 1061 + } 1062 + 1063 + Ok(cids) 1064 + } 1065 + 949 1066 /// Get all CIDs in the merkle path to a key 950 1067 /// 951 1068 /// Returns a list of CIDs representing the proof path from root to the target key: ··· 1070 1187 1071 1188 Ok(()) 1072 1189 }) 1190 + } 1191 + } 1192 + 1193 + impl<S: BlockStore> std::fmt::Debug for Mst<S> { 1194 + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { 1195 + f.debug_struct("MST") 1196 + .field("entries", &self.entries.try_read().unwrap()) 1197 + .field("layer", &self.layer) 1198 + .field("pointer", &self.pointer.try_read().unwrap().to_string()) 1199 + .field( 1200 + "outdated_pointer", 1201 + &self.outdated_pointer.try_read().unwrap(), 1202 + ) 1203 + .finish() 1204 + } 1205 + } 1206 + 1207 + /// Format a CID for display (shortens long CIDs) 1208 + /// 1209 + /// Truncates long CIDs to first 7 and last 8 characters with `...` in between. 1210 + pub fn short_cid(cid: &IpldCid) -> String { 1211 + let cid_string = cid.to_string(); 1212 + let len = cid_string.len(); 1213 + if len > 15 { 1214 + let first = &cid_string[0..7]; 1215 + let last = &cid_string[len - 8..]; 1216 + format!("{}...{}", first, last) 1217 + } else { 1218 + cid_string 1219 + } 1220 + } 1221 + 1222 + impl<S: BlockStore> Display for Mst<S> { 1223 + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { 1224 + fn pointer_str<S: BlockStore>(mst: &Mst<S>) -> String { 1225 + let cid_guard = mst.pointer.try_read().unwrap(); 1226 + format!("*({})", short_cid(&*cid_guard)) 1227 + } 1228 + 1229 + fn fmt_mst<S: BlockStore>( 1230 + mst: &Mst<S>, 1231 + f: &mut Formatter<'_>, 1232 + prefix: &str, 1233 + is_last: bool, 1234 + ) -> fmt::Result { 1235 + // Print MST pointer using our helper 1236 + writeln!( 1237 + f, 1238 + "{}{}── {}", 1239 + prefix, 1240 + if is_last { "└" } else { "├" }, 1241 + pointer_str(mst), 1242 + )?; 1243 + 1244 + // Prepare the child prefix 1245 + let child_prefix = format!("{}{}", prefix, if is_last { " " } else { "│ " }); 1246 + 1247 + let entries_guard = mst.entries.try_read().unwrap(); 1248 + let entries = match &*entries_guard { 1249 + Some(e) => e, 1250 + None => { 1251 + writeln!(f, "{}(virtual node)", child_prefix)?; 1252 + return Ok(()); 1253 + } 1254 + }; 1255 + 1256 + for (i, entry) in entries.iter().enumerate() { 1257 + let last_child = i == entries.len() - 1; 1258 + match entry { 1259 + NodeEntry::Leaf { key, value } => { 1260 + // Print leaf key and (short) leaf value 1261 + writeln!( 1262 + f, 1263 + "{}{}── {} -> {}", 1264 + child_prefix, 1265 + if last_child { "└" } else { "├" }, 1266 + key, 1267 + short_cid(&value) 1268 + )?; 1269 + } 1270 + NodeEntry::Tree(child_mst) => { 1271 + // Recurse 1272 + fmt_mst(child_mst, f, &child_prefix, last_child)?; 1273 + } 1274 + } 1275 + } 1276 + Ok(()) 1277 + } 1278 + 1279 + // Start with empty prefix for the root 1280 + fmt_mst(self, f, "", true) 1073 1281 } 1074 1282 } 1075 1283
+466 -150
crates/jacquard-repo/src/repo.rs
··· 3 3 //! Optional convenience layer over MST primitives. Provides type-safe record operations, 4 4 //! batch writes, commit creation, and CAR export. 5 5 6 - use crate::MstDiff; 7 6 use crate::commit::Commit; 8 7 use crate::error::Result; 9 - use crate::mst::{Mst, WriteOp}; 8 + use crate::mst::Mst; 10 9 use crate::storage::BlockStore; 11 10 use cid::Cid as IpldCid; 12 11 use jacquard_common::IntoStatic; ··· 40 39 /// Previous MST root CID (for sync v1.1) 41 40 pub prev_data: Option<IpldCid>, 42 41 43 - /// All blocks to persist (MST nodes + commit block) 42 + /// All blocks to persist (MST nodes + record data + commit block) 44 43 /// 45 44 /// Includes: 46 - /// - All new MST node blocks from `mst.collect_blocks()` 45 + /// - All new MST node blocks from `diff.new_mst_blocks` 46 + /// - All new record data blocks (from creates + updates) 47 47 /// - The commit block itself 48 48 pub blocks: BTreeMap<IpldCid, bytes::Bytes>, 49 49 ··· 54 54 /// - MST node blocks along paths for all changed keys 55 55 /// - Includes "adjacent" blocks needed for operation inversion 56 56 pub relevant_blocks: BTreeMap<IpldCid, bytes::Bytes>, 57 + 58 + /// CIDs of blocks to delete from storage 59 + /// 60 + /// Contains CIDs that are no longer referenced by the current tree: 61 + /// - Record CIDs from deleted records 62 + /// - Old record CIDs from updated records 63 + /// 64 + /// **Note:** Actual deletion should consider whether previous commits still 65 + /// reference these CIDs. A proper GC strategy might: 66 + /// - Only delete if previous commits are also being GC'd 67 + /// - Use reference counting across all retained commits 68 + /// - Perform periodic reachability analysis 69 + /// 70 + /// For simple single-commit repos or when old commits are discarded, direct 71 + /// deletion is safe. 72 + pub deleted_cids: Vec<IpldCid>, 57 73 } 58 74 59 75 impl CommitData { ··· 224 240 Ok(old_cid) 225 241 } 226 242 227 - /// Apply write operations individually (validates existence/prev) 228 - pub async fn create_writes(&mut self, ops: &[WriteOp]) -> Result<crate::mst::MstDiff> { 229 - let old_mst = self.mst.clone(); 243 + /// Apply record write operations with inline data 244 + /// 245 + /// Serializes record data to DAG-CBOR, computes CIDs, stores data blocks, 246 + /// then applies write operations to the MST. Returns the diff for inspection. 247 + /// 248 + /// For creating commits with operations, use `create_commit()` instead. 249 + pub async fn apply_record_writes( 250 + &mut self, 251 + ops: &[crate::mst::RecordWriteOp<'_>], 252 + ) -> Result<crate::mst::MstDiff> { 253 + use crate::mst::RecordWriteOp; 254 + use smol_str::format_smolstr; 255 + 256 + let mut updated_tree = self.mst.clone(); 230 257 231 - // Apply operations individually (add/update/delete verify existence) 232 258 for op in ops { 233 - self.mst = match op { 234 - WriteOp::Create { key, cid } => { 235 - // Check doesn't exist 236 - if self.mst.get(key.as_str()).await?.is_some() { 237 - return Err(crate::error::RepoError::already_exists( 238 - "record", 239 - key.as_str(), 240 - )); 241 - } 242 - self.mst.add(key.as_str(), *cid).await? 259 + updated_tree = match op { 260 + RecordWriteOp::Create { 261 + collection, 262 + rkey, 263 + record, 264 + } => { 265 + let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref()); 266 + 267 + // Serialize record to DAG-CBOR 268 + let cbor = serde_ipld_dagcbor::to_vec(record) 269 + .map_err(|e| crate::error::RepoError::serialization(e))?; 270 + 271 + // Compute CID and store data 272 + let cid = self.storage.put(&cbor).await?; 273 + 274 + updated_tree.add(key.as_str(), cid).await? 243 275 } 244 - WriteOp::Update { key, cid, prev } => { 245 - // Check exists 246 - let current = self.mst.get(key.as_str()).await?.ok_or_else(|| { 247 - crate::error::RepoError::not_found("record", key.as_str()) 248 - })?; 276 + RecordWriteOp::Update { 277 + collection, 278 + rkey, 279 + record, 280 + prev, 281 + } => { 282 + let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref()); 283 + 284 + // Serialize record to DAG-CBOR 285 + let cbor = serde_ipld_dagcbor::to_vec(record) 286 + .map_err(|e| crate::error::RepoError::serialization(e))?; 287 + 288 + // Compute CID and store data 289 + let cid = self.storage.put(&cbor).await?; 249 290 250 291 // Validate prev if provided 251 292 if let Some(prev_cid) = prev { 252 - if &current != prev_cid { 293 + if &cid != prev_cid { 253 294 return Err(crate::error::RepoError::invalid(format!( 254 295 "Update prev CID mismatch for key {}: expected {}, got {}", 255 - key, prev_cid, current 296 + key, prev_cid, cid 256 297 ))); 257 298 } 258 299 } 259 300 260 - self.mst.add(key.as_str(), *cid).await? 301 + updated_tree.add(key.as_str(), cid).await? 261 302 } 262 - WriteOp::Delete { key, prev } => { 303 + RecordWriteOp::Delete { 304 + collection, 305 + rkey, 306 + prev, 307 + } => { 308 + let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref()); 309 + 263 310 // Check exists 264 311 let current = self.mst.get(key.as_str()).await?.ok_or_else(|| { 265 312 crate::error::RepoError::not_found("record", key.as_str()) ··· 275 322 } 276 323 } 277 324 278 - self.mst.delete(key.as_str()).await? 325 + updated_tree.delete(key.as_str()).await? 279 326 } 280 327 }; 281 328 } 282 329 283 - old_mst.diff(&self.mst).await 284 - } 330 + // Compute diff before updating 331 + let diff = self.mst.diff(&updated_tree).await?; 332 + 333 + // Update mst 334 + self.mst = updated_tree; 285 335 286 - /// Apply write operations and create a commit 287 - /// 288 - /// Convenience method that calls `create_writes()` and `commit()`. 289 - pub async fn apply_writes<K>(&mut self, ops: &[WriteOp], signing_key: &K) -> Result<MstDiff> 290 - where 291 - K: crate::commit::SigningKey, 292 - { 293 - let did = &self.commit.did.clone(); 294 - let cid = &self.commit_cid.clone(); 295 - let diff = self.create_writes(ops).await?; 296 - self.commit(&did, Some(*cid), signing_key).await?; 297 336 Ok(diff) 298 337 } 299 338 300 - /// Format a commit (create signed commit + collect blocks) 339 + /// Create a commit from record write operations 301 340 /// 302 - /// Creates signed commit and collects blocks for persistence and firehose: 303 - /// - All MST node blocks from `mst.collect_blocks()` 304 - /// - Commit block itself 305 - /// - Relevant blocks for sync v1.1 (walks paths for all changed keys) 341 + /// Applies write operations, creates signed commit, and collects blocks: 342 + /// - Serializes records to DAG-CBOR and stores data blocks 343 + /// - Applies operations to MST and computes diff 344 + /// - Uses `diff.new_mst_blocks` for efficient block tracking 345 + /// - Walks paths for original operations to build relevant_blocks (sync v1.1) 306 346 /// 307 347 /// Returns `(ops, CommitData)` - ops are needed for `to_firehose_commit()`. 308 - pub async fn format_commit<K>( 309 - &self, 348 + pub async fn create_commit<K>( 349 + &mut self, 350 + ops: &[crate::mst::RecordWriteOp<'_>], 310 351 did: &Did<'_>, 311 352 prev: Option<IpldCid>, 312 353 signing_key: &K, ··· 314 355 where 315 356 K: crate::commit::SigningKey, 316 357 { 317 - let rev = Ticker::new().next(Some(self.commit.rev.clone())); 318 - let data = self.mst.root().await?; 319 - let prev_data = *self.commit.data(); 358 + use crate::mst::RecordWriteOp; 359 + use smol_str::format_smolstr; 320 360 321 - // Create signed commit 322 - let commit = Commit::new_unsigned(did.clone().into_static(), data, rev.clone(), prev) 323 - .sign(signing_key)?; 361 + // Step 1: Apply all write operations to build new MST 362 + let mut updated_tree = self.mst.clone(); 324 363 325 - // Load previous MST to compute diff 326 - let prev_mst = Mst::load(self.storage.clone(), prev_data, None); 327 - let diff = prev_mst.diff(&self.mst).await?; 364 + for op in ops { 365 + updated_tree = match op { 366 + RecordWriteOp::Create { 367 + collection, 368 + rkey, 369 + record, 370 + } => { 371 + let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref()); 328 372 329 - // Collect all MST blocks for persistence 330 - let (_root_cid, mut blocks) = self.mst.collect_blocks().await?; 373 + // Serialize record to DAG-CBOR 374 + let cbor = serde_ipld_dagcbor::to_vec(record) 375 + .map_err(|e| crate::error::RepoError::serialization(e))?; 331 376 332 - // Collect relevant blocks for firehose (walk paths for all changed keys) 333 - let mut relevant_blocks = BTreeMap::new(); 377 + // Compute CID and store data 378 + let cid = self.storage.put(&cbor).await?; 334 379 335 - // Walk paths for creates 336 - for (key, _cid) in &diff.creates { 337 - let path_cids = self.mst.cids_for_path(key.as_str()).await?; 338 - for path_cid in path_cids { 339 - if let Some(block) = blocks.get(&path_cid) { 340 - relevant_blocks.insert(path_cid, block.clone()); 341 - } else if let Some(block) = self.storage.get(&path_cid).await? { 342 - relevant_blocks.insert(path_cid, block); 380 + updated_tree.add(key.as_str(), cid).await? 343 381 } 344 - } 345 - } 382 + RecordWriteOp::Update { 383 + collection, 384 + rkey, 385 + record, 386 + prev, 387 + } => { 388 + let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref()); 389 + 390 + // Serialize record to DAG-CBOR 391 + let cbor = serde_ipld_dagcbor::to_vec(record) 392 + .map_err(|e| crate::error::RepoError::serialization(e))?; 393 + 394 + // Compute CID and store data 395 + let cid = self.storage.put(&cbor).await?; 396 + 397 + // Validate prev if provided 398 + if let Some(prev_cid) = prev { 399 + if &cid != prev_cid { 400 + return Err(crate::error::RepoError::invalid(format!( 401 + "Update prev CID mismatch for key {}: expected {}, got {}", 402 + key, prev_cid, cid 403 + ))); 404 + } 405 + } 346 406 347 - // Walk paths for updates 348 - for (key, _new_cid, _old_cid) in &diff.updates { 349 - let path_cids = self.mst.cids_for_path(key.as_str()).await?; 350 - for path_cid in path_cids { 351 - if let Some(block) = blocks.get(&path_cid) { 352 - relevant_blocks.insert(path_cid, block.clone()); 353 - } else if let Some(block) = self.storage.get(&path_cid).await? { 354 - relevant_blocks.insert(path_cid, block); 407 + updated_tree.add(key.as_str(), cid).await? 355 408 } 356 - } 409 + RecordWriteOp::Delete { 410 + collection, 411 + rkey, 412 + prev, 413 + } => { 414 + let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref()); 415 + 416 + // Check exists 417 + let current = self.mst.get(key.as_str()).await?.ok_or_else(|| { 418 + crate::error::RepoError::not_found("record", key.as_str()) 419 + })?; 420 + 421 + // Validate prev if provided 422 + if let Some(prev_cid) = prev { 423 + if &current != prev_cid { 424 + return Err(crate::error::RepoError::invalid(format!( 425 + "Delete prev CID mismatch for key {}: expected {}, got {}", 426 + key, prev_cid, current 427 + ))); 428 + } 429 + } 430 + 431 + updated_tree.delete(key.as_str()).await? 432 + } 433 + }; 357 434 } 358 435 359 - // Walk paths for deletes (path may not exist in new tree, but walk as far as possible) 360 - for (key, _old_cid) in &diff.deletes { 361 - let path_cids = self.mst.cids_for_path(key.as_str()).await?; 436 + // Step 2: Compute diff and get new MST root 437 + let data = updated_tree.root().await?; 438 + let prev_data = *self.commit.data(); 439 + let diff = self.mst.diff(&updated_tree).await?; 440 + 441 + // Step 3: Extract everything we need from diff before moving it 442 + let new_leaf_blocks = diff.fetch_new_blocks(self.storage.as_ref()).await?; 443 + let repo_ops = diff 444 + .to_repo_ops() 445 + .into_iter() 446 + .map(|op| op.into_static()) 447 + .collect(); 448 + let deleted_cids = diff.removed_cids; 449 + 450 + // Step 4: Use diff.new_mst_blocks instead of collect_blocks() 451 + let mut blocks = diff.new_mst_blocks; 452 + 453 + // Step 5: Build relevant_blocks by walking paths for ORIGINAL operations 454 + let mut relevant_blocks = BTreeMap::new(); 455 + for op in ops { 456 + let key = format_smolstr!("{}/{}", op.collection().as_ref(), op.rkey().as_ref()); 457 + let path_cids = updated_tree.cids_for_path(key.as_str()).await?; 458 + 362 459 for path_cid in path_cids { 363 460 if let Some(block) = blocks.get(&path_cid) { 364 461 relevant_blocks.insert(path_cid, block.clone()); ··· 368 465 } 369 466 } 370 467 371 - // Add commit block to both collections 468 + // Step 6: Add new leaf blocks (record data) to both collections 469 + for (cid, block) in new_leaf_blocks { 470 + blocks.insert(cid, block.clone()); 471 + relevant_blocks.insert(cid, block); 472 + } 473 + 474 + // Step 7: Create and sign commit 475 + let rev = Ticker::new().next(Some(self.commit.rev.clone())); 476 + let commit = Commit::new_unsigned(did.clone().into_static(), data, rev.clone(), prev) 477 + .sign(signing_key)?; 478 + 372 479 let commit_cbor = commit.to_cbor()?; 373 480 let commit_cid = crate::mst::util::compute_cid(&commit_cbor)?; 374 481 let commit_bytes = bytes::Bytes::from(commit_cbor); 482 + 483 + // Step 8: Add commit block to both collections 375 484 blocks.insert(commit_cid, commit_bytes.clone()); 376 485 relevant_blocks.insert(commit_cid, commit_bytes); 377 486 378 - // Convert diff to repository operations 379 - let ops = diff 380 - .to_repo_ops() 381 - .into_iter() 382 - .map(|op| op.into_static()) 383 - .collect(); 487 + // Step 9: Update internal MST state 488 + self.mst = updated_tree; 384 489 385 490 Ok(( 386 - ops, 491 + repo_ops, 387 492 CommitData { 388 493 cid: commit_cid, 389 494 rev, ··· 393 498 prev_data: Some(prev_data), 394 499 blocks, 395 500 relevant_blocks, 501 + deleted_cids, 396 502 }, 397 503 )) 398 504 } ··· 400 506 /// Apply a commit (persist blocks to storage) 401 507 /// 402 508 /// Persists all blocks from `CommitData` and updates internal state. 509 + /// Uses `BlockStore::apply_commit()` to perform atomic write+delete operations. 403 510 pub async fn apply_commit(&mut self, commit_data: CommitData) -> Result<IpldCid> { 404 511 let commit_cid = commit_data.cid; 405 512 406 - // Persist all blocks (MST + commit) 407 - self.storage.put_many(commit_data.blocks).await?; 513 + // Apply commit to storage (writes new blocks, deletes garbage) 514 + self.storage.apply_commit(commit_data).await?; 408 515 409 516 // Load and update internal state 410 517 let commit_bytes = self ··· 425 532 426 533 /// Create a commit for the current repository state 427 534 /// 428 - /// Convenience method that calls `format_commit()` and `apply_commit()`. 535 + /// Convenience method that calls `create_commit()` with no additional operations 536 + /// and `apply_commit()`. Use this after manually updating the MST with individual 537 + /// record operations (e.g., `create_record()`, `update_record()`, `delete_record()`). 429 538 pub async fn commit<K>( 430 539 &mut self, 431 540 did: &Did<'_>, ··· 435 544 where 436 545 K: crate::commit::SigningKey, 437 546 { 438 - let (ops, commit_data) = self.format_commit(did, prev, signing_key).await?; 547 + let (ops, commit_data) = self.create_commit(&[], did, prev, signing_key).await?; 439 548 Ok((ops, self.apply_commit(commit_data).await?)) 440 549 } 441 550 ··· 477 586 use super::*; 478 587 use crate::storage::MemoryBlockStore; 479 588 use jacquard_common::types::recordkey::Rkey; 589 + use smol_str::SmolStr; 480 590 481 591 fn make_test_cid(value: u8) -> IpldCid { 482 592 use crate::DAG_CBOR_CID_CODEC; ··· 488 598 IpldCid::new_v1(DAG_CBOR_CID_CODEC, mh) 489 599 } 490 600 601 + fn make_test_record( 602 + n: u32, 603 + ) -> std::collections::BTreeMap<SmolStr, jacquard_common::types::value::RawData<'static>> { 604 + use jacquard_common::types::value::RawData; 605 + use smol_str::SmolStr; 606 + 607 + let mut record = std::collections::BTreeMap::new(); 608 + record.insert( 609 + SmolStr::new("$type"), 610 + RawData::String("app.bsky.feed.post".into()), 611 + ); 612 + record.insert( 613 + SmolStr::new("text"), 614 + RawData::String(format!("Test post #{}", n).into()), 615 + ); 616 + record.insert( 617 + SmolStr::new("createdAt"), 618 + RawData::String("2024-01-01T00:00:00Z".to_string().into()), 619 + ); 620 + record 621 + } 622 + 491 623 async fn create_test_repo(storage: Arc<MemoryBlockStore>) -> Repository<MemoryBlockStore> { 492 624 let did = Did::new("did:plc:test").unwrap(); 493 625 let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); ··· 508 640 509 641 #[tokio::test] 510 642 async fn test_create_and_get_record() { 643 + use crate::mst::RecordWriteOp; 644 + 511 645 let storage = Arc::new(MemoryBlockStore::new()); 512 646 let mut repo = create_test_repo(storage.clone()).await; 513 647 514 648 let collection = Nsid::new("app.bsky.feed.post").unwrap(); 515 649 let rkey = RecordKey(Rkey::new("abc123").unwrap()); 516 - let cid = make_test_cid(1); 517 650 518 - repo.create_record(&collection, &rkey, cid).await.unwrap(); 651 + let ops = vec![RecordWriteOp::Create { 652 + collection: collection.clone().into_static(), 653 + rkey: rkey.clone(), 654 + record: make_test_record(1), 655 + }]; 656 + 657 + let did = Did::new("did:plc:test").unwrap(); 658 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 659 + let (repo_ops, commit_data) = repo 660 + .create_commit( 661 + &ops, 662 + &did, 663 + Some(repo.current_commit_cid().clone()), 664 + &signing_key, 665 + ) 666 + .await 667 + .unwrap(); 668 + 669 + assert_eq!(repo_ops.len(), 1); 670 + assert_eq!(repo_ops[0].action.as_ref(), "create"); 519 671 520 672 let retrieved = repo.get_record(&collection, &rkey).await.unwrap(); 521 - assert_eq!(retrieved, Some(cid)); 673 + assert!(retrieved.is_some()); 674 + 675 + // Verify data is actually in storage (from commit_data blocks) 676 + let cid = retrieved.unwrap(); 677 + assert!(commit_data.blocks.contains_key(&cid)); 522 678 } 523 679 524 680 #[tokio::test] ··· 598 754 599 755 let result = repo.delete_record(&collection, &rkey).await; 600 756 assert!(result.is_err()); 601 - } 602 - 603 - #[tokio::test] 604 - async fn test_apply_writes() { 605 - let storage = Arc::new(MemoryBlockStore::new()); 606 - let mut repo = create_test_repo(storage).await; 607 - 608 - let ops = vec![ 609 - WriteOp::Create { 610 - key: "app.bsky.feed.post/abc123".into(), 611 - cid: make_test_cid(1), 612 - }, 613 - WriteOp::Create { 614 - key: "app.bsky.feed.post/def456".into(), 615 - cid: make_test_cid(2), 616 - }, 617 - ]; 618 - 619 - let diff = repo.create_writes(&ops).await.unwrap(); 620 - assert_eq!(diff.creates.len(), 2); 621 - assert_eq!(diff.updates.len(), 0); 622 - assert_eq!(diff.deletes.len(), 0); 623 757 } 624 758 625 759 #[tokio::test] ··· 827 961 } 828 962 829 963 #[tokio::test] 964 + async fn test_commit_tracks_deleted_cids() { 965 + use crate::mst::RecordWriteOp; 966 + 967 + let storage = Arc::new(MemoryBlockStore::new()); 968 + let mut repo = create_test_repo(storage.clone()).await; 969 + 970 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 971 + let rkey1 = RecordKey(Rkey::new("test1").unwrap()); 972 + let rkey2 = RecordKey(Rkey::new("test2").unwrap()); 973 + 974 + let did = Did::new("did:plc:test").unwrap(); 975 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 976 + 977 + // Create records with actual data 978 + let create_ops = vec![ 979 + RecordWriteOp::Create { 980 + collection: collection.clone(), 981 + rkey: rkey1.clone(), 982 + record: make_test_record(1), 983 + }, 984 + RecordWriteOp::Create { 985 + collection: collection.clone(), 986 + rkey: rkey2.clone(), 987 + record: make_test_record(2), 988 + }, 989 + ]; 990 + 991 + let (_repo_ops, commit_data) = repo 992 + .create_commit( 993 + &create_ops, 994 + &did, 995 + Some(repo.current_commit_cid().clone()), 996 + &signing_key, 997 + ) 998 + .await 999 + .unwrap(); 1000 + 1001 + let cid1 = repo.get_record(&collection, &rkey1).await.unwrap().unwrap(); 1002 + 1003 + repo.apply_commit(commit_data).await.unwrap(); 1004 + 1005 + // Delete one record and format commit (don't apply yet) 1006 + let delete_ops = vec![RecordWriteOp::Delete { 1007 + collection: collection.clone(), 1008 + rkey: rkey1.clone(), 1009 + prev: None, 1010 + }]; 1011 + 1012 + let (_, commit_data) = repo 1013 + .create_commit( 1014 + &delete_ops, 1015 + &did, 1016 + Some(repo.current_commit_cid().clone()), 1017 + &signing_key, 1018 + ) 1019 + .await 1020 + .unwrap(); 1021 + 1022 + // Verify deleted_cids contains the deleted record CID 1023 + assert_eq!(commit_data.deleted_cids.len(), 1); 1024 + assert_eq!(commit_data.deleted_cids[0], cid1); 1025 + } 1026 + 1027 + #[tokio::test] 1028 + async fn test_record_writes_with_commit_includes_data_blocks() { 1029 + use crate::mst::RecordWriteOp; 1030 + 1031 + let storage = Arc::new(MemoryBlockStore::new()); 1032 + let mut repo = create_test_repo(storage.clone()).await; 1033 + 1034 + let collection = Nsid::new("app.bsky.feed.post").unwrap(); 1035 + let rkey1 = RecordKey(Rkey::new("post1").unwrap()); 1036 + let rkey2 = RecordKey(Rkey::new("post2").unwrap()); 1037 + 1038 + // Create records with actual data 1039 + let ops = vec![ 1040 + RecordWriteOp::Create { 1041 + collection: collection.clone(), 1042 + rkey: rkey1.clone(), 1043 + record: make_test_record(1), 1044 + }, 1045 + RecordWriteOp::Create { 1046 + collection: collection.clone(), 1047 + rkey: rkey2.clone(), 1048 + record: make_test_record(2), 1049 + }, 1050 + ]; 1051 + 1052 + // Format commit 1053 + let did = Did::new("did:plc:test").unwrap(); 1054 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 1055 + let (repo_ops, commit_data) = repo 1056 + .create_commit( 1057 + &ops, 1058 + &did, 1059 + Some(repo.current_commit_cid().clone()), 1060 + &signing_key, 1061 + ) 1062 + .await 1063 + .unwrap(); 1064 + 1065 + let cid1 = repo.get_record(&collection, &rkey1).await.unwrap().unwrap(); 1066 + let cid2 = repo.get_record(&collection, &rkey2).await.unwrap().unwrap(); 1067 + 1068 + // Verify commit data includes record data blocks 1069 + assert!( 1070 + commit_data.blocks.contains_key(&cid1), 1071 + "blocks should contain record 1 data" 1072 + ); 1073 + assert!( 1074 + commit_data.blocks.contains_key(&cid2), 1075 + "blocks should contain record 2 data" 1076 + ); 1077 + assert!( 1078 + commit_data.relevant_blocks.contains_key(&cid1), 1079 + "relevant_blocks should contain record 1 data" 1080 + ); 1081 + assert!( 1082 + commit_data.relevant_blocks.contains_key(&cid2), 1083 + "relevant_blocks should contain record 2 data" 1084 + ); 1085 + 1086 + // Verify we can deserialize the record data 1087 + let record1_bytes = commit_data.blocks.get(&cid1).unwrap(); 1088 + let record1: std::collections::BTreeMap<SmolStr, jacquard_common::types::value::RawData> = 1089 + serde_ipld_dagcbor::from_slice(record1_bytes).unwrap(); 1090 + assert_eq!( 1091 + record1.get(&SmolStr::new("text")).unwrap(), 1092 + &jacquard_common::types::value::RawData::String("Test post #1".to_string().into()) 1093 + ); 1094 + 1095 + // Verify firehose ops 1096 + assert_eq!(repo_ops.len(), 2); 1097 + assert_eq!(repo_ops[0].action.as_ref(), "create"); 1098 + assert_eq!(repo_ops[1].action.as_ref(), "create"); 1099 + } 1100 + 1101 + #[tokio::test] 830 1102 async fn test_batch_mixed_operations() { 1103 + use crate::mst::RecordWriteOp; 1104 + 831 1105 let storage = Arc::new(MemoryBlockStore::new()); 832 1106 let mut repo = create_test_repo(storage.clone()).await; 833 1107 ··· 837 1111 let rkey1 = RecordKey(Rkey::new("existing1").unwrap()); 838 1112 let rkey2 = RecordKey(Rkey::new("existing2").unwrap()); 839 1113 let rkey3 = RecordKey(Rkey::new("existing3").unwrap()); 840 - repo.create_record(&collection, &rkey1, make_test_cid(1)) 841 - .await 842 - .unwrap(); 843 - repo.create_record(&collection, &rkey2, make_test_cid(2)) 844 - .await 845 - .unwrap(); 846 - repo.create_record(&collection, &rkey3, make_test_cid(3)) 1114 + 1115 + let did = Did::new("did:plc:test").unwrap(); 1116 + let signing_key = k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng); 1117 + 1118 + let create_ops = vec![ 1119 + RecordWriteOp::Create { 1120 + collection: collection.clone(), 1121 + rkey: rkey1.clone(), 1122 + record: make_test_record(1), 1123 + }, 1124 + RecordWriteOp::Create { 1125 + collection: collection.clone(), 1126 + rkey: rkey2.clone(), 1127 + record: make_test_record(2), 1128 + }, 1129 + RecordWriteOp::Create { 1130 + collection: collection.clone(), 1131 + rkey: rkey3.clone(), 1132 + record: make_test_record(3), 1133 + }, 1134 + ]; 1135 + 1136 + let (_, commit_data) = repo 1137 + .create_commit( 1138 + &create_ops, 1139 + &did, 1140 + Some(repo.current_commit_cid().clone()), 1141 + &signing_key, 1142 + ) 847 1143 .await 848 1144 .unwrap(); 849 1145 1146 + // Get the CID of existing1 so we can verify it changed 1147 + let old_cid1 = repo.get_record(&collection, &rkey1).await.unwrap().unwrap(); 1148 + 1149 + repo.apply_commit(commit_data).await.unwrap(); 1150 + 850 1151 // Batch operation: create new, update existing, delete existing 1152 + let new_rkey = RecordKey(Rkey::new("new1").unwrap()); 851 1153 let ops = vec![ 852 - WriteOp::Create { 853 - key: format!("{}/{}", collection.as_ref(), "new1").into(), 854 - cid: make_test_cid(10), 1154 + RecordWriteOp::Create { 1155 + collection: collection.clone(), 1156 + rkey: new_rkey.clone(), 1157 + record: make_test_record(10), 855 1158 }, 856 - WriteOp::Update { 857 - key: format!("{}/{}", collection.as_ref(), "existing1").into(), 858 - cid: make_test_cid(11), 1159 + RecordWriteOp::Update { 1160 + collection: collection.clone(), 1161 + rkey: rkey1.clone(), 1162 + record: make_test_record(11), 859 1163 prev: None, 860 1164 }, 861 - WriteOp::Delete { 862 - key: format!("{}/{}", collection.as_ref(), "existing2").into(), 1165 + RecordWriteOp::Delete { 1166 + collection: collection.clone(), 1167 + rkey: rkey2.clone(), 863 1168 prev: None, 864 1169 }, 865 1170 ]; 866 1171 867 - let diff = repo.create_writes(&ops).await.unwrap(); 868 - assert_eq!(diff.creates.len(), 1); 869 - assert_eq!(diff.updates.len(), 1); 870 - assert_eq!(diff.deletes.len(), 1); 1172 + let (repo_ops, _commit_data) = repo 1173 + .create_commit( 1174 + &ops, 1175 + &did, 1176 + Some(repo.current_commit_cid().clone()), 1177 + &signing_key, 1178 + ) 1179 + .await 1180 + .unwrap(); 1181 + 1182 + assert_eq!(repo_ops.len(), 3); 871 1183 872 1184 // Verify final state 873 - let new_rkey = RecordKey(Rkey::new("new1").unwrap()); 874 - assert_eq!( 875 - repo.get_record(&collection, &new_rkey).await.unwrap(), 876 - Some(make_test_cid(10)) 877 - ); 878 - assert_eq!( 879 - repo.get_record(&collection, &rkey1).await.unwrap(), 880 - Some(make_test_cid(11)) 1185 + let new_cid = repo.get_record(&collection, &new_rkey).await.unwrap(); 1186 + assert!(new_cid.is_some(), "new record should exist"); 1187 + 1188 + let updated_cid1 = repo.get_record(&collection, &rkey1).await.unwrap(); 1189 + assert!(updated_cid1.is_some(), "updated record should exist"); 1190 + assert_ne!( 1191 + updated_cid1.unwrap(), 1192 + old_cid1, 1193 + "record should have new CID" 881 1194 ); 1195 + 882 1196 assert_eq!(repo.get_record(&collection, &rkey2).await.unwrap(), None); 883 - assert_eq!( 884 - repo.get_record(&collection, &rkey3).await.unwrap(), 885 - Some(make_test_cid(3)) 1197 + assert!( 1198 + repo.get_record(&collection, &rkey3) 1199 + .await 1200 + .unwrap() 1201 + .is_some() 886 1202 ); 887 1203 } 888 1204 }
+17
crates/jacquard-repo/src/storage/file.rs
··· 144 144 } 145 145 Ok(results) 146 146 } 147 + 148 + async fn apply_commit(&self, commit: crate::repo::CommitData) -> Result<()> { 149 + let mut store = self.blocks.write().unwrap(); 150 + 151 + // First, insert all new blocks 152 + for (cid, data) in commit.blocks { 153 + store.insert(cid, data); 154 + } 155 + 156 + // Then, delete all garbage-collected blocks 157 + for cid in commit.deleted_cids { 158 + store.remove(&cid); 159 + } 160 + 161 + *self.dirty.write().unwrap() = true; 162 + Ok(()) 163 + } 147 164 } 148 165 149 166 #[cfg(test)]
+5
crates/jacquard-repo/src/storage/layered.rs
··· 110 110 111 111 Ok(results) 112 112 } 113 + 114 + async fn apply_commit(&self, commit: crate::repo::CommitData) -> Result<()> { 115 + // All operations go to writable layer only (base layer is read-only) 116 + self.writable.apply_commit(commit).await 117 + } 113 118 } 114 119 115 120 #[cfg(test)]
+16
crates/jacquard-repo/src/storage/memory.rs
··· 119 119 } 120 120 Ok(results) 121 121 } 122 + 123 + async fn apply_commit(&self, commit: crate::repo::CommitData) -> Result<()> { 124 + let mut store = self.blocks.write().unwrap(); 125 + 126 + // First, insert all new blocks 127 + for (cid, data) in commit.blocks { 128 + store.insert(cid, data); 129 + } 130 + 131 + // Then, delete all garbage-collected blocks 132 + for cid in commit.deleted_cids { 133 + store.remove(&cid); 134 + } 135 + 136 + Ok(()) 137 + } 122 138 } 123 139 124 140 #[cfg(test)]
+13 -2
crates/jacquard-repo/src/storage/mod.rs
··· 8 8 /// 9 9 /// Provides CID-keyed block storage for MST nodes, commits, and record data. 10 10 /// Implementations might use: 11 - /// - In-memory HashMap ([`MemoryBlockStore`](memory::MemoryBlockStore)) 12 - /// - CAR file ([`FileBlockStore`](file::FileBlockStore)) 11 + /// - In-memory HashMap ([`MemoryBlockStore`]) 12 + /// - CAR file ([`FileBlockStore`]) 13 13 /// - SQLite/RocksDB (user-provided) 14 14 /// - Remote HTTP storage (user-provided) 15 15 /// ··· 77 77 /// 78 78 /// Returns a vec of the same length as the input, with `None` for missing blocks. 79 79 async fn get_many(&self, cids: &[IpldCid]) -> Result<Vec<Option<Bytes>>>; 80 + 81 + /// Apply a commit (atomic write + delete) 82 + /// 83 + /// Performs validated commit operations on the underlying storage: 84 + /// - Persists all blocks from `commit.blocks` 85 + /// - Deletes blocks listed in `commit.deleted_cids` (garbage collection) 86 + /// 87 + /// This should be atomic where possible - either both operations succeed or both fail. 88 + /// For implementations that don't support atomic operations, writes should happen first, 89 + /// then deletes. 90 + async fn apply_commit(&self, commit: crate::repo::CommitData) -> Result<()>; 80 91 } 81 92 82 93 pub mod file;