Streaming Tree ARchive format

height verification

+191 -75
+191 -75
src/parser.rs
··· 6 6 #[derive(Debug)] 7 7 enum State { 8 8 Header, 9 - Body { 9 + Body { 10 10 stack: Vec<StackItem>, 11 11 current_len: Option<usize>, 12 12 }, ··· 17 17 enum StackItem { 18 18 Node { 19 19 expected: Option<Cid>, 20 + expected_height: Option<u32>, // The node at this position must have height exactly == this value (if set) 20 21 }, 21 22 Record { 22 23 key: Vec<u8>, ··· 27 28 node: StarMstNode, 28 29 parent_expected: Option<Cid>, 29 30 pending_records: Vec<(usize, Cid)>, 31 + height: u32, 30 32 }, 31 33 } 32 34 ··· 41 43 } 42 44 } 43 45 44 - /// Parses the input buffer. 45 - /// Returns (bytes_consumed, Option<Item>). 46 46 pub fn parse(&mut self, buf: &[u8]) -> Result<(usize, Option<StarItem>)> { 47 47 let mut consumed = 0; 48 - 49 - // Loop allows state transitions (e.g. Header -> Body) without returning 50 - // but we must be careful to track consumed bytes correctly. 48 + 51 49 loop { 52 - // Check if we need to transition from Body to Done 53 50 let is_body_done = if let State::Body { stack, .. } = &self.state { 54 51 stack.is_empty() 55 52 } else { ··· 61 58 return Ok((consumed, None)); 62 59 } 63 60 64 - // Slice the buffer to the remaining part 65 61 let current_buf = &buf[consumed..]; 66 62 67 63 match &mut self.state { ··· 76 72 } 77 73 State::Body { stack, current_len } => { 78 74 if Self::process_verification(stack)? { 79 - // Verification doesn't consume bytes, but changes state/stack. 80 - // We continue the loop to try reading the next item immediately. 81 75 continue; 82 76 } 83 77 84 - // Try to read length 85 78 let (len_consumed, len) = match Self::read_length(current_buf, current_len)? { 86 79 Some((n, len)) => (n, len), 87 80 None => return Ok((consumed, None)), 88 81 }; 89 - 90 - // Note: read_length advances internal state (current_len) but also returns 91 - // how many bytes of the varint were consumed from current_buf. 92 - // If we have the full length, we now check if we have the body. 93 - 82 + 94 83 let body_buf = &current_buf[len_consumed..]; 95 84 if body_buf.len() < len { 96 - // We read the length varint, but don't have enough bytes for the body. 97 - // We must report the varint itself as consumed so the caller advances, 98 - // and we have stored the in via mutation. 99 85 consumed += len_consumed; 100 86 return Ok((consumed, None)); 101 87 } 102 88 103 - // We have the body. 104 89 let block_bytes = &body_buf[..len]; 105 - 106 - // Reset current_len since we are consuming the block 107 90 *current_len = None; 108 - 91 + 109 92 let item = stack.pop().unwrap(); 110 93 let result_item = match item { 111 - StackItem::Node { expected } => { 112 - Self::process_node(block_bytes, expected, stack)? 113 - }, 114 - StackItem::Record { key, expected, implicit_index } => { 94 + StackItem::Node { 95 + expected, 96 + expected_height, 97 + } => Self::process_node(block_bytes, expected, expected_height, stack)?, 98 + StackItem::Record { 99 + key, 100 + expected, 101 + implicit_index, 102 + } => { 115 103 Self::process_record(block_bytes, key, expected, implicit_index, stack)? 116 - }, 104 + } 117 105 _ => return Err(StarError::InvalidState("Unexpected stack item".into())), 118 106 }; 119 107 ··· 124 112 } 125 113 } 126 114 127 - // Returns Option<(bytes_consumed, Item)> 115 + fn calculate_height(key: &[u8]) -> u32 { 116 + let digest = Sha256::digest(key); 117 + let mut zeros = 0; 118 + for &byte in digest.iter() { 119 + if byte == 0 { 120 + zeros += 8; 121 + } else { 122 + zeros += byte.leading_zeros(); 123 + break; 124 + } 125 + } 126 + zeros / 2 127 + } 128 + 128 129 fn parse_header(&mut self, buf: &[u8]) -> Result<Option<(usize, StarItem)>> { 129 130 if buf.len() < 1 { 130 131 return Ok(None); ··· 134 135 } 135 136 136 137 let slice = &buf[1..]; 137 - 138 + 138 139 let (ver, remaining1) = match unsigned_varint::decode::usize(slice) { 139 140 Ok(res) => res, 140 141 Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None), 141 142 Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))), 142 143 }; 143 - 144 + 144 145 let (len, remaining2) = match unsigned_varint::decode::usize(remaining1) { 145 146 Ok(res) => res, 146 147 Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None), 147 148 Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))), 148 149 }; 149 150 150 - let header_varints_len = buf.len() - 1 - remaining2.len(); 151 - let total_header_len = 1 + header_varints_len; 151 + let header_varints_len = buf.len() - 1 - remaining2.len(); 152 + let total_header_len = 1 + header_varints_len; 152 153 let total_len = total_header_len + len; 153 154 154 155 if buf.len() < total_len { 155 156 return Ok(None); 156 157 } 157 158 158 - // We have the full commit 159 159 let commit_bytes = &buf[total_header_len..total_len]; 160 160 let commit: StarCommit = serde_ipld_dagcbor::from_slice(commit_bytes) 161 161 .map_err(|e| StarError::Cbor(e.to_string()))?; 162 - 163 - let _ = ver; 162 + 163 + let _ = ver; 164 164 165 165 let mut stack = Vec::new(); 166 166 if let Some(root_cid) = commit.data { 167 167 stack.push(StackItem::Node { 168 168 expected: Some(root_cid), 169 + expected_height: None, // Root 169 170 }); 170 171 } 171 172 172 - self.state = State::Body { 173 + self.state = State::Body { 173 174 stack, 174 - current_len: None 175 + current_len: None, 175 176 }; 176 - 177 177 Ok(Some((total_len, StarItem::Commit(commit)))) 178 178 } 179 179 ··· 183 183 mut node, 184 184 parent_expected, 185 185 pending_records, 186 + .. 186 187 }) = stack.pop() 187 188 { 188 189 for (idx, cid) in pending_records { ··· 212 213 Ok(false) 213 214 } 214 215 215 - // Returns Option<(bytes_consumed, length_value)> 216 - // If successful, updates current_len to the read length value 217 216 fn read_length(buf: &[u8], current_len: &mut Option<usize>) -> Result<Option<(usize, usize)>> { 218 - // If we already have a length (from a previous partial read), we consumed 0 bytes *now* to get it 219 217 if let Some(len) = current_len { 220 218 return Ok(Some((0, *len))); 221 219 } ··· 225 223 let consumed = buf.len() - remaining.len(); 226 224 *current_len = Some(l); 227 225 Ok(Some((consumed, l))) 228 - }, 226 + } 229 227 Err(unsigned_varint::decode::Error::Insufficient) => Ok(None), 230 228 Err(e) => Err(StarError::InvalidState(format!("Varint error: {}", e))), 231 229 } 232 230 } 233 231 234 - fn process_node(block_bytes: &[u8], expected: Option<Cid>, stack: &mut Vec<StackItem>) -> Result<Option<StarItem>> { 232 + fn process_node( 233 + block_bytes: &[u8], 234 + expected: Option<Cid>, 235 + expected_height: Option<u32>, 236 + stack: &mut Vec<StackItem>, 237 + ) -> Result<Option<StarItem>> { 235 238 let node: StarMstNode = serde_ipld_dagcbor::from_slice(block_bytes) 236 239 .map_err(|e| StarError::Cbor(e.to_string()))?; 237 240 241 + // We run the loop to reconstruct keys and validate their internal consistency first. 242 + let mut prev_key_bytes = Vec::new(); 243 + let mut entry_keys = Vec::new(); 244 + let mut node_height = None; 245 + 246 + for e in &node.e { 247 + let mut key = if e.p as usize <= prev_key_bytes.len() { 248 + prev_key_bytes[..e.p as usize].to_vec() 249 + } else { 250 + // If prefix len > prev key len, invalid compression 251 + // Although spec says "shared prefix bytes", usually <= prev.len() 252 + prev_key_bytes.clone() 253 + }; 254 + key.extend_from_slice(&e.k); 255 + 256 + let h = Self::calculate_height(&key); 257 + 258 + if let Some(existing_h) = node_height { 259 + if h != existing_h { 260 + return Err(StarError::InvalidState(format!( 261 + "Inconsistent key height in node: {} vs {}", 262 + h, existing_h 263 + ))); 264 + } 265 + } else { 266 + node_height = Some(h); 267 + } 268 + 269 + entry_keys.push(key.clone()); 270 + prev_key_bytes = key; 271 + } 272 + 273 + // 2. Validate Height against Expectation 274 + let height = match (node_height, expected_height) { 275 + (Some(h), Some(exp)) => { 276 + if h != exp { 277 + return Err(StarError::InvalidState(format!( 278 + "Invalid MST height: found {}, expected {}", 279 + h, exp 280 + ))); 281 + } 282 + h 283 + } 284 + (Some(h), None) => { 285 + // Root node with keys. 286 + h 287 + } 288 + (None, Some(exp)) => { 289 + // Empty node (intermediate). Must match expectation. 290 + // Constraint: Height 0 cannot be empty. 291 + if exp == 0 { 292 + return Err(StarError::InvalidState( 293 + "MST nodes at depth=0 cannot be empty".into(), 294 + )); 295 + } 296 + exp 297 + } 298 + (None, None) => { 299 + // Root node empty. 300 + return Err(StarError::InvalidState( 301 + "Root node must contain entries".into(), 302 + )); 303 + } 304 + }; 305 + 306 + // 3. Validate Structure based on Height 307 + if height == 0 { 308 + // Must have no children 309 + if node.l.is_some() || node.l_archived.is_some() { 310 + return Err(StarError::InvalidState( 311 + "Height 0 node cannot have left child".into(), 312 + )); 313 + } 314 + for e in &node.e { 315 + if e.t.is_some() || e.t_archived.is_some() { 316 + return Err(StarError::InvalidState( 317 + "Height 0 entries cannot have subtrees".into(), 318 + )); 319 + } 320 + } 321 + } else { 322 + // Height > 0 323 + if node.e.is_empty() && node.l.is_none() { 324 + return Err(StarError::InvalidState( 325 + "Empty intermediate node must have left child".into(), 326 + )); 327 + } 328 + } 329 + 330 + // Check for implicit records 238 331 let mut has_implicit = false; 239 332 for e in &node.e { 240 333 if e.v_archived == Some(true) && e.v.is_none() { ··· 249 342 .map_err(|e| StarError::Cbor(e.to_string()))?; 250 343 251 344 let hash = Sha256::digest(&bytes); 252 - let cid = Cid::new_v1( 253 - 0x71, 254 - cid::multihash::Multihash::wrap(0x12, &hash)?, 255 - ); 345 + let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); 256 346 if let Some(exp) = expected { 257 347 if cid != exp { 258 348 return Err(StarError::VerificationFailed { ··· 266 356 node: node.clone(), 267 357 parent_expected: expected, 268 358 pending_records: Vec::new(), 359 + height, 269 360 }); 270 361 } 271 362 272 - let mut prev_key_bytes = Vec::new(); 273 - let mut entry_keys = Vec::new(); 274 - for e in &node.e { 275 - let mut key = if e.p as usize <= prev_key_bytes.len() { 276 - prev_key_bytes[..e.p as usize].to_vec() 277 - } else { 278 - prev_key_bytes.clone() 279 - }; 280 - key.extend_from_slice(&e.k); 281 - entry_keys.push(key.clone()); 282 - prev_key_bytes = key; 283 - } 363 + // Push children in reverse 364 + // Expected height for children is strictly height - 1 365 + let child_expected_height = height.checked_sub(1); 366 + 367 + // If height is 0, we already verified no children exist, so child_expected_height is None/invalid but unused. 368 + // Actually height - 1 would underflow if height 0. But logic ensures no children pushed if height 0. 369 + 370 + if height > 0 { 371 + let next_h = child_expected_height.unwrap(); 372 + 373 + for i in (0..node.e.len()).rev() { 374 + let e = &node.e[i]; 375 + let key = entry_keys[i].clone(); 284 376 285 - for i in (0..node.e.len()).rev() { 286 - let e = &node.e[i]; 287 - let key = entry_keys[i].clone(); 377 + if e.t_archived == Some(true) { 378 + stack.push(StackItem::Node { 379 + expected: e.t, 380 + expected_height: Some(next_h), 381 + }); 382 + } 288 383 289 - if e.t_archived == Some(true) { 290 - stack.push(StackItem::Node { expected: e.t }); 384 + if e.v_archived == Some(true) { 385 + let implicit_index = if e.v.is_none() { Some(i) } else { None }; 386 + stack.push(StackItem::Record { 387 + key, 388 + expected: e.v, 389 + implicit_index, 390 + }); 391 + } 291 392 } 292 393 293 - if e.v_archived == Some(true) { 294 - let implicit_index = if e.v.is_none() { Some(i) } else { None }; 295 - stack.push(StackItem::Record { 296 - key, 297 - expected: e.v, 298 - implicit_index, 394 + if node.l_archived == Some(true) { 395 + stack.push(StackItem::Node { 396 + expected: node.l, 397 + expected_height: Some(next_h), 299 398 }); 300 399 } 301 - } 400 + } else { 401 + // Height 0: Push records only 402 + for i in (0..node.e.len()).rev() { 403 + let e = &node.e[i]; 404 + let key = entry_keys[i].clone(); 302 405 303 - if node.l_archived == Some(true) { 304 - stack.push(StackItem::Node { expected: node.l }); 406 + if e.v_archived == Some(true) { 407 + let implicit_index = if e.v.is_none() { Some(i) } else { None }; 408 + stack.push(StackItem::Record { 409 + key, 410 + expected: e.v, 411 + implicit_index, 412 + }); 413 + } 414 + } 305 415 } 306 416 307 417 Ok(Some(StarItem::Node(node))) 308 418 } 309 419 310 - fn process_record(block_bytes: &[u8], key: Vec<u8>, expected: Option<Cid>, implicit_index: Option<usize>, stack: &mut Vec<StackItem>) -> Result<Option<StarItem>> { 420 + fn process_record( 421 + block_bytes: &[u8], 422 + key: Vec<u8>, 423 + expected: Option<Cid>, 424 + implicit_index: Option<usize>, 425 + stack: &mut Vec<StackItem>, 426 + ) -> Result<Option<StarItem>> { 311 427 let hash = Sha256::digest(block_bytes); 312 428 let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); 313 429