Fast and robust atproto CAR file processing in rust
1//! Consume a CAR from an AsyncRead, producing an ordered stream of records
2
3use crate::{
4 Bytes, HashMap,
5 disk::{DiskError, DiskStore},
6 mst::MstNode,
7 walk::Output,
8};
9use cid::Cid;
10use iroh_car::CarReader;
11use multihash_codetable::{Code, MultihashDigest};
12use std::convert::Infallible;
13use tokio::{io::AsyncRead, sync::mpsc};
14
15use crate::mst::Commit;
16use crate::walk::{WalkError, Walker};
17
18/// Errors that can happen while consuming and emitting blocks and records
19#[derive(Debug, thiserror::Error)]
20pub enum DriveError {
21 #[error("Error from iroh_car: {0}")]
22 CarReader(#[from] iroh_car::Error),
23 #[error("Block did not match its CID")]
24 BadCID,
25 #[error("Failed to decode commit block: {0}")]
26 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
27 #[error("The Commit block reference by the root was not found")]
28 MissingCommit,
29 #[error("Failed to walk the mst tree: {0}")]
30 WalkError(#[from] WalkError),
31 #[error("CAR file had no roots")]
32 MissingRoot,
33 #[error("Storage error")]
34 StorageError(#[from] DiskError),
35 #[error("Tried to send on a closed channel")]
36 ChannelSendError, // SendError takes <T> which we don't need
37 #[error("Failed to join a task: {0}")]
38 JoinError(#[from] tokio::task::JoinError),
39}
40
41/// An in-order chunk of Rkey + CID + (processed) Block
42pub type BlockChunk = Vec<Output>;
43
44#[derive(Debug, Clone)]
45pub(crate) enum MaybeProcessedBlock {
46 /// A block that's *probably* a Node (but we can't know yet)
47 ///
48 /// It *can be* a record that suspiciously looks a lot like a node, so we
49 /// cannot eagerly turn it into a Node. We only know for sure what it is
50 /// when we actually walk down the MST
51 Raw(Bytes),
52 /// A processed record from a block that was definitely not a Node
53 ///
54 /// Processing has to be fallible because the CAR can have totally-unused
55 /// blocks, which can just be garbage. since we're eagerly trying to process
56 /// record blocks without knowing for sure that they *are* records, we
57 /// discard any definitely-not-nodes that fail processing and keep their
58 /// error in the buffer for them. if we later try to retreive them as a
59 /// record, then we can surface the error.
60 ///
61 /// If we _never_ needed this block, then we may have wasted a bit of effort
62 /// trying to process it. Oh well.
63 ///
64 /// There's an alternative here, which would be to kick unprocessable blocks
65 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could
66 /// surface the typed error later if needed by trying to reprocess.
67 Processed(Bytes),
68}
69
70impl MaybeProcessedBlock {
71 pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self {
72 if MstNode::could_be(&data) {
73 MaybeProcessedBlock::Raw(data)
74 } else {
75 MaybeProcessedBlock::Processed(process(data))
76 }
77 }
78 pub(crate) fn len(&self) -> usize {
79 match self {
80 MaybeProcessedBlock::Raw(b) => b.len(),
81 MaybeProcessedBlock::Processed(b) => b.len(),
82 }
83 }
84 pub(crate) fn into_bytes(self) -> Bytes {
85 match self {
86 MaybeProcessedBlock::Raw(mut b) => {
87 b.push(0x00);
88 b
89 }
90 MaybeProcessedBlock::Processed(mut b) => {
91 b.push(0x01);
92 b
93 }
94 }
95 }
96 pub(crate) fn from_bytes(mut b: Bytes) -> Self {
97 // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc
98 let suffix = b.pop().unwrap();
99 if suffix == 0x00 {
100 MaybeProcessedBlock::Raw(b)
101 } else {
102 MaybeProcessedBlock::Processed(b)
103 }
104 }
105}
106
107/// Read a CAR file, buffering blocks in memory or to disk
108pub enum Driver<R: AsyncRead + Unpin> {
109 /// All blocks fit within the memory limit
110 ///
111 /// You probably want to check the commit's signature. You can go ahead and
112 /// walk the MST right away.
113 Memory(Commit, MemDriver),
114 /// Blocks exceed the memory limit
115 ///
116 /// You'll need to provide a disk storage to continue. The commit will be
117 /// returned and can be validated only once all blocks are loaded.
118 Disk(NeedDisk<R>),
119}
120
121/// Processor that just returns the raw blocks
122#[inline]
123pub fn noop(block: Bytes) -> Bytes {
124 block
125}
126
127// iroh-car doesn't verify CIDs!!!!!!
128#[inline(always)]
129fn verify_block(given: Cid, block: &[u8]) -> bool {
130 Cid::new_v1(0x71, Code::Sha2_256.digest(block)) == given
131}
132
133/// Builder-style driver setup
134#[derive(Debug, Clone)]
135pub struct DriverBuilder {
136 pub mem_limit_mb: usize,
137 pub block_processor: fn(Bytes) -> Bytes,
138}
139
140impl Default for DriverBuilder {
141 fn default() -> Self {
142 Self {
143 mem_limit_mb: 16,
144 block_processor: noop,
145 }
146 }
147}
148
149impl DriverBuilder {
150 /// Begin configuring the driver with defaults
151 pub fn new() -> Self {
152 Default::default()
153 }
154 /// Set the in-memory size limit, in MiB
155 ///
156 /// Default: 16 MiB
157 pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self {
158 self.mem_limit_mb = new_limit;
159 self
160 }
161
162 /// Set the block processor
163 ///
164 /// Default: noop, raw blocks will be emitted
165 pub fn with_block_processor(mut self, new_processor: fn(Bytes) -> Bytes) -> DriverBuilder {
166 self.block_processor = new_processor;
167 self
168 }
169
170 /// Begin processing an atproto MST from a CAR file
171 pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> {
172 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
173 }
174}
175
176impl<R: AsyncRead + Unpin> Driver<R> {
177 /// Begin processing an atproto MST from a CAR file
178 ///
179 /// Blocks will be loaded, processed, and buffered in memory. If the entire
180 /// processed size is under the `mem_limit_mb` limit, a `Driver::Memory`
181 /// will be returned along with a `Commit` ready for validation.
182 ///
183 /// If the `mem_limit_mb` limit is reached before loading all blocks, the
184 /// partial state will be returned as `Driver::Disk(needed)`, which can be
185 /// resumed by providing a `SqliteStorage` for on-disk block storage.
186 pub async fn load_car(
187 reader: R,
188 process: fn(Bytes) -> Bytes,
189 mem_limit_mb: usize,
190 ) -> Result<Driver<R>, DriveError> {
191 let max_size = mem_limit_mb * 2_usize.pow(20);
192 let mut mem_blocks = HashMap::new();
193
194 let mut car = CarReader::new(reader).await?;
195
196 let root = *car
197 .header()
198 .roots()
199 .first()
200 .ok_or(DriveError::MissingRoot)?;
201 log::debug!("root: {root:?}");
202
203 let mut commit = None;
204
205 // try to load all the blocks into memory
206 let mut mem_size = 0;
207 while let Some((cid, data)) = car.next_block().await? {
208 // lkasdjflkajdsflkajsfdlkjasdf
209 if !verify_block(cid, &data) {
210 return Err(DriveError::BadCID);
211 }
212
213 // the root commit is a Special Third Kind of block that we need to make
214 // sure not to optimistically send to the processing function
215 if cid == root {
216 let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
217 commit = Some(c);
218 continue;
219 }
220
221 // remaining possible types: node, record, other. optimistically process
222 let maybe_processed = MaybeProcessedBlock::maybe(process, data);
223
224 // stash (maybe processed) blocks in memory as long as we have room
225 mem_size += maybe_processed.len();
226 mem_blocks.insert(cid, maybe_processed);
227 if mem_size >= max_size {
228 return Ok(Driver::Disk(NeedDisk {
229 car,
230 root,
231 process,
232 max_size,
233 mem_blocks,
234 commit,
235 }));
236 }
237 }
238
239 // all blocks loaded and we fit in memory! hopefully we found the commit...
240 let commit = commit.ok_or(DriveError::MissingCommit)?;
241
242 // the commit always must point to a Node; empty node => empty MST special case
243 let root_node: MstNode = match mem_blocks
244 .get(&commit.data)
245 .ok_or(DriveError::MissingCommit)?
246 {
247 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?,
248 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?,
249 };
250 let walker = Walker::new(root_node);
251
252 Ok(Driver::Memory(
253 commit,
254 MemDriver {
255 blocks: mem_blocks,
256 walker,
257 process,
258 },
259 ))
260 }
261}
262
263/// The core driver between the block stream and MST walker
264///
265/// In the future, PDSs will export CARs in a stream-friendly order that will
266/// enable processing them with tiny memory overhead. But that future is not
267/// here yet.
268///
269/// CARs are almost always in a stream-unfriendly order, so I'm reverting the
270/// optimistic stream features: we load all block first, then walk the MST.
271///
272/// This makes things much simpler: we only need to worry about spilling to disk
273/// in one place, and we always have a reasonable expecatation about how much
274/// work the init function will do. We can drop the CAR reader before walking,
275/// so the sync/async boundaries become a little easier to work around.
276#[derive(Debug)]
277pub struct MemDriver {
278 blocks: HashMap<Cid, MaybeProcessedBlock>,
279 walker: Walker,
280 process: fn(Bytes) -> Bytes,
281}
282
283impl MemDriver {
284 /// Step through the record outputs, in rkey order
285 pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
286 let mut out = Vec::with_capacity(n);
287 for _ in 0..n {
288 // walk as far as we can until we run out of blocks or find a record
289 let Some(output) = self.walker.step(&mut self.blocks, self.process)? else {
290 break;
291 };
292 out.push(output);
293 }
294 if out.is_empty() {
295 Ok(None)
296 } else {
297 Ok(Some(out))
298 }
299 }
300}
301
302/// A partially memory-loaded car file that needs disk spillover to continue
303pub struct NeedDisk<R: AsyncRead + Unpin> {
304 car: CarReader<R>,
305 root: Cid,
306 process: fn(Bytes) -> Bytes,
307 max_size: usize,
308 mem_blocks: HashMap<Cid, MaybeProcessedBlock>,
309 pub commit: Option<Commit>,
310}
311
312impl<R: AsyncRead + Unpin> NeedDisk<R> {
313 pub async fn finish_loading(
314 mut self,
315 mut store: DiskStore,
316 ) -> Result<(Commit, DiskDriver), DriveError> {
317 // move store in and back out so we can manage lifetimes
318 // dump mem blocks into the store
319 store = tokio::task::spawn(async move {
320 let kvs = self
321 .mem_blocks
322 .into_iter()
323 .map(|(k, v)| (k.to_bytes(), v.into_bytes()));
324
325 store.put_many(kvs)?;
326 Ok::<_, DriveError>(store)
327 })
328 .await??;
329
330 let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock)>>(1);
331
332 let store_worker = tokio::task::spawn_blocking(move || {
333 while let Some(chunk) = rx.blocking_recv() {
334 let kvs = chunk
335 .into_iter()
336 .map(|(k, v)| (k.to_bytes(), v.into_bytes()));
337 store.put_many(kvs)?;
338 }
339 Ok::<_, DriveError>(store)
340 }); // await later
341
342 // dump the rest to disk (in chunks)
343 log::debug!("dumping the rest of the stream...");
344 loop {
345 let mut mem_size = 0;
346 let mut chunk = vec![];
347 loop {
348 let Some((cid, data)) = self.car.next_block().await? else {
349 break;
350 };
351
352 // lkasdjflkajdsflkajsfdlkjasdf
353 if !verify_block(cid, &data) {
354 return Err(DriveError::BadCID);
355 }
356
357 // we still gotta keep checking for the root since we might not have it
358 if cid == self.root {
359 let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
360 self.commit = Some(c);
361 continue;
362 }
363
364 let data = Bytes::from(data);
365
366 // remaining possible types: node, record, other. optimistically process
367 // TODO: get the actual in-memory size to compute disk spill
368 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data);
369 mem_size += maybe_processed.len();
370 chunk.push((cid, maybe_processed));
371 if mem_size >= (self.max_size / 2) {
372 // soooooo if we're setting the db cache to max_size and then letting
373 // multiple chunks in the queue that are >= max_size, then at any time
374 // we might be using some multiple of max_size?
375 break;
376 }
377 }
378 if chunk.is_empty() {
379 break;
380 }
381 tx.send(chunk)
382 .await
383 .map_err(|_| DriveError::ChannelSendError)?;
384 }
385 drop(tx);
386 log::debug!("done. waiting for worker to finish...");
387
388 store = store_worker.await??;
389
390 log::debug!("worker finished.");
391
392 let commit = self.commit.ok_or(DriveError::MissingCommit)?;
393
394 // the commit always must point to a Node; empty node => empty MST special case
395 let db_bytes = store
396 .get(&commit.data.to_bytes())
397 .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))?
398 .ok_or(DriveError::MissingCommit)?;
399
400 let node: MstNode = match MaybeProcessedBlock::from_bytes(db_bytes.to_vec()) {
401 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?,
402 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?,
403 };
404 let walker = Walker::new(node);
405
406 Ok((
407 commit,
408 DiskDriver {
409 process: self.process,
410 state: Some(BigState { store, walker }),
411 },
412 ))
413 }
414}
415
416struct BigState {
417 store: DiskStore,
418 walker: Walker,
419}
420
421/// MST walker that reads from disk instead of an in-memory hashmap
422pub struct DiskDriver {
423 process: fn(Bytes) -> Bytes,
424 state: Option<BigState>,
425}
426
427// for doctests only
428#[doc(hidden)]
429pub fn _get_fake_disk_driver() -> DiskDriver {
430 DiskDriver {
431 process: noop,
432 state: None,
433 }
434}
435
436impl DiskDriver {
437 /// Walk the MST returning up to `n` rkey + record pairs
438 ///
439 /// ```no_run
440 /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
441 /// # #[tokio::main]
442 /// # async fn main() -> Result<(), DriveError> {
443 /// # let mut disk_driver = _get_fake_disk_driver();
444 /// while let Some(pairs) = disk_driver.next_chunk(256).await? {
445 /// for output in pairs {
446 /// println!("{}: size={}", output.rkey, output.data.len());
447 /// }
448 /// }
449 /// # Ok(())
450 /// # }
451 /// ```
452 pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
453 let process = self.process;
454
455 // state should only *ever* be None transiently while inside here
456 let mut state = self.state.take().expect("DiskDriver must have Some(state)");
457
458 // the big pain here is that we don't want to leave self.state in an
459 // invalid state (None), so all the error paths have to make sure it
460 // comes out again.
461 let (state, res) =
462 tokio::task::spawn_blocking(move || -> (BigState, Result<BlockChunk, DriveError>) {
463 let mut out = Vec::with_capacity(n);
464
465 for _ in 0..n {
466 // walk as far as we can until we run out of blocks or find a record
467 let step = match state.walker.disk_step(&mut state.store, process) {
468 Ok(s) => s,
469 Err(e) => {
470 return (state, Err(e.into()));
471 }
472 };
473 let Some(output) = step else {
474 break;
475 };
476 out.push(output);
477 }
478
479 (state, Ok::<_, DriveError>(out))
480 })
481 .await?; // on tokio JoinError, we'll be left with invalid state :(
482
483 // *must* restore state before dealing with the actual result
484 self.state = Some(state);
485
486 let out = res?;
487
488 if out.is_empty() {
489 Ok(None)
490 } else {
491 Ok(Some(out))
492 }
493 }
494
495 fn read_tx_blocking(
496 &mut self,
497 n: usize,
498 tx: mpsc::Sender<Result<BlockChunk, DriveError>>,
499 ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> {
500 let BigState { store, walker } = self.state.as_mut().expect("valid state");
501
502 loop {
503 let mut out: BlockChunk = Vec::with_capacity(n);
504
505 for _ in 0..n {
506 // walk as far as we can until we run out of blocks or find a record
507
508 let step = match walker.disk_step(store, self.process) {
509 Ok(s) => s,
510 Err(e) => return tx.blocking_send(Err(e.into())),
511 };
512
513 let Some(output) = step else {
514 break;
515 };
516 out.push(output);
517 }
518
519 if out.is_empty() {
520 break;
521 }
522 tx.blocking_send(Ok(out))?;
523 }
524
525 Ok(())
526 }
527
528 /// Spawn the disk reading task into a tokio blocking thread
529 ///
530 /// The idea is to avoid so much sending back and forth to the blocking
531 /// thread, letting a blocking task do all the disk reading work and sending
532 /// records and rkeys back through an `mpsc` channel instead.
533 ///
534 /// This might also allow the disk work to continue while processing the
535 /// records. It's still not yet clear if this method actually has much
536 /// benefit over just using `.next_chunk(n)`.
537 ///
538 /// ```no_run
539 /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
540 /// # #[tokio::main]
541 /// # async fn main() -> Result<(), DriveError> {
542 /// # let mut disk_driver = _get_fake_disk_driver();
543 /// let (mut rx, join) = disk_driver.to_channel(512);
544 /// while let Some(recvd) = rx.recv().await {
545 /// let pairs = recvd?;
546 /// for output in pairs {
547 /// println!("{}: size={}", output.rkey, output.data.len());
548 /// }
549 ///
550 /// }
551 /// # Ok(())
552 /// # }
553 /// ```
554 pub fn to_channel(
555 mut self,
556 n: usize,
557 ) -> (
558 mpsc::Receiver<Result<BlockChunk, DriveError>>,
559 tokio::task::JoinHandle<Self>,
560 ) {
561 let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1);
562
563 // sketch: this worker is going to be allowed to execute without a join handle
564 let chan_task = tokio::task::spawn_blocking(move || {
565 if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) {
566 log::debug!("big car reader exited early due to dropped receiver channel");
567 }
568 self
569 });
570
571 (rx, chan_task)
572 }
573
574 /// Reset the disk storage so it can be reused.
575 ///
576 /// The store is returned, so it can be reused for another `DiskDriver`.
577 pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> {
578 let BigState { store, .. } = self.state.take().expect("valid state");
579 store.reset().await?;
580 Ok(store)
581 }
582}