Fast and robust atproto CAR file processing in rust

box up things to mitigate large Err

+71 -133
+2 -2
src/drive.rs
··· 234 234 235 235 // the commit always must point to a Node; empty node => empty MST special case 236 236 let root_node: MstNode = match mem_blocks 237 - .get(&commit.data_link()?) 237 + .get(&commit.data) 238 238 .ok_or(DriveError::MissingCommit)? 239 239 { 240 240 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, ··· 381 381 let commit = self.commit.ok_or(DriveError::MissingCommit)?; 382 382 383 383 let db_bytes = store 384 - .get(&commit.data_link()?.to_bytes()) 384 + .get(&commit.data.to_bytes()) 385 385 .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))? 386 386 .ok_or(DriveError::MissingCommit)?; 387 387
+18 -72
src/link.rs
··· 1 1 use cid::Cid; 2 - use thiserror::Error; 3 2 4 - #[derive(Debug, Error, PartialEq)] 5 - #[error("The CID is not a valid strict atproto SHA256 CID")] 6 - pub struct NotStrictError; 7 - 8 - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 9 - pub struct CidStrict32([u8; 32]); 10 - 11 - pub const ATPROTO_HASHLINK_PREFIX: [u8; 4] = [0x01, 0x71, 0x12, 0x20]; 12 - 13 - impl TryFrom<&[u8]> for CidStrict32 { 14 - type Error = NotStrictError; 15 - fn try_from(raw: &[u8]) -> Result<CidStrict32, Self::Error> { 16 - let (pre, sha) = raw.split_first_chunk::<4>().ok_or(NotStrictError)?; 17 - if pre != &ATPROTO_HASHLINK_PREFIX { 18 - return Err(NotStrictError); 19 - } 20 - let inner = sha.try_into().map_err(|_| NotStrictError)?; 21 - Ok(CidStrict32(inner)) 22 - } 23 - } 24 - 25 - impl TryFrom<Cid> for CidStrict32 { 26 - type Error = NotStrictError; 27 - fn try_from(cid: Cid) -> Result<CidStrict32, Self::Error> { 28 - cid.to_bytes().as_slice().try_into() 29 - } 30 - } 31 - 32 - impl From<CidStrict32> for Cid { 33 - fn from(CidStrict32(sha): CidStrict32) -> Cid { 34 - let mut bytes = Vec::from(ATPROTO_HASHLINK_PREFIX); 35 - bytes.extend_from_slice(&sha); 36 - bytes.try_into().unwrap() // this prefix + sha is always a valid Cid 37 - } 38 - } 39 - 40 - #[derive(Debug, Clone, PartialEq, Eq, Hash)] 41 - pub enum ObjectLink { 42 - Should(CidStrict32), 43 - Allowed(Box<Cid>), 44 - } 3 + #[derive(Debug, serde::Deserialize, Clone, PartialEq, Eq, Hash)] 4 + pub struct ObjectLink(Cid); 45 5 46 6 impl ObjectLink { 47 7 pub fn to_bytes(&self) -> Vec<u8> { 48 - match self { 49 - ObjectLink::Should(CidStrict32(sha)) => { 50 - let mut bytes = vec![0xFF]; // prefix that's never valid for CID 51 - bytes.extend_from_slice(sha); 52 - bytes 53 - } 54 - ObjectLink::Allowed(cid) => cid.to_bytes(), 55 - } 56 - } 57 - } 58 - 59 - impl From<&CidStrict32> for ObjectLink { 60 - fn from(strict: &CidStrict32) -> ObjectLink { 61 - ObjectLink::Should(*strict) 8 + self.0.to_bytes() 62 9 } 63 10 } 64 11 65 12 impl From<Cid> for ObjectLink { 66 13 fn from(cid: Cid) -> ObjectLink { 67 - if let Ok(strict) = cid.try_into() { 68 - ObjectLink::Should(strict) 69 - } else { 70 - ObjectLink::Allowed(cid.into()) 71 - } 14 + ObjectLink(cid) 72 15 } 73 16 } 74 17 75 18 impl From<ObjectLink> for Cid { 76 19 fn from(link: ObjectLink) -> Cid { 77 - match link { 78 - ObjectLink::Should(strict) => strict.into(), 79 - ObjectLink::Allowed(boxed) => *boxed, 80 - } 20 + link.0 81 21 } 82 22 } 83 23 84 24 #[derive(Debug, Clone)] 85 - pub enum NodeThing { 86 - ChildNode(CidStrict32), 87 - Record(crate::Rkey, ObjectLink), 25 + pub struct NodeThing { 26 + pub link: ObjectLink, 27 + pub kind: ThingKind, 88 28 } 89 29 90 30 impl NodeThing { 91 - pub fn link(&self) -> ObjectLink { 92 - match self { 93 - Self::ChildNode(strict) => strict.into(), 94 - Self::Record(_, link) => link.clone(), 31 + pub fn is_record(&self) -> bool { 32 + match self.kind { 33 + ThingKind::ChildNode => false, 34 + ThingKind::Record(_) => true, 95 35 } 96 36 } 97 37 } 38 + 39 + #[derive(Debug, Clone)] 40 + pub enum ThingKind { 41 + ChildNode, 42 + Record(crate::Rkey), 43 + }
+18 -32
src/mst.rs
··· 3 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 5 6 - use crate::{ 7 - link::{NodeThing, ObjectLink}, 8 - walk::MstError, 9 - }; 6 + use crate::link::{NodeThing, ObjectLink, ThingKind}; 10 7 use cid::Cid; 11 8 use serde::Deserialize; 12 9 use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor}; ··· 25 22 /// fixed value of 3 for this repo format version 26 23 pub version: u64, 27 24 /// pointer to the top of the repo contents tree structure (MST) 28 - pub data: Cid, 25 + pub data: ObjectLink, 29 26 /// revision of the repo, used as a logical clock. 30 27 /// 31 28 /// TID format. Must increase monotonically. Recommend using current ··· 39 36 /// exist in the CBOR object, but is virtually always null. NOTE: previously 40 37 /// specified as nullable and optional, but this caused interoperability 41 38 /// issues. 42 - pub prev: Option<Cid>, 39 + pub prev: Option<ObjectLink>, 43 40 /// cryptographic signature of this commit, as raw bytes 44 - #[serde(with = "serde_bytes")] 45 41 pub sig: serde_bytes::ByteBuf, 46 42 } 47 43 48 - impl Commit { 49 - pub fn data_link(&self) -> Result<ObjectLink, MstError> { 50 - let strict = self.data.try_into().map_err(MstError::MissingRootNode)?; 51 - Ok(ObjectLink::Should(strict)) 52 - } 53 - } 54 - 55 44 #[inline(always)] 56 45 pub fn atproto_mst_depth(key: &str) -> Depth { 57 46 // 128 bits oughta be enough: https://bsky.app/profile/retr0.id/post/3jwwbf4izps24 ··· 94 83 return Err(de::Error::duplicate_field("l")); 95 84 } 96 85 found_left = true; 97 - if let Some(cid) = map.next_value()? { 98 - let Ok(strict) = Cid::try_into(cid) else { 99 - return Err(de::Error::invalid_value( 100 - Unexpected::Bytes(&cid.to_bytes()), 101 - &"a strict atproto sha256 CID link", 102 - )); 103 - }; 104 - left = Some(NodeThing::ChildNode(strict)); 86 + if let Some(link) = map.next_value()? { 87 + left = Some(NodeThing { 88 + link, 89 + kind: ThingKind::ChildNode, 90 + }); 105 91 } 106 92 } 107 93 "e" => { ··· 142 128 )); 143 129 } 144 130 145 - things.push(NodeThing::Record(rkey_s, entry.value.into())); 131 + things.push(NodeThing { 132 + link: entry.value.into(), 133 + kind: ThingKind::Record(rkey_s), 134 + }); 146 135 147 - if let Some(cid) = entry.tree { 148 - let Ok(strict) = cid.try_into() else { 149 - return Err(de::Error::invalid_value( 150 - Unexpected::Bytes(&cid.to_bytes()), 151 - &"a strict atproto sha256 CID link", 152 - )); 153 - }; 154 - things.push(NodeThing::ChildNode(strict)); 136 + if let Some(link) = entry.tree { 137 + things.push(NodeThing { 138 + link, 139 + kind: ThingKind::ChildNode, 140 + }); 155 141 } 156 142 157 143 prefix = rkey; ··· 230 216 /// the lower level must have keys sorting after this TreeEntry's key (to 231 217 /// the "right"), but before the next TreeEntry's key in this Node (if any) 232 218 #[serde(rename = "t")] 233 - pub tree: Option<Cid>, 219 + pub tree: Option<ObjectLink>, 234 220 }
+33 -27
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::link::{NodeThing, ObjectLink}; 3 + use crate::link::{NodeThing, ObjectLink, ThingKind}; 4 4 use crate::mst::{Depth, MstNode}; 5 5 use crate::{Bytes, HashMap, Rkey, disk::DiskStore, drive::MaybeProcessedBlock, noop}; 6 6 use cid::Cid; ··· 18 18 #[error("storage error: {0}")] 19 19 StorageError(#[from] fjall::Error), 20 20 #[error("block not found: {0:?}")] 21 - MissingBlock(NodeThing), 21 + MissingBlock(Box<NodeThing>), 22 22 } 23 23 24 24 /// Errors from invalid Rkeys 25 25 #[derive(Debug, PartialEq, thiserror::Error)] 26 26 pub enum MstError { 27 - #[error("Bad MST root node: {0}")] 28 - MissingRootNode(crate::link::NotStrictError), 29 27 #[error("Nodes cannot be empty (except for an entirely empty MST)")] 30 28 EmptyNode, 31 29 #[error("Expected node to be at depth {expected}, but it was at {depth}")] ··· 75 73 mpb: &MaybeProcessedBlock, 76 74 process: impl Fn(Bytes) -> Bytes, 77 75 ) -> Result<Option<Output>, WalkError> { 78 - match thing { 79 - NodeThing::Record(rkey, link) => { 76 + match thing.kind { 77 + ThingKind::Record(rkey) => { 80 78 let data = match mpb { 81 79 MaybeProcessedBlock::Raw(data) => process(data.clone()), 82 80 MaybeProcessedBlock::Processed(t) => t.clone(), ··· 93 91 log::trace!("val @ {rkey}"); 94 92 Ok(Some(Output { 95 93 rkey, 96 - cid: link.into(), 94 + cid: thing.link.into(), 97 95 data, 98 96 })) 99 97 } 100 - NodeThing::ChildNode(_) => { 98 + ThingKind::ChildNode => { 101 99 let MaybeProcessedBlock::Raw(data) = mpb else { 102 100 return Err(WalkError::BadCommitFingerprint); 103 101 }; ··· 147 145 blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 148 146 process: impl Fn(Bytes) -> Bytes, 149 147 ) -> Result<Step, WalkError> { 150 - while let Some(thing) = self.next_todo() { 151 - let Some(mpb) = blocks.get(&thing.link()) else { 152 - return Err(WalkError::MissingBlock(thing)); 148 + while let Some(NodeThing { link, kind }) = self.next_todo() { 149 + let Some(mpb) = blocks.get(&link) else { 150 + return Err(WalkError::MissingBlock(NodeThing { link, kind }.into())); 153 151 }; 154 - if let Some(out) = self.mpb_step(thing, mpb, &process)? { 152 + if let Some(out) = self.mpb_step(NodeThing { link, kind }, mpb, &process)? { 155 153 return Ok(Step::Value(out)); 156 154 } 157 155 } 158 156 Ok(Step::End(None)) 159 157 } 160 158 159 + #[allow(dead_code)] 161 160 pub fn step_to_edge( 162 161 &mut self, 163 162 blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, ··· 166 165 let mut rkey_prev = None; 167 166 loop { 168 167 match ant.step(blocks, noop) { 169 - Err(WalkError::MissingBlock(NodeThing::Record(rkey, _))) => { 170 - // missing record is our game here! keep going,, 171 - // (after checking rkey order bc the error path that got us here skips it) 172 - if let Some(prev) = rkey_prev 173 - && rkey <= prev 174 - { 175 - return Err(WalkError::MstError(MstError::RkeyOutOfOrder { rkey, prev })); 168 + Err(WalkError::MissingBlock(thing)) => match *thing { 169 + NodeThing { 170 + kind: ThingKind::Record(rkey), 171 + .. 172 + } => { 173 + if let Some(prev) = rkey_prev 174 + && rkey <= prev 175 + { 176 + return Err(WalkError::MstError(MstError::RkeyOutOfOrder { 177 + rkey, 178 + prev, 179 + })); 180 + } 181 + *self = ant; 182 + ant = self.clone(); 183 + rkey_prev = Some(rkey); 176 184 } 177 - *self = ant; 178 - ant = self.clone(); 179 - rkey_prev = Some(rkey); 180 - } 185 + _ => return Err(WalkError::MissingBlock(thing)), 186 + }, 181 187 Err(anyother) => return Err(anyother), 182 188 Ok(_) => return Ok(rkey_prev), // oop real record, mutant went too far 183 189 } ··· 190 196 blocks: &DiskStore, 191 197 process: impl Fn(Bytes) -> Bytes, 192 198 ) -> Result<Step, WalkError> { 193 - while let Some(thing) = self.next_todo() { 194 - let Some(block_slice) = blocks.get(&thing.link().to_bytes())? else { 195 - return Err(WalkError::MissingBlock(thing)); 199 + while let Some(NodeThing { link, kind }) = self.next_todo() { 200 + let Some(block_slice) = blocks.get(&link.to_bytes())? else { 201 + return Err(WalkError::MissingBlock(NodeThing { link, kind }.into())); 196 202 }; 197 203 let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 198 - if let Some(out) = self.mpb_step(thing, &mpb, &process)? { 204 + if let Some(out) = self.mpb_step(NodeThing { link, kind }, &mpb, &process)? { 199 205 return Ok(Step::Value(out)); 200 206 } 201 207 }