+1
-1
benches/huge-car.rs
+1
-1
benches/huge-car.rs
···
33
33
let reader = tokio::io::BufReader::new(reader);
34
34
35
35
let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() {
36
-
Driver::Memory(_, mem_driver) => mem_driver,
36
+
Driver::Memory(_, _, mem_driver) => mem_driver,
37
37
Driver::Disk(_) => panic!("not doing disk for benchmark"),
38
38
};
39
39
+3
-3
benches/non-huge-cars.rs
+3
-3
benches/non-huge-cars.rs
···
1
1
extern crate repo_stream;
2
-
use repo_stream::Driver;
2
+
use repo_stream::{Driver, Step};
3
3
4
4
use criterion::{Criterion, criterion_group, criterion_main};
5
5
···
40
40
41
41
async fn drive_car(bytes: &[u8]) -> usize {
42
42
let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() {
43
-
Driver::Memory(_, mem_driver) => mem_driver,
43
+
Driver::Memory(_, _, mem_driver) => mem_driver,
44
44
Driver::Disk(_) => panic!("not benching big cars here"),
45
45
};
46
46
47
47
let mut n = 0;
48
-
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
48
+
while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() {
49
49
n += pairs.len();
50
50
}
51
51
n
+10
-7
examples/disk-read-file/main.rs
+10
-7
examples/disk-read-file/main.rs
···
9
9
static GLOBAL: MiMalloc = MiMalloc;
10
10
11
11
use clap::Parser;
12
-
use repo_stream::{DiskBuilder, Driver, DriverBuilder};
12
+
use repo_stream::{DiskBuilder, Driver, DriverBuilder, Step};
13
13
use std::path::PathBuf;
14
14
use std::time::Instant;
15
15
···
42
42
.load_car(reader)
43
43
.await?
44
44
{
45
-
Driver::Memory(_, _) => panic!("try this on a bigger car"),
45
+
Driver::Memory(_, _, _) => panic!("try this on a bigger car"),
46
46
Driver::Disk(big_stuff) => {
47
47
// we reach here if the repo was too big and needs to be spilled to
48
48
// disk to continue
···
51
51
let disk_store = DiskBuilder::new().open(tmpfile).await?;
52
52
53
53
// do the spilling, get back a (similar) driver
54
-
let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
54
+
let (commit, _, driver) = big_stuff.finish_loading(disk_store).await?;
55
55
56
56
// at this point you might want to fetch the account's signing key
57
57
// via the DID from the commit, and then verify the signature.
···
74
74
// this example uses the disk driver's channel mode: the tree walking is
75
75
// spawned onto a blocking thread, and we get chunks of rkey+blocks back
76
76
let (mut rx, join) = driver.to_channel(512);
77
-
while let Some(r) = rx.recv().await {
78
-
let pairs = r?;
77
+
while let Some(step) = rx.recv().await {
78
+
let step = step?;
79
+
let Step::Value(outputs) = step else {
80
+
break;
81
+
};
79
82
80
83
// keep a count of the total number of blocks seen
81
-
n += pairs.len();
84
+
n += outputs.len();
82
85
83
-
for output in pairs {
86
+
for output in outputs {
84
87
// for each block, count how many bytes are equal to '0'
85
88
// (this is just an example, you probably want to do something more
86
89
// interesting)
+3
-3
examples/read-file/main.rs
+3
-3
examples/read-file/main.rs
···
4
4
5
5
extern crate repo_stream;
6
6
use clap::Parser;
7
-
use repo_stream::{Driver, DriverBuilder};
7
+
use repo_stream::{Driver, DriverBuilder, Step};
8
8
use std::path::PathBuf;
9
9
10
10
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···
28
28
.load_car(reader)
29
29
.await?
30
30
{
31
-
Driver::Memory(commit, mem_driver) => (commit, mem_driver),
31
+
Driver::Memory(commit, _, mem_driver) => (commit, mem_driver),
32
32
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
33
33
};
34
34
35
35
log::info!("got commit: {commit:?}");
36
36
37
37
let mut n = 0;
38
-
while let Some(pairs) = driver.next_chunk(256).await? {
38
+
while let Step::Value(pairs) = driver.next_chunk(256).await? {
39
39
n += pairs.len();
40
40
// log::info!("got {rkey:?}");
41
41
}
+7
-7
readme.md
+7
-7
readme.md
···
11
11
[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
12
12
13
13
```rust no_run
14
-
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output};
14
+
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output, Step};
15
15
16
16
#[tokio::main]
17
17
async fn main() -> Result<(), Box<dyn std::error::Error>> {
···
31
31
{
32
32
33
33
// if all blocks fit within memory
34
-
Driver::Memory(_commit, mut driver) => {
35
-
while let Some(chunk) = driver.next_chunk(256).await? {
34
+
Driver::Memory(_commit, _prev_rkey, mut driver) => {
35
+
while let Step::Value(chunk) = driver.next_chunk(256).await? {
36
36
for Output { rkey: _, cid: _, data } in chunk {
37
37
let size = usize::from_ne_bytes(data.try_into().unwrap());
38
38
total_size += size;
···
45
45
// set up a disk store we can spill to
46
46
let store = DiskBuilder::new().open("some/path.db".into()).await?;
47
47
// do the spilling, get back a (similar) driver
48
-
let (_commit, mut driver) = paused.finish_loading(store).await?;
48
+
let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?;
49
49
50
-
while let Some(chunk) = driver.next_chunk(256).await? {
50
+
while let Step::Value(chunk) = driver.next_chunk(256).await? {
51
51
for Output { rkey: _, cid: _, data } in chunk {
52
52
let size = usize::from_ne_bytes(data.try_into().unwrap());
53
53
total_size += size;
···
62
62
63
63
more recent todo
64
64
- [ ] add a zero-copy rkyv process function example
65
-
- [ ] repo car slices
66
-
- [ ] lazy-value stream (rkey -> CID diffing for tap-like `#sync` handling)
65
+
- [ ] car slices
66
+
- [ ] lazy-value stream (for rkey -> CID diffing; tap-like `#sync` handling; save a fjall record `.get` when not needed)
67
67
- [x] get an *emtpy* car for the test suite
68
68
- [x] implement a max size on disk limit
69
69
+26
-23
src/drive.rs
+26
-23
src/drive.rs
···
1
1
//! Consume a CAR from an AsyncRead, producing an ordered stream of records
2
2
3
3
use crate::{
4
-
Bytes, HashMap,
4
+
Bytes, HashMap, Rkey, Step,
5
5
disk::{DiskError, DiskStore},
6
6
mst::MstNode,
7
7
walk::Output,
···
107
107
///
108
108
/// You probably want to check the commit's signature. You can go ahead and
109
109
/// walk the MST right away.
110
-
Memory(Commit, MemDriver),
110
+
Memory(Commit, Option<Rkey>, MemDriver),
111
111
/// Blocks exceed the memory limit
112
112
///
113
113
/// You'll need to provide a disk storage to continue. The commit will be
···
237
237
238
238
Ok(Driver::Memory(
239
239
commit,
240
+
None,
240
241
MemDriver {
241
242
blocks: mem_blocks,
242
243
walker,
···
268
269
269
270
impl MemDriver {
270
271
/// Step through the record outputs, in rkey order
271
-
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
272
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, DriveError> {
272
273
let mut out = Vec::with_capacity(n);
273
274
for _ in 0..n {
274
275
// walk as far as we can until we run out of blocks or find a record
275
-
let Some(output) = self.walker.step(&mut self.blocks, self.process)? else {
276
+
let Step::Value(output) = self.walker.step(&mut self.blocks, self.process)? else {
276
277
break;
277
278
};
278
279
out.push(output);
279
280
}
280
281
if out.is_empty() {
281
-
Ok(None)
282
+
Ok(Step::End(None))
282
283
} else {
283
-
Ok(Some(out))
284
+
Ok(Step::Value(out))
284
285
}
285
286
}
286
287
}
···
299
300
pub async fn finish_loading(
300
301
mut self,
301
302
mut store: DiskStore,
302
-
) -> Result<(Commit, DiskDriver), DriveError> {
303
+
) -> Result<(Commit, Option<Rkey>, DiskDriver), DriveError> {
303
304
// move store in and back out so we can manage lifetimes
304
305
// dump mem blocks into the store
305
306
store = tokio::task::spawn(async move {
···
385
386
386
387
Ok((
387
388
commit,
389
+
None,
388
390
DiskDriver {
389
391
process: self.process,
390
392
state: Some(BigState { store, walker }),
···
417
419
/// Walk the MST returning up to `n` rkey + record pairs
418
420
///
419
421
/// ```no_run
420
-
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
422
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop};
421
423
/// # #[tokio::main]
422
424
/// # async fn main() -> Result<(), DriveError> {
423
425
/// # let mut disk_driver = _get_fake_disk_driver();
424
-
/// while let Some(pairs) = disk_driver.next_chunk(256).await? {
425
-
/// for output in pairs {
426
+
/// while let Step::Value(outputs) = disk_driver.next_chunk(256).await? {
427
+
/// for output in outputs {
426
428
/// println!("{}: size={}", output.rkey, output.data.len());
427
429
/// }
428
430
/// }
429
431
/// # Ok(())
430
432
/// # }
431
433
/// ```
432
-
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
434
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Step<Vec<Output>>, DriveError> {
433
435
let process = self.process;
434
436
435
437
// state should only *ever* be None transiently while inside here
···
450
452
return (state, Err(e.into()));
451
453
}
452
454
};
453
-
let Some(output) = step else {
455
+
let Step::Value(output) = step else {
454
456
break;
455
457
};
456
458
out.push(output);
···
466
468
let out = res?;
467
469
468
470
if out.is_empty() {
469
-
Ok(None)
471
+
Ok(Step::End(None))
470
472
} else {
471
-
Ok(Some(out))
473
+
Ok(Step::Value(out))
472
474
}
473
475
}
474
476
475
477
fn read_tx_blocking(
476
478
&mut self,
477
479
n: usize,
478
-
tx: mpsc::Sender<Result<BlockChunk, DriveError>>,
479
-
) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> {
480
+
tx: mpsc::Sender<Result<Step<BlockChunk>, DriveError>>,
481
+
) -> Result<(), mpsc::error::SendError<Result<Step<BlockChunk>, DriveError>>> {
480
482
let BigState { store, walker } = self.state.as_mut().expect("valid state");
481
483
482
484
loop {
···
490
492
Err(e) => return tx.blocking_send(Err(e.into())),
491
493
};
492
494
493
-
let Some(output) = step else {
495
+
let Step::Value(output) = step else {
494
496
break;
495
497
};
496
498
out.push(output);
···
499
501
if out.is_empty() {
500
502
break;
501
503
}
502
-
tx.blocking_send(Ok(out))?;
504
+
tx.blocking_send(Ok(Step::Value(out)))?;
503
505
}
504
506
505
507
Ok(())
···
516
518
/// benefit over just using `.next_chunk(n)`.
517
519
///
518
520
/// ```no_run
519
-
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
521
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop};
520
522
/// # #[tokio::main]
521
523
/// # async fn main() -> Result<(), DriveError> {
522
524
/// # let mut disk_driver = _get_fake_disk_driver();
523
525
/// let (mut rx, join) = disk_driver.to_channel(512);
524
526
/// while let Some(recvd) = rx.recv().await {
525
-
/// let pairs = recvd?;
526
-
/// for output in pairs {
527
+
/// let outputs = recvd?;
528
+
/// let Step::Value(outputs) = outputs else { break; };
529
+
/// for output in outputs {
527
530
/// println!("{}: size={}", output.rkey, output.data.len());
528
531
/// }
529
532
///
···
535
538
mut self,
536
539
n: usize,
537
540
) -> (
538
-
mpsc::Receiver<Result<BlockChunk, DriveError>>,
541
+
mpsc::Receiver<Result<Step<BlockChunk>, DriveError>>,
539
542
tokio::task::JoinHandle<Self>,
540
543
) {
541
-
let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1);
544
+
let (tx, rx) = mpsc::channel::<Result<Step<BlockChunk>, DriveError>>(1);
542
545
543
546
// sketch: this worker is going to be allowed to execute without a join handle
544
547
let chan_task = tokio::task::spawn_blocking(move || {
+8
-6
src/lib.rs
+8
-6
src/lib.rs
···
18
18
`iroh_car` additionally applies a block size limit of `2MiB`.
19
19
20
20
```
21
-
use repo_stream::{Driver, DriverBuilder, DiskBuilder};
21
+
use repo_stream::{Driver, DriverBuilder, DiskBuilder, Step};
22
22
23
23
# #[tokio::main]
24
24
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
···
35
35
{
36
36
37
37
// if all blocks fit within memory
38
-
Driver::Memory(_commit, mut driver) => {
39
-
while let Some(chunk) = driver.next_chunk(256).await? {
38
+
Driver::Memory(_commit, _prev_rkey, mut driver) => {
39
+
while let Step::Value(chunk) = driver.next_chunk(256).await? {
40
40
for output in chunk {
41
41
let size = usize::from_ne_bytes(output.data.try_into().unwrap());
42
42
···
50
50
// set up a disk store we can spill to
51
51
let store = DiskBuilder::new().open("some/path.db".into()).await?;
52
52
// do the spilling, get back a (similar) driver
53
-
let (_commit, mut driver) = paused.finish_loading(store).await?;
53
+
let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?;
54
54
55
-
while let Some(chunk) = driver.next_chunk(256).await? {
55
+
while let Step::Value(chunk) = driver.next_chunk(256).await? {
56
56
for output in chunk {
57
57
let size = usize::from_ne_bytes(output.data.try_into().unwrap());
58
58
···
86
86
pub use disk::{DiskBuilder, DiskError, DiskStore};
87
87
pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop};
88
88
pub use mst::Commit;
89
-
pub use walk::Output;
89
+
pub use walk::{Output, Step};
90
90
91
91
pub type Bytes = Vec<u8>;
92
+
93
+
pub type Rkey = String;
92
94
93
95
pub(crate) use hashbrown::HashMap;
94
96
+4
-3
src/mst.rs
+4
-3
src/mst.rs
···
3
3
//! The primary aim is to work through the **tree** structure. Non-node blocks
4
4
//! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever.
5
5
6
+
use crate::Rkey;
6
7
use cid::Cid;
7
8
use serde::Deserialize;
8
9
use sha2::{Digest, Sha256};
···
54
55
pub things: Vec<NodeThing>,
55
56
}
56
57
57
-
#[derive(Debug)]
58
+
#[derive(Debug, Clone)]
58
59
pub(crate) struct NodeThing {
59
60
pub(crate) cid: Cid,
60
61
pub(crate) kind: ThingKind,
61
62
}
62
63
63
-
#[derive(Debug)]
64
+
#[derive(Debug, Clone)]
64
65
pub(crate) enum ThingKind {
65
66
Tree,
66
-
Value { rkey: String },
67
+
Value { rkey: Rkey },
67
68
}
68
69
69
70
impl<'de> Deserialize<'de> for MstNode {
+60
-11
src/walk.rs
+60
-11
src/walk.rs
···
1
1
//! Depth-first MST traversal
2
2
3
3
use crate::mst::{Depth, MstNode, NodeThing, ThingKind};
4
-
use crate::{Bytes, HashMap, disk::DiskStore, drive::MaybeProcessedBlock};
4
+
use crate::{Bytes, HashMap, Rkey, noop, disk::DiskStore, drive::MaybeProcessedBlock};
5
5
use cid::Cid;
6
6
use std::convert::Infallible;
7
7
···
30
30
#[error("MST depth underflow: depth-0 node with child trees")]
31
31
DepthUnderflow,
32
32
#[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")]
33
-
RkeyOutOfOrder { prev: String, rkey: String },
33
+
RkeyOutOfOrder { prev: Rkey, rkey: Rkey },
34
34
}
35
35
36
36
/// Walker outputs
37
37
#[derive(Debug, PartialEq)]
38
38
pub struct Output {
39
-
pub rkey: String,
39
+
pub rkey: Rkey,
40
40
pub cid: Cid,
41
41
pub data: Bytes,
42
42
}
43
43
44
+
#[derive(Debug, PartialEq)]
45
+
pub enum Step<T = Output> {
46
+
Value(T),
47
+
End(Option<Rkey>),
48
+
}
49
+
44
50
/// Traverser of an atproto MST
45
51
///
46
52
/// Walks the tree from left-to-right in depth-first order
47
-
#[derive(Debug)]
53
+
#[derive(Debug, Clone)]
48
54
pub struct Walker {
49
-
prev_rkey: String,
55
+
prev_rkey: Rkey,
50
56
root_depth: Depth,
51
57
todo: Vec<Vec<NodeThing>>,
52
58
}
···
134
140
&mut self,
135
141
blocks: &mut HashMap<Cid, MaybeProcessedBlock>,
136
142
process: impl Fn(Bytes) -> Bytes,
137
-
) -> Result<Option<Output>, WalkError> {
143
+
) -> Result<Step, WalkError> {
138
144
while let Some(NodeThing { cid, kind }) = self.next_todo() {
139
145
let Some(mpb) = blocks.get(&cid) else {
140
146
return Err(WalkError::MissingBlock(cid));
141
147
};
142
148
if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? {
143
-
return Ok(Some(out));
149
+
return Ok(Step::Value(out));
150
+
}
151
+
}
152
+
Ok(Step::End(None))
153
+
}
154
+
155
+
pub fn step_to_slice_edge(
156
+
&mut self,
157
+
blocks: &mut HashMap<Cid, MaybeProcessedBlock>,
158
+
) -> Result<Option<Rkey>, WalkError> {
159
+
let mut ant = self.clone();
160
+
let mut ant_prev;
161
+
let mut rkey_prev = None;
162
+
163
+
loop {
164
+
ant_prev = ant.clone();
165
+
ant = ant.clone();
166
+
167
+
let Some(NodeThing { cid, kind }) = ant.next_todo() else {
168
+
return Ok(None);
169
+
};
170
+
171
+
let maybe_mpb = blocks.get(&cid);
172
+
173
+
match (&kind, maybe_mpb) {
174
+
(ThingKind::Value { rkey: _ }, Some(_)) => {
175
+
// oops we took a step too far
176
+
*self = ant_prev;
177
+
return Ok(rkey_prev);
178
+
}
179
+
(ThingKind::Value { rkey }, None) => {
180
+
if let Some(p) = rkey_prev && *rkey <= p {
181
+
return Err(WalkError::MstError(MstError::RkeyOutOfOrder {
182
+
rkey: rkey.clone(),
183
+
prev: p,
184
+
}));
185
+
}
186
+
rkey_prev = Some(rkey.clone());
187
+
}
188
+
(ThingKind::Tree, Some(mpb)) => {
189
+
ant.mpb_step(kind, cid, mpb, noop)?;
190
+
}
191
+
(ThingKind::Tree, None) => {
192
+
return Err(WalkError::MissingBlock(cid));
193
+
}
144
194
}
145
195
}
146
-
Ok(None)
147
196
}
148
197
149
198
/// blocking!!!!!!
···
151
200
&mut self,
152
201
blocks: &mut DiskStore,
153
202
process: impl Fn(Bytes) -> Bytes,
154
-
) -> Result<Option<Output>, WalkError> {
203
+
) -> Result<Step, WalkError> {
155
204
while let Some(NodeThing { cid, kind }) = self.next_todo() {
156
205
let Some(block_slice) = blocks.get(&cid.to_bytes())? else {
157
206
return Err(WalkError::MissingBlock(cid));
158
207
};
159
208
let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec());
160
209
if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? {
161
-
return Ok(Some(out));
210
+
return Ok(Step::Value(out));
162
211
}
163
212
}
164
-
Ok(None)
213
+
Ok(Step::End(None))
165
214
}
166
215
}
+3
-4
tests/non-huge-cars.rs
+3
-4
tests/non-huge-cars.rs
···
1
1
extern crate repo_stream;
2
-
use repo_stream::Driver;
3
-
use repo_stream::Output;
2
+
use repo_stream::{Driver, Output, Step};
4
3
5
4
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
6
5
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
···
21
20
.await
22
21
.unwrap()
23
22
{
24
-
Driver::Memory(_commit, mem_driver) => mem_driver,
23
+
Driver::Memory(_commit, _, mem_driver) => mem_driver,
25
24
Driver::Disk(_) => panic!("too big"),
26
25
};
27
26
···
30
29
let mut found_bsky_profile = false;
31
30
let mut prev_rkey = "".to_string();
32
31
33
-
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
32
+
while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() {
34
33
for Output { rkey, cid: _, data } in pairs {
35
34
records += 1;
36
35
History
1 round
0 comments
bad-example.com
submitted
#0
3 commits
expand
collapse
type alias for rkey finally
update api for car slice handling
...car slice handling not yet actually implemented
wonder if this works
no conflicts, ready to merge