Fast and robust atproto CAR file processing in rust
at hash-cost 160 lines 4.6 kB view raw
1/*! 2Disk storage for blocks on disk 3 4Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed 5to be the best behaved in terms of both on-disk space usage and memory usage. 6 7```no_run 8# use repo_stream::{DiskBuilder, DiskError}; 9# #[tokio::main] 10# async fn main() -> Result<(), DiskError> { 11let store = DiskBuilder::new() 12 .with_cache_size_mb(32) 13 .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 .open("/some/path.db".into()).await?; 15# Ok(()) 16# } 17``` 18*/ 19 20use crate::{Bytes, drive::DriveError}; 21use fjall::{Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 22use std::path::PathBuf; 23 24#[derive(Debug, thiserror::Error)] 25pub enum DiskError { 26 /// A wrapped database error 27 /// 28 /// (The wrapped err should probably be obscured to remove public-facing 29 /// sqlite bits) 30 #[error(transparent)] 31 DbError(#[from] FjallError), 32 /// A tokio blocking task failed to join 33 #[error("Failed to join a tokio blocking task: {0}")] 34 JoinError(#[from] tokio::task::JoinError), 35 /// The total size of stored blocks exceeded the allowed size 36 /// 37 /// If you need to process *really* big CARs, you can configure a higher 38 /// limit. 39 #[error("Maximum disk size reached")] 40 MaxSizeExceeded, 41} 42 43/// Builder-style disk store setup 44#[derive(Debug, Clone)] 45pub struct DiskBuilder { 46 /// Database in-memory cache allowance 47 /// 48 /// Default: 32 MiB 49 pub cache_size_mb: usize, 50 /// Database stored block size limit 51 /// 52 /// Default: 10 GiB 53 /// 54 /// Note: actual size on disk may be more, but should approximately scale 55 /// with this limit 56 pub max_stored_mb: usize, 57} 58 59impl Default for DiskBuilder { 60 fn default() -> Self { 61 Self { 62 cache_size_mb: 64, 63 max_stored_mb: 10 * 1024, // 10 GiB 64 } 65 } 66} 67 68impl DiskBuilder { 69 /// Begin configuring the storage with defaults 70 pub fn new() -> Self { 71 Default::default() 72 } 73 /// Set the in-memory cache allowance for the database 74 /// 75 /// Default: 64 MiB 76 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 77 self.cache_size_mb = size; 78 self 79 } 80 /// Set the approximate stored block size limit 81 /// 82 /// Default: 10 GiB 83 pub fn with_max_stored_mb(mut self, max: usize) -> Self { 84 self.max_stored_mb = max; 85 self 86 } 87 /// Open and initialize the actual disk storage 88 pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 89 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 90 } 91} 92 93/// On-disk block storage 94pub struct DiskStore { 95 #[allow(unused)] 96 db: Database, 97 keyspace: Keyspace, 98 max_stored: usize, 99 stored: usize, 100} 101 102impl DiskStore { 103 /// Initialize a new disk store 104 pub async fn new( 105 path: PathBuf, 106 cache_mb: usize, 107 max_stored_mb: usize, 108 ) -> Result<Self, DiskError> { 109 let max_stored = max_stored_mb * 2_usize.pow(20); 110 let (db, keyspace) = tokio::task::spawn_blocking(move || { 111 let db = Database::builder(path) 112 .manual_journal_persist(true) 113 .worker_threads(1) 114 .cache_size(cache_mb as u64 * 2_u64.pow(20) / 2) 115 .temporary(true) 116 .open()?; 117 let opts = KeyspaceCreateOptions::default() 118 .expect_point_read_hits(true) 119 .max_memtable_size(16 * 2_u64.pow(20)); 120 let keyspace = db.keyspace("z", || opts)?; 121 122 Ok::<_, DiskError>((db, keyspace)) 123 }) 124 .await??; 125 126 Ok(Self { 127 db, 128 keyspace, 129 max_stored, 130 stored: 0, 131 }) 132 } 133 134 pub(crate) fn put_many( 135 &mut self, 136 kv: impl Iterator<Item = (Vec<u8>, Bytes)>, 137 ) -> Result<(), DriveError> { 138 let mut batch = self.db.batch(); 139 for (k, v) in kv { 140 self.stored += v.len(); 141 if self.stored > self.max_stored { 142 return Err(DiskError::MaxSizeExceeded.into()); 143 } 144 batch.insert(&self.keyspace, k, v); 145 } 146 batch.commit().map_err(DiskError::DbError)?; 147 Ok(()) 148 } 149 150 #[inline] 151 pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 152 self.keyspace.get(key) 153 } 154 155 /// Drop and recreate the kv table 156 pub async fn reset(&self) -> Result<(), DiskError> { 157 let keyspace = self.keyspace.clone(); 158 Ok(tokio::task::spawn_blocking(move || keyspace.clear()).await??) 159 } 160}