Streaming Tree ARchive format
at rust-impl 365 lines 12 kB view raw
1use crate::error::{Result, StarError, VerificationKind}; 2use crate::types::{StarCommit, StarItem, StarMstNode}; 3use crate::validation::validate_node_structure; 4use cid::Cid; 5use sha2::{Digest, Sha256}; 6 7#[derive(Debug, Default)] 8enum State { 9 #[default] 10 Header, 11 Body { 12 stack: Vec<StackItem>, 13 current_len: Option<usize>, 14 }, 15 Done, 16} 17 18#[derive(Debug)] 19enum StackItem { 20 Node { 21 expected: Option<Cid>, 22 expected_height: Option<u32>, 23 }, 24 Record { 25 key: Vec<u8>, 26 expected: Option<Cid>, 27 implicit_index: Option<usize>, 28 }, 29 VerifyLayer0 { 30 node: StarMstNode, 31 parent_expected: Option<Cid>, 32 pending_records: Vec<(usize, Cid)>, 33 }, 34} 35 36#[derive(Default)] 37pub struct StarParser { 38 state: State, 39} 40 41impl StarParser { 42 pub fn new() -> Self { 43 Self { 44 state: State::Header, 45 } 46 } 47 48 pub fn parse(&mut self, buf: &[u8]) -> Result<(usize, Option<StarItem>)> { 49 let mut consumed = 0; 50 51 loop { 52 let is_body_done = if let State::Body { stack, .. } = &self.state { 53 stack.is_empty() 54 } else { 55 false 56 }; 57 58 if is_body_done { 59 self.state = State::Done; 60 return Ok((consumed, None)); 61 } 62 63 let current_buf = &buf[consumed..]; 64 65 match &mut self.state { 66 State::Done => return Ok((consumed, None)), 67 State::Header => { 68 let (n, item) = match self.parse_header(current_buf)? { 69 Some((n, item)) => (n, item), 70 None => return Ok((consumed, None)), 71 }; 72 consumed += n; 73 return Ok((consumed, Some(item))); 74 } 75 State::Body { stack, current_len } => { 76 if Self::process_verification(stack)? { 77 continue; 78 } 79 80 let (len_consumed, len) = match Self::read_length(current_buf, current_len)? { 81 Some((n, len)) => (n, len), 82 None => return Ok((consumed, None)), 83 }; 84 85 let body_buf = &current_buf[len_consumed..]; 86 if body_buf.len() < len { 87 consumed += len_consumed; 88 return Ok((consumed, None)); 89 } 90 91 let block_bytes = &body_buf[..len]; 92 *current_len = None; 93 94 let item = stack.pop().unwrap(); 95 let result_item = match item { 96 StackItem::Node { 97 expected, 98 expected_height, 99 } => Self::process_node(block_bytes, expected, expected_height, stack)?, 100 StackItem::Record { 101 key, 102 expected, 103 implicit_index, 104 } => { 105 Self::process_record(block_bytes, key, expected, implicit_index, stack)? 106 } 107 _ => return Err(StarError::InvalidState("Unexpected stack item".into())), 108 }; 109 110 consumed += len_consumed + len; 111 return Ok((consumed, result_item)); 112 } 113 } 114 } 115 } 116 117 fn parse_header(&mut self, buf: &[u8]) -> Result<Option<(usize, StarItem)>> { 118 if buf.is_empty() { 119 return Ok(None); 120 } 121 if buf[0] != 0x2A { 122 return Err(StarError::InvalidHeader); 123 } 124 125 let slice = &buf[1..]; 126 127 let (ver, remaining1) = match unsigned_varint::decode::usize(slice) { 128 Ok(res) => res, 129 Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None), 130 Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))), 131 }; 132 133 let (len, remaining2) = match unsigned_varint::decode::usize(remaining1) { 134 Ok(res) => res, 135 Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None), 136 Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))), 137 }; 138 139 let header_varints_len = buf.len() - 1 - remaining2.len(); 140 let total_header_len = 1 + header_varints_len; 141 let total_len = total_header_len + len; 142 143 if buf.len() < total_len { 144 return Ok(None); 145 } 146 147 let commit_bytes = &buf[total_header_len..total_len]; 148 let commit: StarCommit = serde_ipld_dagcbor::from_slice(commit_bytes) 149 .map_err(|e| StarError::Cbor(e.to_string()))?; 150 151 let _ = ver; 152 153 let mut stack = Vec::new(); 154 if let Some(root_cid) = commit.data { 155 stack.push(StackItem::Node { 156 expected: Some(root_cid), 157 expected_height: None, // Root 158 }); 159 } 160 161 self.state = State::Body { 162 stack, 163 current_len: None, 164 }; 165 Ok(Some((total_len, StarItem::Commit(commit)))) 166 } 167 168 fn process_verification(stack: &mut Vec<StackItem>) -> Result<bool> { 169 if let Some(StackItem::VerifyLayer0 { .. }) = stack.last() 170 && let Some(StackItem::VerifyLayer0 { 171 mut node, 172 parent_expected, 173 pending_records, 174 .. 175 }) = stack.pop() 176 { 177 for (idx, cid) in pending_records { 178 if idx < node.e.len() { 179 node.e[idx].v = Some(cid); 180 } 181 } 182 183 let repo_node = node.to_repo()?; 184 let bytes = serde_ipld_dagcbor::to_vec(&repo_node) 185 .map_err(|e| StarError::Cbor(e.to_string()))?; 186 187 let hash = Sha256::digest(&bytes); 188 let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); 189 190 if let Some(expected) = parent_expected 191 && cid != expected 192 { 193 return Err(StarError::VerificationFailed { 194 kind: VerificationKind::Node, 195 expected: Box::new(expected), 196 computed: Box::new(cid), 197 }); 198 } 199 return Ok(true); 200 } 201 Ok(false) 202 } 203 204 fn read_length(buf: &[u8], current_len: &mut Option<usize>) -> Result<Option<(usize, usize)>> { 205 if let Some(len) = current_len { 206 return Ok(Some((0, *len))); 207 } 208 209 match unsigned_varint::decode::usize(buf) { 210 Ok((l, remaining)) => { 211 let consumed = buf.len() - remaining.len(); 212 *current_len = Some(l); 213 Ok(Some((consumed, l))) 214 } 215 Err(unsigned_varint::decode::Error::Insufficient) => Ok(None), 216 Err(e) => Err(StarError::InvalidState(format!("Varint error: {}", e))), 217 } 218 } 219 220 fn process_node( 221 block_bytes: &[u8], 222 expected: Option<Cid>, 223 expected_height: Option<u32>, 224 stack: &mut Vec<StackItem>, 225 ) -> Result<Option<StarItem>> { 226 let node: StarMstNode = serde_ipld_dagcbor::from_slice(block_bytes) 227 .map_err(|e| StarError::Cbor(e.to_string()))?; 228 229 // Use shared validation logic 230 let (height, entry_keys) = validate_node_structure(&node, expected_height)?; 231 232 // Check for implicit records (needed for VerifyLayer0 logic) 233 let mut has_implicit = false; 234 if height == 0 { 235 for e in &node.e { 236 if e.v_archived == Some(true) { 237 has_implicit = true; 238 break; 239 } 240 } 241 } 242 243 if !has_implicit { 244 let repo_node = node.to_repo()?; 245 let bytes = serde_ipld_dagcbor::to_vec(&repo_node) 246 .map_err(|e| StarError::Cbor(e.to_string()))?; 247 248 let hash = Sha256::digest(&bytes); 249 let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); 250 if let Some(exp) = expected 251 && cid != exp 252 { 253 return Err(StarError::VerificationFailed { 254 kind: VerificationKind::Node, 255 expected: Box::new(exp), 256 computed: Box::new(cid), 257 }); 258 } 259 } else { 260 stack.push(StackItem::VerifyLayer0 { 261 node: node.clone(), 262 parent_expected: expected, 263 pending_records: Vec::new(), 264 }); 265 } 266 267 // Push children in reverse 268 let child_expected_height = height.checked_sub(1); 269 270 if height > 0 { 271 let next_h = child_expected_height.unwrap(); 272 273 for i in (0..node.e.len()).rev() { 274 let e = &node.e[i]; 275 let key = entry_keys[i].clone(); 276 277 if e.t_archived == Some(true) { 278 stack.push(StackItem::Node { 279 expected: e.t, 280 expected_height: Some(next_h), 281 }); 282 } 283 284 if e.v_archived == Some(true) { 285 let implicit_index = if e.v.is_none() { Some(i) } else { None }; 286 stack.push(StackItem::Record { 287 key, 288 expected: e.v, 289 implicit_index, 290 }); 291 } 292 } 293 294 if node.l_archived == Some(true) { 295 stack.push(StackItem::Node { 296 expected: node.l, 297 expected_height: Some(next_h), 298 }); 299 } 300 } else { 301 // Height 0: Push records only 302 for i in (0..node.e.len()).rev() { 303 let e = &node.e[i]; 304 let key = entry_keys[i].clone(); 305 306 if e.v_archived == Some(true) { 307 let implicit_index = if e.v.is_none() { Some(i) } else { None }; 308 stack.push(StackItem::Record { 309 key, 310 expected: e.v, 311 implicit_index, 312 }); 313 } 314 } 315 } 316 317 Ok(Some(StarItem::Node(node))) 318 } 319 320 fn process_record( 321 block_bytes: &[u8], 322 key: Vec<u8>, 323 expected: Option<Cid>, 324 implicit_index: Option<usize>, 325 stack: &mut [StackItem], 326 ) -> Result<Option<StarItem>> { 327 let hash = Sha256::digest(block_bytes); 328 let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); 329 330 if let Some(exp) = expected 331 && cid != exp 332 { 333 return Err(StarError::VerificationFailed { 334 kind: VerificationKind::Record { key: key.clone() }, 335 expected: Box::new(exp), 336 computed: Box::new(cid), 337 }); 338 } 339 340 if let Some(idx) = implicit_index { 341 let mut found = false; 342 for item in stack.iter_mut().rev() { 343 if let StackItem::VerifyLayer0 { 344 pending_records, .. 345 } = item 346 { 347 pending_records.push((idx, cid)); 348 found = true; 349 break; 350 } 351 } 352 if !found { 353 return Err(StarError::InvalidState( 354 "Implicit record verification context missing".into(), 355 )); 356 } 357 } 358 359 Ok(Some(StarItem::Record { 360 key, 361 cid, 362 content: Some(block_bytes.to_vec()), 363 })) 364 } 365}