Streaming Tree ARchive format
at rust-impl 228 lines 7.0 kB view raw
1use crate::error::Result; 2use crate::error::StarError; 3use crate::types::{StarCommit, StarItem, StarMstNode}; 4use crate::validation::validate_node_structure; 5use std::io::Write; 6 7pub struct StarEncoder; 8 9impl StarEncoder { 10 fn write_varint<W: Write>(val: usize, dst: &mut W) -> std::io::Result<()> { 11 let mut buf = unsigned_varint::encode::usize_buffer(); 12 let encoded = unsigned_varint::encode::usize(val, &mut buf); 13 dst.write_all(encoded) 14 } 15 16 pub fn write_header<W: Write>(commit: &StarCommit, dst: &mut W) -> Result<()> { 17 dst.write_all(&[0x2A])?; 18 19 Self::write_varint(1, dst)?; 20 21 let commit_bytes = serde_ipld_dagcbor::to_vec(commit) 22 .map_err(|e| crate::error::StarError::Cbor(e.to_string()))?; 23 24 Self::write_varint(commit_bytes.len(), dst)?; 25 dst.write_all(&commit_bytes)?; 26 27 Ok(()) 28 } 29 30 pub fn write_node<W: Write>(node: &StarMstNode, dst: &mut W) -> Result<()> { 31 let node_bytes = serde_ipld_dagcbor::to_vec(node) 32 .map_err(|e| crate::error::StarError::Cbor(e.to_string()))?; 33 34 Self::write_varint(node_bytes.len(), dst)?; 35 dst.write_all(&node_bytes)?; 36 37 Ok(()) 38 } 39 40 pub fn write_record<W: Write>(record_bytes: &[u8], dst: &mut W) -> Result<()> { 41 Self::write_varint(record_bytes.len(), dst)?; 42 dst.write_all(record_bytes)?; 43 44 Ok(()) 45 } 46} 47 48#[cfg(feature = "async")] 49impl tokio_util::codec::Encoder<StarItem> for StarEncoder { 50 type Error = crate::error::StarError; 51 52 fn encode(&mut self, item: StarItem, dst: &mut bytes::BytesMut) -> Result<()> { 53 use bytes::BufMut; 54 // BytesMut::writer() returns an impl Write 55 let mut writer = dst.writer(); 56 57 match item { 58 StarItem::Commit(c) => Self::write_header(&c, &mut writer), 59 StarItem::Node(n) => Self::write_node(&n, &mut writer), 60 StarItem::Record { content, .. } => { 61 if let Some(bytes) = content { 62 Self::write_record(&bytes, &mut writer) 63 } else { 64 Err(crate::error::StarError::InvalidState( 65 "Cannot serialize record without content".into(), 66 )) 67 } 68 } 69 } 70 } 71} 72 73/// A serializer that enforces strict STAR format compliance. 74pub struct StarSerializer<W> { 75 writer: W, 76 validator: StarValidator, 77} 78 79impl<W: Write> StarSerializer<W> { 80 pub fn new(writer: W) -> Self { 81 Self { 82 writer, 83 validator: StarValidator::new(), 84 } 85 } 86 87 pub fn write_header(&mut self, commit: &StarCommit) -> Result<()> { 88 self.validator.accept_header(commit)?; 89 StarEncoder::write_header(commit, &mut self.writer) 90 } 91 92 pub fn write_node(&mut self, node: &StarMstNode) -> Result<()> { 93 self.validator.accept_node(node)?; 94 StarEncoder::write_node(node, &mut self.writer) 95 } 96 97 pub fn write_record(&mut self, record_bytes: &[u8]) -> Result<()> { 98 self.validator.accept_record(record_bytes)?; 99 StarEncoder::write_record(record_bytes, &mut self.writer) 100 } 101 102 pub fn finish(self) -> Result<W> { 103 if !self.validator.is_done() { 104 return Err(crate::error::StarError::InvalidState( 105 "Incomplete tree".into(), 106 )); 107 } 108 Ok(self.writer) 109 } 110} 111 112// Validator State Machine (Moved from validation.rs) 113 114#[derive(Debug, Default)] 115pub struct StarValidator { 116 state: ValidatorState, 117} 118 119#[derive(Debug, Default)] 120enum ValidatorState { 121 #[default] 122 Header, 123 Body { 124 stack: Vec<Expectation>, 125 }, 126} 127 128#[derive(Debug)] 129enum Expectation { 130 Root, 131 Node { height: u32 }, 132 Record, 133} 134 135impl StarValidator { 136 pub fn new() -> Self { 137 Self { 138 state: ValidatorState::Header, 139 } 140 } 141 142 pub fn accept_header(&mut self, commit: &StarCommit) -> Result<()> { 143 match &self.state { 144 ValidatorState::Header => { 145 let stack = if commit.data.is_some() { 146 vec![Expectation::Root] 147 } else { 148 Vec::new() // Empty tree 149 }; 150 self.state = ValidatorState::Body { stack }; 151 Ok(()) 152 } 153 _ => Err(StarError::InvalidState( 154 "Header already written or invalid state".into(), 155 )), 156 } 157 } 158 159 pub fn accept_node(&mut self, node: &StarMstNode) -> Result<()> { 160 match &mut self.state { 161 ValidatorState::Body { stack } => { 162 if stack.is_empty() { 163 return Err(StarError::InvalidState( 164 "Unexpected node: tree is complete".into(), 165 )); 166 } 167 168 let expectation = stack.pop().unwrap(); 169 let expected_height = match expectation { 170 Expectation::Record => { 171 return Err(StarError::InvalidState("Expected record, got node".into())); 172 } 173 Expectation::Root => None, 174 Expectation::Node { height } => Some(height), 175 }; 176 177 // Use the shared validation logic 178 let (height, _) = validate_node_structure(node, expected_height)?; 179 180 let child_height = if height > 0 { height - 1 } else { 0 }; 181 182 for e in node.e.iter().rev() { 183 if e.t_archived == Some(true) { 184 stack.push(Expectation::Node { 185 height: child_height, 186 }); 187 } 188 if e.v_archived == Some(true) { 189 stack.push(Expectation::Record); 190 } 191 } 192 193 if node.l_archived == Some(true) { 194 stack.push(Expectation::Node { 195 height: child_height, 196 }); 197 } 198 199 Ok(()) 200 } 201 _ => Err(StarError::InvalidState("Invalid state for node".into())), 202 } 203 } 204 205 pub fn accept_record(&mut self, _bytes: &[u8]) -> Result<()> { 206 match &mut self.state { 207 ValidatorState::Body { stack } => { 208 if stack.is_empty() { 209 return Err(StarError::InvalidState( 210 "Unexpected record: tree is complete".into(), 211 )); 212 } 213 match stack.pop().unwrap() { 214 Expectation::Record => Ok(()), 215 _ => Err(StarError::InvalidState("Expected node, got record".into())), 216 } 217 } 218 _ => Err(StarError::InvalidState("Invalid state for record".into())), 219 } 220 } 221 222 pub fn is_done(&self) -> bool { 223 match &self.state { 224 ValidatorState::Body { stack } => stack.is_empty(), 225 _ => false, 226 } 227 } 228}