Fast and robust atproto CAR file processing in rust
at hash-cost 97 lines 3.1 kB view raw
1/*! 2A robust CAR file -> MST walker for atproto 3 4Small CARs have their blocks buffered in memory. If a configurable memory limit 5is reached while reading blocks, CAR reading is suspended, and can be continued 6by providing disk storage to buffer the CAR blocks instead. 7 8A `process` function can be provided for tasks where records are transformed 9into a smaller representation, to save memory (and disk) during block reading. 10 11Once blocks are loaded, the MST is walked and emitted as chunks of pairs of 12`(rkey, processed_block)` pairs, in order (depth first, left-to-right). 13 14Some MST validations are applied 15- Keys must appear in order 16- Keys must be at the correct MST tree depth 17 18`iroh_car` additionally applies a block size limit of `2MiB`. 19 20``` 21use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 22 23# #[tokio::main] 24# async fn main() -> Result<(), Box<dyn std::error::Error>> { 25# let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26let mut total_size = 0; 27 28match DriverBuilder::new() 29 .with_mem_limit_mb(10) 30 .with_block_processor( 31 |rec| rec.len().to_ne_bytes().to_vec() 32 ) // block processing: just extract the raw record size 33 .load_car(reader) 34 .await? 35{ 36 37 // if all blocks fit within memory 38 Driver::Memory(_commit, mut driver) => { 39 while let Some(chunk) = driver.next_chunk(256).await? { 40 for output in chunk { 41 let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 42 43 total_size += size; 44 } 45 } 46 }, 47 48 // if the CAR was too big for in-memory processing 49 Driver::Disk(paused) => { 50 // set up a disk store we can spill to 51 let store = DiskBuilder::new().open("some/path.db".into()).await?; 52 // do the spilling, get back a (similar) driver 53 let (_commit, mut driver) = paused.finish_loading(store).await?; 54 55 while let Some(chunk) = driver.next_chunk(256).await? { 56 for output in chunk { 57 let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 58 59 total_size += size; 60 } 61 } 62 } 63}; 64println!("sum of size of all records: {total_size}"); 65# Ok(()) 66# } 67``` 68 69Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going 70ahead and eagerly using disk I/O. This means you have to write a bit more code 71to handle both cases, but it allows you to have finer control over resource 72usage. For example, you can drive a number of parallel memory CAR workers, and 73separately have a different number of disk workers picking up suspended disk 74tasks from a queue. 75 76Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples). 77 78*/ 79 80pub mod mst; 81mod walk; 82 83pub mod disk; 84pub mod drive; 85 86pub use disk::{DiskBuilder, DiskError, DiskStore}; 87pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 88pub use mst::Commit; 89pub use walk::Output; 90 91pub type Bytes = Vec<u8>; 92 93pub(crate) use hashbrown::HashMap; 94 95#[doc = include_str!("../readme.md")] 96#[cfg(doctest)] 97pub struct ReadmeDoctests;