Fast and robust atproto CAR file processing in rust
at hash-cost 582 lines 20 kB view raw
1//! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 3use crate::{ 4 Bytes, HashMap, 5 disk::{DiskError, DiskStore}, 6 mst::MstNode, 7 walk::Output, 8}; 9use cid::Cid; 10use iroh_car::CarReader; 11use multihash_codetable::{Code, MultihashDigest}; 12use std::convert::Infallible; 13use tokio::{io::AsyncRead, sync::mpsc}; 14 15use crate::mst::Commit; 16use crate::walk::{WalkError, Walker}; 17 18/// Errors that can happen while consuming and emitting blocks and records 19#[derive(Debug, thiserror::Error)] 20pub enum DriveError { 21 #[error("Error from iroh_car: {0}")] 22 CarReader(#[from] iroh_car::Error), 23 #[error("Block did not match its CID")] 24 BadCID, 25 #[error("Failed to decode commit block: {0}")] 26 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 27 #[error("The Commit block reference by the root was not found")] 28 MissingCommit, 29 #[error("Failed to walk the mst tree: {0}")] 30 WalkError(#[from] WalkError), 31 #[error("CAR file had no roots")] 32 MissingRoot, 33 #[error("Storage error")] 34 StorageError(#[from] DiskError), 35 #[error("Tried to send on a closed channel")] 36 ChannelSendError, // SendError takes <T> which we don't need 37 #[error("Failed to join a task: {0}")] 38 JoinError(#[from] tokio::task::JoinError), 39} 40 41/// An in-order chunk of Rkey + CID + (processed) Block 42pub type BlockChunk = Vec<Output>; 43 44#[derive(Debug, Clone)] 45pub(crate) enum MaybeProcessedBlock { 46 /// A block that's *probably* a Node (but we can't know yet) 47 /// 48 /// It *can be* a record that suspiciously looks a lot like a node, so we 49 /// cannot eagerly turn it into a Node. We only know for sure what it is 50 /// when we actually walk down the MST 51 Raw(Bytes), 52 /// A processed record from a block that was definitely not a Node 53 /// 54 /// Processing has to be fallible because the CAR can have totally-unused 55 /// blocks, which can just be garbage. since we're eagerly trying to process 56 /// record blocks without knowing for sure that they *are* records, we 57 /// discard any definitely-not-nodes that fail processing and keep their 58 /// error in the buffer for them. if we later try to retreive them as a 59 /// record, then we can surface the error. 60 /// 61 /// If we _never_ needed this block, then we may have wasted a bit of effort 62 /// trying to process it. Oh well. 63 /// 64 /// There's an alternative here, which would be to kick unprocessable blocks 65 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 66 /// surface the typed error later if needed by trying to reprocess. 67 Processed(Bytes), 68} 69 70impl MaybeProcessedBlock { 71 pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self { 72 if MstNode::could_be(&data) { 73 MaybeProcessedBlock::Raw(data) 74 } else { 75 MaybeProcessedBlock::Processed(process(data)) 76 } 77 } 78 pub(crate) fn len(&self) -> usize { 79 match self { 80 MaybeProcessedBlock::Raw(b) => b.len(), 81 MaybeProcessedBlock::Processed(b) => b.len(), 82 } 83 } 84 pub(crate) fn into_bytes(self) -> Bytes { 85 match self { 86 MaybeProcessedBlock::Raw(mut b) => { 87 b.push(0x00); 88 b 89 } 90 MaybeProcessedBlock::Processed(mut b) => { 91 b.push(0x01); 92 b 93 } 94 } 95 } 96 pub(crate) fn from_bytes(mut b: Bytes) -> Self { 97 // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc 98 let suffix = b.pop().unwrap(); 99 if suffix == 0x00 { 100 MaybeProcessedBlock::Raw(b) 101 } else { 102 MaybeProcessedBlock::Processed(b) 103 } 104 } 105} 106 107/// Read a CAR file, buffering blocks in memory or to disk 108pub enum Driver<R: AsyncRead + Unpin> { 109 /// All blocks fit within the memory limit 110 /// 111 /// You probably want to check the commit's signature. You can go ahead and 112 /// walk the MST right away. 113 Memory(Commit, MemDriver), 114 /// Blocks exceed the memory limit 115 /// 116 /// You'll need to provide a disk storage to continue. The commit will be 117 /// returned and can be validated only once all blocks are loaded. 118 Disk(NeedDisk<R>), 119} 120 121/// Processor that just returns the raw blocks 122#[inline] 123pub fn noop(block: Bytes) -> Bytes { 124 block 125} 126 127// iroh-car doesn't verify CIDs!!!!!! 128#[inline(always)] 129fn verify_block(given: Cid, block: &[u8]) -> bool { 130 Cid::new_v1(0x71, Code::Sha2_256.digest(block)) == given 131} 132 133/// Builder-style driver setup 134#[derive(Debug, Clone)] 135pub struct DriverBuilder { 136 pub mem_limit_mb: usize, 137 pub block_processor: fn(Bytes) -> Bytes, 138} 139 140impl Default for DriverBuilder { 141 fn default() -> Self { 142 Self { 143 mem_limit_mb: 16, 144 block_processor: noop, 145 } 146 } 147} 148 149impl DriverBuilder { 150 /// Begin configuring the driver with defaults 151 pub fn new() -> Self { 152 Default::default() 153 } 154 /// Set the in-memory size limit, in MiB 155 /// 156 /// Default: 16 MiB 157 pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 158 self.mem_limit_mb = new_limit; 159 self 160 } 161 162 /// Set the block processor 163 /// 164 /// Default: noop, raw blocks will be emitted 165 pub fn with_block_processor(mut self, new_processor: fn(Bytes) -> Bytes) -> DriverBuilder { 166 self.block_processor = new_processor; 167 self 168 } 169 170 /// Begin processing an atproto MST from a CAR file 171 pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 172 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 173 } 174} 175 176impl<R: AsyncRead + Unpin> Driver<R> { 177 /// Begin processing an atproto MST from a CAR file 178 /// 179 /// Blocks will be loaded, processed, and buffered in memory. If the entire 180 /// processed size is under the `mem_limit_mb` limit, a `Driver::Memory` 181 /// will be returned along with a `Commit` ready for validation. 182 /// 183 /// If the `mem_limit_mb` limit is reached before loading all blocks, the 184 /// partial state will be returned as `Driver::Disk(needed)`, which can be 185 /// resumed by providing a `SqliteStorage` for on-disk block storage. 186 pub async fn load_car( 187 reader: R, 188 process: fn(Bytes) -> Bytes, 189 mem_limit_mb: usize, 190 ) -> Result<Driver<R>, DriveError> { 191 let max_size = mem_limit_mb * 2_usize.pow(20); 192 let mut mem_blocks = HashMap::new(); 193 194 let mut car = CarReader::new(reader).await?; 195 196 let root = *car 197 .header() 198 .roots() 199 .first() 200 .ok_or(DriveError::MissingRoot)?; 201 log::debug!("root: {root:?}"); 202 203 let mut commit = None; 204 205 // try to load all the blocks into memory 206 let mut mem_size = 0; 207 while let Some((cid, data)) = car.next_block().await? { 208 // lkasdjflkajdsflkajsfdlkjasdf 209 if !verify_block(cid, &data) { 210 return Err(DriveError::BadCID); 211 } 212 213 // the root commit is a Special Third Kind of block that we need to make 214 // sure not to optimistically send to the processing function 215 if cid == root { 216 let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 217 commit = Some(c); 218 continue; 219 } 220 221 // remaining possible types: node, record, other. optimistically process 222 let maybe_processed = MaybeProcessedBlock::maybe(process, data); 223 224 // stash (maybe processed) blocks in memory as long as we have room 225 mem_size += maybe_processed.len(); 226 mem_blocks.insert(cid, maybe_processed); 227 if mem_size >= max_size { 228 return Ok(Driver::Disk(NeedDisk { 229 car, 230 root, 231 process, 232 max_size, 233 mem_blocks, 234 commit, 235 })); 236 } 237 } 238 239 // all blocks loaded and we fit in memory! hopefully we found the commit... 240 let commit = commit.ok_or(DriveError::MissingCommit)?; 241 242 // the commit always must point to a Node; empty node => empty MST special case 243 let root_node: MstNode = match mem_blocks 244 .get(&commit.data) 245 .ok_or(DriveError::MissingCommit)? 246 { 247 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 248 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 249 }; 250 let walker = Walker::new(root_node); 251 252 Ok(Driver::Memory( 253 commit, 254 MemDriver { 255 blocks: mem_blocks, 256 walker, 257 process, 258 }, 259 )) 260 } 261} 262 263/// The core driver between the block stream and MST walker 264/// 265/// In the future, PDSs will export CARs in a stream-friendly order that will 266/// enable processing them with tiny memory overhead. But that future is not 267/// here yet. 268/// 269/// CARs are almost always in a stream-unfriendly order, so I'm reverting the 270/// optimistic stream features: we load all block first, then walk the MST. 271/// 272/// This makes things much simpler: we only need to worry about spilling to disk 273/// in one place, and we always have a reasonable expecatation about how much 274/// work the init function will do. We can drop the CAR reader before walking, 275/// so the sync/async boundaries become a little easier to work around. 276#[derive(Debug)] 277pub struct MemDriver { 278 blocks: HashMap<Cid, MaybeProcessedBlock>, 279 walker: Walker, 280 process: fn(Bytes) -> Bytes, 281} 282 283impl MemDriver { 284 /// Step through the record outputs, in rkey order 285 pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 286 let mut out = Vec::with_capacity(n); 287 for _ in 0..n { 288 // walk as far as we can until we run out of blocks or find a record 289 let Some(output) = self.walker.step(&mut self.blocks, self.process)? else { 290 break; 291 }; 292 out.push(output); 293 } 294 if out.is_empty() { 295 Ok(None) 296 } else { 297 Ok(Some(out)) 298 } 299 } 300} 301 302/// A partially memory-loaded car file that needs disk spillover to continue 303pub struct NeedDisk<R: AsyncRead + Unpin> { 304 car: CarReader<R>, 305 root: Cid, 306 process: fn(Bytes) -> Bytes, 307 max_size: usize, 308 mem_blocks: HashMap<Cid, MaybeProcessedBlock>, 309 pub commit: Option<Commit>, 310} 311 312impl<R: AsyncRead + Unpin> NeedDisk<R> { 313 pub async fn finish_loading( 314 mut self, 315 mut store: DiskStore, 316 ) -> Result<(Commit, DiskDriver), DriveError> { 317 // move store in and back out so we can manage lifetimes 318 // dump mem blocks into the store 319 store = tokio::task::spawn(async move { 320 let kvs = self 321 .mem_blocks 322 .into_iter() 323 .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 324 325 store.put_many(kvs)?; 326 Ok::<_, DriveError>(store) 327 }) 328 .await??; 329 330 let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock)>>(1); 331 332 let store_worker = tokio::task::spawn_blocking(move || { 333 while let Some(chunk) = rx.blocking_recv() { 334 let kvs = chunk 335 .into_iter() 336 .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 337 store.put_many(kvs)?; 338 } 339 Ok::<_, DriveError>(store) 340 }); // await later 341 342 // dump the rest to disk (in chunks) 343 log::debug!("dumping the rest of the stream..."); 344 loop { 345 let mut mem_size = 0; 346 let mut chunk = vec![]; 347 loop { 348 let Some((cid, data)) = self.car.next_block().await? else { 349 break; 350 }; 351 352 // lkasdjflkajdsflkajsfdlkjasdf 353 if !verify_block(cid, &data) { 354 return Err(DriveError::BadCID); 355 } 356 357 // we still gotta keep checking for the root since we might not have it 358 if cid == self.root { 359 let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 360 self.commit = Some(c); 361 continue; 362 } 363 364 let data = Bytes::from(data); 365 366 // remaining possible types: node, record, other. optimistically process 367 // TODO: get the actual in-memory size to compute disk spill 368 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 369 mem_size += maybe_processed.len(); 370 chunk.push((cid, maybe_processed)); 371 if mem_size >= (self.max_size / 2) { 372 // soooooo if we're setting the db cache to max_size and then letting 373 // multiple chunks in the queue that are >= max_size, then at any time 374 // we might be using some multiple of max_size? 375 break; 376 } 377 } 378 if chunk.is_empty() { 379 break; 380 } 381 tx.send(chunk) 382 .await 383 .map_err(|_| DriveError::ChannelSendError)?; 384 } 385 drop(tx); 386 log::debug!("done. waiting for worker to finish..."); 387 388 store = store_worker.await??; 389 390 log::debug!("worker finished."); 391 392 let commit = self.commit.ok_or(DriveError::MissingCommit)?; 393 394 // the commit always must point to a Node; empty node => empty MST special case 395 let db_bytes = store 396 .get(&commit.data.to_bytes()) 397 .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))? 398 .ok_or(DriveError::MissingCommit)?; 399 400 let node: MstNode = match MaybeProcessedBlock::from_bytes(db_bytes.to_vec()) { 401 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 402 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?, 403 }; 404 let walker = Walker::new(node); 405 406 Ok(( 407 commit, 408 DiskDriver { 409 process: self.process, 410 state: Some(BigState { store, walker }), 411 }, 412 )) 413 } 414} 415 416struct BigState { 417 store: DiskStore, 418 walker: Walker, 419} 420 421/// MST walker that reads from disk instead of an in-memory hashmap 422pub struct DiskDriver { 423 process: fn(Bytes) -> Bytes, 424 state: Option<BigState>, 425} 426 427// for doctests only 428#[doc(hidden)] 429pub fn _get_fake_disk_driver() -> DiskDriver { 430 DiskDriver { 431 process: noop, 432 state: None, 433 } 434} 435 436impl DiskDriver { 437 /// Walk the MST returning up to `n` rkey + record pairs 438 /// 439 /// ```no_run 440 /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 441 /// # #[tokio::main] 442 /// # async fn main() -> Result<(), DriveError> { 443 /// # let mut disk_driver = _get_fake_disk_driver(); 444 /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 445 /// for output in pairs { 446 /// println!("{}: size={}", output.rkey, output.data.len()); 447 /// } 448 /// } 449 /// # Ok(()) 450 /// # } 451 /// ``` 452 pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 453 let process = self.process; 454 455 // state should only *ever* be None transiently while inside here 456 let mut state = self.state.take().expect("DiskDriver must have Some(state)"); 457 458 // the big pain here is that we don't want to leave self.state in an 459 // invalid state (None), so all the error paths have to make sure it 460 // comes out again. 461 let (state, res) = 462 tokio::task::spawn_blocking(move || -> (BigState, Result<BlockChunk, DriveError>) { 463 let mut out = Vec::with_capacity(n); 464 465 for _ in 0..n { 466 // walk as far as we can until we run out of blocks or find a record 467 let step = match state.walker.disk_step(&mut state.store, process) { 468 Ok(s) => s, 469 Err(e) => { 470 return (state, Err(e.into())); 471 } 472 }; 473 let Some(output) = step else { 474 break; 475 }; 476 out.push(output); 477 } 478 479 (state, Ok::<_, DriveError>(out)) 480 }) 481 .await?; // on tokio JoinError, we'll be left with invalid state :( 482 483 // *must* restore state before dealing with the actual result 484 self.state = Some(state); 485 486 let out = res?; 487 488 if out.is_empty() { 489 Ok(None) 490 } else { 491 Ok(Some(out)) 492 } 493 } 494 495 fn read_tx_blocking( 496 &mut self, 497 n: usize, 498 tx: mpsc::Sender<Result<BlockChunk, DriveError>>, 499 ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> { 500 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 501 502 loop { 503 let mut out: BlockChunk = Vec::with_capacity(n); 504 505 for _ in 0..n { 506 // walk as far as we can until we run out of blocks or find a record 507 508 let step = match walker.disk_step(store, self.process) { 509 Ok(s) => s, 510 Err(e) => return tx.blocking_send(Err(e.into())), 511 }; 512 513 let Some(output) = step else { 514 break; 515 }; 516 out.push(output); 517 } 518 519 if out.is_empty() { 520 break; 521 } 522 tx.blocking_send(Ok(out))?; 523 } 524 525 Ok(()) 526 } 527 528 /// Spawn the disk reading task into a tokio blocking thread 529 /// 530 /// The idea is to avoid so much sending back and forth to the blocking 531 /// thread, letting a blocking task do all the disk reading work and sending 532 /// records and rkeys back through an `mpsc` channel instead. 533 /// 534 /// This might also allow the disk work to continue while processing the 535 /// records. It's still not yet clear if this method actually has much 536 /// benefit over just using `.next_chunk(n)`. 537 /// 538 /// ```no_run 539 /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 540 /// # #[tokio::main] 541 /// # async fn main() -> Result<(), DriveError> { 542 /// # let mut disk_driver = _get_fake_disk_driver(); 543 /// let (mut rx, join) = disk_driver.to_channel(512); 544 /// while let Some(recvd) = rx.recv().await { 545 /// let pairs = recvd?; 546 /// for output in pairs { 547 /// println!("{}: size={}", output.rkey, output.data.len()); 548 /// } 549 /// 550 /// } 551 /// # Ok(()) 552 /// # } 553 /// ``` 554 pub fn to_channel( 555 mut self, 556 n: usize, 557 ) -> ( 558 mpsc::Receiver<Result<BlockChunk, DriveError>>, 559 tokio::task::JoinHandle<Self>, 560 ) { 561 let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1); 562 563 // sketch: this worker is going to be allowed to execute without a join handle 564 let chan_task = tokio::task::spawn_blocking(move || { 565 if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) { 566 log::debug!("big car reader exited early due to dropped receiver channel"); 567 } 568 self 569 }); 570 571 (rx, chan_task) 572 } 573 574 /// Reset the disk storage so it can be reused. 575 /// 576 /// The store is returned, so it can be reused for another `DiskDriver`. 577 pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 578 let BigState { store, .. } = self.state.take().expect("valid state"); 579 store.reset().await?; 580 Ok(store) 581 } 582}