Fast and robust atproto CAR file processing in rust

car slices #2

open opened by bad-example.com targeting main from car-slice
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:hdhoaan3xa3jiuq4fg4mefid/sh.tangled.repo.pull/3mcini6mmze22
+125 -68
Diff #0
+1 -1
benches/huge-car.rs
··· 33 33 let reader = tokio::io::BufReader::new(reader); 34 34 35 35 let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() { 36 - Driver::Memory(_, mem_driver) => mem_driver, 36 + Driver::Memory(_, _, mem_driver) => mem_driver, 37 37 Driver::Disk(_) => panic!("not doing disk for benchmark"), 38 38 }; 39 39
+3 -3
benches/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::Driver; 2 + use repo_stream::{Driver, Step}; 3 3 4 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 5 ··· 40 40 41 41 async fn drive_car(bytes: &[u8]) -> usize { 42 42 let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 43 - Driver::Memory(_, mem_driver) => mem_driver, 43 + Driver::Memory(_, _, mem_driver) => mem_driver, 44 44 Driver::Disk(_) => panic!("not benching big cars here"), 45 45 }; 46 46 47 47 let mut n = 0; 48 - while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 48 + while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() { 49 49 n += pairs.len(); 50 50 } 51 51 n
+10 -7
examples/disk-read-file/main.rs
··· 9 9 static GLOBAL: MiMalloc = MiMalloc; 10 10 11 11 use clap::Parser; 12 - use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 12 + use repo_stream::{DiskBuilder, Driver, DriverBuilder, Step}; 13 13 use std::path::PathBuf; 14 14 use std::time::Instant; 15 15 ··· 42 42 .load_car(reader) 43 43 .await? 44 44 { 45 - Driver::Memory(_, _) => panic!("try this on a bigger car"), 45 + Driver::Memory(_, _, _) => panic!("try this on a bigger car"), 46 46 Driver::Disk(big_stuff) => { 47 47 // we reach here if the repo was too big and needs to be spilled to 48 48 // disk to continue ··· 51 51 let disk_store = DiskBuilder::new().open(tmpfile).await?; 52 52 53 53 // do the spilling, get back a (similar) driver 54 - let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 54 + let (commit, _, driver) = big_stuff.finish_loading(disk_store).await?; 55 55 56 56 // at this point you might want to fetch the account's signing key 57 57 // via the DID from the commit, and then verify the signature. ··· 74 74 // this example uses the disk driver's channel mode: the tree walking is 75 75 // spawned onto a blocking thread, and we get chunks of rkey+blocks back 76 76 let (mut rx, join) = driver.to_channel(512); 77 - while let Some(r) = rx.recv().await { 78 - let pairs = r?; 77 + while let Some(step) = rx.recv().await { 78 + let step = step?; 79 + let Step::Value(outputs) = step else { 80 + break; 81 + }; 79 82 80 83 // keep a count of the total number of blocks seen 81 - n += pairs.len(); 84 + n += outputs.len(); 82 85 83 - for output in pairs { 86 + for output in outputs { 84 87 // for each block, count how many bytes are equal to '0' 85 88 // (this is just an example, you probably want to do something more 86 89 // interesting)
+3 -3
examples/read-file/main.rs
··· 4 4 5 5 extern crate repo_stream; 6 6 use clap::Parser; 7 - use repo_stream::{Driver, DriverBuilder}; 7 + use repo_stream::{Driver, DriverBuilder, Step}; 8 8 use std::path::PathBuf; 9 9 10 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 28 28 .load_car(reader) 29 29 .await? 30 30 { 31 - Driver::Memory(commit, mem_driver) => (commit, mem_driver), 31 + Driver::Memory(commit, _, mem_driver) => (commit, mem_driver), 32 32 Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 33 33 }; 34 34 35 35 log::info!("got commit: {commit:?}"); 36 36 37 37 let mut n = 0; 38 - while let Some(pairs) = driver.next_chunk(256).await? { 38 + while let Step::Value(pairs) = driver.next_chunk(256).await? { 39 39 n += pairs.len(); 40 40 // log::info!("got {rkey:?}"); 41 41 }
+7 -7
readme.md
··· 11 11 [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 12 13 13 ```rust no_run 14 - use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output}; 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output, Step}; 15 15 16 16 #[tokio::main] 17 17 async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 31 31 { 32 32 33 33 // if all blocks fit within memory 34 - Driver::Memory(_commit, mut driver) => { 35 - while let Some(chunk) = driver.next_chunk(256).await? { 34 + Driver::Memory(_commit, _prev_rkey, mut driver) => { 35 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 36 36 for Output { rkey: _, cid: _, data } in chunk { 37 37 let size = usize::from_ne_bytes(data.try_into().unwrap()); 38 38 total_size += size; ··· 45 45 // set up a disk store we can spill to 46 46 let store = DiskBuilder::new().open("some/path.db".into()).await?; 47 47 // do the spilling, get back a (similar) driver 48 - let (_commit, mut driver) = paused.finish_loading(store).await?; 48 + let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?; 49 49 50 - while let Some(chunk) = driver.next_chunk(256).await? { 50 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 51 51 for Output { rkey: _, cid: _, data } in chunk { 52 52 let size = usize::from_ne_bytes(data.try_into().unwrap()); 53 53 total_size += size; ··· 62 62 63 63 more recent todo 64 64 - [ ] add a zero-copy rkyv process function example 65 - - [ ] repo car slices 66 - - [ ] lazy-value stream (rkey -> CID diffing for tap-like `#sync` handling) 65 + - [ ] car slices 66 + - [ ] lazy-value stream (for rkey -> CID diffing; tap-like `#sync` handling; save a fjall record `.get` when not needed) 67 67 - [x] get an *emtpy* car for the test suite 68 68 - [x] implement a max size on disk limit 69 69
+26 -23
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 3 use crate::{ 4 - Bytes, HashMap, 4 + Bytes, HashMap, Rkey, Step, 5 5 disk::{DiskError, DiskStore}, 6 6 mst::MstNode, 7 7 walk::Output, ··· 107 107 /// 108 108 /// You probably want to check the commit's signature. You can go ahead and 109 109 /// walk the MST right away. 110 - Memory(Commit, MemDriver), 110 + Memory(Commit, Option<Rkey>, MemDriver), 111 111 /// Blocks exceed the memory limit 112 112 /// 113 113 /// You'll need to provide a disk storage to continue. The commit will be ··· 237 237 238 238 Ok(Driver::Memory( 239 239 commit, 240 + None, 240 241 MemDriver { 241 242 blocks: mem_blocks, 242 243 walker, ··· 268 269 269 270 impl MemDriver { 270 271 /// Step through the record outputs, in rkey order 271 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 272 + pub async fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, DriveError> { 272 273 let mut out = Vec::with_capacity(n); 273 274 for _ in 0..n { 274 275 // walk as far as we can until we run out of blocks or find a record 275 - let Some(output) = self.walker.step(&mut self.blocks, self.process)? else { 276 + let Step::Value(output) = self.walker.step(&mut self.blocks, self.process)? else { 276 277 break; 277 278 }; 278 279 out.push(output); 279 280 } 280 281 if out.is_empty() { 281 - Ok(None) 282 + Ok(Step::End(None)) 282 283 } else { 283 - Ok(Some(out)) 284 + Ok(Step::Value(out)) 284 285 } 285 286 } 286 287 } ··· 299 300 pub async fn finish_loading( 300 301 mut self, 301 302 mut store: DiskStore, 302 - ) -> Result<(Commit, DiskDriver), DriveError> { 303 + ) -> Result<(Commit, Option<Rkey>, DiskDriver), DriveError> { 303 304 // move store in and back out so we can manage lifetimes 304 305 // dump mem blocks into the store 305 306 store = tokio::task::spawn(async move { ··· 385 386 386 387 Ok(( 387 388 commit, 389 + None, 388 390 DiskDriver { 389 391 process: self.process, 390 392 state: Some(BigState { store, walker }), ··· 417 419 /// Walk the MST returning up to `n` rkey + record pairs 418 420 /// 419 421 /// ```no_run 420 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 422 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop}; 421 423 /// # #[tokio::main] 422 424 /// # async fn main() -> Result<(), DriveError> { 423 425 /// # let mut disk_driver = _get_fake_disk_driver(); 424 - /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 425 - /// for output in pairs { 426 + /// while let Step::Value(outputs) = disk_driver.next_chunk(256).await? { 427 + /// for output in outputs { 426 428 /// println!("{}: size={}", output.rkey, output.data.len()); 427 429 /// } 428 430 /// } 429 431 /// # Ok(()) 430 432 /// # } 431 433 /// ``` 432 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 434 + pub async fn next_chunk(&mut self, n: usize) -> Result<Step<Vec<Output>>, DriveError> { 433 435 let process = self.process; 434 436 435 437 // state should only *ever* be None transiently while inside here ··· 450 452 return (state, Err(e.into())); 451 453 } 452 454 }; 453 - let Some(output) = step else { 455 + let Step::Value(output) = step else { 454 456 break; 455 457 }; 456 458 out.push(output); ··· 466 468 let out = res?; 467 469 468 470 if out.is_empty() { 469 - Ok(None) 471 + Ok(Step::End(None)) 470 472 } else { 471 - Ok(Some(out)) 473 + Ok(Step::Value(out)) 472 474 } 473 475 } 474 476 475 477 fn read_tx_blocking( 476 478 &mut self, 477 479 n: usize, 478 - tx: mpsc::Sender<Result<BlockChunk, DriveError>>, 479 - ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> { 480 + tx: mpsc::Sender<Result<Step<BlockChunk>, DriveError>>, 481 + ) -> Result<(), mpsc::error::SendError<Result<Step<BlockChunk>, DriveError>>> { 480 482 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 481 483 482 484 loop { ··· 490 492 Err(e) => return tx.blocking_send(Err(e.into())), 491 493 }; 492 494 493 - let Some(output) = step else { 495 + let Step::Value(output) = step else { 494 496 break; 495 497 }; 496 498 out.push(output); ··· 499 501 if out.is_empty() { 500 502 break; 501 503 } 502 - tx.blocking_send(Ok(out))?; 504 + tx.blocking_send(Ok(Step::Value(out)))?; 503 505 } 504 506 505 507 Ok(()) ··· 516 518 /// benefit over just using `.next_chunk(n)`. 517 519 /// 518 520 /// ```no_run 519 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 521 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop}; 520 522 /// # #[tokio::main] 521 523 /// # async fn main() -> Result<(), DriveError> { 522 524 /// # let mut disk_driver = _get_fake_disk_driver(); 523 525 /// let (mut rx, join) = disk_driver.to_channel(512); 524 526 /// while let Some(recvd) = rx.recv().await { 525 - /// let pairs = recvd?; 526 - /// for output in pairs { 527 + /// let outputs = recvd?; 528 + /// let Step::Value(outputs) = outputs else { break; }; 529 + /// for output in outputs { 527 530 /// println!("{}: size={}", output.rkey, output.data.len()); 528 531 /// } 529 532 /// ··· 535 538 mut self, 536 539 n: usize, 537 540 ) -> ( 538 - mpsc::Receiver<Result<BlockChunk, DriveError>>, 541 + mpsc::Receiver<Result<Step<BlockChunk>, DriveError>>, 539 542 tokio::task::JoinHandle<Self>, 540 543 ) { 541 - let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1); 544 + let (tx, rx) = mpsc::channel::<Result<Step<BlockChunk>, DriveError>>(1); 542 545 543 546 // sketch: this worker is going to be allowed to execute without a join handle 544 547 let chan_task = tokio::task::spawn_blocking(move || {
+8 -6
src/lib.rs
··· 18 18 `iroh_car` additionally applies a block size limit of `2MiB`. 19 19 20 20 ``` 21 - use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 21 + use repo_stream::{Driver, DriverBuilder, DiskBuilder, Step}; 22 22 23 23 # #[tokio::main] 24 24 # async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 35 35 { 36 36 37 37 // if all blocks fit within memory 38 - Driver::Memory(_commit, mut driver) => { 39 - while let Some(chunk) = driver.next_chunk(256).await? { 38 + Driver::Memory(_commit, _prev_rkey, mut driver) => { 39 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 40 40 for output in chunk { 41 41 let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 42 42 ··· 50 50 // set up a disk store we can spill to 51 51 let store = DiskBuilder::new().open("some/path.db".into()).await?; 52 52 // do the spilling, get back a (similar) driver 53 - let (_commit, mut driver) = paused.finish_loading(store).await?; 53 + let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?; 54 54 55 - while let Some(chunk) = driver.next_chunk(256).await? { 55 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 56 56 for output in chunk { 57 57 let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 58 58 ··· 86 86 pub use disk::{DiskBuilder, DiskError, DiskStore}; 87 87 pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 88 88 pub use mst::Commit; 89 - pub use walk::Output; 89 + pub use walk::{Output, Step}; 90 90 91 91 pub type Bytes = Vec<u8>; 92 + 93 + pub type Rkey = String; 92 94 93 95 pub(crate) use hashbrown::HashMap; 94 96
+4 -3
src/mst.rs
··· 3 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 5 6 + use crate::Rkey; 6 7 use cid::Cid; 7 8 use serde::Deserialize; 8 9 use sha2::{Digest, Sha256}; ··· 54 55 pub things: Vec<NodeThing>, 55 56 } 56 57 57 - #[derive(Debug)] 58 + #[derive(Debug, Clone)] 58 59 pub(crate) struct NodeThing { 59 60 pub(crate) cid: Cid, 60 61 pub(crate) kind: ThingKind, 61 62 } 62 63 63 - #[derive(Debug)] 64 + #[derive(Debug, Clone)] 64 65 pub(crate) enum ThingKind { 65 66 Tree, 66 - Value { rkey: String }, 67 + Value { rkey: Rkey }, 67 68 } 68 69 69 70 impl<'de> Deserialize<'de> for MstNode {
+60 -11
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 3 use crate::mst::{Depth, MstNode, NodeThing, ThingKind}; 4 - use crate::{Bytes, HashMap, disk::DiskStore, drive::MaybeProcessedBlock}; 4 + use crate::{Bytes, HashMap, Rkey, noop, disk::DiskStore, drive::MaybeProcessedBlock}; 5 5 use cid::Cid; 6 6 use std::convert::Infallible; 7 7 ··· 30 30 #[error("MST depth underflow: depth-0 node with child trees")] 31 31 DepthUnderflow, 32 32 #[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")] 33 - RkeyOutOfOrder { prev: String, rkey: String }, 33 + RkeyOutOfOrder { prev: Rkey, rkey: Rkey }, 34 34 } 35 35 36 36 /// Walker outputs 37 37 #[derive(Debug, PartialEq)] 38 38 pub struct Output { 39 - pub rkey: String, 39 + pub rkey: Rkey, 40 40 pub cid: Cid, 41 41 pub data: Bytes, 42 42 } 43 43 44 + #[derive(Debug, PartialEq)] 45 + pub enum Step<T = Output> { 46 + Value(T), 47 + End(Option<Rkey>), 48 + } 49 + 44 50 /// Traverser of an atproto MST 45 51 /// 46 52 /// Walks the tree from left-to-right in depth-first order 47 - #[derive(Debug)] 53 + #[derive(Debug, Clone)] 48 54 pub struct Walker { 49 - prev_rkey: String, 55 + prev_rkey: Rkey, 50 56 root_depth: Depth, 51 57 todo: Vec<Vec<NodeThing>>, 52 58 } ··· 134 140 &mut self, 135 141 blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 136 142 process: impl Fn(Bytes) -> Bytes, 137 - ) -> Result<Option<Output>, WalkError> { 143 + ) -> Result<Step, WalkError> { 138 144 while let Some(NodeThing { cid, kind }) = self.next_todo() { 139 145 let Some(mpb) = blocks.get(&cid) else { 140 146 return Err(WalkError::MissingBlock(cid)); 141 147 }; 142 148 if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? { 143 - return Ok(Some(out)); 149 + return Ok(Step::Value(out)); 150 + } 151 + } 152 + Ok(Step::End(None)) 153 + } 154 + 155 + pub fn step_to_slice_edge( 156 + &mut self, 157 + blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 158 + ) -> Result<Option<Rkey>, WalkError> { 159 + let mut ant = self.clone(); 160 + let mut ant_prev; 161 + let mut rkey_prev = None; 162 + 163 + loop { 164 + ant_prev = ant.clone(); 165 + ant = ant.clone(); 166 + 167 + let Some(NodeThing { cid, kind }) = ant.next_todo() else { 168 + return Ok(None); 169 + }; 170 + 171 + let maybe_mpb = blocks.get(&cid); 172 + 173 + match (&kind, maybe_mpb) { 174 + (ThingKind::Value { rkey: _ }, Some(_)) => { 175 + // oops we took a step too far 176 + *self = ant_prev; 177 + return Ok(rkey_prev); 178 + } 179 + (ThingKind::Value { rkey }, None) => { 180 + if let Some(p) = rkey_prev && *rkey <= p { 181 + return Err(WalkError::MstError(MstError::RkeyOutOfOrder { 182 + rkey: rkey.clone(), 183 + prev: p, 184 + })); 185 + } 186 + rkey_prev = Some(rkey.clone()); 187 + } 188 + (ThingKind::Tree, Some(mpb)) => { 189 + ant.mpb_step(kind, cid, mpb, noop)?; 190 + } 191 + (ThingKind::Tree, None) => { 192 + return Err(WalkError::MissingBlock(cid)); 193 + } 144 194 } 145 195 } 146 - Ok(None) 147 196 } 148 197 149 198 /// blocking!!!!!! ··· 151 200 &mut self, 152 201 blocks: &mut DiskStore, 153 202 process: impl Fn(Bytes) -> Bytes, 154 - ) -> Result<Option<Output>, WalkError> { 203 + ) -> Result<Step, WalkError> { 155 204 while let Some(NodeThing { cid, kind }) = self.next_todo() { 156 205 let Some(block_slice) = blocks.get(&cid.to_bytes())? else { 157 206 return Err(WalkError::MissingBlock(cid)); 158 207 }; 159 208 let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 160 209 if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? { 161 - return Ok(Some(out)); 210 + return Ok(Step::Value(out)); 162 211 } 163 212 } 164 - Ok(None) 213 + Ok(Step::End(None)) 165 214 } 166 215 }
+3 -4
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::Driver; 3 - use repo_stream::Output; 2 + use repo_stream::{Driver, Output, Step}; 4 3 5 4 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 6 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); ··· 21 20 .await 22 21 .unwrap() 23 22 { 24 - Driver::Memory(_commit, mem_driver) => mem_driver, 23 + Driver::Memory(_commit, _, mem_driver) => mem_driver, 25 24 Driver::Disk(_) => panic!("too big"), 26 25 }; 27 26 ··· 30 29 let mut found_bsky_profile = false; 31 30 let mut prev_rkey = "".to_string(); 32 31 33 - while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 32 + while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() { 34 33 for Output { rkey, cid: _, data } in pairs { 35 34 records += 1; 36 35

History

1 round 0 comments
sign up or login to add to the discussion
bad-example.com submitted #0
3 commits
expand
type alias for rkey finally
update api for car slice handling
wonder if this works
no conflicts, ready to merge
expand 0 comments