kind of like tap but different and in rust
at main 681 lines 28 kB view raw
1use crate::db::{self, keys}; 2use crate::filter::FilterMode; 3use crate::ingest::{BufferRx, IngestMessage}; 4use crate::ops; 5use crate::resolver::{NoSigningKeyError, ResolverError}; 6use crate::state::AppState; 7use crate::types::{AccountEvt, BroadcastEvent, GaugeState, IdentityEvt, RepoState, RepoStatus}; 8use jacquard::api::com_atproto::sync::subscribe_repos::SubscribeReposMessage; 9 10use fjall::OwnedWriteBatch; 11 12use jacquard::cowstr::ToCowStr; 13use jacquard::types::crypto::PublicKey; 14use jacquard::types::did::Did; 15use jacquard_api::com_atproto::sync::subscribe_repos::Commit; 16use jacquard_common::IntoStatic; 17use jacquard_repo::error::CommitError; 18use miette::{Context, Diagnostic, IntoDiagnostic, Result}; 19use rand::Rng; 20use smol_str::ToSmolStr; 21use std::collections::hash_map::DefaultHasher; 22use std::hash::{Hash, Hasher}; 23use std::sync::Arc; 24use thiserror::Error; 25use tokio::sync::mpsc; 26use tracing::{debug, error, info, trace, warn}; 27 28#[derive(Debug, Diagnostic, Error)] 29enum IngestError { 30 #[error("{0}")] 31 Generic(miette::Report), 32 33 #[error(transparent)] 34 #[diagnostic(transparent)] 35 Resolver(#[from] ResolverError), 36 37 #[error(transparent)] 38 #[diagnostic(transparent)] 39 Commit(#[from] CommitError), 40 41 #[error(transparent)] 42 #[diagnostic(transparent)] 43 NoSigningKey(#[from] NoSigningKeyError), 44} 45 46impl From<miette::Report> for IngestError { 47 fn from(report: miette::Report) -> Self { 48 IngestError::Generic(report) 49 } 50} 51 52#[derive(Debug)] 53enum RepoProcessResult<'s, 'c> { 54 Deleted, 55 Syncing(Option<&'c Commit<'c>>), 56 Ok(RepoState<'s>), 57} 58 59pub struct FirehoseWorker { 60 state: Arc<AppState>, 61 rx: BufferRx, 62 verify_signatures: bool, 63 num_shards: usize, 64} 65 66struct WorkerContext<'a> { 67 verify_signatures: bool, 68 state: &'a AppState, 69 batch: &'a mut OwnedWriteBatch, 70 added_blocks: &'a mut i64, 71 records_delta: &'a mut i64, 72 broadcast_events: &'a mut Vec<BroadcastEvent>, 73 handle: &'a tokio::runtime::Handle, 74} 75 76impl FirehoseWorker { 77 pub fn new( 78 state: Arc<AppState>, 79 rx: BufferRx, 80 verify_signatures: bool, 81 num_shards: usize, 82 ) -> Self { 83 Self { 84 state, 85 rx, 86 verify_signatures, 87 num_shards, 88 } 89 } 90 91 // starts the worker threads and the main dispatch loop 92 // the dispatch loop reads from the firehose channel and distributes messages to shards 93 // based on the consistent hash of the DID 94 pub fn run(mut self, handle: tokio::runtime::Handle) -> Result<()> { 95 let mut shards = Vec::with_capacity(self.num_shards); 96 97 for i in 0..self.num_shards { 98 let (tx, rx) = mpsc::unbounded_channel(); 99 shards.push(tx); 100 101 let state = self.state.clone(); 102 let verify = self.verify_signatures; 103 let handle = handle.clone(); 104 105 std::thread::Builder::new() 106 .name(format!("ingest-shard-{}", i)) 107 .spawn(move || { 108 Self::worker_thread(i, rx, state, verify, handle); 109 }) 110 .into_diagnostic()?; 111 } 112 113 info!("started {} ingest shards", self.num_shards); 114 115 let _g = handle.enter(); 116 117 // dispatch loop 118 while let Some(msg) = self.rx.blocking_recv() { 119 let did = match &msg { 120 IngestMessage::Firehose(m) => match m { 121 SubscribeReposMessage::Commit(c) => &c.repo, 122 SubscribeReposMessage::Identity(i) => &i.did, 123 SubscribeReposMessage::Account(a) => &a.did, 124 SubscribeReposMessage::Sync(s) => &s.did, 125 _ => continue, 126 }, 127 IngestMessage::BackfillFinished(did) => did, 128 }; 129 130 let mut hasher = DefaultHasher::new(); 131 did.hash(&mut hasher); 132 let hash = hasher.finish(); 133 let shard_idx = (hash as usize) % self.num_shards; 134 135 if let Err(e) = shards[shard_idx].send(msg) { 136 error!("failed to send message to shard {shard_idx}: {e}"); 137 // break if send fails; receiver likely closed 138 break; 139 } 140 } 141 142 error!("firehose worker dispatcher shutting down"); 143 144 Ok(()) 145 } 146 147 // synchronous worker loop running on a dedicated thread 148 // pulls messages from the channel, builds batches, and processes them 149 // enters the tokio runtime only when necessary (key resolution) 150 fn worker_thread( 151 id: usize, 152 mut rx: mpsc::UnboundedReceiver<IngestMessage>, 153 state: Arc<AppState>, 154 verify_signatures: bool, 155 handle: tokio::runtime::Handle, 156 ) { 157 let _guard = handle.enter(); 158 debug!("shard {id} started"); 159 160 let mut broadcast_events = Vec::new(); 161 162 while let Some(msg) = rx.blocking_recv() { 163 let mut batch = state.db.inner.batch(); 164 broadcast_events.clear(); 165 166 let mut added_blocks = 0; 167 let mut records_delta = 0; 168 169 let mut ctx = WorkerContext { 170 state: &state, 171 batch: &mut batch, 172 added_blocks: &mut added_blocks, 173 records_delta: &mut records_delta, 174 broadcast_events: &mut broadcast_events, 175 handle: &handle, 176 verify_signatures, 177 }; 178 179 match msg { 180 IngestMessage::BackfillFinished(did) => { 181 debug!("backfill finished for {did}, verifying state and draining buffer"); 182 183 // load repo state to transition status and draining buffer 184 let repo_key = keys::repo_key(&did); 185 if let Ok(Some(state_bytes)) = state.db.repos.get(&repo_key).into_diagnostic() { 186 match crate::db::deser_repo_state(&state_bytes) { 187 Ok(repo_state) => { 188 let repo_state = repo_state.into_static(); 189 190 match Self::drain_resync_buffer(&mut ctx, &did, repo_state) { 191 Ok(res) => match res { 192 RepoProcessResult::Ok(s) => { 193 // TODO: there might be a race condition here where we get a new commit 194 // while the resync buffer is being drained, we should handle that probably 195 // but also it should still be fine since we'll sync eventually anyway 196 let res = ops::update_repo_status( 197 &mut batch, 198 &state.db, 199 &did, 200 s, 201 RepoStatus::Synced, 202 ); 203 if let Err(e) = res { 204 // this can only fail if serde retry fails which would be really weird 205 error!("failed to transition {did} to synced: {e}"); 206 } 207 } 208 // we don't have to handle this since drain_resync_buffer doesn't delete 209 // the commits from the resync buffer so they will get retried later 210 RepoProcessResult::Syncing(_) => {} 211 RepoProcessResult::Deleted => {} 212 }, 213 Err(e) => { 214 error!("failed to drain resync buffer for {did}: {e}") 215 } 216 }; 217 } 218 Err(e) => error!("failed to deser repo state for {did}: {e}"), 219 } 220 } 221 } 222 IngestMessage::Firehose(msg) => { 223 let (did, seq) = match &msg { 224 SubscribeReposMessage::Commit(c) => (&c.repo, c.seq), 225 SubscribeReposMessage::Identity(i) => (&i.did, i.seq), 226 SubscribeReposMessage::Account(a) => (&a.did, a.seq), 227 SubscribeReposMessage::Sync(s) => (&s.did, s.seq), 228 _ => continue, 229 }; 230 231 match Self::process_message(&mut ctx, &msg, did) { 232 Ok(RepoProcessResult::Ok(_)) => {} 233 Ok(RepoProcessResult::Deleted) => {} 234 Ok(RepoProcessResult::Syncing(Some(commit))) => { 235 if let Err(e) = ops::persist_to_resync_buffer(&state.db, did, commit) { 236 error!("failed to persist commit to resync_buffer for {did}: {e}"); 237 } 238 } 239 Ok(RepoProcessResult::Syncing(None)) => {} 240 Err(e) => { 241 if let IngestError::Generic(e) = &e { 242 db::check_poisoned_report(e); 243 } 244 error!("error processing message for {did}: {e}"); 245 if Self::check_if_retriable_failure(&e) { 246 if let SubscribeReposMessage::Commit(commit) = &msg { 247 if let Err(e) = 248 ops::persist_to_resync_buffer(&state.db, did, commit) 249 { 250 error!( 251 "failed to persist commit to resync_buffer for {did}: {e}" 252 ); 253 } 254 } 255 } 256 } 257 } 258 259 state 260 .cur_firehose 261 .store(seq, std::sync::atomic::Ordering::SeqCst); 262 } 263 } 264 265 if let Err(e) = batch.commit() { 266 error!("failed to commit batch in shard {id}: {e}"); 267 } 268 269 if added_blocks > 0 { 270 state.db.update_count("blocks", added_blocks); 271 } 272 if records_delta != 0 { 273 state.db.update_count("records", records_delta); 274 } 275 for evt in broadcast_events.drain(..) { 276 let _ = state.db.event_tx.send(evt); 277 } 278 279 state.db.inner.persist(fjall::PersistMode::Buffer).ok(); 280 } 281 } 282 283 // dont retry commit or sync on key fetch errors 284 // since we'll just try again later if we get commit or sync again 285 fn check_if_retriable_failure(e: &IngestError) -> bool { 286 matches!( 287 e, 288 IngestError::Generic(_) 289 | IngestError::Resolver(ResolverError::Ratelimited) 290 | IngestError::Resolver(ResolverError::Transport(_)) 291 ) 292 } 293 294 fn process_message<'s, 'c>( 295 ctx: &mut WorkerContext, 296 msg: &'c SubscribeReposMessage<'static>, 297 did: &Did, 298 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 299 let check_repo_res = Self::check_repo_state(ctx, did, msg)?; 300 let mut repo_state = match check_repo_res { 301 RepoProcessResult::Syncing(_) | RepoProcessResult::Deleted => { 302 return Ok(check_repo_res); 303 } 304 RepoProcessResult::Ok(s) => s, 305 }; 306 307 match msg { 308 SubscribeReposMessage::Commit(commit) => { 309 trace!("processing buffered commit for {did}"); 310 311 return Self::process_commit(ctx, did, repo_state, commit); 312 } 313 SubscribeReposMessage::Sync(sync) => { 314 debug!("processing buffered sync for {did}"); 315 316 match ops::verify_sync_event( 317 sync.blocks.as_ref(), 318 Self::fetch_key(ctx, did)?.as_ref(), 319 ) { 320 Ok((root, rev)) => { 321 if let Some(current_data) = &repo_state.data { 322 if current_data == &root.to_ipld().expect("valid cid") { 323 debug!("skipping noop sync for {did}"); 324 return Ok(RepoProcessResult::Ok(repo_state)); 325 } 326 } 327 328 if let Some(current_rev) = &repo_state.rev { 329 if rev.as_str() <= current_rev.to_tid().as_str() { 330 debug!("skipping replayed sync for {did}"); 331 return Ok(RepoProcessResult::Ok(repo_state)); 332 } 333 } 334 335 warn!("sync event for {did}: triggering backfill"); 336 let mut batch = ctx.state.db.inner.batch(); 337 repo_state = ops::update_repo_status( 338 &mut batch, 339 &ctx.state.db, 340 did, 341 repo_state, 342 RepoStatus::Backfilling, 343 )?; 344 batch.commit().into_diagnostic()?; 345 ctx.state 346 .db 347 .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 348 ctx.state.notify_backfill(); 349 return Ok(RepoProcessResult::Ok(repo_state)); 350 } 351 Err(e) => { 352 error!("failed to process sync event for {did}: {e}"); 353 } 354 } 355 } 356 SubscribeReposMessage::Identity(identity) => { 357 debug!("processing buffered identity for {did}"); 358 let handle = identity 359 .handle 360 .as_ref() 361 .map(|h| h.to_cowstr().into_static()); 362 363 let evt = IdentityEvt { 364 did: did.clone().into_static(), 365 handle, 366 }; 367 ctx.broadcast_events 368 .push(ops::make_identity_event(&ctx.state.db, evt)); 369 } 370 SubscribeReposMessage::Account(account) => { 371 debug!("processing buffered account for {did}"); 372 let evt = AccountEvt { 373 did: did.clone().into_static(), 374 active: account.active, 375 status: account.status.as_ref().map(|s| s.to_cowstr().into_static()), 376 }; 377 378 if !account.active { 379 use jacquard::api::com_atproto::sync::subscribe_repos::AccountStatus; 380 match &account.status { 381 Some(AccountStatus::Deleted) => { 382 debug!("account {did} deleted, wiping data"); 383 ops::delete_repo(ctx.batch, &ctx.state.db, did, repo_state)?; 384 return Ok(RepoProcessResult::Deleted); 385 } 386 status => { 387 let target_status = match status { 388 Some(status) => match status { 389 AccountStatus::Deleted => { 390 unreachable!("deleted account status is handled before") 391 } 392 AccountStatus::Takendown => RepoStatus::Takendown, 393 AccountStatus::Suspended => RepoStatus::Suspended, 394 AccountStatus::Deactivated => RepoStatus::Deactivated, 395 AccountStatus::Throttled => { 396 RepoStatus::Error("throttled".into()) 397 } 398 AccountStatus::Desynchronized => { 399 RepoStatus::Error("desynchronized".into()) 400 } 401 AccountStatus::Other(s) => { 402 warn!( 403 "unknown account status for {did}, will put in error state: {s}" 404 ); 405 RepoStatus::Error(s.to_smolstr()) 406 } 407 }, 408 None => { 409 warn!("account {did} inactive but no status provided"); 410 RepoStatus::Error("unknown".into()) 411 } 412 }; 413 414 if repo_state.status == target_status { 415 debug!("account status unchanged for {did}: {target_status:?}"); 416 return Ok(RepoProcessResult::Ok(repo_state)); 417 } 418 419 repo_state = ops::update_repo_status( 420 ctx.batch, 421 &ctx.state.db, 422 did, 423 repo_state, 424 target_status, 425 )?; 426 ctx.state 427 .db 428 .update_gauge_diff(&GaugeState::Synced, &GaugeState::Resync(None)); 429 } 430 } 431 } else { 432 // normally we would initiate backfill here 433 // but we don't have to do anything because: 434 // 1. we handle changing repo status to Synced before this (in check repo state) 435 // 2. initiating backfilling is also handled there 436 } 437 ctx.broadcast_events 438 .push(ops::make_account_event(&ctx.state.db, evt)); 439 } 440 _ => { 441 warn!("unknown message type in buffer for {did}"); 442 } 443 } 444 445 Ok(RepoProcessResult::Ok(repo_state)) 446 } 447 448 fn process_commit<'c, 'ns, 's: 'ns>( 449 ctx: &mut WorkerContext, 450 did: &Did, 451 repo_state: RepoState<'s>, 452 commit: &'c Commit<'c>, 453 ) -> Result<RepoProcessResult<'ns, 'c>, IngestError> { 454 // check for replayed events (already seen revision) 455 if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.to_tid().as_str()) { 456 debug!( 457 "skipping replayed event for {}: {} <= {}", 458 did, 459 commit.rev, 460 repo_state 461 .rev 462 .as_ref() 463 .map(|r| r.to_tid()) 464 .expect("we checked in if") 465 ); 466 return Ok(RepoProcessResult::Ok(repo_state)); 467 } 468 469 if let (Some(repo), Some(prev_commit)) = (&repo_state.data, &commit.prev_data) 470 && repo 471 != &prev_commit 472 .0 473 .to_ipld() 474 .into_diagnostic() 475 .wrap_err("invalid cid from relay")? 476 { 477 warn!( 478 "gap detected for {}: repo {} != commit prev {}. triggering backfill", 479 did, repo, prev_commit.0 480 ); 481 482 let mut batch = ctx.state.db.inner.batch(); 483 let _repo_state = ops::update_repo_status( 484 &mut batch, 485 &ctx.state.db, 486 did, 487 repo_state, 488 RepoStatus::Backfilling, 489 )?; 490 batch.commit().into_diagnostic()?; 491 ctx.state.db.update_gauge_diff( 492 &crate::types::GaugeState::Synced, 493 &crate::types::GaugeState::Pending, 494 ); 495 ctx.state.notify_backfill(); 496 return Ok(RepoProcessResult::Syncing(Some(commit))); 497 } 498 499 let res = ops::apply_commit( 500 ctx.batch, 501 &ctx.state.db, 502 repo_state, 503 &commit, 504 Self::fetch_key(ctx, did)?.as_ref(), 505 &ctx.state.filter.load(), 506 )?; 507 let repo_state = res.repo_state; 508 *ctx.added_blocks += res.blocks_count; 509 *ctx.records_delta += res.records_delta; 510 ctx.broadcast_events.push(BroadcastEvent::Persisted( 511 ctx.state 512 .db 513 .next_event_id 514 .load(std::sync::atomic::Ordering::SeqCst) 515 - 1, 516 )); 517 518 Ok(RepoProcessResult::Ok(repo_state)) 519 } 520 521 // checks the current state of the repo in the database 522 // if the repo is new, creates initial state and triggers backfill 523 // handles transitions between states (backfilling -> synced, etc) 524 fn check_repo_state<'s, 'c>( 525 ctx: &mut WorkerContext, 526 did: &Did<'_>, 527 msg: &'c SubscribeReposMessage<'static>, 528 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 529 let repo_key = keys::repo_key(&did); 530 let Some(state_bytes) = ctx.state.db.repos.get(&repo_key).into_diagnostic()? else { 531 let filter = ctx.state.filter.load(); 532 533 if filter.mode == FilterMode::Signal { 534 let commit = match msg { 535 SubscribeReposMessage::Commit(c) => c, 536 _ => return Ok(RepoProcessResult::Syncing(None)), 537 }; 538 let touches_signal = commit.ops.iter().any(|op| { 539 op.path 540 .split_once('/') 541 .map(|(col, _)| { 542 let m = filter.matches_signal(col); 543 debug!( 544 "signal check for {did}: op path={} col={col} signals={:?} -> {m}", 545 op.path, filter.signals 546 ); 547 m 548 }) 549 .unwrap_or(false) 550 }); 551 if !touches_signal { 552 debug!("dropping {did}: commit has no signal-matching ops"); 553 return Ok(RepoProcessResult::Syncing(None)); 554 } 555 debug!("{did}: commit touches a signal, queuing backfill"); 556 } 557 558 debug!("discovered new account {did} from firehose, queueing backfill"); 559 560 let repo_state = RepoState::backfilling(rand::rng().next_u64()); 561 let mut batch = ctx.state.db.inner.batch(); 562 batch.insert( 563 &ctx.state.db.repos, 564 &repo_key, 565 crate::db::ser_repo_state(&repo_state)?, 566 ); 567 batch.insert( 568 &ctx.state.db.pending, 569 keys::pending_key(repo_state.index_id), 570 &repo_key, 571 ); 572 batch.commit().into_diagnostic()?; 573 574 ctx.state.db.update_count("repos", 1); 575 ctx.state.db.update_gauge_diff( 576 &crate::types::GaugeState::Synced, 577 &crate::types::GaugeState::Pending, 578 ); 579 580 ctx.state.notify_backfill(); 581 582 return Ok(RepoProcessResult::Syncing(None)); 583 }; 584 let mut repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static(); 585 586 // if we are backfilling or it is new, DON'T mark it as synced yet 587 // the backfill worker will do that when it finishes 588 match &repo_state.status { 589 RepoStatus::Synced => { 590 // lazy drain: if there are buffered commits, drain them now 591 if ops::has_buffered_commits(&ctx.state.db, did) { 592 Self::drain_resync_buffer(ctx, did, repo_state) 593 } else { 594 Ok(RepoProcessResult::Ok(repo_state)) 595 } 596 } 597 RepoStatus::Backfilling | RepoStatus::Error(_) => { 598 debug!( 599 "ignoring active status for {did} as it is {:?}", 600 repo_state.status 601 ); 602 Ok(RepoProcessResult::Syncing(None)) 603 } 604 RepoStatus::Deactivated | RepoStatus::Suspended | RepoStatus::Takendown => { 605 // if it was in deactivated/takendown/suspended state, we can mark it as synced 606 // because we are receiving live events now 607 // UNLESS it is an account status event that keeps it deactivated 608 if let SubscribeReposMessage::Account(acc) = msg { 609 if !acc.active { 610 return Ok(RepoProcessResult::Ok(repo_state)); 611 } 612 } 613 repo_state = ops::update_repo_status( 614 ctx.batch, 615 &ctx.state.db, 616 did, 617 repo_state, 618 RepoStatus::Synced, 619 )?; 620 ctx.state.db.update_gauge_diff( 621 &crate::types::GaugeState::Resync(None), 622 &crate::types::GaugeState::Synced, 623 ); 624 Ok(RepoProcessResult::Ok(repo_state)) 625 } 626 } 627 } 628 629 fn drain_resync_buffer<'s>( 630 ctx: &mut WorkerContext, 631 did: &Did, 632 mut repo_state: RepoState<'s>, 633 ) -> Result<RepoProcessResult<'s, 'static>, IngestError> { 634 let prefix = keys::resync_buffer_prefix(did); 635 636 for guard in ctx.state.db.resync_buffer.prefix(&prefix) { 637 let (key, value) = guard.into_inner().into_diagnostic()?; 638 let commit: Commit = rmp_serde::from_slice(&value).into_diagnostic()?; 639 640 let res = Self::process_commit(ctx, did, repo_state, &commit); 641 let res = match res { 642 Ok(r) => r, 643 Err(e) => { 644 if !Self::check_if_retriable_failure(&e) { 645 ctx.batch.remove(&ctx.state.db.resync_buffer, key); 646 } 647 return Err(e); 648 } 649 }; 650 match res { 651 RepoProcessResult::Ok(rs) => { 652 ctx.batch.remove(&ctx.state.db.resync_buffer, key); 653 repo_state = rs; 654 } 655 RepoProcessResult::Syncing(_) => { 656 return Ok(RepoProcessResult::Syncing(None)); 657 } 658 RepoProcessResult::Deleted => { 659 ctx.batch.remove(&ctx.state.db.resync_buffer, key); 660 return Ok(RepoProcessResult::Deleted); 661 } 662 } 663 } 664 665 Ok(RepoProcessResult::Ok(repo_state)) 666 } 667 668 fn fetch_key( 669 ctx: &WorkerContext, 670 did: &Did, 671 ) -> Result<Option<PublicKey<'static>>, IngestError> { 672 if ctx.verify_signatures { 673 let key = ctx 674 .handle 675 .block_on(ctx.state.resolver.resolve_signing_key(did))?; 676 Ok(Some(key)) 677 } else { 678 Ok(None) 679 } 680 } 681}