Streaming Tree ARchive format
1use crate::error::{Result, StarError, VerificationKind};
2use crate::types::{StarCommit, StarItem, StarMstNode};
3use crate::validation::validate_node_structure;
4use cid::Cid;
5use sha2::{Digest, Sha256};
6
7#[derive(Debug, Default)]
8enum State {
9 #[default]
10 Header,
11 Body {
12 stack: Vec<StackItem>,
13 current_len: Option<usize>,
14 },
15 Done,
16}
17
18#[derive(Debug)]
19enum StackItem {
20 Node {
21 expected: Option<Cid>,
22 expected_height: Option<u32>,
23 },
24 Record {
25 key: Vec<u8>,
26 expected: Option<Cid>,
27 implicit_index: Option<usize>,
28 },
29 VerifyLayer0 {
30 node: StarMstNode,
31 parent_expected: Option<Cid>,
32 pending_records: Vec<(usize, Cid)>,
33 },
34}
35
36#[derive(Default)]
37pub struct StarParser {
38 state: State,
39}
40
41impl StarParser {
42 pub fn new() -> Self {
43 Self {
44 state: State::Header,
45 }
46 }
47
48 pub fn parse(&mut self, buf: &[u8]) -> Result<(usize, Option<StarItem>)> {
49 let mut consumed = 0;
50
51 loop {
52 let is_body_done = if let State::Body { stack, .. } = &self.state {
53 stack.is_empty()
54 } else {
55 false
56 };
57
58 if is_body_done {
59 self.state = State::Done;
60 return Ok((consumed, None));
61 }
62
63 let current_buf = &buf[consumed..];
64
65 match &mut self.state {
66 State::Done => return Ok((consumed, None)),
67 State::Header => {
68 let (n, item) = match self.parse_header(current_buf)? {
69 Some((n, item)) => (n, item),
70 None => return Ok((consumed, None)),
71 };
72 consumed += n;
73 return Ok((consumed, Some(item)));
74 }
75 State::Body { stack, current_len } => {
76 if Self::process_verification(stack)? {
77 continue;
78 }
79
80 let (len_consumed, len) = match Self::read_length(current_buf, current_len)? {
81 Some((n, len)) => (n, len),
82 None => return Ok((consumed, None)),
83 };
84
85 let body_buf = ¤t_buf[len_consumed..];
86 if body_buf.len() < len {
87 consumed += len_consumed;
88 return Ok((consumed, None));
89 }
90
91 let block_bytes = &body_buf[..len];
92 *current_len = None;
93
94 let item = stack.pop().unwrap();
95 let result_item = match item {
96 StackItem::Node {
97 expected,
98 expected_height,
99 } => Self::process_node(block_bytes, expected, expected_height, stack)?,
100 StackItem::Record {
101 key,
102 expected,
103 implicit_index,
104 } => {
105 Self::process_record(block_bytes, key, expected, implicit_index, stack)?
106 }
107 _ => return Err(StarError::InvalidState("Unexpected stack item".into())),
108 };
109
110 consumed += len_consumed + len;
111 return Ok((consumed, result_item));
112 }
113 }
114 }
115 }
116
117 fn parse_header(&mut self, buf: &[u8]) -> Result<Option<(usize, StarItem)>> {
118 if buf.is_empty() {
119 return Ok(None);
120 }
121 if buf[0] != 0x2A {
122 return Err(StarError::InvalidHeader);
123 }
124
125 let slice = &buf[1..];
126
127 let (ver, remaining1) = match unsigned_varint::decode::usize(slice) {
128 Ok(res) => res,
129 Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None),
130 Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))),
131 };
132
133 let (len, remaining2) = match unsigned_varint::decode::usize(remaining1) {
134 Ok(res) => res,
135 Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None),
136 Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))),
137 };
138
139 let header_varints_len = buf.len() - 1 - remaining2.len();
140 let total_header_len = 1 + header_varints_len;
141 let total_len = total_header_len + len;
142
143 if buf.len() < total_len {
144 return Ok(None);
145 }
146
147 let commit_bytes = &buf[total_header_len..total_len];
148 let commit: StarCommit = serde_ipld_dagcbor::from_slice(commit_bytes)
149 .map_err(|e| StarError::Cbor(e.to_string()))?;
150
151 let _ = ver;
152
153 let mut stack = Vec::new();
154 if let Some(root_cid) = commit.data {
155 stack.push(StackItem::Node {
156 expected: Some(root_cid),
157 expected_height: None, // Root
158 });
159 }
160
161 self.state = State::Body {
162 stack,
163 current_len: None,
164 };
165 Ok(Some((total_len, StarItem::Commit(commit))))
166 }
167
168 fn process_verification(stack: &mut Vec<StackItem>) -> Result<bool> {
169 if let Some(StackItem::VerifyLayer0 { .. }) = stack.last()
170 && let Some(StackItem::VerifyLayer0 {
171 mut node,
172 parent_expected,
173 pending_records,
174 ..
175 }) = stack.pop()
176 {
177 for (idx, cid) in pending_records {
178 if idx < node.e.len() {
179 node.e[idx].v = Some(cid);
180 }
181 }
182
183 let repo_node = node.to_repo()?;
184 let bytes = serde_ipld_dagcbor::to_vec(&repo_node)
185 .map_err(|e| StarError::Cbor(e.to_string()))?;
186
187 let hash = Sha256::digest(&bytes);
188 let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?);
189
190 if let Some(expected) = parent_expected
191 && cid != expected
192 {
193 return Err(StarError::VerificationFailed {
194 kind: VerificationKind::Node,
195 expected: Box::new(expected),
196 computed: Box::new(cid),
197 });
198 }
199 return Ok(true);
200 }
201 Ok(false)
202 }
203
204 fn read_length(buf: &[u8], current_len: &mut Option<usize>) -> Result<Option<(usize, usize)>> {
205 if let Some(len) = current_len {
206 return Ok(Some((0, *len)));
207 }
208
209 match unsigned_varint::decode::usize(buf) {
210 Ok((l, remaining)) => {
211 let consumed = buf.len() - remaining.len();
212 *current_len = Some(l);
213 Ok(Some((consumed, l)))
214 }
215 Err(unsigned_varint::decode::Error::Insufficient) => Ok(None),
216 Err(e) => Err(StarError::InvalidState(format!("Varint error: {}", e))),
217 }
218 }
219
220 fn process_node(
221 block_bytes: &[u8],
222 expected: Option<Cid>,
223 expected_height: Option<u32>,
224 stack: &mut Vec<StackItem>,
225 ) -> Result<Option<StarItem>> {
226 let node: StarMstNode = serde_ipld_dagcbor::from_slice(block_bytes)
227 .map_err(|e| StarError::Cbor(e.to_string()))?;
228
229 // Use shared validation logic
230 let (height, entry_keys) = validate_node_structure(&node, expected_height)?;
231
232 // Check for implicit records (needed for VerifyLayer0 logic)
233 let mut has_implicit = false;
234 if height == 0 {
235 for e in &node.e {
236 if e.v_archived == Some(true) {
237 has_implicit = true;
238 break;
239 }
240 }
241 }
242
243 if !has_implicit {
244 let repo_node = node.to_repo()?;
245 let bytes = serde_ipld_dagcbor::to_vec(&repo_node)
246 .map_err(|e| StarError::Cbor(e.to_string()))?;
247
248 let hash = Sha256::digest(&bytes);
249 let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?);
250 if let Some(exp) = expected
251 && cid != exp
252 {
253 return Err(StarError::VerificationFailed {
254 kind: VerificationKind::Node,
255 expected: Box::new(exp),
256 computed: Box::new(cid),
257 });
258 }
259 } else {
260 stack.push(StackItem::VerifyLayer0 {
261 node: node.clone(),
262 parent_expected: expected,
263 pending_records: Vec::new(),
264 });
265 }
266
267 // Push children in reverse
268 let child_expected_height = height.checked_sub(1);
269
270 if height > 0 {
271 let next_h = child_expected_height.unwrap();
272
273 for i in (0..node.e.len()).rev() {
274 let e = &node.e[i];
275 let key = entry_keys[i].clone();
276
277 if e.t_archived == Some(true) {
278 stack.push(StackItem::Node {
279 expected: e.t,
280 expected_height: Some(next_h),
281 });
282 }
283
284 if e.v_archived == Some(true) {
285 let implicit_index = if e.v.is_none() { Some(i) } else { None };
286 stack.push(StackItem::Record {
287 key,
288 expected: e.v,
289 implicit_index,
290 });
291 }
292 }
293
294 if node.l_archived == Some(true) {
295 stack.push(StackItem::Node {
296 expected: node.l,
297 expected_height: Some(next_h),
298 });
299 }
300 } else {
301 // Height 0: Push records only
302 for i in (0..node.e.len()).rev() {
303 let e = &node.e[i];
304 let key = entry_keys[i].clone();
305
306 if e.v_archived == Some(true) {
307 let implicit_index = if e.v.is_none() { Some(i) } else { None };
308 stack.push(StackItem::Record {
309 key,
310 expected: e.v,
311 implicit_index,
312 });
313 }
314 }
315 }
316
317 Ok(Some(StarItem::Node(node)))
318 }
319
320 fn process_record(
321 block_bytes: &[u8],
322 key: Vec<u8>,
323 expected: Option<Cid>,
324 implicit_index: Option<usize>,
325 stack: &mut [StackItem],
326 ) -> Result<Option<StarItem>> {
327 let hash = Sha256::digest(block_bytes);
328 let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?);
329
330 if let Some(exp) = expected
331 && cid != exp
332 {
333 return Err(StarError::VerificationFailed {
334 kind: VerificationKind::Record { key: key.clone() },
335 expected: Box::new(exp),
336 computed: Box::new(cid),
337 });
338 }
339
340 if let Some(idx) = implicit_index {
341 let mut found = false;
342 for item in stack.iter_mut().rev() {
343 if let StackItem::VerifyLayer0 {
344 pending_records, ..
345 } = item
346 {
347 pending_records.push((idx, cid));
348 found = true;
349 break;
350 }
351 }
352 if !found {
353 return Err(StarError::InvalidState(
354 "Implicit record verification context missing".into(),
355 ));
356 }
357 }
358
359 Ok(Some(StarItem::Record {
360 key,
361 cid,
362 content: Some(block_bytes.to_vec()),
363 }))
364 }
365}