Fast and robust atproto CAR file processing in rust

bytes

and hashbrowns

+192 -303
+23 -38
Cargo.lock
··· 27 27 ] 28 28 29 29 [[package]] 30 + name = "allocator-api2" 31 + version = "0.2.21" 32 + source = "registry+https://github.com/rust-lang/crates.io-index" 33 + checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" 34 + 35 + [[package]] 30 36 name = "anes" 31 37 version = "0.1.6" 32 38 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 126 132 ] 127 133 128 134 [[package]] 129 - name = "bincode" 130 - version = "2.0.1" 131 - source = "registry+https://github.com/rust-lang/crates.io-index" 132 - checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" 133 - dependencies = [ 134 - "bincode_derive", 135 - "serde", 136 - "unty", 137 - ] 138 - 139 - [[package]] 140 - name = "bincode_derive" 141 - version = "2.0.1" 142 - source = "registry+https://github.com/rust-lang/crates.io-index" 143 - checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" 144 - dependencies = [ 145 - "virtue", 146 - ] 147 - 148 - [[package]] 149 135 name = "bitflags" 150 136 version = "2.9.4" 151 137 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 174 160 175 161 [[package]] 176 162 name = "bytes" 177 - version = "1.10.1" 163 + version = "1.11.0" 178 164 source = "registry+https://github.com/rust-lang/crates.io-index" 179 - checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 165 + checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" 180 166 181 167 [[package]] 182 168 name = "byteview" ··· 562 548 ] 563 549 564 550 [[package]] 551 + name = "foldhash" 552 + version = "0.2.0" 553 + source = "registry+https://github.com/rust-lang/crates.io-index" 554 + checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" 555 + 556 + [[package]] 565 557 name = "futures" 566 558 version = "0.3.31" 567 559 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 700 692 version = "0.16.1" 701 693 source = "registry+https://github.com/rust-lang/crates.io-index" 702 694 checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 695 + dependencies = [ 696 + "allocator-api2", 697 + "equivalent", 698 + "foldhash", 699 + ] 703 700 704 701 [[package]] 705 702 name = "heck" ··· 853 850 checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb" 854 851 dependencies = [ 855 852 "byteorder-lite", 853 + "bytes", 856 854 "byteview", 857 855 "crossbeam-skiplist", 858 856 "enum_dispatch", ··· 1146 1144 name = "repo-stream" 1147 1145 version = "0.2.2" 1148 1146 dependencies = [ 1149 - "bincode", 1147 + "bytes", 1148 + "cid", 1150 1149 "clap", 1151 1150 "criterion", 1152 1151 "env_logger", 1153 1152 "fjall", 1154 - "futures", 1155 - "futures-core", 1156 - "ipld-core", 1153 + "hashbrown 0.16.1", 1157 1154 "iroh-car", 1158 1155 "log", 1159 1156 "mimalloc", ··· 1505 1502 checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" 1506 1503 1507 1504 [[package]] 1508 - name = "unty" 1509 - version = "0.0.4" 1510 - source = "registry+https://github.com/rust-lang/crates.io-index" 1511 - checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" 1512 - 1513 - [[package]] 1514 1505 name = "utf8parse" 1515 1506 version = "0.2.2" 1516 1507 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1527 1518 version = "0.9.5" 1528 1519 source = "registry+https://github.com/rust-lang/crates.io-index" 1529 1520 checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 1530 - 1531 - [[package]] 1532 - name = "virtue" 1533 - version = "0.0.18" 1534 - source = "registry+https://github.com/rust-lang/crates.io-index" 1535 - checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" 1536 1521 1537 1522 [[package]] 1538 1523 name = "walkdir"
+4 -6
Cargo.toml
··· 7 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 8 9 9 [dependencies] 10 - bincode = { version = "2.0.1", features = ["serde"] } 11 - fjall = { version = "3.0.1", default-features = false } 12 - futures = "0.3.31" 13 - futures-core = "0.3.31" 14 - ipld-core = { version = "0.4.2", features = ["serde"] } 10 + bytes = "1.11.0" 11 + fjall = { version = "3.0.1", default-features = false, features = ["bytes_1"] } 12 + hashbrown = "0.16.1" 13 + cid = { version = "0.11.1", features = ["serde"] } 15 14 iroh-car = "0.5.1" 16 15 log = "0.4.28" 17 - multibase = "0.9.2" 18 16 serde = { version = "1.0.228", features = ["derive"] } 19 17 serde_bytes = "0.11.19" 20 18 serde_ipld_dagcbor = "0.6.4"
+7 -3
benches/huge-car.rs
··· 22 22 let reader = tokio::fs::File::open(filename).await.unwrap(); 23 23 let reader = tokio::io::BufReader::new(reader); 24 24 25 - let mut driver = match Driver::load_car(reader, |block| block.len(), 1024) 26 - .await 27 - .unwrap() 25 + let mut driver = match Driver::load_car( 26 + reader, 27 + |block| block.len().to_le_bytes().to_vec().into(), 28 + 1024, 29 + ) 30 + .await 31 + .unwrap() 28 32 { 29 33 Driver::Memory(_, mem_driver) => mem_driver, 30 34 Driver::Disk(_) => panic!("not doing disk for benchmark"),
+8 -7
benches/non-huge-cars.rs
··· 29 29 } 30 30 31 31 async fn drive_car(bytes: &[u8]) -> usize { 32 - let mut driver = match Driver::load_car(bytes, |block| block.len(), 32) 33 - .await 34 - .unwrap() 35 - { 36 - Driver::Memory(_, mem_driver) => mem_driver, 37 - Driver::Disk(_) => panic!("not benching big cars here"), 38 - }; 32 + let mut driver = 33 + match Driver::load_car(bytes, |block| block.len().to_le_bytes().to_vec().into(), 32) 34 + .await 35 + .unwrap() 36 + { 37 + Driver::Memory(_, mem_driver) => mem_driver, 38 + Driver::Disk(_) => panic!("not benching big cars here"), 39 + }; 39 40 40 41 let mut n = 0; 41 42 while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
+1 -1
examples/read-file/main.rs
··· 24 24 let reader = tokio::io::BufReader::new(reader); 25 25 26 26 let (commit, mut driver) = match DriverBuilder::new() 27 - .with_block_processor(|block| block.len()) 27 + .with_block_processor(|block| block.len().to_ne_bytes().to_vec().into()) 28 28 .load_car(reader) 29 29 .await? 30 30 {
+3 -3
src/disk.rs
··· 18 18 */ 19 19 20 20 use crate::drive::DriveError; 21 + use bytes::Bytes; 21 22 use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy}; 22 23 use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 23 24 use std::path::PathBuf; ··· 140 141 141 142 pub(crate) fn put_many( 142 143 &mut self, 143 - kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 144 + kv: impl Iterator<Item = (Vec<u8>, Bytes)>, 144 145 ) -> Result<(), DriveError> { 145 146 let mut batch = self.db.batch(); 146 - for pair in kv { 147 - let (k, v) = pair?; 147 + for (k, v) in kv { 148 148 self.stored += v.len(); 149 149 if self.stored > self.max_stored { 150 150 return Err(DiskError::MaxSizeExceeded.into());
+99 -106
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 + use crate::HashMap; 3 4 use crate::disk::{DiskError, DiskStore}; 4 - use crate::process::Processable; 5 - use ipld_core::cid::Cid; 5 + use crate::mst::Node; 6 + use bytes::Bytes; 7 + use cid::Cid; 6 8 use iroh_car::CarReader; 7 - use serde::{Deserialize, Serialize}; 8 - use std::collections::HashMap; 9 9 use std::convert::Infallible; 10 10 use tokio::{io::AsyncRead, sync::mpsc}; 11 11 12 - use crate::mst::{Commit, Node}; 12 + use crate::mst::Commit; 13 13 use crate::walk::{Step, WalkError, Walker}; 14 14 15 15 /// Errors that can happen while consuming and emitting blocks and records ··· 29 29 MissingRoot, 30 30 #[error("Storage error")] 31 31 StorageError(#[from] DiskError), 32 - #[error("Encode error: {0}")] 33 - BincodeEncodeError(#[from] bincode::error::EncodeError), 34 32 #[error("Tried to send on a closed channel")] 35 33 ChannelSendError, // SendError takes <T> which we don't need 36 34 #[error("Failed to join a task: {0}")] 37 35 JoinError(#[from] tokio::task::JoinError), 38 36 } 39 37 40 - #[derive(Debug, thiserror::Error)] 41 - pub enum DecodeError { 42 - #[error(transparent)] 43 - BincodeDecodeError(#[from] bincode::error::DecodeError), 44 - #[error("extra bytes remained after decoding")] 45 - ExtraGarbage, 46 - } 47 - 48 38 /// An in-order chunk of Rkey + (processed) Block pairs 49 - pub type BlockChunk<T> = Vec<(String, T)>; 39 + pub type BlockChunk = Vec<(String, Bytes)>; 50 40 51 - #[derive(Debug, Clone, Serialize, Deserialize)] 52 - pub(crate) enum MaybeProcessedBlock<T> { 41 + #[derive(Debug, Clone)] 42 + pub(crate) enum MaybeProcessedBlock { 53 43 /// A block that's *probably* a Node (but we can't know yet) 54 44 /// 55 45 /// It *can be* a record that suspiciously looks a lot like a node, so we 56 46 /// cannot eagerly turn it into a Node. We only know for sure what it is 57 47 /// when we actually walk down the MST 58 - Raw(Vec<u8>), 48 + Raw(Bytes), 59 49 /// A processed record from a block that was definitely not a Node 60 50 /// 61 51 /// Processing has to be fallible because the CAR can have totally-unused ··· 71 61 /// There's an alternative here, which would be to kick unprocessable blocks 72 62 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 73 63 /// surface the typed error later if needed by trying to reprocess. 74 - Processed(T), 75 - } 76 - 77 - impl<T: Processable> Processable for MaybeProcessedBlock<T> { 78 - /// TODO this is probably a little broken 79 - fn get_size(&self) -> usize { 80 - use std::{cmp::max, mem::size_of}; 81 - 82 - // enum is always as big as its biggest member? 83 - let base_size = max(size_of::<Vec<u8>>(), size_of::<T>()); 84 - 85 - let extra = match self { 86 - Self::Raw(bytes) => bytes.len(), 87 - Self::Processed(t) => t.get_size(), 88 - }; 89 - 90 - base_size + extra 91 - } 64 + Processed(Bytes), 92 65 } 93 66 94 - impl<T> MaybeProcessedBlock<T> { 95 - fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self { 67 + impl MaybeProcessedBlock { 68 + pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self { 96 69 if Node::could_be(&data) { 97 70 MaybeProcessedBlock::Raw(data) 98 71 } else { 99 72 MaybeProcessedBlock::Processed(process(data)) 100 73 } 101 74 } 75 + pub(crate) fn len(&self) -> usize { 76 + match self { 77 + MaybeProcessedBlock::Raw(b) => b.len(), 78 + MaybeProcessedBlock::Processed(b) => b.len(), 79 + } 80 + } 81 + pub(crate) fn into_bytes(self) -> Bytes { 82 + match self { 83 + MaybeProcessedBlock::Raw(b) => { 84 + let mut owned = b.try_into_mut().unwrap(); 85 + owned.extend_from_slice(&[0x00]); 86 + owned.into() 87 + } 88 + MaybeProcessedBlock::Processed(b) => { 89 + let mut owned = b.try_into_mut().unwrap(); 90 + owned.extend_from_slice(&[0x01]); 91 + owned.into() 92 + } 93 + } 94 + } 95 + pub(crate) fn from_bytes(mut b: Bytes) -> Self { 96 + // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc 97 + let suffix = b.split_off(b.len() - 1); 98 + if *suffix == [0x00] { 99 + MaybeProcessedBlock::Raw(b) 100 + } else { 101 + MaybeProcessedBlock::Processed(b) 102 + } 103 + } 102 104 } 103 105 104 106 /// Read a CAR file, buffering blocks in memory or to disk 105 - pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 107 + pub enum Driver<R: AsyncRead + Unpin> { 106 108 /// All blocks fit within the memory limit 107 109 /// 108 110 /// You probably want to check the commit's signature. You can go ahead and 109 111 /// walk the MST right away. 110 - Memory(Commit, MemDriver<T>), 112 + Memory(Commit, MemDriver), 111 113 /// Blocks exceed the memory limit 112 114 /// 113 115 /// You'll need to provide a disk storage to continue. The commit will be 114 116 /// returned and can be validated only once all blocks are loaded. 115 - Disk(NeedDisk<R, T>), 117 + Disk(NeedDisk<R>), 116 118 } 117 119 118 120 /// Builder-style driver setup ··· 127 129 } 128 130 } 129 131 132 + /// Processor that just returns the raw blocks 133 + #[inline] 134 + pub fn noop(block: Bytes) -> Bytes { 135 + block 136 + } 137 + 130 138 impl DriverBuilder { 131 139 /// Begin configuring the driver with defaults 132 140 pub fn new() -> Self { ··· 143 151 /// Set the block processor 144 152 /// 145 153 /// Default: noop, raw blocks will be emitted 146 - pub fn with_block_processor<T: Processable>( 154 + pub fn with_block_processor( 147 155 self, 148 - p: fn(Vec<u8>) -> T, 149 - ) -> DriverBuilderWithProcessor<T> { 156 + block_processor: fn(Bytes) -> Bytes, 157 + ) -> DriverBuilderWithProcessor { 150 158 DriverBuilderWithProcessor { 151 159 mem_limit_mb: self.mem_limit_mb, 152 - block_processor: p, 160 + block_processor, 153 161 } 154 162 } 155 163 /// Begin processing an atproto MST from a CAR file 156 - pub async fn load_car<R: AsyncRead + Unpin>( 157 - &self, 158 - reader: R, 159 - ) -> Result<Driver<R, Vec<u8>>, DriveError> { 160 - Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await 164 + pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 165 + Driver::load_car(reader, noop, self.mem_limit_mb).await 161 166 } 162 167 } 163 168 ··· 165 170 /// 166 171 /// start from `DriverBuilder` 167 172 #[derive(Debug, Clone)] 168 - pub struct DriverBuilderWithProcessor<T: Processable> { 173 + pub struct DriverBuilderWithProcessor { 169 174 pub mem_limit_mb: usize, 170 - pub block_processor: fn(Vec<u8>) -> T, 175 + pub block_processor: fn(Bytes) -> Bytes, 171 176 } 172 177 173 - impl<T: Processable> DriverBuilderWithProcessor<T> { 178 + impl DriverBuilderWithProcessor { 174 179 /// Set the in-memory size limit, in MiB 175 180 /// 176 181 /// Default: 16 MiB ··· 179 184 self 180 185 } 181 186 /// Begin processing an atproto MST from a CAR file 182 - pub async fn load_car<R: AsyncRead + Unpin>( 183 - &self, 184 - reader: R, 185 - ) -> Result<Driver<R, T>, DriveError> { 187 + pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 186 188 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 187 189 } 188 190 } 189 191 190 - impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 192 + impl<R: AsyncRead + Unpin> Driver<R> { 191 193 /// Begin processing an atproto MST from a CAR file 192 194 /// 193 195 /// Blocks will be loaded, processed, and buffered in memory. If the entire ··· 199 201 /// resumed by providing a `SqliteStorage` for on-disk block storage. 200 202 pub async fn load_car( 201 203 reader: R, 202 - process: fn(Vec<u8>) -> T, 204 + process: fn(Bytes) -> Bytes, 203 205 mem_limit_mb: usize, 204 - ) -> Result<Driver<R, T>, DriveError> { 206 + ) -> Result<Driver<R>, DriveError> { 205 207 let max_size = mem_limit_mb * 2_usize.pow(20); 206 208 let mut mem_blocks = HashMap::new(); 207 209 ··· 227 229 continue; 228 230 } 229 231 232 + let data = Bytes::from(data); 233 + 230 234 // remaining possible types: node, record, other. optimistically process 231 235 let maybe_processed = MaybeProcessedBlock::maybe(process, data); 232 236 233 237 // stash (maybe processed) blocks in memory as long as we have room 234 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 238 + mem_size += maybe_processed.len(); 235 239 mem_blocks.insert(cid, maybe_processed); 236 240 if mem_size >= max_size { 237 241 return Ok(Driver::Disk(NeedDisk { ··· 275 279 /// work the init function will do. We can drop the CAR reader before walking, 276 280 /// so the sync/async boundaries become a little easier to work around. 277 281 #[derive(Debug)] 278 - pub struct MemDriver<T: Processable> { 279 - blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 282 + pub struct MemDriver { 283 + blocks: HashMap<Cid, MaybeProcessedBlock>, 280 284 walker: Walker, 281 - process: fn(Vec<u8>) -> T, 285 + process: fn(Bytes) -> Bytes, 282 286 } 283 287 284 - impl<T: Processable> MemDriver<T> { 288 + impl MemDriver { 285 289 /// Step through the record outputs, in rkey order 286 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 290 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 287 291 let mut out = Vec::with_capacity(n); 288 292 for _ in 0..n { 289 293 // walk as far as we can until we run out of blocks or find a record ··· 306 310 } 307 311 308 312 /// A partially memory-loaded car file that needs disk spillover to continue 309 - pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> { 313 + pub struct NeedDisk<R: AsyncRead + Unpin> { 310 314 car: CarReader<R>, 311 315 root: Cid, 312 - process: fn(Vec<u8>) -> T, 316 + process: fn(Bytes) -> Bytes, 313 317 max_size: usize, 314 - mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 318 + mem_blocks: HashMap<Cid, MaybeProcessedBlock>, 315 319 pub commit: Option<Commit>, 316 320 } 317 321 318 - fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> { 319 - bincode::serde::encode_to_vec(v, bincode::config::standard()) 320 - } 321 - 322 - pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> { 323 - let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?; 324 - if n != bytes.len() { 325 - return Err(DecodeError::ExtraGarbage); 326 - } 327 - Ok(t) 328 - } 329 - 330 - impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> { 322 + impl<R: AsyncRead + Unpin> NeedDisk<R> { 331 323 pub async fn finish_loading( 332 324 mut self, 333 325 mut store: DiskStore, 334 - ) -> Result<(Commit, DiskDriver<T>), DriveError> { 326 + ) -> Result<(Commit, DiskDriver), DriveError> { 335 327 // move store in and back out so we can manage lifetimes 336 328 // dump mem blocks into the store 337 329 store = tokio::task::spawn(async move { 338 330 let kvs = self 339 331 .mem_blocks 340 332 .into_iter() 341 - .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 333 + .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 342 334 343 335 store.put_many(kvs)?; 344 336 Ok::<_, DriveError>(store) 345 337 }) 346 338 .await??; 347 339 348 - let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 340 + let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock)>>(1); 349 341 350 342 let store_worker = tokio::task::spawn_blocking(move || { 351 343 while let Some(chunk) = rx.blocking_recv() { 352 344 let kvs = chunk 353 345 .into_iter() 354 - .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 346 + .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 355 347 store.put_many(kvs)?; 356 348 } 357 349 Ok::<_, DriveError>(store) ··· 372 364 self.commit = Some(c); 373 365 continue; 374 366 } 367 + 368 + let data = Bytes::from(data); 369 + 375 370 // remaining possible types: node, record, other. optimistically process 376 371 // TODO: get the actual in-memory size to compute disk spill 377 372 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 378 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 373 + mem_size += maybe_processed.len(); 379 374 chunk.push((cid, maybe_processed)); 380 375 if mem_size >= self.max_size { 381 376 // soooooo if we're setting the db cache to max_size and then letting ··· 418 413 } 419 414 420 415 /// MST walker that reads from disk instead of an in-memory hashmap 421 - pub struct DiskDriver<T: Clone> { 422 - process: fn(Vec<u8>) -> T, 416 + pub struct DiskDriver { 417 + process: fn(Bytes) -> Bytes, 423 418 state: Option<BigState>, 424 419 } 425 420 426 421 // for doctests only 427 422 #[doc(hidden)] 428 - pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> { 429 - use crate::process::noop; 423 + pub fn _get_fake_disk_driver() -> DiskDriver { 430 424 DiskDriver { 431 425 process: noop, 432 426 state: None, 433 427 } 434 428 } 435 429 436 - impl<T: Processable + Send + 'static> DiskDriver<T> { 430 + impl DiskDriver { 437 431 /// Walk the MST returning up to `n` rkey + record pairs 438 432 /// 439 433 /// ```no_run 440 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 434 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 441 435 /// # #[tokio::main] 442 436 /// # async fn main() -> Result<(), DriveError> { 443 437 /// # let mut disk_driver = _get_fake_disk_driver(); ··· 449 443 /// # Ok(()) 450 444 /// # } 451 445 /// ``` 452 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 446 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 453 447 let process = self.process; 454 448 455 449 // state should only *ever* be None transiently while inside here ··· 458 452 // the big pain here is that we don't want to leave self.state in an 459 453 // invalid state (None), so all the error paths have to make sure it 460 454 // comes out again. 461 - let (state, res) = tokio::task::spawn_blocking( 462 - move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 455 + let (state, res) = 456 + tokio::task::spawn_blocking(move || -> (BigState, Result<BlockChunk, DriveError>) { 463 457 let mut out = Vec::with_capacity(n); 464 458 465 459 for _ in 0..n { ··· 480 474 } 481 475 482 476 (state, Ok::<_, DriveError>(out)) 483 - }, 484 - ) 485 - .await?; // on tokio JoinError, we'll be left with invalid state :( 477 + }) 478 + .await?; // on tokio JoinError, we'll be left with invalid state :( 486 479 487 480 // *must* restore state before dealing with the actual result 488 481 self.state = Some(state); ··· 499 492 fn read_tx_blocking( 500 493 &mut self, 501 494 n: usize, 502 - tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 503 - ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 495 + tx: mpsc::Sender<Result<BlockChunk, DriveError>>, 496 + ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> { 504 497 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 505 498 506 499 loop { 507 - let mut out: BlockChunk<T> = Vec::with_capacity(n); 500 + let mut out: BlockChunk = Vec::with_capacity(n); 508 501 509 502 for _ in 0..n { 510 503 // walk as far as we can until we run out of blocks or find a record ··· 546 539 /// benefit over just using `.next_chunk(n)`. 547 540 /// 548 541 /// ```no_run 549 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 542 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 550 543 /// # #[tokio::main] 551 544 /// # async fn main() -> Result<(), DriveError> { 552 545 /// # let mut disk_driver = _get_fake_disk_driver(); ··· 565 558 mut self, 566 559 n: usize, 567 560 ) -> ( 568 - mpsc::Receiver<Result<BlockChunk<T>, DriveError>>, 561 + mpsc::Receiver<Result<BlockChunk, DriveError>>, 569 562 tokio::task::JoinHandle<Self>, 570 563 ) { 571 - let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1); 564 + let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1); 572 565 573 566 // sketch: this worker is going to be allowed to execute without a join handle 574 567 let chan_task = tokio::task::spawn_blocking(move || {
+16 -6
src/lib.rs
··· 27 27 28 28 match DriverBuilder::new() 29 29 .with_mem_limit_mb(10) 30 - .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 30 + .with_block_processor( 31 + |rec| rec.len().to_ne_bytes().to_vec().into() 32 + ) // block processing: just extract the raw record size 31 33 .load_car(reader) 32 34 .await? 33 35 { ··· 35 37 // if all blocks fit within memory 36 38 Driver::Memory(_commit, mut driver) => { 37 39 while let Some(chunk) = driver.next_chunk(256).await? { 38 - for (_rkey, size) in chunk { 40 + for (_rkey, bytes) in chunk { 41 + 42 + let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 43 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 44 + 39 45 total_size += size; 40 46 } 41 47 } ··· 49 55 let (_commit, mut driver) = paused.finish_loading(store).await?; 50 56 51 57 while let Some(chunk) = driver.next_chunk(256).await? { 52 - for (_rkey, size) in chunk { 58 + for (_rkey, bytes) in chunk { 59 + 60 + let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 61 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 62 + 53 63 total_size += size; 54 64 } 55 65 } ··· 76 86 77 87 pub mod disk; 78 88 pub mod drive; 79 - pub mod process; 80 89 81 90 pub use disk::{DiskBuilder, DiskError, DiskStore}; 82 - pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk}; 91 + pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 83 92 pub use mst::Commit; 84 - pub use process::Processable; 93 + 94 + pub(crate) use hashbrown::HashMap;
+1 -1
src/mst.rs
··· 3 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 5 6 - use ipld_core::cid::Cid; 6 + use cid::Cid; 7 7 use serde::Deserialize; 8 8 9 9 /// The top-level data object in a repository's tree is a signed commit.
-108
src/process.rs
··· 1 - /*! 2 - Record processor function output trait 3 - 4 - The return type must satisfy the `Processable` trait, which requires: 5 - 6 - - `Clone` because two rkeys can refer to the same record by CID, which may 7 - only appear once in the CAR file. 8 - - `Serialize + DeserializeOwned` so it can be spilled to disk. 9 - 10 - One required function must be implemented, `get_size()`: this should return the 11 - approximate total off-stack size of the type. (the on-stack size will be added 12 - automatically via `std::mem::get_size`). 13 - 14 - Note that it is **not guaranteed** that the `process` function will run on a 15 - block before storing it in memory or on disk: it's not possible to know if a 16 - block is a record without actually walking the MST, so the best we can do is 17 - apply `process` to any block that we know *cannot* be an MST node, and otherwise 18 - store the raw block bytes. 19 - 20 - Here's a silly processing function that just collects 'eyy's found in the raw 21 - record bytes 22 - 23 - ``` 24 - # use repo_stream::Processable; 25 - # use serde::{Serialize, Deserialize}; 26 - #[derive(Debug, Clone, Serialize, Deserialize)] 27 - struct Eyy(usize, String); 28 - 29 - impl Processable for Eyy { 30 - fn get_size(&self) -> usize { 31 - // don't need to compute the usize, it's on the stack 32 - self.1.capacity() // in-mem size from the string's capacity, in bytes 33 - } 34 - } 35 - 36 - fn process(raw: Vec<u8>) -> Vec<Eyy> { 37 - let mut out = Vec::new(); 38 - let to_find = "eyy".as_bytes(); 39 - for i in 0..(raw.len() - 3) { 40 - if &raw[i..(i+3)] == to_find { 41 - out.push(Eyy(i, "eyy".to_string())); 42 - } 43 - } 44 - out 45 - } 46 - ``` 47 - 48 - The memory sizing stuff is a little sketch but probably at least approximately 49 - works. 50 - */ 51 - 52 - use serde::{Serialize, de::DeserializeOwned}; 53 - 54 - /// Output trait for record processing 55 - pub trait Processable: Clone + Serialize + DeserializeOwned { 56 - /// Any additional in-memory size taken by the processed type 57 - /// 58 - /// Do not include stack size (`std::mem::size_of`) 59 - fn get_size(&self) -> usize; 60 - } 61 - 62 - /// Processor that just returns the raw blocks 63 - #[inline] 64 - pub fn noop(block: Vec<u8>) -> Vec<u8> { 65 - block 66 - } 67 - 68 - impl Processable for u8 { 69 - fn get_size(&self) -> usize { 70 - 0 71 - } 72 - } 73 - 74 - impl Processable for usize { 75 - fn get_size(&self) -> usize { 76 - 0 // no additional space taken, just its stack size (newtype is free) 77 - } 78 - } 79 - 80 - impl Processable for String { 81 - fn get_size(&self) -> usize { 82 - self.capacity() 83 - } 84 - } 85 - 86 - impl<Item: Sized + Processable> Processable for Vec<Item> { 87 - fn get_size(&self) -> usize { 88 - let slot_size = std::mem::size_of::<Item>(); 89 - let direct_size = slot_size * self.capacity(); 90 - let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum(); 91 - direct_size + items_referenced_size 92 - } 93 - } 94 - 95 - impl<Item: Processable> Processable for Option<Item> { 96 - fn get_size(&self) -> usize { 97 - self.as_ref().map(|item| item.get_size()).unwrap_or(0) 98 - } 99 - } 100 - 101 - impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> { 102 - fn get_size(&self) -> usize { 103 - match self { 104 - Ok(item) => item.get_size(), 105 - Err(err) => err.get_size(), 106 - } 107 - } 108 - }
+18 -20
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 + use crate::HashMap; 3 4 use crate::disk::DiskStore; 4 - use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 + use crate::drive::MaybeProcessedBlock; 5 6 use crate::mst::Node; 6 - use crate::process::Processable; 7 - use ipld_core::cid::Cid; 7 + use bytes::Bytes; 8 + use cid::Cid; 8 9 use sha2::{Digest, Sha256}; 9 - use std::collections::HashMap; 10 10 use std::convert::Infallible; 11 11 12 12 /// Errors that can happen while walking ··· 20 20 MstError(#[from] MstError), 21 21 #[error("storage error: {0}")] 22 22 StorageError(#[from] fjall::Error), 23 - #[error("Decode error: {0}")] 24 - DecodeError(#[from] DecodeError), 25 23 } 26 24 27 25 /// Errors from invalid Rkeys ··· 45 43 46 44 /// Walker outputs 47 45 #[derive(Debug)] 48 - pub enum Step<T> { 46 + pub enum Step { 49 47 /// We needed this CID but it's not in the block store 50 48 Missing(Cid), 51 49 /// Reached the end of the MST! yay! 52 50 Finish, 53 51 /// A record was found! 54 - Found { rkey: String, data: T }, 52 + Found { rkey: String, data: Bytes }, 55 53 } 56 54 57 55 #[derive(Debug, Clone, PartialEq)] ··· 176 174 } 177 175 178 176 /// Advance through nodes until we find a record or can't go further 179 - pub fn step<T: Processable>( 177 + pub fn step( 180 178 &mut self, 181 - blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 182 - process: impl Fn(Vec<u8>) -> T, 183 - ) -> Result<Step<T>, WalkError> { 179 + blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 180 + process: impl Fn(Bytes) -> Bytes, 181 + ) -> Result<Step, WalkError> { 184 182 loop { 185 183 let Some(need) = self.stack.last_mut() else { 186 184 log::trace!("tried to walk but we're actually done."); ··· 216 214 }; 217 215 let rkey = rkey.clone(); 218 216 let data = match data { 219 - MaybeProcessedBlock::Raw(data) => process(data.to_vec()), 217 + MaybeProcessedBlock::Raw(data) => process(data.clone()), 220 218 MaybeProcessedBlock::Processed(t) => t.clone(), 221 219 }; 222 220 ··· 237 235 } 238 236 239 237 /// blocking!!!!!! 240 - pub fn disk_step<T: Processable>( 238 + pub fn disk_step( 241 239 &mut self, 242 240 reader: &mut DiskStore, 243 - process: impl Fn(Vec<u8>) -> T, 244 - ) -> Result<Step<T>, WalkError> { 241 + process: impl Fn(Bytes) -> Bytes, 242 + ) -> Result<Step, WalkError> { 245 243 loop { 246 244 let Some(need) = self.stack.last_mut() else { 247 245 log::trace!("tried to walk but we're actually done."); ··· 252 250 &mut Need::Node { depth, cid } => { 253 251 let cid_bytes = cid.to_bytes(); 254 252 log::trace!("need node {cid:?}"); 255 - let Some(block_bytes) = reader.get(&cid_bytes)? else { 253 + let Some(block_slice) = reader.get(&cid_bytes)? else { 256 254 log::trace!("node not found, resting"); 257 255 return Ok(Step::Missing(cid)); 258 256 }; 259 257 260 - let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 258 + let block = MaybeProcessedBlock::from_bytes(block_slice.into()); // TODO shouldn't fjalls slice already be bytes 261 259 262 260 let MaybeProcessedBlock::Raw(data) = block else { 263 261 return Err(WalkError::BadCommitFingerprint); ··· 274 272 Need::Record { rkey, cid } => { 275 273 log::trace!("need record {cid:?}"); 276 274 let cid_bytes = cid.to_bytes(); 277 - let Some(data_bytes) = reader.get(&cid_bytes)? else { 275 + let Some(data_slice) = reader.get(&cid_bytes)? else { 278 276 log::trace!("record block not found, resting"); 279 277 return Ok(Step::Missing(*cid)); 280 278 }; 281 - let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?; 279 + let data = MaybeProcessedBlock::from_bytes(data_slice.into()); 282 280 let rkey = rkey.clone(); 283 281 let data = match data { 284 282 MaybeProcessedBlock::Raw(data) => process(data),
+12 -4
tests/non-huge-cars.rs
··· 12 12 expected_sum: usize, 13 13 expect_profile: bool, 14 14 ) { 15 - let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 16 - .await 17 - .unwrap() 15 + let mut driver = match Driver::load_car( 16 + bytes, 17 + |block| block.len().to_ne_bytes().to_vec().into(), 18 + 10, /* MiB */ 19 + ) 20 + .await 21 + .unwrap() 18 22 { 19 23 Driver::Memory(_commit, mem_driver) => mem_driver, 20 24 Driver::Disk(_) => panic!("too big"), ··· 26 30 let mut prev_rkey = "".to_string(); 27 31 28 32 while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 29 - for (rkey, size) in pairs { 33 + for (rkey, bytes) in pairs { 30 34 records += 1; 35 + 36 + let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 37 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 38 + 31 39 sum += size; 32 40 if rkey == "app.bsky.actor.profile/self" { 33 41 found_bsky_profile = true;