+1
-1
benches/huge-car.rs
+1
-1
benches/huge-car.rs
+3
-3
benches/non-huge-cars.rs
+3
-3
benches/non-huge-cars.rs
···
1
extern crate repo_stream;
2
-
use repo_stream::Driver;
3
4
use criterion::{Criterion, criterion_group, criterion_main};
5
···
40
41
async fn drive_car(bytes: &[u8]) -> usize {
42
let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() {
43
-
Driver::Memory(_, mem_driver) => mem_driver,
44
Driver::Disk(_) => panic!("not benching big cars here"),
45
};
46
47
let mut n = 0;
48
-
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
49
n += pairs.len();
50
}
51
n
···
1
extern crate repo_stream;
2
+
use repo_stream::{Driver, Step};
3
4
use criterion::{Criterion, criterion_group, criterion_main};
5
···
40
41
async fn drive_car(bytes: &[u8]) -> usize {
42
let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() {
43
+
Driver::Memory(_, _, mem_driver) => mem_driver,
44
Driver::Disk(_) => panic!("not benching big cars here"),
45
};
46
47
let mut n = 0;
48
+
while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() {
49
n += pairs.len();
50
}
51
n
+10
-7
examples/disk-read-file/main.rs
+10
-7
examples/disk-read-file/main.rs
···
9
static GLOBAL: MiMalloc = MiMalloc;
10
11
use clap::Parser;
12
-
use repo_stream::{DiskBuilder, Driver, DriverBuilder};
13
use std::path::PathBuf;
14
use std::time::Instant;
15
···
42
.load_car(reader)
43
.await?
44
{
45
-
Driver::Memory(_, _) => panic!("try this on a bigger car"),
46
Driver::Disk(big_stuff) => {
47
// we reach here if the repo was too big and needs to be spilled to
48
// disk to continue
···
51
let disk_store = DiskBuilder::new().open(tmpfile).await?;
52
53
// do the spilling, get back a (similar) driver
54
-
let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
55
56
// at this point you might want to fetch the account's signing key
57
// via the DID from the commit, and then verify the signature.
···
74
// this example uses the disk driver's channel mode: the tree walking is
75
// spawned onto a blocking thread, and we get chunks of rkey+blocks back
76
let (mut rx, join) = driver.to_channel(512);
77
-
while let Some(r) = rx.recv().await {
78
-
let pairs = r?;
79
80
// keep a count of the total number of blocks seen
81
-
n += pairs.len();
82
83
-
for output in pairs {
84
// for each block, count how many bytes are equal to '0'
85
// (this is just an example, you probably want to do something more
86
// interesting)
···
9
static GLOBAL: MiMalloc = MiMalloc;
10
11
use clap::Parser;
12
+
use repo_stream::{DiskBuilder, Driver, DriverBuilder, Step};
13
use std::path::PathBuf;
14
use std::time::Instant;
15
···
42
.load_car(reader)
43
.await?
44
{
45
+
Driver::Memory(_, _, _) => panic!("try this on a bigger car"),
46
Driver::Disk(big_stuff) => {
47
// we reach here if the repo was too big and needs to be spilled to
48
// disk to continue
···
51
let disk_store = DiskBuilder::new().open(tmpfile).await?;
52
53
// do the spilling, get back a (similar) driver
54
+
let (commit, _, driver) = big_stuff.finish_loading(disk_store).await?;
55
56
// at this point you might want to fetch the account's signing key
57
// via the DID from the commit, and then verify the signature.
···
74
// this example uses the disk driver's channel mode: the tree walking is
75
// spawned onto a blocking thread, and we get chunks of rkey+blocks back
76
let (mut rx, join) = driver.to_channel(512);
77
+
while let Some(step) = rx.recv().await {
78
+
let step = step?;
79
+
let Step::Value(outputs) = step else {
80
+
break;
81
+
};
82
83
// keep a count of the total number of blocks seen
84
+
n += outputs.len();
85
86
+
for output in outputs {
87
// for each block, count how many bytes are equal to '0'
88
// (this is just an example, you probably want to do something more
89
// interesting)
+3
-3
examples/read-file/main.rs
+3
-3
examples/read-file/main.rs
···
4
5
extern crate repo_stream;
6
use clap::Parser;
7
-
use repo_stream::{Driver, DriverBuilder};
8
use std::path::PathBuf;
9
10
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···
28
.load_car(reader)
29
.await?
30
{
31
-
Driver::Memory(commit, mem_driver) => (commit, mem_driver),
32
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
33
};
34
35
log::info!("got commit: {commit:?}");
36
37
let mut n = 0;
38
-
while let Some(pairs) = driver.next_chunk(256).await? {
39
n += pairs.len();
40
// log::info!("got {rkey:?}");
41
}
···
4
5
extern crate repo_stream;
6
use clap::Parser;
7
+
use repo_stream::{Driver, DriverBuilder, Step};
8
use std::path::PathBuf;
9
10
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···
28
.load_car(reader)
29
.await?
30
{
31
+
Driver::Memory(commit, _, mem_driver) => (commit, mem_driver),
32
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
33
};
34
35
log::info!("got commit: {commit:?}");
36
37
let mut n = 0;
38
+
while let Step::Value(pairs) = driver.next_chunk(256).await? {
39
n += pairs.len();
40
// log::info!("got {rkey:?}");
41
}
+7
-7
readme.md
+7
-7
readme.md
···
11
[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
12
13
```rust no_run
14
-
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output};
15
16
#[tokio::main]
17
async fn main() -> Result<(), Box<dyn std::error::Error>> {
···
31
{
32
33
// if all blocks fit within memory
34
-
Driver::Memory(_commit, mut driver) => {
35
-
while let Some(chunk) = driver.next_chunk(256).await? {
36
for Output { rkey: _, cid: _, data } in chunk {
37
let size = usize::from_ne_bytes(data.try_into().unwrap());
38
total_size += size;
···
45
// set up a disk store we can spill to
46
let store = DiskBuilder::new().open("some/path.db".into()).await?;
47
// do the spilling, get back a (similar) driver
48
-
let (_commit, mut driver) = paused.finish_loading(store).await?;
49
50
-
while let Some(chunk) = driver.next_chunk(256).await? {
51
for Output { rkey: _, cid: _, data } in chunk {
52
let size = usize::from_ne_bytes(data.try_into().unwrap());
53
total_size += size;
···
62
63
more recent todo
64
- [ ] add a zero-copy rkyv process function example
65
-
- [ ] repo car slices
66
-
- [ ] lazy-value stream (rkey -> CID diffing for tap-like `#sync` handling)
67
- [x] get an *emtpy* car for the test suite
68
- [x] implement a max size on disk limit
69
···
11
[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
12
13
```rust no_run
14
+
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output, Step};
15
16
#[tokio::main]
17
async fn main() -> Result<(), Box<dyn std::error::Error>> {
···
31
{
32
33
// if all blocks fit within memory
34
+
Driver::Memory(_commit, _prev_rkey, mut driver) => {
35
+
while let Step::Value(chunk) = driver.next_chunk(256).await? {
36
for Output { rkey: _, cid: _, data } in chunk {
37
let size = usize::from_ne_bytes(data.try_into().unwrap());
38
total_size += size;
···
45
// set up a disk store we can spill to
46
let store = DiskBuilder::new().open("some/path.db".into()).await?;
47
// do the spilling, get back a (similar) driver
48
+
let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?;
49
50
+
while let Step::Value(chunk) = driver.next_chunk(256).await? {
51
for Output { rkey: _, cid: _, data } in chunk {
52
let size = usize::from_ne_bytes(data.try_into().unwrap());
53
total_size += size;
···
62
63
more recent todo
64
- [ ] add a zero-copy rkyv process function example
65
+
- [ ] car slices
66
+
- [ ] lazy-value stream (for rkey -> CID diffing; tap-like `#sync` handling; save a fjall record `.get` when not needed)
67
- [x] get an *emtpy* car for the test suite
68
- [x] implement a max size on disk limit
69
+26
-23
src/drive.rs
+26
-23
src/drive.rs
···
1
//! Consume a CAR from an AsyncRead, producing an ordered stream of records
2
3
use crate::{
4
-
Bytes, HashMap,
5
disk::{DiskError, DiskStore},
6
mst::MstNode,
7
walk::Output,
···
107
///
108
/// You probably want to check the commit's signature. You can go ahead and
109
/// walk the MST right away.
110
-
Memory(Commit, MemDriver),
111
/// Blocks exceed the memory limit
112
///
113
/// You'll need to provide a disk storage to continue. The commit will be
···
237
238
Ok(Driver::Memory(
239
commit,
240
MemDriver {
241
blocks: mem_blocks,
242
walker,
···
268
269
impl MemDriver {
270
/// Step through the record outputs, in rkey order
271
-
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
272
let mut out = Vec::with_capacity(n);
273
for _ in 0..n {
274
// walk as far as we can until we run out of blocks or find a record
275
-
let Some(output) = self.walker.step(&mut self.blocks, self.process)? else {
276
break;
277
};
278
out.push(output);
279
}
280
if out.is_empty() {
281
-
Ok(None)
282
} else {
283
-
Ok(Some(out))
284
}
285
}
286
}
···
299
pub async fn finish_loading(
300
mut self,
301
mut store: DiskStore,
302
-
) -> Result<(Commit, DiskDriver), DriveError> {
303
// move store in and back out so we can manage lifetimes
304
// dump mem blocks into the store
305
store = tokio::task::spawn(async move {
···
385
386
Ok((
387
commit,
388
DiskDriver {
389
process: self.process,
390
state: Some(BigState { store, walker }),
···
417
/// Walk the MST returning up to `n` rkey + record pairs
418
///
419
/// ```no_run
420
-
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
421
/// # #[tokio::main]
422
/// # async fn main() -> Result<(), DriveError> {
423
/// # let mut disk_driver = _get_fake_disk_driver();
424
-
/// while let Some(pairs) = disk_driver.next_chunk(256).await? {
425
-
/// for output in pairs {
426
/// println!("{}: size={}", output.rkey, output.data.len());
427
/// }
428
/// }
429
/// # Ok(())
430
/// # }
431
/// ```
432
-
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
433
let process = self.process;
434
435
// state should only *ever* be None transiently while inside here
···
450
return (state, Err(e.into()));
451
}
452
};
453
-
let Some(output) = step else {
454
break;
455
};
456
out.push(output);
···
466
let out = res?;
467
468
if out.is_empty() {
469
-
Ok(None)
470
} else {
471
-
Ok(Some(out))
472
}
473
}
474
475
fn read_tx_blocking(
476
&mut self,
477
n: usize,
478
-
tx: mpsc::Sender<Result<BlockChunk, DriveError>>,
479
-
) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> {
480
let BigState { store, walker } = self.state.as_mut().expect("valid state");
481
482
loop {
···
490
Err(e) => return tx.blocking_send(Err(e.into())),
491
};
492
493
-
let Some(output) = step else {
494
break;
495
};
496
out.push(output);
···
499
if out.is_empty() {
500
break;
501
}
502
-
tx.blocking_send(Ok(out))?;
503
}
504
505
Ok(())
···
516
/// benefit over just using `.next_chunk(n)`.
517
///
518
/// ```no_run
519
-
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
520
/// # #[tokio::main]
521
/// # async fn main() -> Result<(), DriveError> {
522
/// # let mut disk_driver = _get_fake_disk_driver();
523
/// let (mut rx, join) = disk_driver.to_channel(512);
524
/// while let Some(recvd) = rx.recv().await {
525
-
/// let pairs = recvd?;
526
-
/// for output in pairs {
527
/// println!("{}: size={}", output.rkey, output.data.len());
528
/// }
529
///
···
535
mut self,
536
n: usize,
537
) -> (
538
-
mpsc::Receiver<Result<BlockChunk, DriveError>>,
539
tokio::task::JoinHandle<Self>,
540
) {
541
-
let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1);
542
543
// sketch: this worker is going to be allowed to execute without a join handle
544
let chan_task = tokio::task::spawn_blocking(move || {
···
1
//! Consume a CAR from an AsyncRead, producing an ordered stream of records
2
3
use crate::{
4
+
Bytes, HashMap, Rkey, Step,
5
disk::{DiskError, DiskStore},
6
mst::MstNode,
7
walk::Output,
···
107
///
108
/// You probably want to check the commit's signature. You can go ahead and
109
/// walk the MST right away.
110
+
Memory(Commit, Option<Rkey>, MemDriver),
111
/// Blocks exceed the memory limit
112
///
113
/// You'll need to provide a disk storage to continue. The commit will be
···
237
238
Ok(Driver::Memory(
239
commit,
240
+
None,
241
MemDriver {
242
blocks: mem_blocks,
243
walker,
···
269
270
impl MemDriver {
271
/// Step through the record outputs, in rkey order
272
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, DriveError> {
273
let mut out = Vec::with_capacity(n);
274
for _ in 0..n {
275
// walk as far as we can until we run out of blocks or find a record
276
+
let Step::Value(output) = self.walker.step(&mut self.blocks, self.process)? else {
277
break;
278
};
279
out.push(output);
280
}
281
if out.is_empty() {
282
+
Ok(Step::End(None))
283
} else {
284
+
Ok(Step::Value(out))
285
}
286
}
287
}
···
300
pub async fn finish_loading(
301
mut self,
302
mut store: DiskStore,
303
+
) -> Result<(Commit, Option<Rkey>, DiskDriver), DriveError> {
304
// move store in and back out so we can manage lifetimes
305
// dump mem blocks into the store
306
store = tokio::task::spawn(async move {
···
386
387
Ok((
388
commit,
389
+
None,
390
DiskDriver {
391
process: self.process,
392
state: Some(BigState { store, walker }),
···
419
/// Walk the MST returning up to `n` rkey + record pairs
420
///
421
/// ```no_run
422
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop};
423
/// # #[tokio::main]
424
/// # async fn main() -> Result<(), DriveError> {
425
/// # let mut disk_driver = _get_fake_disk_driver();
426
+
/// while let Step::Value(outputs) = disk_driver.next_chunk(256).await? {
427
+
/// for output in outputs {
428
/// println!("{}: size={}", output.rkey, output.data.len());
429
/// }
430
/// }
431
/// # Ok(())
432
/// # }
433
/// ```
434
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Step<Vec<Output>>, DriveError> {
435
let process = self.process;
436
437
// state should only *ever* be None transiently while inside here
···
452
return (state, Err(e.into()));
453
}
454
};
455
+
let Step::Value(output) = step else {
456
break;
457
};
458
out.push(output);
···
468
let out = res?;
469
470
if out.is_empty() {
471
+
Ok(Step::End(None))
472
} else {
473
+
Ok(Step::Value(out))
474
}
475
}
476
477
fn read_tx_blocking(
478
&mut self,
479
n: usize,
480
+
tx: mpsc::Sender<Result<Step<BlockChunk>, DriveError>>,
481
+
) -> Result<(), mpsc::error::SendError<Result<Step<BlockChunk>, DriveError>>> {
482
let BigState { store, walker } = self.state.as_mut().expect("valid state");
483
484
loop {
···
492
Err(e) => return tx.blocking_send(Err(e.into())),
493
};
494
495
+
let Step::Value(output) = step else {
496
break;
497
};
498
out.push(output);
···
501
if out.is_empty() {
502
break;
503
}
504
+
tx.blocking_send(Ok(Step::Value(out)))?;
505
}
506
507
Ok(())
···
518
/// benefit over just using `.next_chunk(n)`.
519
///
520
/// ```no_run
521
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop};
522
/// # #[tokio::main]
523
/// # async fn main() -> Result<(), DriveError> {
524
/// # let mut disk_driver = _get_fake_disk_driver();
525
/// let (mut rx, join) = disk_driver.to_channel(512);
526
/// while let Some(recvd) = rx.recv().await {
527
+
/// let outputs = recvd?;
528
+
/// let Step::Value(outputs) = outputs else { break; };
529
+
/// for output in outputs {
530
/// println!("{}: size={}", output.rkey, output.data.len());
531
/// }
532
///
···
538
mut self,
539
n: usize,
540
) -> (
541
+
mpsc::Receiver<Result<Step<BlockChunk>, DriveError>>,
542
tokio::task::JoinHandle<Self>,
543
) {
544
+
let (tx, rx) = mpsc::channel::<Result<Step<BlockChunk>, DriveError>>(1);
545
546
// sketch: this worker is going to be allowed to execute without a join handle
547
let chan_task = tokio::task::spawn_blocking(move || {
+8
-6
src/lib.rs
+8
-6
src/lib.rs
···
18
`iroh_car` additionally applies a block size limit of `2MiB`.
19
20
```
21
-
use repo_stream::{Driver, DriverBuilder, DiskBuilder};
22
23
# #[tokio::main]
24
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
···
35
{
36
37
// if all blocks fit within memory
38
-
Driver::Memory(_commit, mut driver) => {
39
-
while let Some(chunk) = driver.next_chunk(256).await? {
40
for output in chunk {
41
let size = usize::from_ne_bytes(output.data.try_into().unwrap());
42
···
50
// set up a disk store we can spill to
51
let store = DiskBuilder::new().open("some/path.db".into()).await?;
52
// do the spilling, get back a (similar) driver
53
-
let (_commit, mut driver) = paused.finish_loading(store).await?;
54
55
-
while let Some(chunk) = driver.next_chunk(256).await? {
56
for output in chunk {
57
let size = usize::from_ne_bytes(output.data.try_into().unwrap());
58
···
86
pub use disk::{DiskBuilder, DiskError, DiskStore};
87
pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop};
88
pub use mst::Commit;
89
-
pub use walk::Output;
90
91
pub type Bytes = Vec<u8>;
92
93
pub(crate) use hashbrown::HashMap;
94
···
18
`iroh_car` additionally applies a block size limit of `2MiB`.
19
20
```
21
+
use repo_stream::{Driver, DriverBuilder, DiskBuilder, Step};
22
23
# #[tokio::main]
24
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
···
35
{
36
37
// if all blocks fit within memory
38
+
Driver::Memory(_commit, _prev_rkey, mut driver) => {
39
+
while let Step::Value(chunk) = driver.next_chunk(256).await? {
40
for output in chunk {
41
let size = usize::from_ne_bytes(output.data.try_into().unwrap());
42
···
50
// set up a disk store we can spill to
51
let store = DiskBuilder::new().open("some/path.db".into()).await?;
52
// do the spilling, get back a (similar) driver
53
+
let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?;
54
55
+
while let Step::Value(chunk) = driver.next_chunk(256).await? {
56
for output in chunk {
57
let size = usize::from_ne_bytes(output.data.try_into().unwrap());
58
···
86
pub use disk::{DiskBuilder, DiskError, DiskStore};
87
pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop};
88
pub use mst::Commit;
89
+
pub use walk::{Output, Step};
90
91
pub type Bytes = Vec<u8>;
92
+
93
+
pub type Rkey = String;
94
95
pub(crate) use hashbrown::HashMap;
96
+4
-3
src/mst.rs
+4
-3
src/mst.rs
···
3
//! The primary aim is to work through the **tree** structure. Non-node blocks
4
//! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever.
5
6
use cid::Cid;
7
use serde::Deserialize;
8
use sha2::{Digest, Sha256};
···
54
pub things: Vec<NodeThing>,
55
}
56
57
-
#[derive(Debug)]
58
pub(crate) struct NodeThing {
59
pub(crate) cid: Cid,
60
pub(crate) kind: ThingKind,
61
}
62
63
-
#[derive(Debug)]
64
pub(crate) enum ThingKind {
65
Tree,
66
-
Value { rkey: String },
67
}
68
69
impl<'de> Deserialize<'de> for MstNode {
···
3
//! The primary aim is to work through the **tree** structure. Non-node blocks
4
//! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever.
5
6
+
use crate::Rkey;
7
use cid::Cid;
8
use serde::Deserialize;
9
use sha2::{Digest, Sha256};
···
55
pub things: Vec<NodeThing>,
56
}
57
58
+
#[derive(Debug, Clone)]
59
pub(crate) struct NodeThing {
60
pub(crate) cid: Cid,
61
pub(crate) kind: ThingKind,
62
}
63
64
+
#[derive(Debug, Clone)]
65
pub(crate) enum ThingKind {
66
Tree,
67
+
Value { rkey: Rkey },
68
}
69
70
impl<'de> Deserialize<'de> for MstNode {
+60
-11
src/walk.rs
+60
-11
src/walk.rs
···
1
//! Depth-first MST traversal
2
3
use crate::mst::{Depth, MstNode, NodeThing, ThingKind};
4
-
use crate::{Bytes, HashMap, disk::DiskStore, drive::MaybeProcessedBlock};
5
use cid::Cid;
6
use std::convert::Infallible;
7
···
30
#[error("MST depth underflow: depth-0 node with child trees")]
31
DepthUnderflow,
32
#[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")]
33
-
RkeyOutOfOrder { prev: String, rkey: String },
34
}
35
36
/// Walker outputs
37
#[derive(Debug, PartialEq)]
38
pub struct Output {
39
-
pub rkey: String,
40
pub cid: Cid,
41
pub data: Bytes,
42
}
43
44
/// Traverser of an atproto MST
45
///
46
/// Walks the tree from left-to-right in depth-first order
47
-
#[derive(Debug)]
48
pub struct Walker {
49
-
prev_rkey: String,
50
root_depth: Depth,
51
todo: Vec<Vec<NodeThing>>,
52
}
···
134
&mut self,
135
blocks: &mut HashMap<Cid, MaybeProcessedBlock>,
136
process: impl Fn(Bytes) -> Bytes,
137
-
) -> Result<Option<Output>, WalkError> {
138
while let Some(NodeThing { cid, kind }) = self.next_todo() {
139
let Some(mpb) = blocks.get(&cid) else {
140
return Err(WalkError::MissingBlock(cid));
141
};
142
if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? {
143
-
return Ok(Some(out));
144
}
145
}
146
-
Ok(None)
147
}
148
149
/// blocking!!!!!!
···
151
&mut self,
152
blocks: &mut DiskStore,
153
process: impl Fn(Bytes) -> Bytes,
154
-
) -> Result<Option<Output>, WalkError> {
155
while let Some(NodeThing { cid, kind }) = self.next_todo() {
156
let Some(block_slice) = blocks.get(&cid.to_bytes())? else {
157
return Err(WalkError::MissingBlock(cid));
158
};
159
let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec());
160
if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? {
161
-
return Ok(Some(out));
162
}
163
}
164
-
Ok(None)
165
}
166
}
···
1
//! Depth-first MST traversal
2
3
use crate::mst::{Depth, MstNode, NodeThing, ThingKind};
4
+
use crate::{Bytes, HashMap, Rkey, noop, disk::DiskStore, drive::MaybeProcessedBlock};
5
use cid::Cid;
6
use std::convert::Infallible;
7
···
30
#[error("MST depth underflow: depth-0 node with child trees")]
31
DepthUnderflow,
32
#[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")]
33
+
RkeyOutOfOrder { prev: Rkey, rkey: Rkey },
34
}
35
36
/// Walker outputs
37
#[derive(Debug, PartialEq)]
38
pub struct Output {
39
+
pub rkey: Rkey,
40
pub cid: Cid,
41
pub data: Bytes,
42
}
43
44
+
#[derive(Debug, PartialEq)]
45
+
pub enum Step<T = Output> {
46
+
Value(T),
47
+
End(Option<Rkey>),
48
+
}
49
+
50
/// Traverser of an atproto MST
51
///
52
/// Walks the tree from left-to-right in depth-first order
53
+
#[derive(Debug, Clone)]
54
pub struct Walker {
55
+
prev_rkey: Rkey,
56
root_depth: Depth,
57
todo: Vec<Vec<NodeThing>>,
58
}
···
140
&mut self,
141
blocks: &mut HashMap<Cid, MaybeProcessedBlock>,
142
process: impl Fn(Bytes) -> Bytes,
143
+
) -> Result<Step, WalkError> {
144
while let Some(NodeThing { cid, kind }) = self.next_todo() {
145
let Some(mpb) = blocks.get(&cid) else {
146
return Err(WalkError::MissingBlock(cid));
147
};
148
if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? {
149
+
return Ok(Step::Value(out));
150
+
}
151
+
}
152
+
Ok(Step::End(None))
153
+
}
154
+
155
+
pub fn step_to_slice_edge(
156
+
&mut self,
157
+
blocks: &mut HashMap<Cid, MaybeProcessedBlock>,
158
+
) -> Result<Option<Rkey>, WalkError> {
159
+
let mut ant = self.clone();
160
+
let mut ant_prev;
161
+
let mut rkey_prev = None;
162
+
163
+
loop {
164
+
ant_prev = ant.clone();
165
+
ant = ant.clone();
166
+
167
+
let Some(NodeThing { cid, kind }) = ant.next_todo() else {
168
+
return Ok(None);
169
+
};
170
+
171
+
let maybe_mpb = blocks.get(&cid);
172
+
173
+
match (&kind, maybe_mpb) {
174
+
(ThingKind::Value { rkey: _ }, Some(_)) => {
175
+
// oops we took a step too far
176
+
*self = ant_prev;
177
+
return Ok(rkey_prev);
178
+
}
179
+
(ThingKind::Value { rkey }, None) => {
180
+
if let Some(p) = rkey_prev && *rkey <= p {
181
+
return Err(WalkError::MstError(MstError::RkeyOutOfOrder {
182
+
rkey: rkey.clone(),
183
+
prev: p,
184
+
}));
185
+
}
186
+
rkey_prev = Some(rkey.clone());
187
+
}
188
+
(ThingKind::Tree, Some(mpb)) => {
189
+
ant.mpb_step(kind, cid, mpb, noop)?;
190
+
}
191
+
(ThingKind::Tree, None) => {
192
+
return Err(WalkError::MissingBlock(cid));
193
+
}
194
}
195
}
196
}
197
198
/// blocking!!!!!!
···
200
&mut self,
201
blocks: &mut DiskStore,
202
process: impl Fn(Bytes) -> Bytes,
203
+
) -> Result<Step, WalkError> {
204
while let Some(NodeThing { cid, kind }) = self.next_todo() {
205
let Some(block_slice) = blocks.get(&cid.to_bytes())? else {
206
return Err(WalkError::MissingBlock(cid));
207
};
208
let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec());
209
if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? {
210
+
return Ok(Step::Value(out));
211
}
212
}
213
+
Ok(Step::End(None))
214
}
215
}
+3
-4
tests/non-huge-cars.rs
+3
-4
tests/non-huge-cars.rs
···
1
extern crate repo_stream;
2
-
use repo_stream::Driver;
3
-
use repo_stream::Output;
4
5
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
6
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
···
21
.await
22
.unwrap()
23
{
24
-
Driver::Memory(_commit, mem_driver) => mem_driver,
25
Driver::Disk(_) => panic!("too big"),
26
};
27
···
30
let mut found_bsky_profile = false;
31
let mut prev_rkey = "".to_string();
32
33
-
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
34
for Output { rkey, cid: _, data } in pairs {
35
records += 1;
36
···
1
extern crate repo_stream;
2
+
use repo_stream::{Driver, Output, Step};
3
4
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
5
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
···
20
.await
21
.unwrap()
22
{
23
+
Driver::Memory(_commit, _, mem_driver) => mem_driver,
24
Driver::Disk(_) => panic!("too big"),
25
};
26
···
29
let mut found_bsky_profile = false;
30
let mut prev_rkey = "".to_string();
31
32
+
while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() {
33
for Output { rkey, cid: _, data } in pairs {
34
records += 1;
35
History
1 round
0 comments
bad-example.com
submitted
#0
3 commits
expand
collapse
type alias for rkey finally
update api for car slice handling
...car slice handling not yet actually implemented
wonder if this works
no conflicts, ready to merge