forked from
ptr.pet/hydrant
kind of like tap but different and in rust
1use crate::types::{BroadcastEvent, RepoState};
2use fjall::config::BlockSizePolicy;
3use fjall::{Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode, Slice};
4use jacquard::IntoStatic;
5use jacquard_common::types::string::Did;
6use miette::{Context, IntoDiagnostic, Result};
7use scc::HashMap;
8use smol_str::SmolStr;
9
10use std::sync::Arc;
11
12pub mod filter;
13pub mod keys;
14pub mod types;
15
16use std::sync::atomic::AtomicU64;
17use tokio::sync::broadcast;
18use tracing::error;
19
20fn default_opts() -> KeyspaceCreateOptions {
21 KeyspaceCreateOptions::default()
22}
23
24pub struct Db {
25 pub inner: Arc<Database>,
26 pub repos: Keyspace,
27 pub records: Keyspace,
28 pub blocks: Keyspace,
29 pub cursors: Keyspace,
30 pub pending: Keyspace,
31 pub resync: Keyspace,
32 pub resync_buffer: Keyspace,
33 pub events: Keyspace,
34 pub counts: Keyspace,
35 pub filter: Keyspace,
36 pub event_tx: broadcast::Sender<BroadcastEvent>,
37 pub next_event_id: Arc<AtomicU64>,
38 pub counts_map: HashMap<SmolStr, u64>,
39}
40
41macro_rules! update_gauge_diff_impl {
42 ($self:ident, $old:ident, $new:ident, $update_method:ident $(, $await:tt)?) => {{
43 use crate::types::GaugeState;
44
45 if $old == $new {
46 return;
47 }
48
49 // pending
50 match ($old, $new) {
51 (GaugeState::Pending, GaugeState::Pending) => {}
52 (GaugeState::Pending, _) => $self.$update_method("pending", -1) $(.$await)?,
53 (_, GaugeState::Pending) => $self.$update_method("pending", 1) $(.$await)?,
54 _ => {}
55 }
56
57 // resync
58 let old_resync = $old.is_resync();
59 let new_resync = $new.is_resync();
60 match (old_resync, new_resync) {
61 (true, false) => $self.$update_method("resync", -1) $(.$await)?,
62 (false, true) => $self.$update_method("resync", 1) $(.$await)?,
63 _ => {}
64 }
65
66 // error kinds
67 if let GaugeState::Resync(Some(kind)) = $old {
68 let key = match kind {
69 crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited",
70 crate::types::ResyncErrorKind::Transport => "error_transport",
71 crate::types::ResyncErrorKind::Generic => "error_generic",
72 };
73 $self.$update_method(key, -1) $(.$await)?;
74 }
75
76 if let GaugeState::Resync(Some(kind)) = $new {
77 let key = match kind {
78 crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited",
79 crate::types::ResyncErrorKind::Transport => "error_transport",
80 crate::types::ResyncErrorKind::Generic => "error_generic",
81 };
82 $self.$update_method(key, 1) $(.$await)?;
83 }
84 }};
85}
86
87impl Db {
88 pub fn open(cfg: &crate::config::Config) -> Result<Self> {
89 const fn kb(v: u32) -> u32 {
90 v * 1024
91 }
92
93 let db = Database::builder(&cfg.database_path)
94 .cache_size(cfg.cache_size * 2_u64.pow(20) / 2)
95 .manual_journal_persist(true)
96 .journal_compression(
97 cfg.disable_lz4_compression
98 .then_some(fjall::CompressionType::None)
99 .unwrap_or(fjall::CompressionType::Lz4),
100 )
101 .worker_threads(cfg.db_worker_threads)
102 .max_journaling_size(cfg.db_max_journaling_size_mb * 1024 * 1024)
103 .open()
104 .into_diagnostic()?;
105 let db = Arc::new(db);
106
107 let opts = default_opts;
108 let open_ks = |name: &str, opts: KeyspaceCreateOptions| {
109 db.keyspace(name, move || opts).into_diagnostic()
110 };
111
112 let repos = open_ks(
113 "repos",
114 opts()
115 // most lookups hit since repo must exist after discovery
116 // we don't hit here if it's not tracked anyway (that happens in filter)
117 .expect_point_read_hits(true)
118 .max_memtable_size(cfg.db_repos_memtable_size_mb * 1024 * 1024)
119 .data_block_size_policy(BlockSizePolicy::all(kb(4))),
120 )?;
121 let blocks = open_ks(
122 "blocks",
123 opts()
124 // point reads are used a lot by stream
125 .expect_point_read_hits(true)
126 .max_memtable_size(cfg.db_blocks_memtable_size_mb * 1024 * 1024)
127 // 32 - 64 kb is probably fine, as the newer blocks will be in the first levels
128 // and any consumers will probably be streaming the newer events...
129 .data_block_size_policy(BlockSizePolicy::new([kb(4), kb(8), kb(32), kb(64)])),
130 )?;
131 let records = open_ks(
132 "records",
133 // point reads might miss when using getRecord
134 // but we assume thats not going to be used much... (todo: should be a config option maybe?)
135 // since this keyspace is big, turning off bloom filters will help a lot
136 opts()
137 .expect_point_read_hits(true)
138 .max_memtable_size(cfg.db_records_memtable_size_mb * 1024 * 1024)
139 .data_block_size_policy(BlockSizePolicy::all(kb(8))),
140 )?;
141 let cursors = open_ks(
142 "cursors",
143 opts()
144 // cursor point reads hit almost 100% of the time
145 .expect_point_read_hits(true)
146 .data_block_size_policy(BlockSizePolicy::all(kb(1))),
147 )?;
148 let pending = open_ks(
149 "pending",
150 opts()
151 // iterated over as a queue, no point reads are used so bloom filters are disabled
152 .expect_point_read_hits(true)
153 .max_memtable_size(cfg.db_pending_memtable_size_mb * 1024 * 1024)
154 .data_block_size_policy(BlockSizePolicy::all(kb(4))),
155 )?;
156 // resync point reads often miss (because most repos aren't resyncing), so keeping the bloom filter helps avoid disk hits
157 let resync = open_ks(
158 "resync",
159 opts().data_block_size_policy(BlockSizePolicy::all(kb(8))),
160 )?;
161 let resync_buffer = open_ks(
162 "resync_buffer",
163 opts()
164 // iterated during backfill, no point reads
165 .expect_point_read_hits(true)
166 .data_block_size_policy(BlockSizePolicy::all(kb(32))),
167 )?;
168 let events = open_ks(
169 "events",
170 opts()
171 // only iterators are used here, no point reads
172 .expect_point_read_hits(true)
173 .max_memtable_size(cfg.db_events_memtable_size_mb * 1024 * 1024)
174 .data_block_size_policy(BlockSizePolicy::new([kb(16), kb(32)])),
175 )?;
176 let counts = open_ks(
177 "counts",
178 opts()
179 // count increments hit because counters are mostly pre-initialized
180 .expect_point_read_hits(true)
181 // the data is very small
182 .data_block_size_policy(BlockSizePolicy::all(kb(1))),
183 )?;
184
185 // filter handles high-volume point reads (checking explicit DID includes and excludes from firehose)
186 // so it needs the bloom filter
187 let filter = open_ks(
188 "filter",
189 // this can be pretty small since the DIDs wont be compressed that well anyhow
190 opts().data_block_size_policy(BlockSizePolicy::all(kb(1))),
191 )?;
192
193 let mut last_id = 0;
194 if let Some(guard) = events.iter().next_back() {
195 let k = guard.key().into_diagnostic()?;
196 last_id = u64::from_be_bytes(
197 k.as_ref()
198 .try_into()
199 .into_diagnostic()
200 .wrap_err("expected to be id (8 bytes)")?,
201 );
202 }
203
204 // load counts into memory
205 let counts_map = HashMap::new();
206 for guard in counts.prefix(keys::COUNT_KS_PREFIX) {
207 let (k, v) = guard.into_inner().into_diagnostic()?;
208 let name = std::str::from_utf8(&k[keys::COUNT_KS_PREFIX.len()..])
209 .into_diagnostic()
210 .wrap_err("expected valid utf8 for ks count key")?;
211 let _ = counts_map.insert_sync(
212 SmolStr::new(name),
213 u64::from_be_bytes(v.as_ref().try_into().unwrap()),
214 );
215 }
216
217 let (event_tx, _) = broadcast::channel(10000);
218
219 Ok(Self {
220 inner: db,
221 repos,
222 records,
223 blocks,
224 cursors,
225 pending,
226 resync,
227 resync_buffer,
228 events,
229 counts,
230 filter,
231 event_tx,
232 counts_map,
233 next_event_id: Arc::new(AtomicU64::new(last_id + 1)),
234 })
235 }
236
237 pub fn persist(&self) -> Result<()> {
238 self.inner.persist(PersistMode::SyncAll).into_diagnostic()?;
239 Ok(())
240 }
241
242 pub async fn get(ks: Keyspace, key: impl AsRef<[u8]>) -> Result<Option<Slice>> {
243 let key = key.as_ref().to_vec();
244 tokio::task::spawn_blocking(move || ks.get(key).into_diagnostic())
245 .await
246 .into_diagnostic()?
247 }
248
249 #[allow(dead_code)]
250 pub async fn insert(
251 ks: Keyspace,
252 key: impl AsRef<[u8]>,
253 value: impl AsRef<[u8]>,
254 ) -> Result<()> {
255 let key = key.as_ref().to_vec();
256 let value = value.as_ref().to_vec();
257 tokio::task::spawn_blocking(move || ks.insert(key, value).into_diagnostic())
258 .await
259 .into_diagnostic()?
260 }
261
262 #[allow(dead_code)]
263 pub async fn remove(ks: Keyspace, key: impl AsRef<[u8]>) -> Result<()> {
264 let key = key.as_ref().to_vec();
265 tokio::task::spawn_blocking(move || ks.remove(key).into_diagnostic())
266 .await
267 .into_diagnostic()?
268 }
269
270 pub async fn contains_key(ks: Keyspace, key: impl AsRef<[u8]>) -> Result<bool> {
271 let key = key.as_ref().to_vec();
272 tokio::task::spawn_blocking(move || ks.contains_key(key).into_diagnostic())
273 .await
274 .into_diagnostic()?
275 }
276
277 pub fn update_count(&self, key: &str, delta: i64) {
278 let mut entry = self.counts_map.entry_sync(SmolStr::new(key)).or_insert(0);
279 *entry = (*entry as i64).saturating_add(delta) as u64;
280 }
281
282 pub async fn update_count_async(&self, key: &str, delta: i64) {
283 let mut entry = self
284 .counts_map
285 .entry_async(SmolStr::new(key))
286 .await
287 .or_insert(0);
288 *entry = (*entry as i64).saturating_add(delta) as u64;
289 }
290
291 pub async fn get_count(&self, key: &str) -> u64 {
292 self.counts_map
293 .read_async(key, |_, v| *v)
294 .await
295 .unwrap_or(0)
296 }
297
298 pub fn update_gauge_diff(
299 &self,
300 old: &crate::types::GaugeState,
301 new: &crate::types::GaugeState,
302 ) {
303 update_gauge_diff_impl!(self, old, new, update_count);
304 }
305
306 pub async fn update_gauge_diff_async(
307 &self,
308 old: &crate::types::GaugeState,
309 new: &crate::types::GaugeState,
310 ) {
311 update_gauge_diff_impl!(self, old, new, update_count_async, await);
312 }
313
314 pub fn update_repo_state<F, T>(
315 batch: &mut OwnedWriteBatch,
316 repos: &Keyspace,
317 did: &Did<'_>,
318 f: F,
319 ) -> Result<Option<(RepoState<'static>, T)>>
320 where
321 F: FnOnce(&mut RepoState, (&[u8], &mut fjall::OwnedWriteBatch)) -> Result<(bool, T)>,
322 {
323 let key = keys::repo_key(did);
324 if let Some(bytes) = repos.get(&key).into_diagnostic()? {
325 let mut state: RepoState = deser_repo_state(bytes.as_ref())?.into_static();
326 let (changed, result) = f(&mut state, (key.as_slice(), batch))?;
327 if changed {
328 batch.insert(repos, key, ser_repo_state(&state)?);
329 }
330 Ok(Some((state, result)))
331 } else {
332 Ok(None)
333 }
334 }
335
336 pub async fn update_repo_state_async<F, T>(
337 &self,
338 did: &Did<'_>,
339 f: F,
340 ) -> Result<Option<(RepoState<'static>, T)>>
341 where
342 F: FnOnce(&mut RepoState, (&[u8], &mut fjall::OwnedWriteBatch)) -> Result<(bool, T)>
343 + Send
344 + 'static,
345 T: Send + 'static,
346 {
347 let mut batch = self.inner.batch();
348 let repos = self.repos.clone();
349 let did = did.clone().into_static();
350
351 tokio::task::spawn_blocking(move || {
352 let Some((state, t)) = Self::update_repo_state(&mut batch, &repos, &did, f)? else {
353 return Ok(None);
354 };
355 batch.commit().into_diagnostic()?;
356 Ok(Some((state, t)))
357 })
358 .await
359 .into_diagnostic()?
360 }
361}
362
363pub fn set_firehose_cursor(db: &Db, cursor: i64) -> Result<()> {
364 db.cursors
365 .insert(keys::CURSOR_KEY, cursor.to_be_bytes())
366 .into_diagnostic()
367}
368
369pub async fn get_firehose_cursor(db: &Db) -> Result<Option<i64>> {
370 Db::get(db.cursors.clone(), keys::CURSOR_KEY)
371 .await?
372 .map(|v| {
373 Ok(i64::from_be_bytes(
374 v.as_ref()
375 .try_into()
376 .into_diagnostic()
377 .wrap_err("cursor is not 8 bytes")?,
378 ))
379 })
380 .transpose()
381}
382
383pub fn ser_repo_state(state: &RepoState) -> Result<Vec<u8>> {
384 rmp_serde::to_vec(&state).into_diagnostic()
385}
386
387pub fn deser_repo_state<'b>(bytes: &'b [u8]) -> Result<RepoState<'b>> {
388 rmp_serde::from_slice(bytes).into_diagnostic()
389}
390
391pub fn check_poisoned(e: &fjall::Error) {
392 if matches!(e, fjall::Error::Poisoned) {
393 error!("!!! DATABASE POISONED !!! exiting");
394 std::process::exit(10);
395 }
396}
397
398pub fn check_poisoned_report(e: &miette::Report) {
399 let Some(err) = e.downcast_ref::<fjall::Error>() else {
400 return;
401 };
402 self::check_poisoned(err);
403}
404
405pub fn set_ks_count(batch: &mut OwnedWriteBatch, db: &Db, name: &str, count: u64) {
406 let key = keys::count_keyspace_key(name);
407 batch.insert(&db.counts, key, count.to_be_bytes());
408}
409
410pub fn persist_counts(db: &Db) -> Result<()> {
411 let mut batch = db.inner.batch();
412 db.counts_map.iter_sync(|k, v| {
413 set_ks_count(&mut batch, db, k, *v);
414 true
415 });
416 batch.commit().into_diagnostic()
417}
418
419pub fn set_record_count(
420 batch: &mut OwnedWriteBatch,
421 db: &Db,
422 did: &Did<'_>,
423 collection: &str,
424 count: u64,
425) {
426 let key = keys::count_collection_key(did, collection);
427 batch.insert(&db.counts, key, count.to_be_bytes());
428}
429
430pub fn update_record_count(
431 batch: &mut OwnedWriteBatch,
432 db: &Db,
433 did: &Did<'_>,
434 collection: &str,
435 delta: i64,
436) -> Result<()> {
437 let key = keys::count_collection_key(did, collection);
438 let count = db
439 .counts
440 .get(&key)
441 .into_diagnostic()?
442 .map(|v| -> Result<_> {
443 Ok(u64::from_be_bytes(
444 v.as_ref()
445 .try_into()
446 .into_diagnostic()
447 .wrap_err("expected to be count (8 bytes)")?,
448 ))
449 })
450 .transpose()?
451 .unwrap_or(0);
452 let new_count = (count as i64).saturating_add(delta) as u64;
453 batch.insert(&db.counts, key, new_count.to_be_bytes());
454 Ok(())
455}
456
457pub fn get_record_count(db: &Db, did: &Did<'_>, collection: &str) -> Result<u64> {
458 let key = keys::count_collection_key(did, collection);
459 let count = db
460 .counts
461 .get(&key)
462 .into_diagnostic()?
463 .map(|v| -> Result<_> {
464 Ok(u64::from_be_bytes(
465 v.as_ref()
466 .try_into()
467 .into_diagnostic()
468 .wrap_err("expected to be count (8 bytes)")?,
469 ))
470 })
471 .transpose()?;
472 Ok(count.unwrap_or(0))
473}