tangled
alpha
login
or
join now
microcosm.blue
/
repo-stream
14
fork
atom
Fast and robust atproto CAR file processing in rust
14
fork
atom
overview
issues
pulls
1
pipelines
remove old code
bad-example.com
2 months ago
de3395b4
43c5105c
+62
-161
4 changed files
expand all
collapse all
unified
split
readme.md
src
drive.rs
mst.rs
walk.rs
+1
-1
readme.md
···
75
- 5.0MiB: `6.8ms`
76
- 279KiB: `170us`
77
- 3.4KiB: `5.2us`
78
-
- empty: `710ns`
79
80
it's a little faster with `mimalloc`
81
···
75
- 5.0MiB: `6.8ms`
76
- 279KiB: `170us`
77
- 3.4KiB: `5.2us`
78
+
- empty: `670ns`
79
80
it's a little faster with `mimalloc`
81
+2
-2
src/drive.rs
···
4
use crate::Bytes;
5
use crate::HashMap;
6
use crate::disk::{DiskError, DiskStore};
7
-
use crate::mst::{Node, MstNode};
8
use cid::Cid;
9
use iroh_car::CarReader;
10
use std::convert::Infallible;
···
65
66
impl MaybeProcessedBlock {
67
pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self {
68
-
if Node::could_be(&data) {
69
MaybeProcessedBlock::Raw(data)
70
} else {
71
MaybeProcessedBlock::Processed(process(data))
···
4
use crate::Bytes;
5
use crate::HashMap;
6
use crate::disk::{DiskError, DiskStore};
7
+
use crate::mst::MstNode;
8
use cid::Cid;
9
use iroh_car::CarReader;
10
use std::convert::Infallible;
···
65
66
impl MaybeProcessedBlock {
67
pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self {
68
+
if MstNode::could_be(&data) {
69
MaybeProcessedBlock::Raw(data)
70
} else {
71
MaybeProcessedBlock::Processed(process(data))
+47
-105
src/mst.rs
···
37
pub sig: serde_bytes::ByteBuf,
38
}
39
40
-
use serde::de::{self, Deserializer, Visitor, MapAccess, SeqAccess, Unexpected};
41
use std::fmt;
42
43
pub type Depth = u32;
···
66
Value { rkey: String },
67
}
68
69
-
pub(crate) struct Entries(Vec<NodeThing>, Option<Depth>);
70
-
71
-
impl<'de> Deserialize<'de> for Entries {
72
-
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
73
-
where
74
-
D: Deserializer<'de>,
75
-
{
76
-
struct EntriesVisitor;
77
-
impl<'de> Visitor<'de> for EntriesVisitor {
78
-
type Value = Entries;
79
-
80
-
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
81
-
formatter.write_str("seq MstEntries")
82
-
}
83
-
84
-
fn visit_seq<S>(self, mut seq: S) -> Result<Self::Value, S::Error>
85
-
where
86
-
S: SeqAccess<'de>,
87
-
{
88
-
let mut children: Vec<NodeThing> = Vec::with_capacity(seq.size_hint().unwrap_or(5));
89
-
let mut prefix: Vec<u8> = vec![];
90
-
let mut depth = None;
91
-
while let Some(entry) = seq.next_element::<Entry>()? {
92
-
let mut rkey: Vec<u8> = vec![];
93
-
let pre_checked = prefix
94
-
.get(..entry.prefix_len)
95
-
.ok_or_else(|| de::Error::invalid_value(
96
-
Unexpected::Bytes(&prefix),
97
-
&"a prefix at least as long as the prefix_len",
98
-
))?;
99
-
100
-
rkey.extend_from_slice(pre_checked);
101
-
rkey.extend_from_slice(&entry.keysuffix);
102
-
103
-
let rkey_s = String::from_utf8(rkey.clone())
104
-
.map_err(|_| de::Error::invalid_value(
105
-
Unexpected::Bytes(&rkey),
106
-
&"a valid utf-8 rkey",
107
-
))?;
108
-
109
-
let key_depth = atproto_mst_depth(&rkey_s);
110
-
if depth.is_none() {
111
-
depth = Some(key_depth);
112
-
} else if Some(key_depth) != depth {
113
-
return Err(de::Error::invalid_value(
114
-
Unexpected::Bytes(&prefix),
115
-
&"all rkeys to have equal MST depth",
116
-
));
117
-
}
118
-
119
-
children.push(NodeThing {
120
-
cid: entry.value,
121
-
kind: ThingKind::Value { rkey: rkey_s },
122
-
});
123
-
124
-
if let Some(cid) = entry.tree {
125
-
children.push(NodeThing {
126
-
cid,
127
-
kind: ThingKind::Tree,
128
-
});
129
-
}
130
-
131
-
prefix = rkey;
132
-
}
133
-
134
-
Ok(Entries(children, depth))
135
-
}
136
-
}
137
-
deserializer.deserialize_seq(EntriesVisitor)
138
-
}
139
-
}
140
-
141
impl<'de> Deserialize<'de> for MstNode {
142
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
143
where
···
158
let mut found_left = false;
159
let mut left = None;
160
let mut found_entries = false;
161
-
let mut things = Vec::with_capacity(4); // "fanout of 4" so does this make sense????
162
let mut depth = None;
163
164
while let Some(key) = map.next_key()? {
···
177
return Err(de::Error::duplicate_field("e"));
178
}
179
found_entries = true;
180
-
let Entries(mut child_entries, d) = map.next_value()?;
181
-
things.append(&mut child_entries);
182
-
depth = d;
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
183
},
184
f => return Err(de::Error::unknown_field(f, NODE_FIELDS))
185
}
···
209
pub(crate) fn is_empty(&self) -> bool {
210
self.things.is_empty()
211
}
212
-
}
213
-
214
-
/// MST node data schema
215
-
#[derive(Debug, Deserialize, PartialEq)]
216
-
#[serde(deny_unknown_fields)]
217
-
pub(crate) struct Node {
218
-
/// link to sub-tree Node on a lower level and with all keys sorting before
219
-
/// keys at this node
220
-
#[serde(rename = "l")]
221
-
pub left: Option<Cid>,
222
-
/// ordered list of TreeEntry objects
223
-
///
224
-
/// atproto MSTs have a fanout of 4, so there can be max 4 entries.
225
-
#[serde(rename = "e")]
226
-
pub entries: Vec<Entry>, // maybe we can do [Option<Entry>; 4]?
227
-
}
228
-
229
-
impl Node {
230
/// test if a block could possibly be a node
231
///
232
/// we can't eagerly decode records except where we're *sure* they cannot be
···
252
.map(|b| b & 0b1110_0000 == 0x80)
253
.unwrap_or(false)
254
}
255
-
256
-
// /// Check if a node has any entries
257
-
// ///
258
-
// /// An empty repository with no records is represented as a single MST node
259
-
// /// with an empty array of entries. This is the only situation in which a
260
-
// /// tree may contain an empty leaf node which does not either contain keys
261
-
// /// ("entries") or point to a sub-tree containing entries.
262
-
// pub(crate) fn is_empty(&self) -> bool {
263
-
// self.left.is_none() && self.entries.is_empty()
264
-
// }
265
}
266
267
/// TreeEntry object
···
37
pub sig: serde_bytes::ByteBuf,
38
}
39
40
+
use serde::de::{self, Deserializer, Visitor, MapAccess, Unexpected};
41
use std::fmt;
42
43
pub type Depth = u32;
···
66
Value { rkey: String },
67
}
68
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
69
impl<'de> Deserialize<'de> for MstNode {
70
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
71
where
···
86
let mut found_left = false;
87
let mut left = None;
88
let mut found_entries = false;
89
+
let mut things = Vec::new();
90
let mut depth = None;
91
92
while let Some(key) = map.next_key()? {
···
105
return Err(de::Error::duplicate_field("e"));
106
}
107
found_entries = true;
108
+
109
+
let mut prefix: Vec<u8> = vec![];
110
+
111
+
for entry in map.next_value::<Vec<Entry>>()? {
112
+
let mut rkey: Vec<u8> = vec![];
113
+
let pre_checked = prefix
114
+
.get(..entry.prefix_len)
115
+
.ok_or_else(|| de::Error::invalid_value(
116
+
Unexpected::Bytes(&prefix),
117
+
&"a prefix at least as long as the prefix_len",
118
+
))?;
119
+
120
+
rkey.extend_from_slice(pre_checked);
121
+
rkey.extend_from_slice(&entry.keysuffix);
122
+
123
+
let rkey_s = String::from_utf8(rkey.clone())
124
+
.map_err(|_| de::Error::invalid_value(
125
+
Unexpected::Bytes(&rkey),
126
+
&"a valid utf-8 rkey",
127
+
))?;
128
+
129
+
let key_depth = atproto_mst_depth(&rkey_s);
130
+
if depth.is_none() {
131
+
depth = Some(key_depth);
132
+
} else if Some(key_depth) != depth {
133
+
return Err(de::Error::invalid_value(
134
+
Unexpected::Bytes(&prefix),
135
+
&"all rkeys to have equal MST depth",
136
+
));
137
+
}
138
+
139
+
things.push(NodeThing {
140
+
cid: entry.value,
141
+
kind: ThingKind::Value { rkey: rkey_s },
142
+
});
143
+
144
+
if let Some(cid) = entry.tree {
145
+
things.push(NodeThing {
146
+
cid,
147
+
kind: ThingKind::Tree,
148
+
});
149
+
}
150
+
151
+
prefix = rkey;
152
+
}
153
},
154
f => return Err(de::Error::unknown_field(f, NODE_FIELDS))
155
}
···
179
pub(crate) fn is_empty(&self) -> bool {
180
self.things.is_empty()
181
}
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
182
/// test if a block could possibly be a node
183
///
184
/// we can't eagerly decode records except where we're *sure* they cannot be
···
204
.map(|b| b & 0b1110_0000 == 0x80)
205
.unwrap_or(false)
206
}
0
0
0
0
0
0
0
0
0
0
207
}
208
209
/// TreeEntry object
+12
-53
src/walk.rs
···
70
})
71
}
72
73
-
fn next_todo(&mut self) -> Option<NodeThing> {
74
-
while let Some(last) = self.todo.last_mut() {
75
-
let Some(thing) = last.pop() else {
76
-
self.todo.pop();
77
-
continue;
78
-
};
79
-
return Some(thing);
80
-
}
81
-
None
82
-
}
83
-
84
fn mpb_step(
85
&mut self,
86
kind: ThingKind,
···
140
}
141
}
142
0
0
0
0
0
0
0
0
0
0
0
0
143
/// Advance through nodes until we find a record or can't go further
144
pub fn step(
145
&mut self,
146
blocks: &mut HashMap<Cid, MaybeProcessedBlock>,
147
process: impl Fn(Bytes) -> Bytes,
148
) -> Result<Option<Output>, WalkError> {
149
-
150
while let Some(NodeThing { cid, kind }) = self.next_todo() {
151
let Some(mpb) = blocks.get(&cid) else {
152
return Err(WalkError::MissingBlock(cid));
153
};
154
-
155
if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? {
156
return Ok(Some(out));
157
}
···
177
Ok(None)
178
}
179
}
180
-
181
-
#[cfg(test)]
182
-
mod test {
183
-
use super::*;
184
-
185
-
// fn cid1() -> Cid {
186
-
// "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
187
-
// .parse()
188
-
// .unwrap()
189
-
// }
190
-
191
-
// #[test]
192
-
// fn test_push_empty_fails() {
193
-
// let empty_node = Node {
194
-
// left: None,
195
-
// entries: vec![],
196
-
// };
197
-
// let mut stack = vec![];
198
-
// let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4));
199
-
// assert_eq!(err, Err(MstError::EmptyNode));
200
-
// }
201
-
202
-
// #[test]
203
-
// fn test_push_one_node() {
204
-
// let node = Node {
205
-
// left: Some(cid1()),
206
-
// entries: vec![],
207
-
// };
208
-
// let mut stack = vec![];
209
-
// push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap();
210
-
// assert_eq!(
211
-
// stack.last(),
212
-
// Some(Need::Node {
213
-
// depth: Depth::Depth(3),
214
-
// cid: cid1()
215
-
// })
216
-
// .as_ref()
217
-
// );
218
-
// }
219
-
}
···
70
})
71
}
72
0
0
0
0
0
0
0
0
0
0
0
73
fn mpb_step(
74
&mut self,
75
kind: ThingKind,
···
129
}
130
}
131
132
+
#[inline(always)]
133
+
fn next_todo(&mut self) -> Option<NodeThing> {
134
+
while let Some(last) = self.todo.last_mut() {
135
+
let Some(thing) = last.pop() else {
136
+
self.todo.pop();
137
+
continue;
138
+
};
139
+
return Some(thing);
140
+
}
141
+
None
142
+
}
143
+
144
/// Advance through nodes until we find a record or can't go further
145
pub fn step(
146
&mut self,
147
blocks: &mut HashMap<Cid, MaybeProcessedBlock>,
148
process: impl Fn(Bytes) -> Bytes,
149
) -> Result<Option<Output>, WalkError> {
0
150
while let Some(NodeThing { cid, kind }) = self.next_todo() {
151
let Some(mpb) = blocks.get(&cid) else {
152
return Err(WalkError::MissingBlock(cid));
153
};
0
154
if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? {
155
return Ok(Some(out));
156
}
···
176
Ok(None)
177
}
178
}
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0