Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 1798 lines 65 kB view raw
1use super::{ 2 ActionableEvent, LinkReader, LinkStorage, ManyToManyCursor, Order, PagedAppendingCollection, 3 PagedOrderedCollection, StorageStats, 4}; 5use crate::{CountsByCount, Did, ManyToManyItem, RecordId}; 6 7use anyhow::{anyhow, bail, Result}; 8use bincode::Options as BincodeOptions; 9use links::CollectedLink; 10use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; 11use ratelimit::Ratelimiter; 12use rocksdb::backup::{BackupEngine, BackupEngineOptions}; 13use rocksdb::{ 14 AsColumnFamilyRef, ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, MergeOperands, 15 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch, 16}; 17use serde::{Deserialize, Serialize}; 18use tokio_util::sync::CancellationToken; 19 20use std::collections::{BTreeMap, HashMap, HashSet}; 21use std::io::Read; 22use std::marker::PhantomData; 23use std::path::{Path, PathBuf}; 24use std::sync::{ 25 atomic::{AtomicU64, Ordering}, 26 Arc, 27}; 28use std::thread; 29use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; 30 31static DID_IDS_CF: &str = "did_ids"; 32static TARGET_IDS_CF: &str = "target_ids"; 33static TARGET_LINKERS_CF: &str = "target_links"; 34static LINK_TARGETS_CF: &str = "link_targets"; 35 36static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor"; 37static STARTED_AT_KEY: &str = "jetstream_first_cursor"; 38// add reverse mappings for targets if this db was running before that was a thing 39static TARGET_ID_REPAIR_STATE_KEY: &str = "target_id_table_repair_state"; 40 41static COZY_FIRST_CURSOR: u64 = 1_738_083_600_000_000; // constellation.microcosm.blue started 42 43#[derive(Debug, Clone, Serialize, Deserialize)] 44struct TargetIdRepairState { 45 /// start time for repair, microseconds timestamp 46 current_us_started_at: u64, 47 /// id table's latest id when repair started 48 id_when_started: u64, 49 /// id table id 50 latest_repaired_i: u64, 51} 52impl AsRocksValue for TargetIdRepairState {} 53impl ValueFromRocks for TargetIdRepairState {} 54 55// todo: actually understand and set these options probably better 56fn rocks_opts_base() -> Options { 57 let mut opts = Options::default(); 58 opts.set_level_compaction_dynamic_level_bytes(true); 59 opts.create_if_missing(true); 60 opts.set_compression_type(rocksdb::DBCompressionType::Lz4); 61 opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); // this probably doesn't work because it hasn't been enabled 62 // TODO: actually enable the bottommost compression. but after other changes run for a bit in case zstd is cpu- or mem-expensive. 63 opts 64} 65fn get_db_opts() -> Options { 66 let mut opts = rocks_opts_base(); 67 opts.create_missing_column_families(true); 68 opts.increase_parallelism(4); // todo: make configurable if anyone else actually runs a different instance. start at # of cores 69 // consider doing optimize_level_style_compaction or optimize_universal_style_compaction 70 opts 71} 72fn get_db_read_opts() -> Options { 73 let mut opts = Options::default(); 74 opts.optimize_for_point_lookup(16_384); // mb (run this on big machines) 75 opts 76} 77 78#[derive(Debug, Clone)] 79pub struct RocksStorage { 80 pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun) 81 did_id_table: IdTable<Did, DidIdValue>, 82 target_id_table: IdTable<TargetKey, TargetId>, 83 is_writer: bool, 84 backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>, 85} 86 87trait IdTableValue: ValueFromRocks + Clone { 88 fn new(v: u64) -> Self; 89 fn id(&self) -> u64; 90} 91#[derive(Debug, Clone)] 92struct IdTableBase<Orig, IdVal: IdTableValue> 93where 94 Orig: KeyFromRocks, 95 for<'a> &'a Orig: AsRocksKey, 96{ 97 _key_marker: PhantomData<Orig>, 98 _val_marker: PhantomData<IdVal>, 99 name: String, 100 id_seq: Arc<AtomicU64>, 101} 102impl<Orig, IdVal: IdTableValue> IdTableBase<Orig, IdVal> 103where 104 Orig: KeyFromRocks, 105 for<'a> &'a Orig: AsRocksKey, 106{ 107 fn cf_descriptor(&self) -> ColumnFamilyDescriptor { 108 ColumnFamilyDescriptor::new(&self.name, rocks_opts_base()) 109 } 110 fn init(self, db: &DBWithThreadMode<MultiThreaded>) -> Result<IdTable<Orig, IdVal>> { 111 if db.cf_handle(&self.name).is_none() { 112 bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?"); 113 } 114 let priv_id_seq = if let Some(seq_bytes) = db.get(self.seq_key())? { 115 if seq_bytes.len() != 8 { 116 bail!( 117 "reading bytes for u64 id seq {:?}: found the wrong number of bytes", 118 self.seq_key() 119 ); 120 } 121 let mut buf: [u8; 8] = [0; 8]; 122 seq_bytes.as_slice().read_exact(&mut buf)?; 123 let last_seq = u64::from_le_bytes(buf); 124 last_seq + 1 125 } else { 126 1 127 }; 128 self.id_seq.store(priv_id_seq, Ordering::SeqCst); 129 Ok(IdTable { 130 base: self, 131 priv_id_seq, 132 }) 133 } 134 fn seq_key(&self) -> Vec<u8> { 135 let mut k = b"__id_seq_key_plz_be_unique:".to_vec(); 136 k.extend(self.name.as_bytes()); 137 k 138 } 139} 140#[derive(Debug, Clone)] 141struct IdTable<Orig, IdVal: IdTableValue> 142where 143 Orig: KeyFromRocks, 144 for<'a> &'a Orig: AsRocksKey, 145{ 146 base: IdTableBase<Orig, IdVal>, 147 priv_id_seq: u64, 148} 149impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal> 150where 151 Orig: KeyFromRocks, 152 for<'v> &'v IdVal: AsRocksValue, 153 for<'k> &'k Orig: AsRocksKey, 154{ 155 #[must_use] 156 fn setup(name: &str) -> IdTableBase<Orig, IdVal> { 157 IdTableBase::<Orig, IdVal> { 158 _key_marker: PhantomData, 159 _val_marker: PhantomData, 160 name: name.into(), 161 id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninit", first seq num will be 1 162 } 163 } 164 fn get_id_val( 165 &self, 166 db: &DBWithThreadMode<MultiThreaded>, 167 orig: &Orig, 168 ) -> Result<Option<IdVal>> { 169 let cf = db.cf_handle(&self.base.name).unwrap(); 170 if let Some(_id_bytes) = db.get_cf(&cf, _rk(orig))? { 171 Ok(Some(_vr(&_id_bytes)?)) 172 } else { 173 Ok(None) 174 } 175 } 176 fn __get_or_create_id_val<CF>( 177 &mut self, 178 cf: &CF, 179 db: &DBWithThreadMode<MultiThreaded>, 180 batch: &mut WriteBatch, 181 orig: &Orig, 182 ) -> Result<IdVal> 183 where 184 CF: AsColumnFamilyRef, 185 { 186 Ok(self.get_id_val(db, orig)?.unwrap_or_else(|| { 187 let prev_priv_seq = self.priv_id_seq; 188 self.priv_id_seq += 1; 189 let prev_public_seq = self.base.id_seq.swap(self.priv_id_seq, Ordering::SeqCst); 190 assert_eq!( 191 prev_public_seq, prev_priv_seq, 192 "public seq may have been modified??" 193 ); 194 let id_value = IdVal::new(self.priv_id_seq); 195 batch.put(self.base.seq_key(), self.priv_id_seq.to_le_bytes()); 196 batch.put_cf(cf, _rk(orig), _rv(&id_value)); 197 id_value 198 })) 199 } 200 201 fn estimate_count(&self) -> u64 { 202 self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved 203 } 204 205 fn get_or_create_id_val( 206 &mut self, 207 db: &DBWithThreadMode<MultiThreaded>, 208 batch: &mut WriteBatch, 209 orig: &Orig, 210 ) -> Result<IdVal> { 211 let cf = db.cf_handle(&self.base.name).unwrap(); 212 let id_val = self.__get_or_create_id_val(&cf, db, batch, orig)?; 213 // TODO: assert that the original is never a u64 that could collide 214 batch.put_cf(&cf, id_val.id().to_be_bytes(), _rk(orig)); // reversed rk/rv on purpose here :/ 215 Ok(id_val) 216 } 217 218 fn get_val_from_id( 219 &self, 220 db: &DBWithThreadMode<MultiThreaded>, 221 id: u64, 222 ) -> Result<Option<Orig>> { 223 let cf = db.cf_handle(&self.base.name).unwrap(); 224 if let Some(orig_bytes) = db.get_cf(&cf, id.to_be_bytes())? { 225 // HACK ish 226 Ok(Some(_kr(&orig_bytes)?)) 227 } else { 228 Ok(None) 229 } 230 } 231} 232 233impl IdTableValue for DidIdValue { 234 fn new(v: u64) -> Self { 235 DidIdValue(DidId(v), true) 236 } 237 fn id(&self) -> u64 { 238 self.0 .0 239 } 240} 241impl IdTableValue for TargetId { 242 fn new(v: u64) -> Self { 243 TargetId(v) 244 } 245 fn id(&self) -> u64 { 246 self.0 247 } 248} 249 250fn now() -> u64 { 251 SystemTime::now() 252 .duration_since(UNIX_EPOCH) 253 .unwrap() 254 .as_micros() as u64 255} 256 257impl RocksStorage { 258 pub fn new(path: impl AsRef<Path>) -> Result<Self> { 259 Self::describe_metrics(); 260 let me = RocksStorage::open_readmode(path, false)?; 261 me.global_init()?; 262 Ok(me) 263 } 264 265 pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> { 266 RocksStorage::open_readmode(path, true) 267 } 268 269 fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> { 270 let did_id_table = IdTable::setup(DID_IDS_CF); 271 let target_id_table = IdTable::setup(TARGET_IDS_CF); 272 273 // note: global stuff like jetstream cursor goes in the default cf 274 // these are bonus extra cfs 275 let cfs = vec![ 276 // id reference tables 277 did_id_table.cf_descriptor(), 278 target_id_table.cf_descriptor(), 279 // the reverse links: 280 ColumnFamilyDescriptor::new(TARGET_LINKERS_CF, { 281 let mut opts = rocks_opts_base(); 282 opts.set_merge_operator_associative( 283 "merge_op_extend_did_ids", 284 Self::merge_op_extend_did_ids, 285 ); 286 opts 287 }), 288 // unfortunately we also need forward links to handle deletes 289 ColumnFamilyDescriptor::new(LINK_TARGETS_CF, rocks_opts_base()), 290 ]; 291 292 let db = if readonly { 293 DBWithThreadMode::open_cf_descriptors_read_only(&get_db_read_opts(), path, cfs, false)? 294 } else { 295 DBWithThreadMode::open_cf_descriptors(&get_db_opts(), path, cfs)? 296 }; 297 298 let db = Arc::new(db); 299 let did_id_table = did_id_table.init(&db)?; 300 let target_id_table = target_id_table.init(&db)?; 301 Ok(Self { 302 db, 303 did_id_table, 304 target_id_table, 305 is_writer: !readonly, 306 backup_task: None.into(), 307 }) 308 } 309 310 fn global_init(&self) -> Result<()> { 311 let first_run = self.db.get(JETSTREAM_CURSOR_KEY)?.is_some(); 312 if first_run { 313 self.db.put(STARTED_AT_KEY, _rv(now()))?; 314 315 // hack / temporary: if we're a new db, put in a completed repair 316 // state so we don't run repairs (repairs are for old-code dbs) 317 let completed = TargetIdRepairState { 318 id_when_started: 0, 319 current_us_started_at: 0, 320 latest_repaired_i: 0, 321 }; 322 self.db.put(TARGET_ID_REPAIR_STATE_KEY, _rv(completed))?; 323 } 324 Ok(()) 325 } 326 327 pub fn run_repair(&self, breather: Duration, stay_alive: CancellationToken) -> Result<bool> { 328 let mut state = match self 329 .db 330 .get(TARGET_ID_REPAIR_STATE_KEY)? 331 .map(|s| _vr(&s)) 332 .transpose()? 333 { 334 Some(s) => s, 335 None => TargetIdRepairState { 336 id_when_started: self.did_id_table.priv_id_seq, 337 current_us_started_at: now(), 338 latest_repaired_i: 0, 339 }, 340 }; 341 342 eprintln!("initial repair state: {state:?}"); 343 344 let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap(); 345 346 let mut iter = self.db.raw_iterator_cf(&cf); 347 iter.seek_to_first(); 348 349 eprintln!("repair iterator sent to first key"); 350 351 // skip ahead if we're done some, or take a single first step 352 for _ in 0..state.latest_repaired_i { 353 iter.next(); 354 } 355 356 eprintln!( 357 "repair iterator skipped to {}th key", 358 state.latest_repaired_i 359 ); 360 361 let mut maybe_done = false; 362 363 let mut write_fast = rocksdb::WriteOptions::default(); 364 write_fast.set_sync(false); 365 write_fast.disable_wal(true); 366 367 while !stay_alive.is_cancelled() && !maybe_done { 368 // let mut batch = WriteBatch::default(); 369 370 let mut any_written = false; 371 372 for _ in 0..1000 { 373 if state.latest_repaired_i % 1_000_000 == 0 { 374 eprintln!("target iter at {}", state.latest_repaired_i); 375 } 376 state.latest_repaired_i += 1; 377 378 if !iter.valid() { 379 eprintln!("invalid iter, are we done repairing?"); 380 maybe_done = true; 381 break; 382 }; 383 384 // eprintln!("iterator seems to be valid! getting the key..."); 385 let raw_key = iter.key().unwrap(); 386 if raw_key.len() == 8 { 387 // eprintln!("found an 8-byte key, skipping it since it's probably an id..."); 388 iter.next(); 389 continue; 390 } 391 let target: TargetKey = _kr::<TargetKey>(raw_key)?; 392 let target_id: TargetId = _vr(iter.value().unwrap())?; 393 394 self.db 395 .put_cf_opt(&cf, target_id.id().to_be_bytes(), _rv(&target), &write_fast)?; 396 any_written = true; 397 iter.next(); 398 } 399 400 if any_written { 401 self.db 402 .put(TARGET_ID_REPAIR_STATE_KEY, _rv(state.clone()))?; 403 std::thread::sleep(breather); 404 } 405 } 406 407 eprintln!("repair iterator done."); 408 409 Ok(false) 410 } 411 412 pub fn start_backup( 413 &mut self, 414 path: PathBuf, 415 auto: Option<(u64, Option<usize>)>, 416 stay_alive: CancellationToken, 417 ) -> Result<()> { 418 let task = if let Some((interval_hrs, copies)) = auto { 419 eprintln!("backups: starting background task..."); 420 self.backup_task(path, interval_hrs, copies, stay_alive) 421 } else { 422 eprintln!("backups: starting a one-off backup..."); 423 thread::spawn({ 424 let db = self.db.clone(); 425 move || Self::do_backup(db, path) 426 }) 427 }; 428 self.backup_task = Arc::new(Some(task)); 429 Ok(()) 430 } 431 432 fn backup_task( 433 &self, 434 path: PathBuf, 435 interval_hrs: u64, 436 copies: Option<usize>, 437 stay_alive: CancellationToken, 438 ) -> std::thread::JoinHandle<Result<()>> { 439 let db = self.db.clone(); 440 thread::spawn(move || { 441 let limit = 442 Ratelimiter::builder(1, Duration::from_secs(interval_hrs * 60 * 60)).build()?; 443 let minimum_sleep = Duration::from_secs(1); 444 445 'quit: loop { 446 if let Err(sleep) = limit.try_wait() { 447 eprintln!("backups: background: next backup scheduled in {sleep:?}"); 448 let waiting = Instant::now(); 449 loop { 450 let remaining = sleep - waiting.elapsed(); 451 if stay_alive.is_cancelled() { 452 break 'quit; 453 } else if remaining <= Duration::ZERO { 454 break; 455 } else if remaining < minimum_sleep { 456 thread::sleep(remaining); 457 break; 458 } else { 459 thread::sleep(minimum_sleep); 460 } 461 } 462 } 463 eprintln!("backups: background: starting backup..."); 464 if let Err(e) = Self::do_backup(db.clone(), &path) { 465 eprintln!("backups: background: backup failed: {e:?}"); 466 // todo: metrics 467 } else { 468 eprintln!("backups: background: backup succeeded yay"); 469 } 470 if let Some(copies) = copies { 471 eprintln!("backups: background: trimming to {copies} saved backups..."); 472 if let Err(e) = Self::trim_backups(copies, &path) { 473 eprintln!("backups: background: failed to trim backups: {e:?}"); 474 } else { 475 eprintln!("backups: background: trimming worked!") 476 } 477 } 478 } 479 480 Ok(()) 481 }) 482 } 483 484 fn do_backup(db: Arc<DBWithThreadMode<MultiThreaded>>, path: impl AsRef<Path>) -> Result<()> { 485 let mut engine = 486 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?; 487 eprintln!("backups: starting backup..."); 488 let t0 = Instant::now(); 489 if let Err(e) = engine.create_new_backup(&db) { 490 eprintln!("backups: oh no, backup failed: {e:?}"); 491 } else { 492 eprintln!("backups: yay, backup worked?"); 493 } 494 eprintln!( 495 "backups: backup finished after {:.2}s", 496 t0.elapsed().as_secs_f32() 497 ); 498 Ok(()) 499 } 500 501 fn trim_backups(num_backups_to_keep: usize, path: impl AsRef<Path>) -> Result<()> { 502 let mut engine = 503 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?; 504 engine.purge_old_backups(num_backups_to_keep)?; 505 Ok(()) 506 } 507 508 fn describe_metrics() { 509 describe_histogram!( 510 "storage_rocksdb_read_seconds", 511 Unit::Seconds, 512 "duration of the read stage of actions" 513 ); 514 describe_histogram!( 515 "storage_rocksdb_action_seconds", 516 Unit::Seconds, 517 "duration of read + write of actions" 518 ); 519 describe_counter!( 520 "storage_rocksdb_batch_ops_total", 521 Unit::Count, 522 "total batched operations from actions" 523 ); 524 describe_histogram!( 525 "storage_rocksdb_delete_account_ops", 526 Unit::Count, 527 "total batched ops for account deletions" 528 ); 529 } 530 531 fn merge_op_extend_did_ids( 532 key: &[u8], 533 existing: Option<&[u8]>, 534 operands: &MergeOperands, 535 ) -> Option<Vec<u8>> { 536 let mut linkers: Vec<_> = if let Some(existing_bytes) = existing { 537 match _vr(existing_bytes) { 538 Ok(TargetLinkers(mut existing_linkers)) => { 539 existing_linkers.reserve(operands.len()); 540 existing_linkers 541 } 542 Err(e) => { 543 eprintln!("bug? could not deserialize existing target linkers: {e:?}. key={key:?}. continuing, but data will be lost!"); 544 if existing_bytes.len() < 1000 { 545 eprintln!("dropping: {existing_bytes:?}"); 546 } else { 547 eprintln!("(too long to print)"); 548 } 549 Vec::with_capacity(operands.len()) 550 } 551 } 552 } else { 553 Vec::with_capacity(operands.len()) 554 }; 555 for new_linkers in operands { 556 match _vr(new_linkers) { 557 Ok(TargetLinkers(new_linkers)) => linkers.extend(new_linkers), 558 Err(e) => { 559 eprintln!("bug? could not deserialize new target linkers: {e:?}. key={key:?}. continuing, but data will be lost!"); 560 if new_linkers.len() < 1000 { 561 eprintln!("skipping: {new_linkers:?}"); 562 } else { 563 eprintln!("(too long to print)"); 564 } 565 } 566 } 567 } 568 Some(_rv(&TargetLinkers(linkers))) 569 } 570 571 fn prefix_iter_cf<K, V, CF, P>( 572 &self, 573 cf: &CF, 574 pre: P, 575 ) -> impl Iterator<Item = (K, V)> + use<'_, K, V, CF, P> 576 where 577 K: KeyFromRocks, 578 V: ValueFromRocks, 579 CF: AsColumnFamilyRef, 580 for<'a> &'a P: AsRocksKeyPrefix<K>, 581 { 582 let mut read_opts = ReadOptions::default(); 583 read_opts.set_iterate_range(PrefixRange(_rkp(&pre))); // TODO verify: inclusive bounds? 584 self.db 585 .iterator_cf_opt(cf, read_opts, IteratorMode::Start) 586 .map_while(Result::ok) 587 .map_while(|(k, v)| Some((_kr(&k).ok()?, _vr(&v).ok()?))) 588 } 589 590 fn update_did_id_value<F>(&self, batch: &mut WriteBatch, did: &Did, update: F) -> Result<bool> 591 where 592 F: FnOnce(DidIdValue) -> Option<DidIdValue>, 593 { 594 let cf = self.db.cf_handle(DID_IDS_CF).unwrap(); 595 let Some(did_id_value) = self.did_id_table.get_id_val(&self.db, did)? else { 596 return Ok(false); 597 }; 598 let Some(new_did_id_value) = update(did_id_value) else { 599 return Ok(false); 600 }; 601 batch.put_cf(&cf, _rk(did), _rv(&new_did_id_value)); 602 Ok(true) 603 } 604 fn delete_did_id_value(&self, batch: &mut WriteBatch, did: &Did) { 605 let cf = self.db.cf_handle(DID_IDS_CF).unwrap(); 606 batch.delete_cf(&cf, _rk(did)); 607 } 608 609 fn get_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> { 610 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap(); 611 let Some(linkers_bytes) = self.db.get_cf(&cf, _rk(target_id))? else { 612 return Ok(TargetLinkers::default()); 613 }; 614 _vr(&linkers_bytes) 615 } 616 /// zero out every duplicate did. bit of a hack, looks the same as deleted, but eh 617 fn get_distinct_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> { 618 let mut seen = HashSet::new(); 619 let mut linkers = self.get_target_linkers(target_id)?; 620 for (did_id, _) in linkers.0.iter_mut() { 621 if seen.contains(did_id) { 622 did_id.0 = 0; 623 } else { 624 seen.insert(*did_id); 625 } 626 } 627 Ok(linkers) 628 } 629 fn merge_target_linker( 630 &self, 631 batch: &mut WriteBatch, 632 target_id: &TargetId, 633 linker_did_id: &DidId, 634 linker_rkey: &RKey, 635 ) { 636 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap(); 637 batch.merge_cf( 638 &cf, 639 _rk(target_id), 640 _rv(&TargetLinkers(vec![(*linker_did_id, linker_rkey.clone())])), 641 ); 642 } 643 fn update_target_linkers<F>( 644 &self, 645 batch: &mut WriteBatch, 646 target_id: &TargetId, 647 update: F, 648 ) -> Result<bool> 649 where 650 F: FnOnce(TargetLinkers) -> Option<TargetLinkers>, 651 { 652 let cf = self.db.cf_handle(TARGET_LINKERS_CF).unwrap(); 653 let existing_linkers = self.get_target_linkers(target_id)?; 654 let Some(new_linkers) = update(existing_linkers) else { 655 return Ok(false); 656 }; 657 batch.put_cf(&cf, _rk(target_id), _rv(&new_linkers)); 658 Ok(true) 659 } 660 661 fn put_link_targets( 662 &self, 663 batch: &mut WriteBatch, 664 record_link_key: &RecordLinkKey, 665 targets: &RecordLinkTargets, 666 ) { 667 // todo: we are almost idempotent to link creates with this blind write, but we'll still 668 // merge in the reverse index. we could read+modify+write here but it'll be SLOWWWWW on 669 // the path that we need to be fast. we could go back to a merge op and probably be 670 // consistent. or we can accept just a littttttle inconsistency and be idempotent on 671 // forward links but not reverse, slightly messing up deletes :/ 672 // _maybe_ we could run in slow idempotent r-m-w mode during firehose catch-up at the start, 673 // then switch to the fast version? 674 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 675 batch.put_cf(&cf, _rk(record_link_key), _rv(targets)); 676 } 677 fn get_record_link_targets( 678 &self, 679 record_link_key: &RecordLinkKey, 680 ) -> Result<Option<RecordLinkTargets>> { 681 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 682 if let Some(bytes) = self.db.get_cf(&cf, _rk(record_link_key))? { 683 Ok(Some(_vr(&bytes)?)) 684 } else { 685 Ok(None) 686 } 687 } 688 fn delete_record_link(&self, batch: &mut WriteBatch, record_link_key: &RecordLinkKey) { 689 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 690 batch.delete_cf(&cf, _rk(record_link_key)); 691 } 692 fn iter_links_for_did_id( 693 &self, 694 did_id: &DidId, 695 ) -> impl Iterator<Item = (RecordLinkKey, RecordLinkTargets)> + use<'_> { 696 let cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 697 self.prefix_iter_cf(&cf, RecordLinkKeyDidIdPrefix(*did_id)) 698 } 699 fn iter_targets_for_target( 700 &self, 701 target: &Target, 702 ) -> impl Iterator<Item = (TargetKey, TargetId)> + use<'_> { 703 let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap(); 704 self.prefix_iter_cf(&cf, TargetIdTargetPrefix(target.clone())) 705 } 706 707 // 708 // higher-level event action handlers 709 // 710 711 fn add_links( 712 &mut self, 713 record_id: &RecordId, 714 links: &[CollectedLink], 715 batch: &mut WriteBatch, 716 ) -> Result<()> { 717 let DidIdValue(did_id, _) = 718 self.did_id_table 719 .get_or_create_id_val(&self.db, batch, &record_id.did)?; 720 721 let record_link_key = RecordLinkKey( 722 did_id, 723 Collection(record_id.collection()), 724 RKey(record_id.rkey()), 725 ); 726 let mut record_link_targets = RecordLinkTargets::with_capacity(links.len()); 727 728 for CollectedLink { target, path } in links { 729 let target_key = TargetKey( 730 Target(target.clone().into_string()), 731 Collection(record_id.collection()), 732 RPath(path.clone()), 733 ); 734 let target_id = 735 self.target_id_table 736 .get_or_create_id_val(&self.db, batch, &target_key)?; 737 self.merge_target_linker(batch, &target_id, &did_id, &RKey(record_id.rkey())); 738 739 record_link_targets.add(RecordLinkTarget(RPath(path.clone()), target_id)) 740 } 741 742 self.put_link_targets(batch, &record_link_key, &record_link_targets); 743 Ok(()) 744 } 745 746 fn remove_links(&mut self, record_id: &RecordId, batch: &mut WriteBatch) -> Result<()> { 747 let Some(DidIdValue(linking_did_id, _)) = 748 self.did_id_table.get_id_val(&self.db, &record_id.did)? 749 else { 750 return Ok(()); // we don't know her: nothing to do 751 }; 752 753 let record_link_key = RecordLinkKey( 754 linking_did_id, 755 Collection(record_id.collection()), 756 RKey(record_id.rkey()), 757 ); 758 let Some(record_link_targets) = self.get_record_link_targets(&record_link_key)? else { 759 return Ok(()); // we don't have these links 760 }; 761 762 // we do read -> modify -> write here: could merge-op in the deletes instead? 763 // otherwise it's another single-thread-constraining thing. 764 for RecordLinkTarget(_, target_id) in record_link_targets.0 { 765 self.update_target_linkers(batch, &target_id, |mut linkers| { 766 if linkers.0.is_empty() { 767 eprintln!("bug? linked target was missing when removing links"); 768 } 769 if !linkers.remove_linker(&linking_did_id, &RKey(record_id.rkey.clone())) { 770 eprintln!("bug? linked target was missing a link when removing links"); 771 } 772 Some(linkers) 773 })?; 774 } 775 776 self.delete_record_link(batch, &record_link_key); 777 Ok(()) 778 } 779 780 fn set_account(&mut self, did: &Did, active: bool, batch: &mut WriteBatch) -> Result<()> { 781 // this needs to be read-modify-write since the did_id needs to stay the same, 782 // which has a benefit of allowing to avoid adding entries for dids we don't 783 // need. reading on dids needs to be cheap anyway for the current design, and 784 // did active/inactive updates are low-freq in the firehose so, eh, it's fine. 785 self.update_did_id_value(batch, did, |current_value| { 786 Some(DidIdValue(current_value.did_id(), active)) 787 })?; 788 Ok(()) 789 } 790 791 fn delete_account(&mut self, did: &Did, batch: &mut WriteBatch) -> Result<usize> { 792 let mut total_batched_ops = 0; 793 let Some(DidIdValue(did_id, _)) = self.did_id_table.get_id_val(&self.db, did)? else { 794 return Ok(total_batched_ops); // ignore updates for dids we don't know about 795 }; 796 self.delete_did_id_value(batch, did); 797 // TODO: also delete the reverse!! 798 799 // use a separate batch for all their links, since it can be a lot and make us crash at around 1GiB batch size. 800 // this should still hopefully be crash-safe: as long as we don't actually delete the DidId entry until after all links are cleared. 801 // the above .delete_did_id_value is batched, so it shouldn't be written until we've returned from this fn successfully 802 // TODO: queue a background delete task or whatever 803 // TODO: test delete account with more links than chunk size 804 let stuff: Vec<_> = self.iter_links_for_did_id(&did_id).collect(); 805 for chunk in stuff.chunks(1024) { 806 let mut mini_batch = WriteBatch::default(); 807 808 for (record_link_key, links) in chunk { 809 self.delete_record_link(&mut mini_batch, record_link_key); // _could_ use delete range here instead of individual deletes, but since we have to scan anyway it's not obvious if it's better 810 811 for RecordLinkTarget(_, target_link_id) in links.0.iter() { 812 self.update_target_linkers(&mut mini_batch, target_link_id, |mut linkers| { 813 if !linkers.remove_linker(&did_id, &record_link_key.2) { 814 eprintln!("bug? could not find linker when removing links while deleting an account"); 815 } 816 Some(linkers) 817 })?; 818 } 819 } 820 total_batched_ops += mini_batch.len(); 821 self.db.write(mini_batch)?; // todo 822 } 823 Ok(total_batched_ops) 824 } 825} 826 827impl Drop for RocksStorage { 828 fn drop(&mut self) { 829 if self.is_writer { 830 println!("rocksdb writer: cleaning up for shutdown..."); 831 if let Err(e) = self.db.flush_wal(true) { 832 eprintln!("rocks: flushing wal failed: {e:?}"); 833 } 834 if let Err(e) = self.db.flush_opt(&{ 835 let mut opt = rocksdb::FlushOptions::default(); 836 opt.set_wait(true); 837 opt 838 }) { 839 eprintln!("rocks: flushing memtables failed: {e:?}"); 840 } 841 match Arc::get_mut(&mut self.backup_task) { 842 Some(maybe_task) => { 843 if let Some(task) = maybe_task.take() { 844 eprintln!("waiting for backup task to complete..."); 845 if let Err(e) = task.join() { 846 eprintln!("failed to join backup task: {e:?}"); 847 } 848 } 849 } 850 None => eprintln!("rocks: failed to get backup task, likely a bug."), 851 } 852 self.db.cancel_all_background_work(true); 853 } 854 } 855} 856 857impl AsRocksValue for u64 {} 858impl ValueFromRocks for u64 {} 859 860impl LinkStorage for RocksStorage { 861 fn get_cursor(&mut self) -> Result<Option<u64>> { 862 self.db 863 .get(JETSTREAM_CURSOR_KEY)? 864 .map(|b| _vr(&b)) 865 .transpose() 866 } 867 868 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()> { 869 // normal ops 870 let mut batch = WriteBatch::default(); 871 let t0 = Instant::now(); 872 if let Some(action) = match event { 873 ActionableEvent::CreateLinks { record_id, links } => { 874 self.add_links(record_id, links, &mut batch)?; 875 Some("create_links") 876 } 877 ActionableEvent::UpdateLinks { 878 record_id, 879 new_links, 880 } => { 881 self.remove_links(record_id, &mut batch)?; 882 self.add_links(record_id, new_links, &mut batch)?; 883 Some("update_links") 884 } 885 ActionableEvent::DeleteRecord(record_id) => { 886 self.remove_links(record_id, &mut batch)?; 887 Some("delete_record") 888 } 889 ActionableEvent::ActivateAccount(did) => { 890 self.set_account(did, true, &mut batch)?; 891 Some("set_account_status") 892 } 893 ActionableEvent::DeactivateAccount(did) => { 894 self.set_account(did, false, &mut batch)?; 895 Some("set_account_status") 896 } 897 ActionableEvent::DeleteAccount(_) => None, // delete account is handled specially 898 } { 899 let t_read = t0.elapsed(); 900 batch.put(JETSTREAM_CURSOR_KEY.as_bytes(), _rv(cursor)); 901 let batch_ops = batch.len(); 902 self.db.write(batch)?; 903 let t_total = t0.elapsed(); 904 905 histogram!("storage_rocksdb_read_seconds", "action" => action) 906 .record(t_read.as_secs_f64()); 907 histogram!("storage_rocksdb_action_seconds", "action" => action) 908 .record(t_total.as_secs_f64()); 909 counter!("storage_rocksdb_batch_ops_total", "action" => action) 910 .increment(batch_ops as u64); 911 } 912 913 // special metrics for account deletion which can be arbitrarily expensive 914 let mut outer_batch = WriteBatch::default(); 915 let t0 = Instant::now(); 916 if let ActionableEvent::DeleteAccount(did) = event { 917 let inner_batch_ops = self.delete_account(did, &mut outer_batch)?; 918 let total_batch_ops = inner_batch_ops + outer_batch.len(); 919 self.db.write(outer_batch)?; 920 let t_total = t0.elapsed(); 921 922 histogram!("storage_rocksdb_action_seconds", "action" => "delete_account") 923 .record(t_total.as_secs_f64()); 924 counter!("storage_rocksdb_batch_ops_total", "action" => "delete_account") 925 .increment(total_batch_ops as u64); 926 histogram!("storage_rocksdb_delete_account_ops").record(total_batch_ops as f64); 927 } 928 929 Ok(()) 930 } 931 932 fn to_readable(&mut self) -> impl LinkReader { 933 let mut readable = self.clone(); 934 readable.is_writer = false; 935 readable 936 } 937} 938 939impl LinkReader for RocksStorage { 940 fn get_many_to_many_counts( 941 &self, 942 target: &str, 943 collection: &str, 944 path: &str, 945 path_to_other: &str, 946 limit: u64, 947 after: Option<String>, 948 filter_link_dids: &HashSet<Did>, 949 filter_to_targets: &HashSet<String>, 950 ) -> Result<PagedOrderedCollection<(String, u64, u64), String>> { 951 let collection = Collection(collection.to_string()); 952 let path = RPath(path.to_string()); 953 954 let target_key = TargetKey(Target(target.to_string()), collection.clone(), path.clone()); 955 956 // unfortunately the cursor is a, uh, stringified number. 957 // this was easier for the memstore (plain target, not target id), and 958 // making it generic is a bit awful. 959 // so... parse the number out of a string here :( 960 // TODO: this should bubble up to a BAD_REQUEST response 961 let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?; 962 963 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 964 return Ok(PagedOrderedCollection::empty()); 965 }; 966 967 let filter_did_ids: HashMap<DidId, bool> = filter_link_dids 968 .iter() 969 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) 970 .collect::<Result<Vec<DidIdValue>>>()? 971 .into_iter() 972 .map(|DidIdValue(id, active)| (id, active)) 973 .collect(); 974 975 // stored targets are keyed by triples of (target, collection, path). 976 // target filtering only consideres the target itself, so we actually 977 // need to do a prefix iteration of all target ids for this target and 978 // keep them all. 979 // i *think* the number of keys at a target prefix should usually be 980 // pretty small, so this is hopefully fine. but if it turns out to be 981 // large, we can push this filtering back into the main links loop and 982 // do forward db queries per backlink to get the raw target back out. 983 let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new(); 984 for t in filter_to_targets { 985 for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) { 986 filter_to_target_ids.insert(target_id); 987 } 988 } 989 990 let linkers = self.get_target_linkers(&target_id)?; 991 992 let mut grouped_counts: BTreeMap<TargetId, (u64, HashSet<DidId>)> = BTreeMap::new(); 993 994 for (did_id, rkey) in linkers.0 { 995 if did_id.is_empty() { 996 continue; 997 } 998 999 if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) { 1000 continue; 1001 } 1002 1003 let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey); 1004 let Some(targets) = self.get_record_link_targets(&record_link_key)? else { 1005 continue; 1006 }; 1007 1008 let Some(fwd_target) = targets 1009 .0 1010 .into_iter() 1011 .filter_map(|RecordLinkTarget(rpath, target_id)| { 1012 if rpath.0 == path_to_other 1013 && (filter_to_target_ids.is_empty() 1014 || filter_to_target_ids.contains(&target_id)) 1015 { 1016 Some(target_id) 1017 } else { 1018 None 1019 } 1020 }) 1021 .take(1) 1022 .next() 1023 else { 1024 continue; 1025 }; 1026 1027 // small relief: we page over target ids, so we can already bail 1028 // reprocessing previous pages here 1029 if after.as_ref().map(|a| fwd_target <= *a).unwrap_or(false) { 1030 continue; 1031 } 1032 1033 // aand we can skip target ids that must be on future pages 1034 // (this check continues after the did-lookup, which we have to do) 1035 let page_is_full = grouped_counts.len() as u64 > limit; 1036 if page_is_full { 1037 let current_max = grouped_counts.keys().next_back().unwrap(); 1038 if fwd_target > *current_max { 1039 continue; 1040 } 1041 } 1042 1043 // bit painful: 2-step lookup to make sure this did is active 1044 let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else { 1045 eprintln!("failed to look up did from did_id {did_id:?}"); 1046 continue; 1047 }; 1048 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else { 1049 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1050 continue; 1051 }; 1052 if !active { 1053 continue; 1054 } 1055 1056 // page-management, continued 1057 // if we have a full page, and we're inserting a *new* key less than 1058 // the current max, then we can evict the current max 1059 let mut should_evict = false; 1060 let entry = grouped_counts.entry(fwd_target).or_insert_with(|| { 1061 // this is a *new* key, so kick the max if we're full 1062 should_evict = page_is_full; 1063 Default::default() 1064 }); 1065 entry.0 += 1; 1066 entry.1.insert(did_id); 1067 1068 if should_evict { 1069 grouped_counts.pop_last(); 1070 } 1071 } 1072 1073 // If we accumulated more than limit groups, there's another page. 1074 // Pop the extra before building items so it doesn't appear in results. 1075 let next = if grouped_counts.len() as u64 > limit { 1076 grouped_counts.pop_last(); 1077 grouped_counts 1078 .keys() 1079 .next_back() 1080 .map(|k| format!("{}", k.0)) 1081 } else { 1082 None 1083 }; 1084 1085 let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len()); 1086 for (target_id, (n, dids)) in &grouped_counts { 1087 let Some(target) = self 1088 .target_id_table 1089 .get_val_from_id(&self.db, target_id.0)? 1090 else { 1091 eprintln!("failed to look up target from target_id {target_id:?}"); 1092 continue; 1093 }; 1094 items.push((target.0 .0, *n, dids.len() as u64)); 1095 } 1096 1097 Ok(PagedOrderedCollection { items, next }) 1098 } 1099 1100 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> { 1101 let target_key = TargetKey( 1102 Target(target.to_string()), 1103 Collection(collection.to_string()), 1104 RPath(path.to_string()), 1105 ); 1106 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? { 1107 let (alive, _) = self.get_target_linkers(&target_id)?.count(); 1108 Ok(alive) 1109 } else { 1110 Ok(0) 1111 } 1112 } 1113 1114 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> { 1115 let target_key = TargetKey( 1116 Target(target.to_string()), 1117 Collection(collection.to_string()), 1118 RPath(path.to_string()), 1119 ); 1120 if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? { 1121 Ok(self.get_target_linkers(&target_id)?.count_distinct_dids()) 1122 } else { 1123 Ok(0) 1124 } 1125 } 1126 1127 fn get_many_to_many( 1128 &self, 1129 target: &str, 1130 collection: &str, 1131 path: &str, 1132 path_to_other: &str, 1133 limit: u64, 1134 after: Option<String>, 1135 filter_link_dids: &HashSet<Did>, 1136 filter_to_targets: &HashSet<String>, 1137 ) -> Result<PagedOrderedCollection<ManyToManyItem, String>> { 1138 // helper to resolve dids 1139 let resolve_active_did = |did_id: &DidId| -> Result<Option<Did>> { 1140 let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else { 1141 eprintln!("failed to look up did from did_id {did_id:?}"); 1142 return Ok(None); 1143 }; 1144 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else { 1145 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1146 return Ok(None); 1147 }; 1148 Ok(active.then_some(did)) 1149 }; 1150 1151 // setup variables that we need later 1152 let collection = Collection(collection.to_string()); 1153 let path = RPath(path.to_string()); 1154 1155 // extract parts form composite cursor 1156 let cursor = match after { 1157 Some(a) => { 1158 let (b, f) = a.split_once(',').ok_or(anyhow!("invalid cursor format"))?; 1159 let backlink_idx = b 1160 .parse::<u64>() 1161 .map_err(|e| anyhow!("invalid cursor.0: {e}"))?; 1162 let other_link_idx = f 1163 .parse::<u64>() 1164 .map_err(|e| anyhow!("invalid cursor.1: {e}"))?; 1165 Some(ManyToManyCursor { 1166 backlink_idx, 1167 other_link_idx, 1168 }) 1169 } 1170 None => None, 1171 }; 1172 1173 // (__active__) did ids and filter targets 1174 let filter_did_ids: HashMap<DidId, bool> = filter_link_dids 1175 .iter() 1176 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) 1177 .collect::<Result<Vec<DidIdValue>>>()? 1178 .into_iter() 1179 .map(|DidIdValue(id, active)| (id, active)) 1180 .collect(); 1181 let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new(); 1182 for t in filter_to_targets { 1183 for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) { 1184 filter_to_target_ids.insert(target_id); 1185 } 1186 } 1187 1188 let target_key = TargetKey(Target(target.to_string()), collection.clone(), path); 1189 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1190 eprintln!("Target not found for {target_key:?}"); 1191 return Ok(PagedOrderedCollection::empty()); 1192 }; 1193 let linkers = self.get_target_linkers(&target_id)?; 1194 1195 let mut items: Vec<(usize, usize, ManyToManyItem)> = Vec::new(); 1196 1197 // iterate backlinks (who linked to the target?) 1198 for (backlink_idx, (did_id, rkey)) in 1199 linkers 1200 .0 1201 .iter() 1202 .enumerate() 1203 .skip_while(|(backlink_idx, _)| { 1204 cursor.is_some_and(|c| *backlink_idx < c.backlink_idx as usize) 1205 }) 1206 { 1207 if did_id.is_empty() 1208 || (!filter_did_ids.is_empty() && !filter_did_ids.contains_key(did_id)) 1209 { 1210 continue; 1211 } 1212 1213 let Some(links) = self.get_record_link_targets(&RecordLinkKey( 1214 *did_id, 1215 collection.clone(), 1216 rkey.clone(), 1217 ))? 1218 else { 1219 continue; 1220 }; 1221 1222 // iterate fwd links (which of these links point to the "other" target?) 1223 for (other_link_idx, RecordLinkTarget(_, fwd_target_id)) in links 1224 .0 1225 .into_iter() 1226 .enumerate() 1227 .filter(|(_, RecordLinkTarget(rpath, target_id))| { 1228 rpath.0 == path_to_other 1229 && (filter_to_target_ids.is_empty() 1230 || filter_to_target_ids.contains(target_id)) 1231 }) 1232 .skip_while(|(other_link_idx, _)| { 1233 cursor.is_some_and(|c| { 1234 backlink_idx == c.backlink_idx as usize 1235 && *other_link_idx <= c.other_link_idx as usize 1236 }) 1237 }) 1238 .take(limit as usize + 1 - items.len()) 1239 { 1240 // extract forward target did (target that links to the __other__ target) 1241 let Some(did) = resolve_active_did(did_id)? else { 1242 continue; 1243 }; 1244 // resolve to target string 1245 let Some(fwd_target_key) = self 1246 .target_id_table 1247 .get_val_from_id(&self.db, fwd_target_id.0)? 1248 else { 1249 continue; 1250 }; 1251 1252 // link to be added 1253 let record_id = RecordId { 1254 did, 1255 collection: collection.0.clone(), 1256 rkey: rkey.0.clone(), 1257 }; 1258 let item = ManyToManyItem { 1259 link_record: record_id, 1260 other_subject: fwd_target_key.0 .0, 1261 }; 1262 items.push((backlink_idx, other_link_idx, item)); 1263 } 1264 1265 // page full - eject 1266 if items.len() > limit as usize { 1267 break; 1268 } 1269 } 1270 1271 // We collect up to limit + 1 fully-resolved items. If we got more than 1272 // limit, there are more results beyond this page. We truncate to limit 1273 // items (the actual page) and build a composite cursor from the last 1274 // item on the page — a base64-encoded pair of (backlink_vec_idx, 1275 // forward_link_idx). On the next request, skip_while advances past 1276 // this position: backlinks before backlink_vec_idx are skipped entirely, 1277 // and at backlink_vec_idx itself, forward links at or before 1278 // forward_link_idx are skipped. This correctly resumes mid-record when 1279 // a single backlinker has multiple forward links at path_to_other. 1280 let next = (items.len() > limit as usize).then(|| { 1281 let (b, o, _) = items[limit as usize - 1]; 1282 format!("{b},{o}") 1283 }); 1284 1285 let items = items 1286 .into_iter() 1287 .take(limit as usize) 1288 .map(|(_, _, item)| item) 1289 .collect(); 1290 1291 Ok(PagedOrderedCollection { items, next }) 1292 } 1293 1294 fn get_links( 1295 &self, 1296 target: &str, 1297 collection: &str, 1298 path: &str, 1299 order: Order, 1300 limit: u64, 1301 until: Option<u64>, 1302 filter_dids: &HashSet<Did>, 1303 ) -> Result<PagedAppendingCollection<RecordId>> { 1304 let target_key = TargetKey( 1305 Target(target.to_string()), 1306 Collection(collection.to_string()), 1307 RPath(path.to_string()), 1308 ); 1309 1310 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1311 return Ok(PagedAppendingCollection::empty()); 1312 }; 1313 1314 let mut linkers = self.get_target_linkers(&target_id)?; 1315 if !filter_dids.is_empty() { 1316 let mut did_filter = HashSet::new(); 1317 for did in filter_dids { 1318 let Some(DidIdValue(did_id, active)) = 1319 self.did_id_table.get_id_val(&self.db, did)? 1320 else { 1321 eprintln!("failed to find a did_id for {did:?}"); 1322 continue; 1323 }; 1324 if !active { 1325 eprintln!("excluding inactive did from filtered results"); 1326 continue; 1327 } 1328 did_filter.insert(did_id); 1329 } 1330 linkers.0.retain(|linker| did_filter.contains(&linker.0)); 1331 } 1332 1333 let (alive, gone) = linkers.count(); 1334 let total = alive + gone; 1335 1336 let (start, take, next_until) = match order { 1337 // OldestToNewest: start from the beginning, paginate forward 1338 Order::OldestToNewest => { 1339 let start = until.unwrap_or(0); 1340 let next = start + limit + 1; 1341 let next_until = if next < total { Some(next) } else { None }; 1342 (start, limit, next_until) 1343 } 1344 // NewestToOldest: start from the end, paginate backward 1345 Order::NewestToOldest => { 1346 let until = until.unwrap_or(total); 1347 match until.checked_sub(limit) { 1348 Some(s) if s > 0 => (s, limit, Some(s)), 1349 Some(s) => (s, limit, None), 1350 None => (0, until, None), 1351 } 1352 } 1353 }; 1354 1355 let did_id_rkeys = linkers.0.iter().skip(start as usize).take(take as usize); 1356 let did_id_rkeys: Vec<_> = match order { 1357 Order::OldestToNewest => did_id_rkeys.collect(), 1358 Order::NewestToOldest => did_id_rkeys.rev().collect(), 1359 }; 1360 1361 let mut items = Vec::with_capacity(did_id_rkeys.len()); 1362 // TODO: use get-many (or multi-get or whatever it's called) 1363 for (did_id, rkey) in did_id_rkeys { 1364 if did_id.is_empty() { 1365 continue; 1366 } 1367 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? { 1368 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? 1369 else { 1370 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1371 continue; 1372 }; 1373 if !active { 1374 continue; 1375 } 1376 items.push(RecordId { 1377 did, 1378 collection: collection.to_string(), 1379 rkey: rkey.0.clone(), 1380 }); 1381 } else { 1382 eprintln!("failed to look up did from did_id {did_id:?}"); 1383 } 1384 } 1385 1386 Ok(PagedAppendingCollection { 1387 version: (total, gone), 1388 items, 1389 next: next_until, 1390 total: alive, 1391 }) 1392 } 1393 1394 fn get_distinct_dids( 1395 &self, 1396 target: &str, 1397 collection: &str, 1398 path: &str, 1399 limit: u64, 1400 until: Option<u64>, 1401 ) -> Result<PagedAppendingCollection<Did>> { 1402 let target_key = TargetKey( 1403 Target(target.to_string()), 1404 Collection(collection.to_string()), 1405 RPath(path.to_string()), 1406 ); 1407 1408 let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1409 return Ok(PagedAppendingCollection::empty()); 1410 }; 1411 1412 let linkers = self.get_distinct_target_linkers(&target_id)?; 1413 1414 let (alive, gone) = linkers.count(); 1415 let total = alive + gone; 1416 1417 let until = until.unwrap_or(total); 1418 let (start, take, next_until) = match until.checked_sub(limit) { 1419 Some(s) if s > 0 => (s, limit, Some(s)), 1420 Some(s) => (s, limit, None), 1421 None => (0, until, None), 1422 }; 1423 1424 let did_id_rkeys = linkers.0.iter().skip(start as usize).take(take as usize); 1425 let did_id_rkeys: Vec<_> = did_id_rkeys.rev().collect(); 1426 1427 let mut items = Vec::with_capacity(did_id_rkeys.len()); 1428 // TODO: use get-many (or multi-get or whatever it's called) 1429 for (did_id, _) in did_id_rkeys { 1430 if did_id.is_empty() { 1431 continue; 1432 } 1433 if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? { 1434 let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? 1435 else { 1436 eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1437 continue; 1438 }; 1439 if !active { 1440 continue; 1441 } 1442 items.push(did); 1443 } else { 1444 eprintln!("failed to look up did from did_id {did_id:?}"); 1445 } 1446 } 1447 1448 Ok(PagedAppendingCollection { 1449 version: (total, gone), 1450 items, 1451 next: next_until, 1452 total: alive, 1453 }) 1454 } 1455 1456 fn get_all_record_counts(&self, target: &str) -> Result<HashMap<String, HashMap<String, u64>>> { 1457 let mut out: HashMap<String, HashMap<String, u64>> = HashMap::new(); 1458 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) { 1459 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key; 1460 let (count, _) = self.get_target_linkers(&target_id)?.count(); 1461 out.entry(collection.into()) 1462 .or_default() 1463 .insert(path.clone(), count); 1464 } 1465 Ok(out) 1466 } 1467 1468 fn get_all_counts( 1469 &self, 1470 target: &str, 1471 ) -> Result<HashMap<String, HashMap<String, CountsByCount>>> { 1472 let mut out: HashMap<String, HashMap<String, CountsByCount>> = HashMap::new(); 1473 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) { 1474 let TargetKey(_, Collection(ref collection), RPath(ref path)) = target_key; 1475 let target_linkers = self.get_target_linkers(&target_id)?; 1476 let (records, _) = target_linkers.count(); 1477 let distinct_dids = target_linkers.count_distinct_dids(); 1478 out.entry(collection.into()).or_default().insert( 1479 path.clone(), 1480 CountsByCount { 1481 records, 1482 distinct_dids, 1483 }, 1484 ); 1485 } 1486 Ok(out) 1487 } 1488 1489 fn get_stats(&self) -> Result<StorageStats> { 1490 let dids = self.did_id_table.estimate_count(); 1491 let targetables = self.target_id_table.estimate_count(); 1492 let lr_cf = self.db.cf_handle(LINK_TARGETS_CF).unwrap(); 1493 let linking_records = self 1494 .db 1495 .property_value_cf(&lr_cf, rocksdb::properties::ESTIMATE_NUM_KEYS)? 1496 .map(|s| s.parse::<u64>()) 1497 .transpose()? 1498 .unwrap_or(0); 1499 let started_at = self 1500 .db 1501 .get(STARTED_AT_KEY)? 1502 .map(|c| _vr(&c)) 1503 .transpose()? 1504 .unwrap_or(COZY_FIRST_CURSOR); 1505 1506 let other_data = self 1507 .db 1508 .get(TARGET_ID_REPAIR_STATE_KEY)? 1509 .map(|s| _vr(&s)) 1510 .transpose()? 1511 .map( 1512 |TargetIdRepairState { 1513 current_us_started_at, 1514 id_when_started, 1515 latest_repaired_i, 1516 }| { 1517 HashMap::from([ 1518 ("current_us_started_at".to_string(), current_us_started_at), 1519 ("id_when_started".to_string(), id_when_started), 1520 ("latest_repaired_i".to_string(), latest_repaired_i), 1521 ]) 1522 }, 1523 ) 1524 .unwrap_or(HashMap::default()); 1525 1526 Ok(StorageStats { 1527 dids, 1528 targetables, 1529 linking_records, 1530 started_at: Some(started_at), 1531 other_data, 1532 }) 1533 } 1534} 1535 1536trait AsRocksKey: Serialize {} 1537trait AsRocksKeyPrefix<K: KeyFromRocks>: Serialize {} 1538trait AsRocksValue: Serialize {} 1539trait KeyFromRocks: for<'de> Deserialize<'de> {} 1540trait ValueFromRocks: for<'de> Deserialize<'de> {} 1541 1542// did_id table 1543impl AsRocksKey for &Did {} 1544impl AsRocksValue for &DidIdValue {} 1545impl ValueFromRocks for DidIdValue {} 1546 1547// temp 1548impl KeyFromRocks for Did {} 1549impl AsRocksKey for &DidId {} 1550 1551// target_ids table 1552impl AsRocksKey for &TargetKey {} 1553impl AsRocksKeyPrefix<TargetKey> for &TargetIdTargetPrefix {} 1554impl AsRocksValue for &TargetId {} 1555impl KeyFromRocks for TargetKey {} 1556impl ValueFromRocks for TargetId {} 1557 1558// temp? 1559impl KeyFromRocks for TargetId {} 1560impl AsRocksValue for &TargetKey {} 1561 1562// target_links table 1563impl AsRocksKey for &TargetId {} 1564impl AsRocksValue for &TargetLinkers {} 1565impl ValueFromRocks for TargetLinkers {} 1566 1567// record_link_targets table 1568impl AsRocksKey for &RecordLinkKey {} 1569impl AsRocksKeyPrefix<RecordLinkKey> for &RecordLinkKeyDidIdPrefix {} 1570impl AsRocksValue for &RecordLinkTargets {} 1571impl KeyFromRocks for RecordLinkKey {} 1572impl ValueFromRocks for RecordLinkTargets {} 1573 1574pub fn _bincode_opts() -> impl BincodeOptions { 1575 bincode::DefaultOptions::new().with_big_endian() // happier db -- numeric prefixes in lsm 1576} 1577fn _rk(k: impl AsRocksKey) -> Vec<u8> { 1578 _bincode_opts().serialize(&k).unwrap() 1579} 1580fn _rkp<K: KeyFromRocks>(kp: impl AsRocksKeyPrefix<K>) -> Vec<u8> { 1581 _bincode_opts().serialize(&kp).unwrap() 1582} 1583fn _rv(v: impl AsRocksValue) -> Vec<u8> { 1584 _bincode_opts().serialize(&v).unwrap() 1585} 1586fn _kr<T: KeyFromRocks>(bytes: &[u8]) -> Result<T> { 1587 Ok(_bincode_opts().deserialize(bytes)?) 1588} 1589fn _vr<T: ValueFromRocks>(bytes: &[u8]) -> Result<T> { 1590 Ok(_bincode_opts().deserialize(bytes)?) 1591} 1592 1593#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] 1594pub struct Collection(pub String); 1595 1596#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] 1597pub struct RPath(pub String); 1598 1599#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] 1600pub struct RKey(pub String); 1601 1602impl RKey { 1603 fn empty() -> Self { 1604 RKey("".to_string()) 1605 } 1606} 1607 1608// did ids 1609#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] 1610pub struct DidId(pub u64); 1611 1612impl DidId { 1613 fn empty() -> Self { 1614 DidId(0) 1615 } 1616 fn is_empty(&self) -> bool { 1617 self.0 == 0 1618 } 1619} 1620 1621#[derive(Debug, Clone, Serialize, Deserialize)] 1622struct DidIdValue(DidId, bool); // active or not 1623 1624impl DidIdValue { 1625 fn did_id(&self) -> DidId { 1626 let Self(id, _) = self; 1627 *id 1628 } 1629} 1630 1631// target ids 1632#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)] 1633struct TargetId(u64); // key 1634 1635#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] 1636pub struct Target(pub String); // the actual target/uri 1637 1638// targets (uris, dids, etc.): the reverse index 1639#[derive(Debug, Clone, Serialize, Deserialize)] 1640pub struct TargetKey(pub Target, pub Collection, pub RPath); 1641 1642#[derive(Debug, Default, Serialize, Deserialize)] 1643pub struct TargetLinkers(pub Vec<(DidId, RKey)>); 1644 1645impl TargetLinkers { 1646 fn remove_linker(&mut self, did: &DidId, rkey: &RKey) -> bool { 1647 if let Some(entry) = self.0.iter_mut().rfind(|d| **d == (*did, rkey.clone())) { 1648 *entry = (DidId::empty(), RKey::empty()); 1649 true 1650 } else { 1651 false 1652 } 1653 } 1654 pub fn count(&self) -> (u64, u64) { 1655 // (linkers, deleted links) 1656 let total = self.0.len() as u64; 1657 let alive = self.0.iter().filter(|(DidId(id), _)| *id != 0).count() as u64; 1658 let gone = total - alive; 1659 (alive, gone) 1660 } 1661 fn count_distinct_dids(&self) -> u64 { 1662 self.0 1663 .iter() 1664 .filter_map(|(DidId(id), _)| if *id == 0 { None } else { Some(id) }) 1665 .collect::<HashSet<_>>() 1666 .len() as u64 1667 } 1668} 1669 1670// forward links to targets so we can delete links 1671#[derive(Debug, Serialize, Deserialize)] 1672struct RecordLinkKey(DidId, Collection, RKey); 1673 1674// does this even work???? 1675#[derive(Debug, Serialize, Deserialize)] 1676struct RecordLinkKeyDidIdPrefix(DidId); 1677 1678#[derive(Debug, Serialize, Deserialize)] 1679struct TargetIdTargetPrefix(Target); 1680 1681#[derive(Debug, Serialize, Deserialize)] 1682struct RecordLinkTarget(RPath, TargetId); 1683 1684#[derive(Debug, Default, Serialize, Deserialize)] 1685struct RecordLinkTargets(Vec<RecordLinkTarget>); 1686 1687impl RecordLinkTargets { 1688 fn with_capacity(cap: usize) -> Self { 1689 Self(Vec::with_capacity(cap)) 1690 } 1691 fn add(&mut self, target: RecordLinkTarget) { 1692 self.0.push(target) 1693 } 1694} 1695 1696#[cfg(test)] 1697mod tests { 1698 use super::super::ActionableEvent; 1699 use super::*; 1700 use links::Link; 1701 use tempfile::tempdir; 1702 1703 #[test] 1704 fn rocks_delete_iterator_regression() -> Result<()> { 1705 let mut store = RocksStorage::new(tempdir()?)?; 1706 1707 // create a link from the deleter account 1708 store.push( 1709 &ActionableEvent::CreateLinks { 1710 record_id: RecordId { 1711 did: "did:plc:will-shortly-delete".into(), 1712 collection: "a.b.c".into(), 1713 rkey: "asdf".into(), 1714 }, 1715 links: vec![CollectedLink { 1716 target: Link::Uri("example.com".into()), 1717 path: ".uri".into(), 1718 }], 1719 }, 1720 0, 1721 )?; 1722 1723 // and a different link from a separate, new account (later in didid prefix iteration) 1724 store.push( 1725 &ActionableEvent::CreateLinks { 1726 record_id: RecordId { 1727 did: "did:plc:someone-else".into(), 1728 collection: "a.b.c".into(), 1729 rkey: "asdf".into(), 1730 }, 1731 links: vec![CollectedLink { 1732 target: Link::Uri("another.example.com".into()), 1733 path: ".uri".into(), 1734 }], 1735 }, 1736 0, 1737 )?; 1738 1739 // now delete the first account (this is where the buggy version explodes) 1740 store.push( 1741 &ActionableEvent::DeleteAccount("did:plc:will-shortly-delete".into()), 1742 0, 1743 )?; 1744 1745 Ok(()) 1746 } 1747 1748 #[test] 1749 fn rocks_prefix_iteration_helper() -> Result<()> { 1750 #[derive(Serialize, Deserialize)] 1751 struct Key(u8, u8); 1752 1753 #[derive(Serialize)] 1754 struct KeyPrefix(u8); 1755 1756 #[derive(Serialize, Deserialize)] 1757 struct Value(()); 1758 1759 impl AsRocksKey for &Key {} 1760 impl AsRocksKeyPrefix<Key> for &KeyPrefix {} 1761 impl AsRocksValue for &Value {} 1762 1763 impl KeyFromRocks for Key {} 1764 impl ValueFromRocks for Value {} 1765 1766 let data = RocksStorage::new(tempdir()?)?; 1767 let cf = data.db.cf_handle(DID_IDS_CF).unwrap(); 1768 let mut batch = WriteBatch::default(); 1769 1770 // not our prefix 1771 batch.put_cf(&cf, _rk(&Key(0x01, 0x00)), _rv(&Value(()))); 1772 batch.put_cf(&cf, _rk(&Key(0x01, 0xFF)), _rv(&Value(()))); 1773 1774 // our prefix! 1775 for i in 0..=0xFF { 1776 batch.put_cf(&cf, _rk(&Key(0x02, i)), _rv(&Value(()))); 1777 } 1778 1779 // not our prefix 1780 batch.put_cf(&cf, _rk(&Key(0x03, 0x00)), _rv(&Value(()))); 1781 batch.put_cf(&cf, _rk(&Key(0x03, 0xFF)), _rv(&Value(()))); 1782 1783 data.db.write(batch)?; 1784 1785 let mut okays: [bool; 256] = [false; 256]; 1786 for (i, (k, Value(_))) in data.prefix_iter_cf(&cf, KeyPrefix(0x02)).enumerate() { 1787 assert!(i < 256); 1788 assert_eq!(k.0, 0x02, "prefix iterated key has the right prefix"); 1789 assert_eq!(k.1 as usize, i, "prefixed keys are iterated in exact order"); 1790 okays[k.1 as usize] = true; 1791 } 1792 assert!(okays.iter().all(|b| *b), "every key was iterated"); 1793 1794 Ok(()) 1795 } 1796 1797 // TODO: add tests for key prefixes actually prefixing (bincode encoding _should_...) 1798}