Fast and robust atproto CAR file processing in rust
1/*!
2Disk storage for blocks on disk
3
4Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed
5to be the best behaved in terms of both on-disk space usage and memory usage.
6
7```no_run
8# use repo_stream::{DiskBuilder, DiskError};
9# #[tokio::main]
10# async fn main() -> Result<(), DiskError> {
11let store = DiskBuilder::new()
12 .with_cache_size_mb(32)
13 .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted
14 .open("/some/path.db".into()).await?;
15# Ok(())
16# }
17```
18*/
19
20use crate::{Bytes, drive::DriveError};
21use fjall::{Database, Error as FjallError, Keyspace, KeyspaceCreateOptions};
22use std::path::PathBuf;
23
24#[derive(Debug, thiserror::Error)]
25pub enum DiskError {
26 /// A wrapped database error
27 ///
28 /// (The wrapped err should probably be obscured to remove public-facing
29 /// sqlite bits)
30 #[error(transparent)]
31 DbError(#[from] FjallError),
32 /// A tokio blocking task failed to join
33 #[error("Failed to join a tokio blocking task: {0}")]
34 JoinError(#[from] tokio::task::JoinError),
35 /// The total size of stored blocks exceeded the allowed size
36 ///
37 /// If you need to process *really* big CARs, you can configure a higher
38 /// limit.
39 #[error("Maximum disk size reached")]
40 MaxSizeExceeded,
41}
42
43/// Builder-style disk store setup
44#[derive(Debug, Clone)]
45pub struct DiskBuilder {
46 /// Database in-memory cache allowance
47 ///
48 /// Default: 32 MiB
49 pub cache_size_mb: usize,
50 /// Database stored block size limit
51 ///
52 /// Default: 10 GiB
53 ///
54 /// Note: actual size on disk may be more, but should approximately scale
55 /// with this limit
56 pub max_stored_mb: usize,
57}
58
59impl Default for DiskBuilder {
60 fn default() -> Self {
61 Self {
62 cache_size_mb: 64,
63 max_stored_mb: 10 * 1024, // 10 GiB
64 }
65 }
66}
67
68impl DiskBuilder {
69 /// Begin configuring the storage with defaults
70 pub fn new() -> Self {
71 Default::default()
72 }
73 /// Set the in-memory cache allowance for the database
74 ///
75 /// Default: 64 MiB
76 pub fn with_cache_size_mb(mut self, size: usize) -> Self {
77 self.cache_size_mb = size;
78 self
79 }
80 /// Set the approximate stored block size limit
81 ///
82 /// Default: 10 GiB
83 pub fn with_max_stored_mb(mut self, max: usize) -> Self {
84 self.max_stored_mb = max;
85 self
86 }
87 /// Open and initialize the actual disk storage
88 pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> {
89 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
90 }
91}
92
93/// On-disk block storage
94pub struct DiskStore {
95 #[allow(unused)]
96 db: Database,
97 keyspace: Keyspace,
98 max_stored: usize,
99 stored: usize,
100}
101
102impl DiskStore {
103 /// Initialize a new disk store
104 pub async fn new(
105 path: PathBuf,
106 cache_mb: usize,
107 max_stored_mb: usize,
108 ) -> Result<Self, DiskError> {
109 let max_stored = max_stored_mb * 2_usize.pow(20);
110 let (db, keyspace) = tokio::task::spawn_blocking(move || {
111 let db = Database::builder(path)
112 .manual_journal_persist(true)
113 .worker_threads(1)
114 .cache_size(cache_mb as u64 * 2_u64.pow(20) / 2)
115 .temporary(true)
116 .open()?;
117 let opts = KeyspaceCreateOptions::default()
118 .expect_point_read_hits(true)
119 .max_memtable_size(16 * 2_u64.pow(20));
120 let keyspace = db.keyspace("z", || opts)?;
121
122 Ok::<_, DiskError>((db, keyspace))
123 })
124 .await??;
125
126 Ok(Self {
127 db,
128 keyspace,
129 max_stored,
130 stored: 0,
131 })
132 }
133
134 pub(crate) fn put_many(
135 &mut self,
136 kv: impl Iterator<Item = (Vec<u8>, Bytes)>,
137 ) -> Result<(), DriveError> {
138 let mut batch = self.db.batch();
139 for (k, v) in kv {
140 self.stored += v.len();
141 if self.stored > self.max_stored {
142 return Err(DiskError::MaxSizeExceeded.into());
143 }
144 batch.insert(&self.keyspace, k, v);
145 }
146 batch.commit().map_err(DiskError::DbError)?;
147 Ok(())
148 }
149
150 #[inline]
151 pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> {
152 self.keyspace.get(key)
153 }
154
155 /// Drop and recreate the kv table
156 pub async fn reset(&self) -> Result<(), DiskError> {
157 let keyspace = self.keyspace.clone();
158 Ok(tokio::task::spawn_blocking(move || keyspace.clear()).await??)
159 }
160}