Fast and robust atproto CAR file processing in rust
1/*!
2A robust CAR file -> MST walker for atproto
3
4Small CARs have their blocks buffered in memory. If a configurable memory limit
5is reached while reading blocks, CAR reading is suspended, and can be continued
6by providing disk storage to buffer the CAR blocks instead.
7
8A `process` function can be provided for tasks where records are transformed
9into a smaller representation, to save memory (and disk) during block reading.
10
11Once blocks are loaded, the MST is walked and emitted as chunks of pairs of
12`(rkey, processed_block)` pairs, in order (depth first, left-to-right).
13
14Some MST validations are applied
15- Keys must appear in order
16- Keys must be at the correct MST tree depth
17
18`iroh_car` additionally applies a block size limit of `2MiB`.
19
20```
21use repo_stream::{Driver, DriverBuilder, DiskBuilder};
22
23# #[tokio::main]
24# async fn main() -> Result<(), Box<dyn std::error::Error>> {
25# let reader = include_bytes!("../car-samples/tiny.car").as_slice();
26let mut total_size = 0;
27
28match DriverBuilder::new()
29 .with_mem_limit_mb(10)
30 .with_block_processor(
31 |rec| rec.len().to_ne_bytes().to_vec()
32 ) // block processing: just extract the raw record size
33 .load_car(reader)
34 .await?
35{
36
37 // if all blocks fit within memory
38 Driver::Memory(_commit, mut driver) => {
39 while let Some(chunk) = driver.next_chunk(256).await? {
40 for output in chunk {
41 let size = usize::from_ne_bytes(output.data.try_into().unwrap());
42
43 total_size += size;
44 }
45 }
46 },
47
48 // if the CAR was too big for in-memory processing
49 Driver::Disk(paused) => {
50 // set up a disk store we can spill to
51 let store = DiskBuilder::new().open("some/path.db".into()).await?;
52 // do the spilling, get back a (similar) driver
53 let (_commit, mut driver) = paused.finish_loading(store).await?;
54
55 while let Some(chunk) = driver.next_chunk(256).await? {
56 for output in chunk {
57 let size = usize::from_ne_bytes(output.data.try_into().unwrap());
58
59 total_size += size;
60 }
61 }
62 }
63};
64println!("sum of size of all records: {total_size}");
65# Ok(())
66# }
67```
68
69Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going
70ahead and eagerly using disk I/O. This means you have to write a bit more code
71to handle both cases, but it allows you to have finer control over resource
72usage. For example, you can drive a number of parallel memory CAR workers, and
73separately have a different number of disk workers picking up suspended disk
74tasks from a queue.
75
76Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples).
77
78*/
79
80pub mod mst;
81mod walk;
82
83pub mod disk;
84pub mod drive;
85
86pub use disk::{DiskBuilder, DiskError, DiskStore};
87pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop};
88pub use mst::Commit;
89pub use walk::Output;
90
91pub type Bytes = Vec<u8>;
92
93pub(crate) use hashbrown::HashMap;
94
95#[doc = include_str!("../readme.md")]
96#[cfg(doctest)]
97pub struct ReadmeDoctests;