Fast and robust atproto CAR file processing in rust

car slices #2

open opened by bad-example.com targeting main from car-slice
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:hdhoaan3xa3jiuq4fg4mefid/sh.tangled.repo.pull/3mcini6mmze22
+125 -68
Diff #0
+1 -1
benches/huge-car.rs
··· 33 let reader = tokio::io::BufReader::new(reader); 34 35 let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() { 36 - Driver::Memory(_, mem_driver) => mem_driver, 37 Driver::Disk(_) => panic!("not doing disk for benchmark"), 38 }; 39
··· 33 let reader = tokio::io::BufReader::new(reader); 34 35 let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() { 36 + Driver::Memory(_, _, mem_driver) => mem_driver, 37 Driver::Disk(_) => panic!("not doing disk for benchmark"), 38 }; 39
+3 -3
benches/non-huge-cars.rs
··· 1 extern crate repo_stream; 2 - use repo_stream::Driver; 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 ··· 40 41 async fn drive_car(bytes: &[u8]) -> usize { 42 let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 43 - Driver::Memory(_, mem_driver) => mem_driver, 44 Driver::Disk(_) => panic!("not benching big cars here"), 45 }; 46 47 let mut n = 0; 48 - while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 49 n += pairs.len(); 50 } 51 n
··· 1 extern crate repo_stream; 2 + use repo_stream::{Driver, Step}; 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 ··· 40 41 async fn drive_car(bytes: &[u8]) -> usize { 42 let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 43 + Driver::Memory(_, _, mem_driver) => mem_driver, 44 Driver::Disk(_) => panic!("not benching big cars here"), 45 }; 46 47 let mut n = 0; 48 + while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() { 49 n += pairs.len(); 50 } 51 n
+10 -7
examples/disk-read-file/main.rs
··· 9 static GLOBAL: MiMalloc = MiMalloc; 10 11 use clap::Parser; 12 - use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 13 use std::path::PathBuf; 14 use std::time::Instant; 15 ··· 42 .load_car(reader) 43 .await? 44 { 45 - Driver::Memory(_, _) => panic!("try this on a bigger car"), 46 Driver::Disk(big_stuff) => { 47 // we reach here if the repo was too big and needs to be spilled to 48 // disk to continue ··· 51 let disk_store = DiskBuilder::new().open(tmpfile).await?; 52 53 // do the spilling, get back a (similar) driver 54 - let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 55 56 // at this point you might want to fetch the account's signing key 57 // via the DID from the commit, and then verify the signature. ··· 74 // this example uses the disk driver's channel mode: the tree walking is 75 // spawned onto a blocking thread, and we get chunks of rkey+blocks back 76 let (mut rx, join) = driver.to_channel(512); 77 - while let Some(r) = rx.recv().await { 78 - let pairs = r?; 79 80 // keep a count of the total number of blocks seen 81 - n += pairs.len(); 82 83 - for output in pairs { 84 // for each block, count how many bytes are equal to '0' 85 // (this is just an example, you probably want to do something more 86 // interesting)
··· 9 static GLOBAL: MiMalloc = MiMalloc; 10 11 use clap::Parser; 12 + use repo_stream::{DiskBuilder, Driver, DriverBuilder, Step}; 13 use std::path::PathBuf; 14 use std::time::Instant; 15 ··· 42 .load_car(reader) 43 .await? 44 { 45 + Driver::Memory(_, _, _) => panic!("try this on a bigger car"), 46 Driver::Disk(big_stuff) => { 47 // we reach here if the repo was too big and needs to be spilled to 48 // disk to continue ··· 51 let disk_store = DiskBuilder::new().open(tmpfile).await?; 52 53 // do the spilling, get back a (similar) driver 54 + let (commit, _, driver) = big_stuff.finish_loading(disk_store).await?; 55 56 // at this point you might want to fetch the account's signing key 57 // via the DID from the commit, and then verify the signature. ··· 74 // this example uses the disk driver's channel mode: the tree walking is 75 // spawned onto a blocking thread, and we get chunks of rkey+blocks back 76 let (mut rx, join) = driver.to_channel(512); 77 + while let Some(step) = rx.recv().await { 78 + let step = step?; 79 + let Step::Value(outputs) = step else { 80 + break; 81 + }; 82 83 // keep a count of the total number of blocks seen 84 + n += outputs.len(); 85 86 + for output in outputs { 87 // for each block, count how many bytes are equal to '0' 88 // (this is just an example, you probably want to do something more 89 // interesting)
+3 -3
examples/read-file/main.rs
··· 4 5 extern crate repo_stream; 6 use clap::Parser; 7 - use repo_stream::{Driver, DriverBuilder}; 8 use std::path::PathBuf; 9 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 28 .load_car(reader) 29 .await? 30 { 31 - Driver::Memory(commit, mem_driver) => (commit, mem_driver), 32 Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 33 }; 34 35 log::info!("got commit: {commit:?}"); 36 37 let mut n = 0; 38 - while let Some(pairs) = driver.next_chunk(256).await? { 39 n += pairs.len(); 40 // log::info!("got {rkey:?}"); 41 }
··· 4 5 extern crate repo_stream; 6 use clap::Parser; 7 + use repo_stream::{Driver, DriverBuilder, Step}; 8 use std::path::PathBuf; 9 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 28 .load_car(reader) 29 .await? 30 { 31 + Driver::Memory(commit, _, mem_driver) => (commit, mem_driver), 32 Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 33 }; 34 35 log::info!("got commit: {commit:?}"); 36 37 let mut n = 0; 38 + while let Step::Value(pairs) = driver.next_chunk(256).await? { 39 n += pairs.len(); 40 // log::info!("got {rkey:?}"); 41 }
+7 -7
readme.md
··· 11 [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 13 ```rust no_run 14 - use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output}; 15 16 #[tokio::main] 17 async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 31 { 32 33 // if all blocks fit within memory 34 - Driver::Memory(_commit, mut driver) => { 35 - while let Some(chunk) = driver.next_chunk(256).await? { 36 for Output { rkey: _, cid: _, data } in chunk { 37 let size = usize::from_ne_bytes(data.try_into().unwrap()); 38 total_size += size; ··· 45 // set up a disk store we can spill to 46 let store = DiskBuilder::new().open("some/path.db".into()).await?; 47 // do the spilling, get back a (similar) driver 48 - let (_commit, mut driver) = paused.finish_loading(store).await?; 49 50 - while let Some(chunk) = driver.next_chunk(256).await? { 51 for Output { rkey: _, cid: _, data } in chunk { 52 let size = usize::from_ne_bytes(data.try_into().unwrap()); 53 total_size += size; ··· 62 63 more recent todo 64 - [ ] add a zero-copy rkyv process function example 65 - - [ ] repo car slices 66 - - [ ] lazy-value stream (rkey -> CID diffing for tap-like `#sync` handling) 67 - [x] get an *emtpy* car for the test suite 68 - [x] implement a max size on disk limit 69
··· 11 [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 13 ```rust no_run 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output, Step}; 15 16 #[tokio::main] 17 async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 31 { 32 33 // if all blocks fit within memory 34 + Driver::Memory(_commit, _prev_rkey, mut driver) => { 35 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 36 for Output { rkey: _, cid: _, data } in chunk { 37 let size = usize::from_ne_bytes(data.try_into().unwrap()); 38 total_size += size; ··· 45 // set up a disk store we can spill to 46 let store = DiskBuilder::new().open("some/path.db".into()).await?; 47 // do the spilling, get back a (similar) driver 48 + let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?; 49 50 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 51 for Output { rkey: _, cid: _, data } in chunk { 52 let size = usize::from_ne_bytes(data.try_into().unwrap()); 53 total_size += size; ··· 62 63 more recent todo 64 - [ ] add a zero-copy rkyv process function example 65 + - [ ] car slices 66 + - [ ] lazy-value stream (for rkey -> CID diffing; tap-like `#sync` handling; save a fjall record `.get` when not needed) 67 - [x] get an *emtpy* car for the test suite 68 - [x] implement a max size on disk limit 69
+26 -23
src/drive.rs
··· 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 3 use crate::{ 4 - Bytes, HashMap, 5 disk::{DiskError, DiskStore}, 6 mst::MstNode, 7 walk::Output, ··· 107 /// 108 /// You probably want to check the commit's signature. You can go ahead and 109 /// walk the MST right away. 110 - Memory(Commit, MemDriver), 111 /// Blocks exceed the memory limit 112 /// 113 /// You'll need to provide a disk storage to continue. The commit will be ··· 237 238 Ok(Driver::Memory( 239 commit, 240 MemDriver { 241 blocks: mem_blocks, 242 walker, ··· 268 269 impl MemDriver { 270 /// Step through the record outputs, in rkey order 271 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 272 let mut out = Vec::with_capacity(n); 273 for _ in 0..n { 274 // walk as far as we can until we run out of blocks or find a record 275 - let Some(output) = self.walker.step(&mut self.blocks, self.process)? else { 276 break; 277 }; 278 out.push(output); 279 } 280 if out.is_empty() { 281 - Ok(None) 282 } else { 283 - Ok(Some(out)) 284 } 285 } 286 } ··· 299 pub async fn finish_loading( 300 mut self, 301 mut store: DiskStore, 302 - ) -> Result<(Commit, DiskDriver), DriveError> { 303 // move store in and back out so we can manage lifetimes 304 // dump mem blocks into the store 305 store = tokio::task::spawn(async move { ··· 385 386 Ok(( 387 commit, 388 DiskDriver { 389 process: self.process, 390 state: Some(BigState { store, walker }), ··· 417 /// Walk the MST returning up to `n` rkey + record pairs 418 /// 419 /// ```no_run 420 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 421 /// # #[tokio::main] 422 /// # async fn main() -> Result<(), DriveError> { 423 /// # let mut disk_driver = _get_fake_disk_driver(); 424 - /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 425 - /// for output in pairs { 426 /// println!("{}: size={}", output.rkey, output.data.len()); 427 /// } 428 /// } 429 /// # Ok(()) 430 /// # } 431 /// ``` 432 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 433 let process = self.process; 434 435 // state should only *ever* be None transiently while inside here ··· 450 return (state, Err(e.into())); 451 } 452 }; 453 - let Some(output) = step else { 454 break; 455 }; 456 out.push(output); ··· 466 let out = res?; 467 468 if out.is_empty() { 469 - Ok(None) 470 } else { 471 - Ok(Some(out)) 472 } 473 } 474 475 fn read_tx_blocking( 476 &mut self, 477 n: usize, 478 - tx: mpsc::Sender<Result<BlockChunk, DriveError>>, 479 - ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> { 480 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 481 482 loop { ··· 490 Err(e) => return tx.blocking_send(Err(e.into())), 491 }; 492 493 - let Some(output) = step else { 494 break; 495 }; 496 out.push(output); ··· 499 if out.is_empty() { 500 break; 501 } 502 - tx.blocking_send(Ok(out))?; 503 } 504 505 Ok(()) ··· 516 /// benefit over just using `.next_chunk(n)`. 517 /// 518 /// ```no_run 519 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 520 /// # #[tokio::main] 521 /// # async fn main() -> Result<(), DriveError> { 522 /// # let mut disk_driver = _get_fake_disk_driver(); 523 /// let (mut rx, join) = disk_driver.to_channel(512); 524 /// while let Some(recvd) = rx.recv().await { 525 - /// let pairs = recvd?; 526 - /// for output in pairs { 527 /// println!("{}: size={}", output.rkey, output.data.len()); 528 /// } 529 /// ··· 535 mut self, 536 n: usize, 537 ) -> ( 538 - mpsc::Receiver<Result<BlockChunk, DriveError>>, 539 tokio::task::JoinHandle<Self>, 540 ) { 541 - let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1); 542 543 // sketch: this worker is going to be allowed to execute without a join handle 544 let chan_task = tokio::task::spawn_blocking(move || {
··· 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 3 use crate::{ 4 + Bytes, HashMap, Rkey, Step, 5 disk::{DiskError, DiskStore}, 6 mst::MstNode, 7 walk::Output, ··· 107 /// 108 /// You probably want to check the commit's signature. You can go ahead and 109 /// walk the MST right away. 110 + Memory(Commit, Option<Rkey>, MemDriver), 111 /// Blocks exceed the memory limit 112 /// 113 /// You'll need to provide a disk storage to continue. The commit will be ··· 237 238 Ok(Driver::Memory( 239 commit, 240 + None, 241 MemDriver { 242 blocks: mem_blocks, 243 walker, ··· 269 270 impl MemDriver { 271 /// Step through the record outputs, in rkey order 272 + pub async fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, DriveError> { 273 let mut out = Vec::with_capacity(n); 274 for _ in 0..n { 275 // walk as far as we can until we run out of blocks or find a record 276 + let Step::Value(output) = self.walker.step(&mut self.blocks, self.process)? else { 277 break; 278 }; 279 out.push(output); 280 } 281 if out.is_empty() { 282 + Ok(Step::End(None)) 283 } else { 284 + Ok(Step::Value(out)) 285 } 286 } 287 } ··· 300 pub async fn finish_loading( 301 mut self, 302 mut store: DiskStore, 303 + ) -> Result<(Commit, Option<Rkey>, DiskDriver), DriveError> { 304 // move store in and back out so we can manage lifetimes 305 // dump mem blocks into the store 306 store = tokio::task::spawn(async move { ··· 386 387 Ok(( 388 commit, 389 + None, 390 DiskDriver { 391 process: self.process, 392 state: Some(BigState { store, walker }), ··· 419 /// Walk the MST returning up to `n` rkey + record pairs 420 /// 421 /// ```no_run 422 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop}; 423 /// # #[tokio::main] 424 /// # async fn main() -> Result<(), DriveError> { 425 /// # let mut disk_driver = _get_fake_disk_driver(); 426 + /// while let Step::Value(outputs) = disk_driver.next_chunk(256).await? { 427 + /// for output in outputs { 428 /// println!("{}: size={}", output.rkey, output.data.len()); 429 /// } 430 /// } 431 /// # Ok(()) 432 /// # } 433 /// ``` 434 + pub async fn next_chunk(&mut self, n: usize) -> Result<Step<Vec<Output>>, DriveError> { 435 let process = self.process; 436 437 // state should only *ever* be None transiently while inside here ··· 452 return (state, Err(e.into())); 453 } 454 }; 455 + let Step::Value(output) = step else { 456 break; 457 }; 458 out.push(output); ··· 468 let out = res?; 469 470 if out.is_empty() { 471 + Ok(Step::End(None)) 472 } else { 473 + Ok(Step::Value(out)) 474 } 475 } 476 477 fn read_tx_blocking( 478 &mut self, 479 n: usize, 480 + tx: mpsc::Sender<Result<Step<BlockChunk>, DriveError>>, 481 + ) -> Result<(), mpsc::error::SendError<Result<Step<BlockChunk>, DriveError>>> { 482 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 483 484 loop { ··· 492 Err(e) => return tx.blocking_send(Err(e.into())), 493 }; 494 495 + let Step::Value(output) = step else { 496 break; 497 }; 498 out.push(output); ··· 501 if out.is_empty() { 502 break; 503 } 504 + tx.blocking_send(Ok(Step::Value(out)))?; 505 } 506 507 Ok(()) ··· 518 /// benefit over just using `.next_chunk(n)`. 519 /// 520 /// ```no_run 521 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop}; 522 /// # #[tokio::main] 523 /// # async fn main() -> Result<(), DriveError> { 524 /// # let mut disk_driver = _get_fake_disk_driver(); 525 /// let (mut rx, join) = disk_driver.to_channel(512); 526 /// while let Some(recvd) = rx.recv().await { 527 + /// let outputs = recvd?; 528 + /// let Step::Value(outputs) = outputs else { break; }; 529 + /// for output in outputs { 530 /// println!("{}: size={}", output.rkey, output.data.len()); 531 /// } 532 /// ··· 538 mut self, 539 n: usize, 540 ) -> ( 541 + mpsc::Receiver<Result<Step<BlockChunk>, DriveError>>, 542 tokio::task::JoinHandle<Self>, 543 ) { 544 + let (tx, rx) = mpsc::channel::<Result<Step<BlockChunk>, DriveError>>(1); 545 546 // sketch: this worker is going to be allowed to execute without a join handle 547 let chan_task = tokio::task::spawn_blocking(move || {
+8 -6
src/lib.rs
··· 18 `iroh_car` additionally applies a block size limit of `2MiB`. 19 20 ``` 21 - use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 22 23 # #[tokio::main] 24 # async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 35 { 36 37 // if all blocks fit within memory 38 - Driver::Memory(_commit, mut driver) => { 39 - while let Some(chunk) = driver.next_chunk(256).await? { 40 for output in chunk { 41 let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 42 ··· 50 // set up a disk store we can spill to 51 let store = DiskBuilder::new().open("some/path.db".into()).await?; 52 // do the spilling, get back a (similar) driver 53 - let (_commit, mut driver) = paused.finish_loading(store).await?; 54 55 - while let Some(chunk) = driver.next_chunk(256).await? { 56 for output in chunk { 57 let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 58 ··· 86 pub use disk::{DiskBuilder, DiskError, DiskStore}; 87 pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 88 pub use mst::Commit; 89 - pub use walk::Output; 90 91 pub type Bytes = Vec<u8>; 92 93 pub(crate) use hashbrown::HashMap; 94
··· 18 `iroh_car` additionally applies a block size limit of `2MiB`. 19 20 ``` 21 + use repo_stream::{Driver, DriverBuilder, DiskBuilder, Step}; 22 23 # #[tokio::main] 24 # async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 35 { 36 37 // if all blocks fit within memory 38 + Driver::Memory(_commit, _prev_rkey, mut driver) => { 39 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 40 for output in chunk { 41 let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 42 ··· 50 // set up a disk store we can spill to 51 let store = DiskBuilder::new().open("some/path.db".into()).await?; 52 // do the spilling, get back a (similar) driver 53 + let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?; 54 55 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 56 for output in chunk { 57 let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 58 ··· 86 pub use disk::{DiskBuilder, DiskError, DiskStore}; 87 pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 88 pub use mst::Commit; 89 + pub use walk::{Output, Step}; 90 91 pub type Bytes = Vec<u8>; 92 + 93 + pub type Rkey = String; 94 95 pub(crate) use hashbrown::HashMap; 96
+4 -3
src/mst.rs
··· 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 6 use cid::Cid; 7 use serde::Deserialize; 8 use sha2::{Digest, Sha256}; ··· 54 pub things: Vec<NodeThing>, 55 } 56 57 - #[derive(Debug)] 58 pub(crate) struct NodeThing { 59 pub(crate) cid: Cid, 60 pub(crate) kind: ThingKind, 61 } 62 63 - #[derive(Debug)] 64 pub(crate) enum ThingKind { 65 Tree, 66 - Value { rkey: String }, 67 } 68 69 impl<'de> Deserialize<'de> for MstNode {
··· 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 6 + use crate::Rkey; 7 use cid::Cid; 8 use serde::Deserialize; 9 use sha2::{Digest, Sha256}; ··· 55 pub things: Vec<NodeThing>, 56 } 57 58 + #[derive(Debug, Clone)] 59 pub(crate) struct NodeThing { 60 pub(crate) cid: Cid, 61 pub(crate) kind: ThingKind, 62 } 63 64 + #[derive(Debug, Clone)] 65 pub(crate) enum ThingKind { 66 Tree, 67 + Value { rkey: Rkey }, 68 } 69 70 impl<'de> Deserialize<'de> for MstNode {
+60 -11
src/walk.rs
··· 1 //! Depth-first MST traversal 2 3 use crate::mst::{Depth, MstNode, NodeThing, ThingKind}; 4 - use crate::{Bytes, HashMap, disk::DiskStore, drive::MaybeProcessedBlock}; 5 use cid::Cid; 6 use std::convert::Infallible; 7 ··· 30 #[error("MST depth underflow: depth-0 node with child trees")] 31 DepthUnderflow, 32 #[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")] 33 - RkeyOutOfOrder { prev: String, rkey: String }, 34 } 35 36 /// Walker outputs 37 #[derive(Debug, PartialEq)] 38 pub struct Output { 39 - pub rkey: String, 40 pub cid: Cid, 41 pub data: Bytes, 42 } 43 44 /// Traverser of an atproto MST 45 /// 46 /// Walks the tree from left-to-right in depth-first order 47 - #[derive(Debug)] 48 pub struct Walker { 49 - prev_rkey: String, 50 root_depth: Depth, 51 todo: Vec<Vec<NodeThing>>, 52 } ··· 134 &mut self, 135 blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 136 process: impl Fn(Bytes) -> Bytes, 137 - ) -> Result<Option<Output>, WalkError> { 138 while let Some(NodeThing { cid, kind }) = self.next_todo() { 139 let Some(mpb) = blocks.get(&cid) else { 140 return Err(WalkError::MissingBlock(cid)); 141 }; 142 if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? { 143 - return Ok(Some(out)); 144 } 145 } 146 - Ok(None) 147 } 148 149 /// blocking!!!!!! ··· 151 &mut self, 152 blocks: &mut DiskStore, 153 process: impl Fn(Bytes) -> Bytes, 154 - ) -> Result<Option<Output>, WalkError> { 155 while let Some(NodeThing { cid, kind }) = self.next_todo() { 156 let Some(block_slice) = blocks.get(&cid.to_bytes())? else { 157 return Err(WalkError::MissingBlock(cid)); 158 }; 159 let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 160 if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? { 161 - return Ok(Some(out)); 162 } 163 } 164 - Ok(None) 165 } 166 }
··· 1 //! Depth-first MST traversal 2 3 use crate::mst::{Depth, MstNode, NodeThing, ThingKind}; 4 + use crate::{Bytes, HashMap, Rkey, noop, disk::DiskStore, drive::MaybeProcessedBlock}; 5 use cid::Cid; 6 use std::convert::Infallible; 7 ··· 30 #[error("MST depth underflow: depth-0 node with child trees")] 31 DepthUnderflow, 32 #[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")] 33 + RkeyOutOfOrder { prev: Rkey, rkey: Rkey }, 34 } 35 36 /// Walker outputs 37 #[derive(Debug, PartialEq)] 38 pub struct Output { 39 + pub rkey: Rkey, 40 pub cid: Cid, 41 pub data: Bytes, 42 } 43 44 + #[derive(Debug, PartialEq)] 45 + pub enum Step<T = Output> { 46 + Value(T), 47 + End(Option<Rkey>), 48 + } 49 + 50 /// Traverser of an atproto MST 51 /// 52 /// Walks the tree from left-to-right in depth-first order 53 + #[derive(Debug, Clone)] 54 pub struct Walker { 55 + prev_rkey: Rkey, 56 root_depth: Depth, 57 todo: Vec<Vec<NodeThing>>, 58 } ··· 140 &mut self, 141 blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 142 process: impl Fn(Bytes) -> Bytes, 143 + ) -> Result<Step, WalkError> { 144 while let Some(NodeThing { cid, kind }) = self.next_todo() { 145 let Some(mpb) = blocks.get(&cid) else { 146 return Err(WalkError::MissingBlock(cid)); 147 }; 148 if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? { 149 + return Ok(Step::Value(out)); 150 + } 151 + } 152 + Ok(Step::End(None)) 153 + } 154 + 155 + pub fn step_to_slice_edge( 156 + &mut self, 157 + blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 158 + ) -> Result<Option<Rkey>, WalkError> { 159 + let mut ant = self.clone(); 160 + let mut ant_prev; 161 + let mut rkey_prev = None; 162 + 163 + loop { 164 + ant_prev = ant.clone(); 165 + ant = ant.clone(); 166 + 167 + let Some(NodeThing { cid, kind }) = ant.next_todo() else { 168 + return Ok(None); 169 + }; 170 + 171 + let maybe_mpb = blocks.get(&cid); 172 + 173 + match (&kind, maybe_mpb) { 174 + (ThingKind::Value { rkey: _ }, Some(_)) => { 175 + // oops we took a step too far 176 + *self = ant_prev; 177 + return Ok(rkey_prev); 178 + } 179 + (ThingKind::Value { rkey }, None) => { 180 + if let Some(p) = rkey_prev && *rkey <= p { 181 + return Err(WalkError::MstError(MstError::RkeyOutOfOrder { 182 + rkey: rkey.clone(), 183 + prev: p, 184 + })); 185 + } 186 + rkey_prev = Some(rkey.clone()); 187 + } 188 + (ThingKind::Tree, Some(mpb)) => { 189 + ant.mpb_step(kind, cid, mpb, noop)?; 190 + } 191 + (ThingKind::Tree, None) => { 192 + return Err(WalkError::MissingBlock(cid)); 193 + } 194 } 195 } 196 } 197 198 /// blocking!!!!!! ··· 200 &mut self, 201 blocks: &mut DiskStore, 202 process: impl Fn(Bytes) -> Bytes, 203 + ) -> Result<Step, WalkError> { 204 while let Some(NodeThing { cid, kind }) = self.next_todo() { 205 let Some(block_slice) = blocks.get(&cid.to_bytes())? else { 206 return Err(WalkError::MissingBlock(cid)); 207 }; 208 let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 209 if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? { 210 + return Ok(Step::Value(out)); 211 } 212 } 213 + Ok(Step::End(None)) 214 } 215 }
+3 -4
tests/non-huge-cars.rs
··· 1 extern crate repo_stream; 2 - use repo_stream::Driver; 3 - use repo_stream::Output; 4 5 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 6 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); ··· 21 .await 22 .unwrap() 23 { 24 - Driver::Memory(_commit, mem_driver) => mem_driver, 25 Driver::Disk(_) => panic!("too big"), 26 }; 27 ··· 30 let mut found_bsky_profile = false; 31 let mut prev_rkey = "".to_string(); 32 33 - while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 34 for Output { rkey, cid: _, data } in pairs { 35 records += 1; 36
··· 1 extern crate repo_stream; 2 + use repo_stream::{Driver, Output, Step}; 3 4 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); ··· 20 .await 21 .unwrap() 22 { 23 + Driver::Memory(_commit, _, mem_driver) => mem_driver, 24 Driver::Disk(_) => panic!("too big"), 25 }; 26 ··· 29 let mut found_bsky_profile = false; 30 let mut prev_rkey = "".to_string(); 31 32 + while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() { 33 for Output { rkey, cid: _, data } in pairs { 34 records += 1; 35

History

1 round 0 comments
sign up or login to add to the discussion
bad-example.com submitted #0
3 commits
expand
type alias for rkey finally
update api for car slice handling
wonder if this works
no conflicts, ready to merge
expand 0 comments