Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use super::{
2 ActionableEvent, LinkReader, LinkStorage, ManyToManyCursor, Order, PagedAppendingCollection,
3 PagedOrderedCollection, StorageStats,
4};
5use crate::{CountsByCount, Did, ManyToManyItem, RecordId};
6
7use anyhow::{anyhow, bail, Result};
8use bincode::Options as BincodeOptions;
9use links::CollectedLink;
10use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
11use ratelimit::Ratelimiter;
12use rocksdb::backup::{BackupEngine, BackupEngineOptions};
13use rocksdb::{
14 AsColumnFamilyRef, ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, MergeOperands,
15 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
16};
17use serde::{Deserialize, Serialize};
18use tokio_util::sync::CancellationToken;
19
20use std::collections::{BTreeMap, HashMap, HashSet};
21use std::io::Read;
22use std::marker::PhantomData;
23use std::path::{Path, PathBuf};
24use std::sync::{
25 atomic::{AtomicU64, Ordering},
26 Arc,
27};
28use std::thread;
29use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
30
31static DID_IDS_CF: &str = "did_ids";
32static TARGET_IDS_CF: &str = "target_ids";
33static TARGET_LINKERS_CF: &str = "target_links";
34static LINK_TARGETS_CF: &str = "link_targets";
35
36static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
37static STARTED_AT_KEY: &str = "jetstream_first_cursor";
38// add reverse mappings for targets if this db was running before that was a thing
39static TARGET_ID_REPAIR_STATE_KEY: &str = "target_id_table_repair_state";
40
41static COZY_FIRST_CURSOR: u64 = 1_738_083_600_000_000; // constellation.microcosm.blue started
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44struct TargetIdRepairState {
45 /// start time for repair, microseconds timestamp
46 current_us_started_at: u64,
47 /// id table's latest id when repair started
48 id_when_started: u64,
49 /// id table id
50 latest_repaired_i: u64,
51}
52impl AsRocksValue for TargetIdRepairState {}
53impl ValueFromRocks for TargetIdRepairState {}
54
55// todo: actually understand and set these options probably better
56fn rocks_opts_base() -> Options {
57 let mut opts = Options::default();
58 opts.set_level_compaction_dynamic_level_bytes(true);
59 opts.create_if_missing(true);
60 opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
61 opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); // this probably doesn't work because it hasn't been enabled
62 // TODO: actually enable the bottommost compression. but after other changes run for a bit in case zstd is cpu- or mem-expensive.
63 opts
64}
65fn get_db_opts() -> Options {
66 let mut opts = rocks_opts_base();
67 opts.create_missing_column_families(true);
68 opts.increase_parallelism(4); // todo: make configurable if anyone else actually runs a different instance. start at # of cores
69 // consider doing optimize_level_style_compaction or optimize_universal_style_compaction
70 opts
71}
72fn get_db_read_opts() -> Options {
73 let mut opts = Options::default();
74 opts.optimize_for_point_lookup(16_384); // mb (run this on big machines)
75 opts
76}
77
78#[derive(Debug, Clone)]
79pub struct RocksStorage {
80 pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun)
81 did_id_table: IdTable<Did, DidIdValue>,
82 target_id_table: IdTable<TargetKey, TargetId>,
83 is_writer: bool,
84 backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>,
85}
86
87trait IdTableValue: ValueFromRocks + Clone {
88 fn new(v: u64) -> Self;
89 fn id(&self) -> u64;
90}
91#[derive(Debug, Clone)]
92struct IdTableBase<Orig, IdVal: IdTableValue>
93where
94 Orig: KeyFromRocks,
95 for<'a> &'a Orig: AsRocksKey,
96{
97 _key_marker: PhantomData<Orig>,
98 _val_marker: PhantomData<IdVal>,
99 name: String,
100 id_seq: Arc<AtomicU64>,
101}
102impl<Orig, IdVal: IdTableValue> IdTableBase<Orig, IdVal>
103where
104 Orig: KeyFromRocks,
105 for<'a> &'a Orig: AsRocksKey,
106{
107 fn cf_descriptor(&self) -> ColumnFamilyDescriptor {
108 ColumnFamilyDescriptor::new(&self.name, rocks_opts_base())
109 }
110 fn init(self, db: &DBWithThreadMode<MultiThreaded>) -> Result<IdTable<Orig, IdVal>> {
111 if db.cf_handle(&self.name).is_none() {
112 bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?");
113 }
114 let priv_id_seq = if let Some(seq_bytes) = db.get(self.seq_key())? {
115 if seq_bytes.len() != 8 {
116 bail!(
117 "reading bytes for u64 id seq {:?}: found the wrong number of bytes",
118 self.seq_key()
119 );
120 }
121 let mut buf: [u8; 8] = [0; 8];
122 seq_bytes.as_slice().read_exact(&mut buf)?;
123 let last_seq = u64::from_le_bytes(buf);
124 last_seq + 1
125 } else {
126 1
127 };
128 self.id_seq.store(priv_id_seq, Ordering::SeqCst);
129 Ok(IdTable {
130 base: self,
131 priv_id_seq,
132 })
133 }
134 fn seq_key(&self) -> Vec<u8> {
135 let mut k = b"__id_seq_key_plz_be_unique:".to_vec();
136 k.extend(self.name.as_bytes());
137 k
138 }
139}
140#[derive(Debug, Clone)]
141struct IdTable<Orig, IdVal: IdTableValue>
142where
143 Orig: KeyFromRocks,
144 for<'a> &'a Orig: AsRocksKey,
145{
146 base: IdTableBase<Orig, IdVal>,
147 priv_id_seq: u64,
148}
149impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal>
150where
151 Orig: KeyFromRocks,
152 for<'v> &'v IdVal: AsRocksValue,
153 for<'k> &'k Orig: AsRocksKey,
154{
155 #[must_use]
156 fn setup(name: &str) -> IdTableBase<Orig, IdVal> {
157 IdTableBase::<Orig, IdVal> {
158 _key_marker: PhantomData,
159 _val_marker: PhantomData,
160 name: name.into(),
161 id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninit", first seq num will be 1
162 }
163 }
164 fn get_id_val(
165 &self,
166 db: &DBWithThreadMode<MultiThreaded>,
167 orig: &Orig,
168 ) -> Result<Option<IdVal>> {
169 let cf = db.cf_handle(&self.base.name).unwrap();
170 if let Some(_id_bytes) = db.get_cf(&cf, _rk(orig))? {
171 Ok(Some(_vr(&_id_bytes)?))
172 } else {
173 Ok(None)
174 }
175 }
176 fn __get_or_create_id_val<CF>(
177 &mut self,
178 cf: &CF,
179 db: &DBWithThreadMode<MultiThreaded>,
180 batch: &mut WriteBatch,
181 orig: &Orig,
182 ) -> Result<IdVal>
183 where
184 CF: AsColumnFamilyRef,
185 {
186 Ok(self.get_id_val(db, orig)?.unwrap_or_else(|| {
187 let prev_priv_seq = self.priv_id_seq;
188 self.priv_id_seq += 1;
189 let prev_public_seq = self.base.id_seq.swap(self.priv_id_seq, Ordering::SeqCst);
190 assert_eq!(
191 prev_public_seq, prev_priv_seq,
192 "public seq may have been modified??"
193 );
194 let id_value = IdVal::new(self.priv_id_seq);
195 batch.put(self.base.seq_key(), self.priv_id_seq.to_le_bytes());
196 batch.put_cf(cf, _rk(orig), _rv(&id_value));
197 id_value
198 }))
199 }
200
201 fn estimate_count(&self) -> u64 {
202 self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved
203 }
204
205 fn get_or_create_id_val(
206 &mut self,
207 db: &DBWithThreadMode<MultiThreaded>,
208 batch: &mut WriteBatch,
209 orig: &Orig,
210 ) -> Result<IdVal> {
211 let cf = db.cf_handle(&self.base.name).unwrap();
212 let id_val = self.__get_or_create_id_val(&cf, db, batch, orig)?;
213 // TODO: assert that the original is never a u64 that could collide
214 batch.put_cf(&cf, id_val.id().to_be_bytes(), _rk(orig)); // reversed rk/rv on purpose here :/
215 Ok(id_val)
216 }
217
218 fn get_val_from_id(
219 &self,
220 db: &DBWithThreadMode<MultiThreaded>,
221 id: u64,
222 ) -> Result<Option<Orig>> {
223 let cf = db.cf_handle(&self.base.name).unwrap();
224 if let Some(orig_bytes) = db.get_cf(&cf, id.to_be_bytes())? {
225 // HACK ish
226 Ok(Some(_kr(&orig_bytes)?))
227 } else {
228 Ok(None)
229 }
230 }
231}
232
233impl IdTableValue for DidIdValue {
234 fn new(v: u64) -> Self {
235 DidIdValue(DidId(v), true)
236 }
237 fn id(&self) -> u64 {
238 self.0 .0
239 }
240}
241impl IdTableValue for TargetId {
242 fn new(v: u64) -> Self {
243 TargetId(v)
244 }
245 fn id(&self) -> u64 {
246 self.0
247 }
248}
249
250fn now() -> u64 {
251 SystemTime::now()
252 .duration_since(UNIX_EPOCH)
253 .unwrap()
254 .as_micros() as u64
255}
256
257impl RocksStorage {
258 pub fn new(path: impl AsRef<Path>) -> Result<Self> {
259 Self::describe_metrics();
260 let me = RocksStorage::open_readmode(path, false)?;
261 me.global_init()?;
262 Ok(me)
263 }
264
265 pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
266 RocksStorage::open_readmode(path, true)
267 }
268
269 fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> {
270 let did_id_table = IdTable::setup(DID_IDS_CF);
271 let target_id_table = IdTable::setup(TARGET_IDS_CF);
272
273 // note: global stuff like jetstream cursor goes in the default cf
274 // these are bonus extra cfs
275 let cfs = vec![
276 // id reference tables
277 did_id_table.cf_descriptor(),
278 target_id_table.cf_descriptor(),
279 // the reverse links:
280 ColumnFamilyDescriptor::new(TARGET_LINKERS_CF, {
281 let mut opts = rocks_opts_base();
282 opts.set_merge_operator_associative(
283 "merge_op_extend_did_ids",
284 Self::merge_op_extend_did_ids,
285 );
286 opts
287 }),
288 // unfortunately we also need forward links to handle deletes
289 ColumnFamilyDescriptor::new(LINK_TARGETS_CF, rocks_opts_base()),
290 ];
291
292 let db = if readonly {
293 DBWithThreadMode::open_cf_descriptors_read_only(&get_db_read_opts(), path, cfs, false)?
294 } else {
295 DBWithThreadMode::open_cf_descriptors(&get_db_opts(), path, cfs)?
296 };
297
298 let db = Arc::new(db);
299 let did_id_table = did_id_table.init(&db)?;
300 let target_id_table = target_id_table.init(&db)?;
301 Ok(Self {
302 db,
303 did_id_table,
304 target_id_table,
305 is_writer: !readonly,
306 backup_task: None.into(),
307 })
308 }
309
310 fn global_init(&self) -> Result<()> {
311 let first_run = self.db.get(JETSTREAM_CURSOR_KEY)?.is_some();
312 if first_run {
313 self.db.put(STARTED_AT_KEY, _rv(now()))?;
314
315 // hack / temporary: if we're a new db, put in a completed repair
316 // state so we don't run repairs (repairs are for old-code dbs)
317 let completed = TargetIdRepairState {
318 id_when_started: 0,
319 current_us_started_at: 0,
320 latest_repaired_i: 0,
321 };
322 self.db.put(TARGET_ID_REPAIR_STATE_KEY, _rv(completed))?;
323 }
324 Ok(())
325 }
326
327 pub fn run_repair(&self, breather: Duration, stay_alive: CancellationToken) -> Result<bool> {
328 let mut state = match self
329 .db
330 .get(TARGET_ID_REPAIR_STATE_KEY)?
331 .map(|s| _vr(&s))
332 .transpose()?
333 {
334 Some(s) => s,
335 None => TargetIdRepairState {
336 id_when_started: self.did_id_table.priv_id_seq,
337 current_us_started_at: now(),
338 latest_repaired_i: 0,
339 },
340 };
341
342 eprintln!("initial repair state: {state:?}");
343
344 let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap();
345
346 let mut iter = self.db.raw_iterator_cf(&cf);
347 iter.seek_to_first();
348
349 eprintln!("repair iterator sent to first key");
350
351 // skip ahead if we're done some, or take a single first step
352 for _ in 0..state.latest_repaired_i {
353 iter.next();
354 }
355
356 eprintln!(
357 "repair iterator skipped to {}th key",
358 state.latest_repaired_i
359 );
360
361 let mut maybe_done = false;
362
363 let mut write_fast = rocksdb::WriteOptions::default();
364 write_fast.set_sync(false);
365 write_fast.disable_wal(true);
366
367 while !stay_alive.is_cancelled() && !maybe_done {
368 // let mut batch = WriteBatch::default();
369
370 let mut any_written = false;
371
372 for _ in 0..1000 {
373 if state.latest_repaired_i % 1_000_000 == 0 {
374 eprintln!("target iter at {}", state.latest_repaired_i);
375 }
376 state.latest_repaired_i += 1;
377
378 if !iter.valid() {
379 eprintln!("invalid iter, are we done repairing?");
380 maybe_done = true;
381 break;
382 };
383
384 // eprintln!("iterator seems to be valid! getting the key...");
385 let raw_key = iter.key().unwrap();
386 if raw_key.len() == 8 {
387 // eprintln!("found an 8-byte key, skipping it since it's probably an id...");
388 iter.next();
389 continue;
390 }
391 let target: TargetKey = _kr::<TargetKey>(raw_key)?;
392 let target_id: TargetId = _vr(iter.value().unwrap())?;
393
394 self.db
395 .put_cf_opt(&cf, target_id.id().to_be_bytes(), _rv(&target), &write_fast)?;
396 any_written = true;
397 iter.next();
398 }
399
400 if any_written {
401 self.db
402 .put(TARGET_ID_REPAIR_STATE_KEY, _rv(state.clone()))?;
403 std::thread::sleep(breather);
404 }
405 }
406
407 eprintln!("repair iterator done.");
408
409 Ok(false)
410 }
411
412 pub fn start_backup(
413 &mut self,
414 path: PathBuf,
415 auto: Option<(u64, Option<usize>)>,
416 stay_alive: CancellationToken,
417 ) -> Result<()> {
418 let task = if let Some((interval_hrs, copies)) = auto {
419 eprintln!("backups: starting background task...");
420 self.backup_task(path, interval_hrs, copies, stay_alive)
421 } else {
422 eprintln!("backups: starting a one-off backup...");
423 thread::spawn({
424 let db = self.db.clone();
425 move || Self::do_backup(db, path)
426 })
427 };
428 self.backup_task = Arc::new(Some(task));
429 Ok(())
430 }
431
432 fn backup_task(
433 &self,
434 path: PathBuf,
435 interval_hrs: u64,
436 copies: Option<usize>,
437 stay_alive: CancellationToken,
438 ) -> std::thread::JoinHandle<Result<()>> {
439 let db = self.db.clone();
440 thread::spawn(move || {
441 let limit =
442 Ratelimiter::builder(1, Duration::from_secs(interval_hrs * 60 * 60)).build()?;
443 let minimum_sleep = Duration::from_secs(1);
444
445 'quit: loop {
446 if let Err(sleep) = limit.try_wait() {
447 eprintln!("backups: background: next backup scheduled in {sleep:?}");
448 let waiting = Instant::now();
449 loop {
450 let remaining = sleep - waiting.elapsed();
451 if stay_alive.is_cancelled() {
452 break 'quit;
453 } else if remaining <= Duration::ZERO {
454 break;
455 } else if remaining < minimum_sleep {
456 thread::sleep(remaining);
457 break;
458 } else {
459 thread::sleep(minimum_sleep);
460 }
461 }
462 }
463 eprintln!("backups: background: starting backup...");
464 if let Err(e) = Self::do_backup(db.clone(), &path) {
465 eprintln!("backups: background: backup failed: {e:?}");
466 // todo: metrics
467 } else {
468 eprintln!("backups: background: backup succeeded yay");
469 }
470 if let Some(copies) = copies {
471 eprintln!("backups: background: trimming to {copies} saved backups...");
472 if let Err(e) = Self::trim_backups(copies, &path) {
473 eprintln!("backups: background: failed to trim backups: {e:?}");
474 } else {
475 eprintln!("backups: background: trimming worked!")
476 }
477 }
478 }
479
480 Ok(())
481 })
482 }
483
484 fn do_backup(db: Arc<DBWithThreadMode<MultiThreaded>>, path: impl AsRef<Path>) -> Result<()> {
485 let mut engine =
486 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?;
487 eprintln!("backups: starting backup...");
488 let t0 = Instant::now();
489 if let Err(e) = engine.create_new_backup(&db) {
490 eprintln!("backups: oh no, backup failed: {e:?}");
491 } else {
492 eprintln!("backups: yay, backup worked?");
493 }
494 eprintln!(
495 "backups: backup finished after {:.2}s",
496 t0.elapsed().as_secs_f32()
497 );
498 Ok(())
499 }
500
501 fn trim_backups(num_backups_to_keep: usize, path: impl AsRef<Path>) -> Result<()> {
502 let mut engine =
503 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?;
504 engine.purge_old_backups(num_backups_to_keep)?;
505 Ok(())
506 }
507
508 fn describe_metrics() {
509 describe_histogram!(
510 "storage_rocksdb_read_seconds",
511 Unit::Seconds,
512 "duration of the read stage of actions"
513 );
514 describe_histogram!(
515 "storage_rocksdb_action_seconds",
516 Unit::Seconds,
517 "duration of read + write of actions"
518 );
519 describe_counter!(
520 "storage_rocksdb_batch_ops_total",
521 Unit::Count,
522 "total batched operations from actions"
523 );
524 describe_histogram!(
525 "storage_rocksdb_delete_account_ops",
526 Unit::Count,
527 "total batched ops for account deletions"
528 );
529 }
530
531 fn merge_op_extend_did_ids(
532 key: &[u8],
533 existing: Option<&[u8]>,
534 operands: &MergeOperands,
535 ) -> Option<Vec<u8>> {
536 let mut linkers: Vec<_> = if let Some(existing_bytes) = existing {
537 match _vr(existing_bytes) {
538 Ok(TargetLinkers(mut existing_linkers)) => {
539 existing_linkers.reserve(operands.len());
540 existing_linkers
541 }
542 Err(e) => {
543 eprintln!("bug? could not deserialize existing target linkers: {e:?}. key={key:?}. continuing, but data will be lost!");
544 if existing_bytes.len() < 1000 {
545 eprintln!("dropping: {existing_bytes:?}");
546 } else {
547 eprintln!("(too long to print)");
548 }
549 Vec::with_capacity(operands.len())
550 }
551 }
552 } else {
553 Vec::with_capacity(operands.len())
554 };
555 for new_linkers in operands {
556 match _vr(new_linkers) {
557 Ok(TargetLinkers(new_linkers)) => linkers.extend(new_linkers),
558 Err(e) => {
559 eprintln!("bug? could not deserialize new target linkers: {e:?}. key={key:?}. continuing, but data will be lost!");
560 if new_linkers.len() < 1000 {
561 eprintln!("skipping: {new_linkers:?}");
562 } else {
563 eprintln!("(too long to print)");
564 }
565 }
566 }
567 }
568 Some(_rv(&TargetLinkers(linkers)))
569 }
570
571 fn prefix_iter_cf<K, V, CF, P>(
572 &self,
573 cf: &CF,
574 pre: P,
575 ) -> impl Iterator<Item = (K, V)> + use<'_, K, V, CF, P>
576 where
577 K: KeyFromRocks,
578 V: ValueFromRocks,
579 CF: AsColumnFamilyRef,
580 for<'a> &'a P: AsRocksKeyPrefix<K>,
581 {
582 let mut read_opts = ReadOptions::default();
583 read_opts.set_iterate_range(PrefixRange(_rkp(&pre))); // TODO verify: inclusive bounds?
584 self.db
585 .iterator_cf_opt(cf, read_opts, IteratorMode::Start)
586 .map_while(Result::ok)
587 .map_while(|(k, v)| Some((_kr(&k).ok()?, _vr(&v).ok()?)))
588 }
589
590 fn update_did_id_value<F>(&self, batch: &mut WriteBatch, did: &Did, update: F) -> Result<bool>
591 where
592 F: FnOnce(DidIdValue) -> Option<DidIdValue>,
593 {
594 let cf = self.db.cf_handle(DID_IDS_CF).unwrap();
595 let Some(did_id_value) = self.did_id_table.get_id_val(&self.db, did)? else {
596 return Ok(false);
597 };
598 let Some(new_did_id_value) = update(did_id_value) else {
599 return Ok(false);
600 };
601 batch.put_cf(&cf, _rk(did), _rv(&new_did_id_value));
602 Ok(true)
603 }
604 fn delete_did_id_value(&self, batch: &mut WriteBatch, did: &Did) {
605 let cf = self.db.cf_handle(DID_IDS_CF).unwrap();
606 batch.delete_cf(&cf, _rk(did));
607 }
608
609 fn get_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> {
610 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap();
611 let Some(linkers_bytes) = self.db.get_cf(&cf, _rk(target_id))? else {
612 return Ok(TargetLinkers::default());
613 };
614 _vr(&linkers_bytes)
615 }
616 /// zero out every duplicate did. bit of a hack, looks the same as deleted, but eh
617 fn get_distinct_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> {
618 let mut seen = HashSet::new();
619 let mut linkers = self.get_target_linkers(target_id)?;
620 for (did_id, _) in linkers.0.iter_mut() {
621 if seen.contains(did_id) {
622 did_id.0 = 0;
623 } else {
624 seen.insert(*did_id);
625 }
626 }
627 Ok(linkers)
628 }
629 fn merge_target_linker(
630 &self,
631 batch: &mut WriteBatch,
632 target_id: &TargetId,
633 linker_did_id: &DidId,
634 linker_rkey: &RKey,
635 ) {
636 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap();
637 batch.merge_cf(
638 &cf,
639 _rk(target_id),
640 _rv(&TargetLinkers(vec![(*linker_did_id, linker_rkey.clone())])),
641 );
642 }
643 fn update_target_linkers<F>(
644 &self,
645 batch: &mut WriteBatch,
646 target_id: &TargetId,
647 update: F,
648 ) -> Result<bool>
649 where
650 F: FnOnce(TargetLinkers) -> Option<TargetLinkers>,
651 {
652 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap();
653 let existing_linkers = self.get_target_linkers(target_id)?;
654 let Some(new_linkers) = update(existing_linkers) else {
655 return Ok(false);
656 };
657 batch.put_cf(&cf, _rk(target_id), _rv(&new_linkers));
658 Ok(true)
659 }
660
661 fn put_link_targets(
662 &self,
663 batch: &mut WriteBatch,
664 record_link_key: &RecordLinkKey,
665 targets: &RecordLinkTargets,
666 ) {
667 // todo: we are almost idempotent to link creates with this blind write, but we'll still
668 // merge in the reverse index. we could read+modify+write here but it'll be SLOWWWWW on
669 // the path that we need to be fast. we could go back to a merge op and probably be
670 // consistent. or we can accept just a littttttle inconsistency and be idempotent on
671 // forward links but not reverse, slightly messing up deletes :/
672 // _maybe_ we could run in slow idempotent r-m-w mode during firehose catch-up at the start,
673 // then switch to the fast version?
674 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
675 batch.put_cf(&cf, _rk(record_link_key), _rv(targets));
676 }
677 fn get_record_link_targets(
678 &self,
679 record_link_key: &RecordLinkKey,
680 ) -> Result<Option<RecordLinkTargets>> {
681 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
682 if let Some(bytes) = self.db.get_cf(&cf, _rk(record_link_key))? {
683 Ok(Some(_vr(&bytes)?))
684 } else {
685 Ok(None)
686 }
687 }
688 fn delete_record_link(&self, batch: &mut WriteBatch, record_link_key: &RecordLinkKey) {
689 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
690 batch.delete_cf(&cf, _rk(record_link_key));
691 }
692 fn iter_links_for_did_id(
693 &self,
694 did_id: &DidId,
695 ) -> impl Iterator<Item = (RecordLinkKey, RecordLinkTargets)> + use<'_> {
696 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
697 self.prefix_iter_cf(&cf, RecordLinkKeyDidIdPrefix(*did_id))
698 }
699 fn iter_targets_for_target(
700 &self,
701 target: &Target,
702 ) -> impl Iterator<Item = (TargetKey, TargetId)> + use<'_> {
703 let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap();
704 self.prefix_iter_cf(&cf, TargetIdTargetPrefix(target.clone()))
705 }
706
707 //
708 // higher-level event action handlers
709 //
710
711 fn add_links(
712 &mut self,
713 record_id: &RecordId,
714 links: &[CollectedLink],
715 batch: &mut WriteBatch,
716 ) -> Result<()> {
717 let DidIdValue(did_id, _) =
718 self.did_id_table
719 .get_or_create_id_val(&self.db, batch, &record_id.did)?;
720
721 let record_link_key = RecordLinkKey(
722 did_id,
723 Collection(record_id.collection()),
724 RKey(record_id.rkey()),
725 );
726 let mut record_link_targets = RecordLinkTargets::with_capacity(links.len());
727
728 for CollectedLink { target, path } in links {
729 let target_key = TargetKey(
730 Target(target.clone().into_string()),
731 Collection(record_id.collection()),
732 RPath(path.clone()),
733 );
734 let target_id =
735 self.target_id_table
736 .get_or_create_id_val(&self.db, batch, &target_key)?;
737 self.merge_target_linker(batch, &target_id, &did_id, &RKey(record_id.rkey()));
738
739 record_link_targets.add(RecordLinkTarget(RPath(path.clone()), target_id))
740 }
741
742 self.put_link_targets(batch, &record_link_key, &record_link_targets);
743 Ok(())
744 }
745
746 fn remove_links(&mut self, record_id: &RecordId, batch: &mut WriteBatch) -> Result<()> {
747 let Some(DidIdValue(linking_did_id, _)) =
748 self.did_id_table.get_id_val(&self.db, &record_id.did)?
749 else {
750 return Ok(()); // we don't know her: nothing to do
751 };
752
753 let record_link_key = RecordLinkKey(
754 linking_did_id,
755 Collection(record_id.collection()),
756 RKey(record_id.rkey()),
757 );
758 let Some(record_link_targets) = self.get_record_link_targets(&record_link_key)? else {
759 return Ok(()); // we don't have these links
760 };
761
762 // we do read -> modify -> write here: could merge-op in the deletes instead?
763 // otherwise it's another single-thread-constraining thing.
764 for RecordLinkTarget(_, target_id) in record_link_targets.0 {
765 self.update_target_linkers(batch, &target_id, |mut linkers| {
766 if linkers.0.is_empty() {
767 eprintln!("bug? linked target was missing when removing links");
768 }
769 if !linkers.remove_linker(&linking_did_id, &RKey(record_id.rkey.clone())) {
770 eprintln!("bug? linked target was missing a link when removing links");
771 }
772 Some(linkers)
773 })?;
774 }
775
776 self.delete_record_link(batch, &record_link_key);
777 Ok(())
778 }
779
780 fn set_account(&mut self, did: &Did, active: bool, batch: &mut WriteBatch) -> Result<()> {
781 // this needs to be read-modify-write since the did_id needs to stay the same,
782 // which has a benefit of allowing to avoid adding entries for dids we don't
783 // need. reading on dids needs to be cheap anyway for the current design, and
784 // did active/inactive updates are low-freq in the firehose so, eh, it's fine.
785 self.update_did_id_value(batch, did, |current_value| {
786 Some(DidIdValue(current_value.did_id(), active))
787 })?;
788 Ok(())
789 }
790
791 fn delete_account(&mut self, did: &Did, batch: &mut WriteBatch) -> Result<usize> {
792 let mut total_batched_ops = 0;
793 let Some(DidIdValue(did_id, _)) = self.did_id_table.get_id_val(&self.db, did)? else {
794 return Ok(total_batched_ops); // ignore updates for dids we don't know about
795 };
796 self.delete_did_id_value(batch, did);
797 // TODO: also delete the reverse!!
798
799 // use a separate batch for all their links, since it can be a lot and make us crash at around 1GiB batch size.
800 // this should still hopefully be crash-safe: as long as we don't actually delete the DidId entry until after all links are cleared.
801 // the above .delete_did_id_value is batched, so it shouldn't be written until we've returned from this fn successfully
802 // TODO: queue a background delete task or whatever
803 // TODO: test delete account with more links than chunk size
804 let stuff: Vec<_> = self.iter_links_for_did_id(&did_id).collect();
805 for chunk in stuff.chunks(1024) {
806 let mut mini_batch = WriteBatch::default();
807
808 for (record_link_key, links) in chunk {
809 self.delete_record_link(&mut mini_batch, record_link_key); // _could_ use delete range here instead of individual deletes, but since we have to scan anyway it's not obvious if it's better
810
811 for RecordLinkTarget(_, target_link_id) in links.0.iter() {
812 self.update_target_linkers(&mut mini_batch, target_link_id, |mut linkers| {
813 if !linkers.remove_linker(&did_id, &record_link_key.2) {
814 eprintln!("bug? could not find linker when removing links while deleting an account");
815 }
816 Some(linkers)
817 })?;
818 }
819 }
820 total_batched_ops += mini_batch.len();
821 self.db.write(mini_batch)?; // todo
822 }
823 Ok(total_batched_ops)
824 }
825}
826
827impl Drop for RocksStorage {
828 fn drop(&mut self) {
829 if self.is_writer {
830 println!("rocksdb writer: cleaning up for shutdown...");
831 if let Err(e) = self.db.flush_wal(true) {
832 eprintln!("rocks: flushing wal failed: {e:?}");
833 }
834 if let Err(e) = self.db.flush_opt(&{
835 let mut opt = rocksdb::FlushOptions::default();
836 opt.set_wait(true);
837 opt
838 }) {
839 eprintln!("rocks: flushing memtables failed: {e:?}");
840 }
841 match Arc::get_mut(&mut self.backup_task) {
842 Some(maybe_task) => {
843 if let Some(task) = maybe_task.take() {
844 eprintln!("waiting for backup task to complete...");
845 if let Err(e) = task.join() {
846 eprintln!("failed to join backup task: {e:?}");
847 }
848 }
849 }
850 None => eprintln!("rocks: failed to get backup task, likely a bug."),
851 }
852 self.db.cancel_all_background_work(true);
853 }
854 }
855}
856
857impl AsRocksValue for u64 {}
858impl ValueFromRocks for u64 {}
859
860impl LinkStorage for RocksStorage {
861 fn get_cursor(&mut self) -> Result<Option<u64>> {
862 self.db
863 .get(JETSTREAM_CURSOR_KEY)?
864 .map(|b| _vr(&b))
865 .transpose()
866 }
867
868 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()> {
869 // normal ops
870 let mut batch = WriteBatch::default();
871 let t0 = Instant::now();
872 if let Some(action) = match event {
873 ActionableEvent::CreateLinks { record_id, links } => {
874 self.add_links(record_id, links, &mut batch)?;
875 Some("create_links")
876 }
877 ActionableEvent::UpdateLinks {
878 record_id,
879 new_links,
880 } => {
881 self.remove_links(record_id, &mut batch)?;
882 self.add_links(record_id, new_links, &mut batch)?;
883 Some("update_links")
884 }
885 ActionableEvent::DeleteRecord(record_id) => {
886 self.remove_links(record_id, &mut batch)?;
887 Some("delete_record")
888 }
889 ActionableEvent::ActivateAccount(did) => {
890 self.set_account(did, true, &mut batch)?;
891 Some("set_account_status")
892 }
893 ActionableEvent::DeactivateAccount(did) => {
894 self.set_account(did, false, &mut batch)?;
895 Some("set_account_status")
896 }
897 ActionableEvent::DeleteAccount(_) => None, // delete account is handled specially
898 } {
899 let t_read = t0.elapsed();
900 batch.put(JETSTREAM_CURSOR_KEY.as_bytes(), _rv(cursor));
901 let batch_ops = batch.len();
902 self.db.write(batch)?;
903 let t_total = t0.elapsed();
904
905 histogram!("storage_rocksdb_read_seconds", "action" => action)
906 .record(t_read.as_secs_f64());
907 histogram!("storage_rocksdb_action_seconds", "action" => action)
908 .record(t_total.as_secs_f64());
909 counter!("storage_rocksdb_batch_ops_total", "action" => action)
910 .increment(batch_ops as u64);
911 }
912
913 // special metrics for account deletion which can be arbitrarily expensive
914 let mut outer_batch = WriteBatch::default();
915 let t0 = Instant::now();
916 if let ActionableEvent::DeleteAccount(did) = event {
917 let inner_batch_ops = self.delete_account(did, &mut outer_batch)?;
918 let total_batch_ops = inner_batch_ops + outer_batch.len();
919 self.db.write(outer_batch)?;
920 let t_total = t0.elapsed();
921
922 histogram!("storage_rocksdb_action_seconds", "action" => "delete_account")
923 .record(t_total.as_secs_f64());
924 counter!("storage_rocksdb_batch_ops_total", "action" => "delete_account")
925 .increment(total_batch_ops as u64);
926 histogram!("storage_rocksdb_delete_account_ops").record(total_batch_ops as f64);
927 }
928
929 Ok(())
930 }
931
932 fn to_readable(&mut self) -> impl LinkReader {
933 let mut readable = self.clone();
934 readable.is_writer = false;
935 readable
936 }
937}
938
939impl LinkReader for RocksStorage {
940 fn get_many_to_many_counts(
941 &self,
942 target: &str,
943 collection: &str,
944 path: &str,
945 path_to_other: &str,
946 limit: u64,
947 after: Option<String>,
948 filter_link_dids: &HashSet<Did>,
949 filter_to_targets: &HashSet<String>,
950 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
951 let collection = Collection(collection.to_string());
952 let path = RPath(path.to_string());
953
954 let target_key = TargetKey(Target(target.to_string()), collection.clone(), path.clone());
955
956 // unfortunately the cursor is a, uh, stringified number.
957 // this was easier for the memstore (plain target, not target id), and
958 // making it generic is a bit awful.
959 // so... parse the number out of a string here :(
960 // TODO: this should bubble up to a BAD_REQUEST response
961 let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?;
962
963 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
964 return Ok(PagedOrderedCollection::empty());
965 };
966
967 let filter_did_ids: HashMap<DidId, bool> = filter_link_dids
968 .iter()
969 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose())
970 .collect::<Result<Vec<DidIdValue>>>()?
971 .into_iter()
972 .map(|DidIdValue(id, active)| (id, active))
973 .collect();
974
975 // stored targets are keyed by triples of (target, collection, path).
976 // target filtering only consideres the target itself, so we actually
977 // need to do a prefix iteration of all target ids for this target and
978 // keep them all.
979 // i *think* the number of keys at a target prefix should usually be
980 // pretty small, so this is hopefully fine. but if it turns out to be
981 // large, we can push this filtering back into the main links loop and
982 // do forward db queries per backlink to get the raw target back out.
983 let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new();
984 for t in filter_to_targets {
985 for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) {
986 filter_to_target_ids.insert(target_id);
987 }
988 }
989
990 let linkers = self.get_target_linkers(&target_id)?;
991
992 let mut grouped_counts: BTreeMap<TargetId, (u64, HashSet<DidId>)> = BTreeMap::new();
993
994 for (did_id, rkey) in linkers.0 {
995 if did_id.is_empty() {
996 continue;
997 }
998
999 if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) {
1000 continue;
1001 }
1002
1003 let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey);
1004 let Some(targets) = self.get_record_link_targets(&record_link_key)? else {
1005 continue;
1006 };
1007
1008 let Some(fwd_target) = targets
1009 .0
1010 .into_iter()
1011 .filter_map(|RecordLinkTarget(rpath, target_id)| {
1012 if rpath.0 == path_to_other
1013 && (filter_to_target_ids.is_empty()
1014 || filter_to_target_ids.contains(&target_id))
1015 {
1016 Some(target_id)
1017 } else {
1018 None
1019 }
1020 })
1021 .take(1)
1022 .next()
1023 else {
1024 continue;
1025 };
1026
1027 // small relief: we page over target ids, so we can already bail
1028 // reprocessing previous pages here
1029 if after.as_ref().map(|a| fwd_target <= *a).unwrap_or(false) {
1030 continue;
1031 }
1032
1033 // aand we can skip target ids that must be on future pages
1034 // (this check continues after the did-lookup, which we have to do)
1035 let page_is_full = grouped_counts.len() as u64 > limit;
1036 if page_is_full {
1037 let current_max = grouped_counts.keys().next_back().unwrap();
1038 if fwd_target > *current_max {
1039 continue;
1040 }
1041 }
1042
1043 // bit painful: 2-step lookup to make sure this did is active
1044 let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else {
1045 eprintln!("failed to look up did from did_id {did_id:?}");
1046 continue;
1047 };
1048 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else {
1049 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
1050 continue;
1051 };
1052 if !active {
1053 continue;
1054 }
1055
1056 // page-management, continued
1057 // if we have a full page, and we're inserting a *new* key less than
1058 // the current max, then we can evict the current max
1059 let mut should_evict = false;
1060 let entry = grouped_counts.entry(fwd_target).or_insert_with(|| {
1061 // this is a *new* key, so kick the max if we're full
1062 should_evict = page_is_full;
1063 Default::default()
1064 });
1065 entry.0 += 1;
1066 entry.1.insert(did_id);
1067
1068 if should_evict {
1069 grouped_counts.pop_last();
1070 }
1071 }
1072
1073 // If we accumulated more than limit groups, there's another page.
1074 // Pop the extra before building items so it doesn't appear in results.
1075 let next = if grouped_counts.len() as u64 > limit {
1076 grouped_counts.pop_last();
1077 grouped_counts
1078 .keys()
1079 .next_back()
1080 .map(|k| format!("{}", k.0))
1081 } else {
1082 None
1083 };
1084
1085 let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len());
1086 for (target_id, (n, dids)) in &grouped_counts {
1087 let Some(target) = self
1088 .target_id_table
1089 .get_val_from_id(&self.db, target_id.0)?
1090 else {
1091 eprintln!("failed to look up target from target_id {target_id:?}");
1092 continue;
1093 };
1094 items.push((target.0 .0, *n, dids.len() as u64));
1095 }
1096
1097 Ok(PagedOrderedCollection { items, next })
1098 }
1099
1100 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
1101 let target_key = TargetKey(
1102 Target(target.to_string()),
1103 Collection(collection.to_string()),
1104 RPath(path.to_string()),
1105 );
1106 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? {
1107 let (alive, _) = self.get_target_linkers(&target_id)?.count();
1108 Ok(alive)
1109 } else {
1110 Ok(0)
1111 }
1112 }
1113
1114 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
1115 let target_key = TargetKey(
1116 Target(target.to_string()),
1117 Collection(collection.to_string()),
1118 RPath(path.to_string()),
1119 );
1120 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? {
1121 Ok(self.get_target_linkers(&target_id)?.count_distinct_dids())
1122 } else {
1123 Ok(0)
1124 }
1125 }
1126
1127 fn get_many_to_many(
1128 &self,
1129 target: &str,
1130 collection: &str,
1131 path: &str,
1132 path_to_other: &str,
1133 limit: u64,
1134 after: Option<String>,
1135 filter_link_dids: &HashSet<Did>,
1136 filter_to_targets: &HashSet<String>,
1137 ) -> Result<PagedOrderedCollection<ManyToManyItem, String>> {
1138 // helper to resolve dids
1139 let resolve_active_did = |did_id: &DidId| -> Result<Option<Did>> {
1140 let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else {
1141 eprintln!("failed to look up did from did_id {did_id:?}");
1142 return Ok(None);
1143 };
1144 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else {
1145 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
1146 return Ok(None);
1147 };
1148 Ok(active.then_some(did))
1149 };
1150
1151 // setup variables that we need later
1152 let collection = Collection(collection.to_string());
1153 let path = RPath(path.to_string());
1154
1155 // extract parts form composite cursor
1156 let cursor = match after {
1157 Some(a) => {
1158 let (b, f) = a.split_once(',').ok_or(anyhow!("invalid cursor format"))?;
1159 let backlink_idx = b
1160 .parse::<u64>()
1161 .map_err(|e| anyhow!("invalid cursor.0: {e}"))?;
1162 let other_link_idx = f
1163 .parse::<u64>()
1164 .map_err(|e| anyhow!("invalid cursor.1: {e}"))?;
1165 Some(ManyToManyCursor {
1166 backlink_idx,
1167 other_link_idx,
1168 })
1169 }
1170 None => None,
1171 };
1172
1173 // (__active__) did ids and filter targets
1174 let filter_did_ids: HashMap<DidId, bool> = filter_link_dids
1175 .iter()
1176 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose())
1177 .collect::<Result<Vec<DidIdValue>>>()?
1178 .into_iter()
1179 .map(|DidIdValue(id, active)| (id, active))
1180 .collect();
1181 let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new();
1182 for t in filter_to_targets {
1183 for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) {
1184 filter_to_target_ids.insert(target_id);
1185 }
1186 }
1187
1188 let target_key = TargetKey(Target(target.to_string()), collection.clone(), path);
1189 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
1190 eprintln!("Target not found for {target_key:?}");
1191 return Ok(PagedOrderedCollection::empty());
1192 };
1193 let linkers = self.get_target_linkers(&target_id)?;
1194
1195 let mut items: Vec<(usize, usize, ManyToManyItem)> = Vec::new();
1196
1197 // iterate backlinks (who linked to the target?)
1198 for (backlink_idx, (did_id, rkey)) in
1199 linkers
1200 .0
1201 .iter()
1202 .enumerate()
1203 .skip_while(|(backlink_idx, _)| {
1204 cursor.is_some_and(|c| *backlink_idx < c.backlink_idx as usize)
1205 })
1206 {
1207 if did_id.is_empty()
1208 || (!filter_did_ids.is_empty() && !filter_did_ids.contains_key(did_id))
1209 {
1210 continue;
1211 }
1212
1213 let Some(links) = self.get_record_link_targets(&RecordLinkKey(
1214 *did_id,
1215 collection.clone(),
1216 rkey.clone(),
1217 ))?
1218 else {
1219 continue;
1220 };
1221
1222 // iterate fwd links (which of these links point to the "other" target?)
1223 for (other_link_idx, RecordLinkTarget(_, fwd_target_id)) in links
1224 .0
1225 .into_iter()
1226 .enumerate()
1227 .filter(|(_, RecordLinkTarget(rpath, target_id))| {
1228 rpath.0 == path_to_other
1229 && (filter_to_target_ids.is_empty()
1230 || filter_to_target_ids.contains(target_id))
1231 })
1232 .skip_while(|(other_link_idx, _)| {
1233 cursor.is_some_and(|c| {
1234 backlink_idx == c.backlink_idx as usize
1235 && *other_link_idx <= c.other_link_idx as usize
1236 })
1237 })
1238 .take(limit as usize + 1 - items.len())
1239 {
1240 // extract forward target did (target that links to the __other__ target)
1241 let Some(did) = resolve_active_did(did_id)? else {
1242 continue;
1243 };
1244 // resolve to target string
1245 let Some(fwd_target_key) = self
1246 .target_id_table
1247 .get_val_from_id(&self.db, fwd_target_id.0)?
1248 else {
1249 continue;
1250 };
1251
1252 // link to be added
1253 let record_id = RecordId {
1254 did,
1255 collection: collection.0.clone(),
1256 rkey: rkey.0.clone(),
1257 };
1258 let item = ManyToManyItem {
1259 link_record: record_id,
1260 other_subject: fwd_target_key.0 .0,
1261 };
1262 items.push((backlink_idx, other_link_idx, item));
1263 }
1264
1265 // page full - eject
1266 if items.len() > limit as usize {
1267 break;
1268 }
1269 }
1270
1271 // We collect up to limit + 1 fully-resolved items. If we got more than
1272 // limit, there are more results beyond this page. We truncate to limit
1273 // items (the actual page) and build a composite cursor from the last
1274 // item on the page — a base64-encoded pair of (backlink_vec_idx,
1275 // forward_link_idx). On the next request, skip_while advances past
1276 // this position: backlinks before backlink_vec_idx are skipped entirely,
1277 // and at backlink_vec_idx itself, forward links at or before
1278 // forward_link_idx are skipped. This correctly resumes mid-record when
1279 // a single backlinker has multiple forward links at path_to_other.
1280 let next = (items.len() > limit as usize).then(|| {
1281 let (b, o, _) = items[limit as usize - 1];
1282 format!("{b},{o}")
1283 });
1284
1285 let items = items
1286 .into_iter()
1287 .take(limit as usize)
1288 .map(|(_, _, item)| item)
1289 .collect();
1290
1291 Ok(PagedOrderedCollection { items, next })
1292 }
1293
1294 fn get_links(
1295 &self,
1296 target: &str,
1297 collection: &str,
1298 path: &str,
1299 order: Order,
1300 limit: u64,
1301 until: Option<u64>,
1302 filter_dids: &HashSet<Did>,
1303 ) -> Result<PagedAppendingCollection<RecordId>> {
1304 let target_key = TargetKey(
1305 Target(target.to_string()),
1306 Collection(collection.to_string()),
1307 RPath(path.to_string()),
1308 );
1309
1310 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
1311 return Ok(PagedAppendingCollection::empty());
1312 };
1313
1314 let mut linkers = self.get_target_linkers(&target_id)?;
1315 if !filter_dids.is_empty() {
1316 let mut did_filter = HashSet::new();
1317 for did in filter_dids {
1318 let Some(DidIdValue(did_id, active)) =
1319 self.did_id_table.get_id_val(&self.db, did)?
1320 else {
1321 eprintln!("failed to find a did_id for {did:?}");
1322 continue;
1323 };
1324 if !active {
1325 eprintln!("excluding inactive did from filtered results");
1326 continue;
1327 }
1328 did_filter.insert(did_id);
1329 }
1330 linkers.0.retain(|linker| did_filter.contains(&linker.0));
1331 }
1332
1333 let (alive, gone) = linkers.count();
1334 let total = alive + gone;
1335
1336 let (start, take, next_until) = match order {
1337 // OldestToNewest: start from the beginning, paginate forward
1338 Order::OldestToNewest => {
1339 let start = until.unwrap_or(0);
1340 let next = start + limit + 1;
1341 let next_until = if next < total { Some(next) } else { None };
1342 (start, limit, next_until)
1343 }
1344 // NewestToOldest: start from the end, paginate backward
1345 Order::NewestToOldest => {
1346 let until = until.unwrap_or(total);
1347 match until.checked_sub(limit) {
1348 Some(s) if s > 0 => (s, limit, Some(s)),
1349 Some(s) => (s, limit, None),
1350 None => (0, until, None),
1351 }
1352 }
1353 };
1354
1355 let did_id_rkeys = linkers.0.iter().skip(start as usize).take(take as usize);
1356 let did_id_rkeys: Vec<_> = match order {
1357 Order::OldestToNewest => did_id_rkeys.collect(),
1358 Order::NewestToOldest => did_id_rkeys.rev().collect(),
1359 };
1360
1361 let mut items = Vec::with_capacity(did_id_rkeys.len());
1362 // TODO: use get-many (or multi-get or whatever it's called)
1363 for (did_id, rkey) in did_id_rkeys {
1364 if did_id.is_empty() {
1365 continue;
1366 }
1367 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? {
1368 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)?
1369 else {
1370 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
1371 continue;
1372 };
1373 if !active {
1374 continue;
1375 }
1376 items.push(RecordId {
1377 did,
1378 collection: collection.to_string(),
1379 rkey: rkey.0.clone(),
1380 });
1381 } else {
1382 eprintln!("failed to look up did from did_id {did_id:?}");
1383 }
1384 }
1385
1386 Ok(PagedAppendingCollection {
1387 version: (total, gone),
1388 items,
1389 next: next_until,
1390 total: alive,
1391 })
1392 }
1393
1394 fn get_distinct_dids(
1395 &self,
1396 target: &str,
1397 collection: &str,
1398 path: &str,
1399 limit: u64,
1400 until: Option<u64>,
1401 ) -> Result<PagedAppendingCollection<Did>> {
1402 let target_key = TargetKey(
1403 Target(target.to_string()),
1404 Collection(collection.to_string()),
1405 RPath(path.to_string()),
1406 );
1407
1408 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
1409 return Ok(PagedAppendingCollection::empty());
1410 };
1411
1412 let linkers = self.get_distinct_target_linkers(&target_id)?;
1413
1414 let (alive, gone) = linkers.count();
1415 let total = alive + gone;
1416
1417 let until = until.unwrap_or(total);
1418 let (start, take, next_until) = match until.checked_sub(limit) {
1419 Some(s) if s > 0 => (s, limit, Some(s)),
1420 Some(s) => (s, limit, None),
1421 None => (0, until, None),
1422 };
1423
1424 let did_id_rkeys = linkers.0.iter().skip(start as usize).take(take as usize);
1425 let did_id_rkeys: Vec<_> = did_id_rkeys.rev().collect();
1426
1427 let mut items = Vec::with_capacity(did_id_rkeys.len());
1428 // TODO: use get-many (or multi-get or whatever it's called)
1429 for (did_id, _) in did_id_rkeys {
1430 if did_id.is_empty() {
1431 continue;
1432 }
1433 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? {
1434 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)?
1435 else {
1436 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
1437 continue;
1438 };
1439 if !active {
1440 continue;
1441 }
1442 items.push(did);
1443 } else {
1444 eprintln!("failed to look up did from did_id {did_id:?}");
1445 }
1446 }
1447
1448 Ok(PagedAppendingCollection {
1449 version: (total, gone),
1450 items,
1451 next: next_until,
1452 total: alive,
1453 })
1454 }
1455
1456 fn get_all_record_counts(&self, target: &str) -> Result<HashMap<String, HashMap<String, u64>>> {
1457 let mut out: HashMap<String, HashMap<String, u64>> = HashMap::new();
1458 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) {
1459 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key;
1460 let (count, _) = self.get_target_linkers(&target_id)?.count();
1461 out.entry(collection.into())
1462 .or_default()
1463 .insert(path.clone(), count);
1464 }
1465 Ok(out)
1466 }
1467
1468 fn get_all_counts(
1469 &self,
1470 target: &str,
1471 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>> {
1472 let mut out: HashMap<String, HashMap<String, CountsByCount>> = HashMap::new();
1473 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) {
1474 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key;
1475 let target_linkers = self.get_target_linkers(&target_id)?;
1476 let (records, _) = target_linkers.count();
1477 let distinct_dids = target_linkers.count_distinct_dids();
1478 out.entry(collection.into()).or_default().insert(
1479 path.clone(),
1480 CountsByCount {
1481 records,
1482 distinct_dids,
1483 },
1484 );
1485 }
1486 Ok(out)
1487 }
1488
1489 fn get_stats(&self) -> Result<StorageStats> {
1490 let dids = self.did_id_table.estimate_count();
1491 let targetables = self.target_id_table.estimate_count();
1492 let lr_cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap();
1493 let linking_records = self
1494 .db
1495 .property_value_cf(&lr_cf, rocksdb::properties::ESTIMATE_NUM_KEYS)?
1496 .map(|s| s.parse::<u64>())
1497 .transpose()?
1498 .unwrap_or(0);
1499 let started_at = self
1500 .db
1501 .get(STARTED_AT_KEY)?
1502 .map(|c| _vr(&c))
1503 .transpose()?
1504 .unwrap_or(COZY_FIRST_CURSOR);
1505
1506 let other_data = self
1507 .db
1508 .get(TARGET_ID_REPAIR_STATE_KEY)?
1509 .map(|s| _vr(&s))
1510 .transpose()?
1511 .map(
1512 |TargetIdRepairState {
1513 current_us_started_at,
1514 id_when_started,
1515 latest_repaired_i,
1516 }| {
1517 HashMap::from([
1518 ("current_us_started_at".to_string(), current_us_started_at),
1519 ("id_when_started".to_string(), id_when_started),
1520 ("latest_repaired_i".to_string(), latest_repaired_i),
1521 ])
1522 },
1523 )
1524 .unwrap_or(HashMap::default());
1525
1526 Ok(StorageStats {
1527 dids,
1528 targetables,
1529 linking_records,
1530 started_at: Some(started_at),
1531 other_data,
1532 })
1533 }
1534}
1535
1536trait AsRocksKey: Serialize {}
1537trait AsRocksKeyPrefix<K: KeyFromRocks>: Serialize {}
1538trait AsRocksValue: Serialize {}
1539trait KeyFromRocks: for<'de> Deserialize<'de> {}
1540trait ValueFromRocks: for<'de> Deserialize<'de> {}
1541
1542// did_id table
1543impl AsRocksKey for &Did {}
1544impl AsRocksValue for &DidIdValue {}
1545impl ValueFromRocks for DidIdValue {}
1546
1547// temp
1548impl KeyFromRocks for Did {}
1549impl AsRocksKey for &DidId {}
1550
1551// target_ids table
1552impl AsRocksKey for &TargetKey {}
1553impl AsRocksKeyPrefix<TargetKey> for &TargetIdTargetPrefix {}
1554impl AsRocksValue for &TargetId {}
1555impl KeyFromRocks for TargetKey {}
1556impl ValueFromRocks for TargetId {}
1557
1558// temp?
1559impl KeyFromRocks for TargetId {}
1560impl AsRocksValue for &TargetKey {}
1561
1562// target_links table
1563impl AsRocksKey for &TargetId {}
1564impl AsRocksValue for &TargetLinkers {}
1565impl ValueFromRocks for TargetLinkers {}
1566
1567// record_link_targets table
1568impl AsRocksKey for &RecordLinkKey {}
1569impl AsRocksKeyPrefix<RecordLinkKey> for &RecordLinkKeyDidIdPrefix {}
1570impl AsRocksValue for &RecordLinkTargets {}
1571impl KeyFromRocks for RecordLinkKey {}
1572impl ValueFromRocks for RecordLinkTargets {}
1573
1574pub fn _bincode_opts() -> impl BincodeOptions {
1575 bincode::DefaultOptions::new().with_big_endian() // happier db -- numeric prefixes in lsm
1576}
1577fn _rk(k: impl AsRocksKey) -> Vec<u8> {
1578 _bincode_opts().serialize(&k).unwrap()
1579}
1580fn _rkp<K: KeyFromRocks>(kp: impl AsRocksKeyPrefix<K>) -> Vec<u8> {
1581 _bincode_opts().serialize(&kp).unwrap()
1582}
1583fn _rv(v: impl AsRocksValue) -> Vec<u8> {
1584 _bincode_opts().serialize(&v).unwrap()
1585}
1586fn _kr<T: KeyFromRocks>(bytes: &[u8]) -> Result<T> {
1587 Ok(_bincode_opts().deserialize(bytes)?)
1588}
1589fn _vr<T: ValueFromRocks>(bytes: &[u8]) -> Result<T> {
1590 Ok(_bincode_opts().deserialize(bytes)?)
1591}
1592
1593#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1594pub struct Collection(pub String);
1595
1596#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1597pub struct RPath(pub String);
1598
1599#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1600pub struct RKey(pub String);
1601
1602impl RKey {
1603 fn empty() -> Self {
1604 RKey("".to_string())
1605 }
1606}
1607
1608// did ids
1609#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
1610pub struct DidId(pub u64);
1611
1612impl DidId {
1613 fn empty() -> Self {
1614 DidId(0)
1615 }
1616 fn is_empty(&self) -> bool {
1617 self.0 == 0
1618 }
1619}
1620
1621#[derive(Debug, Clone, Serialize, Deserialize)]
1622struct DidIdValue(DidId, bool); // active or not
1623
1624impl DidIdValue {
1625 fn did_id(&self) -> DidId {
1626 let Self(id, _) = self;
1627 *id
1628 }
1629}
1630
1631// target ids
1632#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)]
1633struct TargetId(u64); // key
1634
1635#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1636pub struct Target(pub String); // the actual target/uri
1637
1638// targets (uris, dids, etc.): the reverse index
1639#[derive(Debug, Clone, Serialize, Deserialize)]
1640pub struct TargetKey(pub Target, pub Collection, pub RPath);
1641
1642#[derive(Debug, Default, Serialize, Deserialize)]
1643pub struct TargetLinkers(pub Vec<(DidId, RKey)>);
1644
1645impl TargetLinkers {
1646 fn remove_linker(&mut self, did: &DidId, rkey: &RKey) -> bool {
1647 if let Some(entry) = self.0.iter_mut().rfind(|d| **d == (*did, rkey.clone())) {
1648 *entry = (DidId::empty(), RKey::empty());
1649 true
1650 } else {
1651 false
1652 }
1653 }
1654 pub fn count(&self) -> (u64, u64) {
1655 // (linkers, deleted links)
1656 let total = self.0.len() as u64;
1657 let alive = self.0.iter().filter(|(DidId(id), _)| *id != 0).count() as u64;
1658 let gone = total - alive;
1659 (alive, gone)
1660 }
1661 fn count_distinct_dids(&self) -> u64 {
1662 self.0
1663 .iter()
1664 .filter_map(|(DidId(id), _)| if *id == 0 { None } else { Some(id) })
1665 .collect::<HashSet<_>>()
1666 .len() as u64
1667 }
1668}
1669
1670// forward links to targets so we can delete links
1671#[derive(Debug, Serialize, Deserialize)]
1672struct RecordLinkKey(DidId, Collection, RKey);
1673
1674// does this even work????
1675#[derive(Debug, Serialize, Deserialize)]
1676struct RecordLinkKeyDidIdPrefix(DidId);
1677
1678#[derive(Debug, Serialize, Deserialize)]
1679struct TargetIdTargetPrefix(Target);
1680
1681#[derive(Debug, Serialize, Deserialize)]
1682struct RecordLinkTarget(RPath, TargetId);
1683
1684#[derive(Debug, Default, Serialize, Deserialize)]
1685struct RecordLinkTargets(Vec<RecordLinkTarget>);
1686
1687impl RecordLinkTargets {
1688 fn with_capacity(cap: usize) -> Self {
1689 Self(Vec::with_capacity(cap))
1690 }
1691 fn add(&mut self, target: RecordLinkTarget) {
1692 self.0.push(target)
1693 }
1694}
1695
1696#[cfg(test)]
1697mod tests {
1698 use super::super::ActionableEvent;
1699 use super::*;
1700 use links::Link;
1701 use tempfile::tempdir;
1702
1703 #[test]
1704 fn rocks_delete_iterator_regression() -> Result<()> {
1705 let mut store = RocksStorage::new(tempdir()?)?;
1706
1707 // create a link from the deleter account
1708 store.push(
1709 &ActionableEvent::CreateLinks {
1710 record_id: RecordId {
1711 did: "did:plc:will-shortly-delete".into(),
1712 collection: "a.b.c".into(),
1713 rkey: "asdf".into(),
1714 },
1715 links: vec![CollectedLink {
1716 target: Link::Uri("example.com".into()),
1717 path: ".uri".into(),
1718 }],
1719 },
1720 0,
1721 )?;
1722
1723 // and a different link from a separate, new account (later in didid prefix iteration)
1724 store.push(
1725 &ActionableEvent::CreateLinks {
1726 record_id: RecordId {
1727 did: "did:plc:someone-else".into(),
1728 collection: "a.b.c".into(),
1729 rkey: "asdf".into(),
1730 },
1731 links: vec![CollectedLink {
1732 target: Link::Uri("another.example.com".into()),
1733 path: ".uri".into(),
1734 }],
1735 },
1736 0,
1737 )?;
1738
1739 // now delete the first account (this is where the buggy version explodes)
1740 store.push(
1741 &ActionableEvent::DeleteAccount("did:plc:will-shortly-delete".into()),
1742 0,
1743 )?;
1744
1745 Ok(())
1746 }
1747
1748 #[test]
1749 fn rocks_prefix_iteration_helper() -> Result<()> {
1750 #[derive(Serialize, Deserialize)]
1751 struct Key(u8, u8);
1752
1753 #[derive(Serialize)]
1754 struct KeyPrefix(u8);
1755
1756 #[derive(Serialize, Deserialize)]
1757 struct Value(());
1758
1759 impl AsRocksKey for &Key {}
1760 impl AsRocksKeyPrefix<Key> for &KeyPrefix {}
1761 impl AsRocksValue for &Value {}
1762
1763 impl KeyFromRocks for Key {}
1764 impl ValueFromRocks for Value {}
1765
1766 let data = RocksStorage::new(tempdir()?)?;
1767 let cf = data.db.cf_handle(DID_IDS_CF).unwrap();
1768 let mut batch = WriteBatch::default();
1769
1770 // not our prefix
1771 batch.put_cf(&cf, _rk(&Key(0x01, 0x00)), _rv(&Value(())));
1772 batch.put_cf(&cf, _rk(&Key(0x01, 0xFF)), _rv(&Value(())));
1773
1774 // our prefix!
1775 for i in 0..=0xFF {
1776 batch.put_cf(&cf, _rk(&Key(0x02, i)), _rv(&Value(())));
1777 }
1778
1779 // not our prefix
1780 batch.put_cf(&cf, _rk(&Key(0x03, 0x00)), _rv(&Value(())));
1781 batch.put_cf(&cf, _rk(&Key(0x03, 0xFF)), _rv(&Value(())));
1782
1783 data.db.write(batch)?;
1784
1785 let mut okays: [bool; 256] = [false; 256];
1786 for (i, (k, Value(_))) in data.prefix_iter_cf(&cf, KeyPrefix(0x02)).enumerate() {
1787 assert!(i < 256);
1788 assert_eq!(k.0, 0x02, "prefix iterated key has the right prefix");
1789 assert_eq!(k.1 as usize, i, "prefixed keys are iterated in exact order");
1790 okays[k.1 as usize] = true;
1791 }
1792 assert!(okays.iter().all(|b| *b), "every key was iterated");
1793
1794 Ok(())
1795 }
1796
1797 // TODO: add tests for key prefixes actually prefixing (bincode encoding _should_...)
1798}