forked from
ptr.pet/hydrant
kind of like tap but different and in rust
1use crate::db::{self, keys};
2use crate::filter::FilterMode;
3use crate::ingest::{BufferRx, IngestMessage};
4use crate::ops;
5use crate::resolver::{NoSigningKeyError, ResolverError};
6use crate::state::AppState;
7use crate::types::{AccountEvt, BroadcastEvent, GaugeState, IdentityEvt, RepoState, RepoStatus};
8use jacquard::api::com_atproto::sync::subscribe_repos::SubscribeReposMessage;
9
10use fjall::OwnedWriteBatch;
11
12use jacquard::cowstr::ToCowStr;
13use jacquard::types::crypto::PublicKey;
14use jacquard::types::did::Did;
15use jacquard_api::com_atproto::sync::subscribe_repos::Commit;
16use jacquard_common::IntoStatic;
17use jacquard_repo::error::CommitError;
18use miette::{Context, Diagnostic, IntoDiagnostic, Result};
19use rand::Rng;
20use smol_str::ToSmolStr;
21use std::collections::hash_map::DefaultHasher;
22use std::hash::{Hash, Hasher};
23use std::sync::Arc;
24use thiserror::Error;
25use tokio::sync::mpsc;
26use tracing::{debug, error, info, trace, warn};
27
28#[derive(Debug, Diagnostic, Error)]
29enum IngestError {
30 #[error("{0}")]
31 Generic(miette::Report),
32
33 #[error(transparent)]
34 #[diagnostic(transparent)]
35 Resolver(#[from] ResolverError),
36
37 #[error(transparent)]
38 #[diagnostic(transparent)]
39 Commit(#[from] CommitError),
40
41 #[error(transparent)]
42 #[diagnostic(transparent)]
43 NoSigningKey(#[from] NoSigningKeyError),
44}
45
46impl From<miette::Report> for IngestError {
47 fn from(report: miette::Report) -> Self {
48 IngestError::Generic(report)
49 }
50}
51
52#[derive(Debug)]
53enum RepoProcessResult<'s, 'c> {
54 Deleted,
55 Syncing(Option<&'c Commit<'c>>),
56 Ok(RepoState<'s>),
57}
58
59pub struct FirehoseWorker {
60 state: Arc<AppState>,
61 rx: BufferRx,
62 verify_signatures: bool,
63 num_shards: usize,
64}
65
66struct WorkerContext<'a> {
67 verify_signatures: bool,
68 state: &'a AppState,
69 batch: &'a mut OwnedWriteBatch,
70 added_blocks: &'a mut i64,
71 records_delta: &'a mut i64,
72 broadcast_events: &'a mut Vec<BroadcastEvent>,
73 handle: &'a tokio::runtime::Handle,
74}
75
76impl FirehoseWorker {
77 pub fn new(
78 state: Arc<AppState>,
79 rx: BufferRx,
80 verify_signatures: bool,
81 num_shards: usize,
82 ) -> Self {
83 Self {
84 state,
85 rx,
86 verify_signatures,
87 num_shards,
88 }
89 }
90
91 // starts the worker threads and the main dispatch loop
92 // the dispatch loop reads from the firehose channel and distributes messages to shards
93 // based on the consistent hash of the DID
94 pub fn run(mut self, handle: tokio::runtime::Handle) -> Result<()> {
95 let mut shards = Vec::with_capacity(self.num_shards);
96
97 for i in 0..self.num_shards {
98 let (tx, rx) = mpsc::unbounded_channel();
99 shards.push(tx);
100
101 let state = self.state.clone();
102 let verify = self.verify_signatures;
103 let handle = handle.clone();
104
105 std::thread::Builder::new()
106 .name(format!("ingest-shard-{}", i))
107 .spawn(move || {
108 Self::worker_thread(i, rx, state, verify, handle);
109 })
110 .into_diagnostic()?;
111 }
112
113 info!("started {} ingest shards", self.num_shards);
114
115 let _g = handle.enter();
116
117 // dispatch loop
118 while let Some(msg) = self.rx.blocking_recv() {
119 let did = match &msg {
120 IngestMessage::Firehose(m) => match m {
121 SubscribeReposMessage::Commit(c) => &c.repo,
122 SubscribeReposMessage::Identity(i) => &i.did,
123 SubscribeReposMessage::Account(a) => &a.did,
124 SubscribeReposMessage::Sync(s) => &s.did,
125 _ => continue,
126 },
127 IngestMessage::BackfillFinished(did) => did,
128 };
129
130 let mut hasher = DefaultHasher::new();
131 did.hash(&mut hasher);
132 let hash = hasher.finish();
133 let shard_idx = (hash as usize) % self.num_shards;
134
135 if let Err(e) = shards[shard_idx].send(msg) {
136 error!("failed to send message to shard {shard_idx}: {e}");
137 // break if send fails; receiver likely closed
138 break;
139 }
140 }
141
142 error!("firehose worker dispatcher shutting down");
143
144 Ok(())
145 }
146
147 // synchronous worker loop running on a dedicated thread
148 // pulls messages from the channel, builds batches, and processes them
149 // enters the tokio runtime only when necessary (key resolution)
150 fn worker_thread(
151 id: usize,
152 mut rx: mpsc::UnboundedReceiver<IngestMessage>,
153 state: Arc<AppState>,
154 verify_signatures: bool,
155 handle: tokio::runtime::Handle,
156 ) {
157 let _guard = handle.enter();
158 debug!("shard {id} started");
159
160 let mut broadcast_events = Vec::new();
161
162 while let Some(msg) = rx.blocking_recv() {
163 let mut batch = state.db.inner.batch();
164 broadcast_events.clear();
165
166 let mut added_blocks = 0;
167 let mut records_delta = 0;
168
169 let mut ctx = WorkerContext {
170 state: &state,
171 batch: &mut batch,
172 added_blocks: &mut added_blocks,
173 records_delta: &mut records_delta,
174 broadcast_events: &mut broadcast_events,
175 handle: &handle,
176 verify_signatures,
177 };
178
179 match msg {
180 IngestMessage::BackfillFinished(did) => {
181 debug!("backfill finished for {did}, verifying state and draining buffer");
182
183 // load repo state to transition status and draining buffer
184 let repo_key = keys::repo_key(&did);
185 if let Ok(Some(state_bytes)) = state.db.repos.get(&repo_key).into_diagnostic() {
186 match crate::db::deser_repo_state(&state_bytes) {
187 Ok(repo_state) => {
188 let repo_state = repo_state.into_static();
189
190 match Self::drain_resync_buffer(&mut ctx, &did, repo_state) {
191 Ok(res) => match res {
192 RepoProcessResult::Ok(s) => {
193 // TODO: there might be a race condition here where we get a new commit
194 // while the resync buffer is being drained, we should handle that probably
195 // but also it should still be fine since we'll sync eventually anyway
196 let res = ops::update_repo_status(
197 &mut batch,
198 &state.db,
199 &did,
200 s,
201 RepoStatus::Synced,
202 );
203 if let Err(e) = res {
204 // this can only fail if serde retry fails which would be really weird
205 error!("failed to transition {did} to synced: {e}");
206 }
207 }
208 // we don't have to handle this since drain_resync_buffer doesn't delete
209 // the commits from the resync buffer so they will get retried later
210 RepoProcessResult::Syncing(_) => {}
211 RepoProcessResult::Deleted => {}
212 },
213 Err(e) => {
214 error!("failed to drain resync buffer for {did}: {e}")
215 }
216 };
217 }
218 Err(e) => error!("failed to deser repo state for {did}: {e}"),
219 }
220 }
221 }
222 IngestMessage::Firehose(msg) => {
223 let (did, seq) = match &msg {
224 SubscribeReposMessage::Commit(c) => (&c.repo, c.seq),
225 SubscribeReposMessage::Identity(i) => (&i.did, i.seq),
226 SubscribeReposMessage::Account(a) => (&a.did, a.seq),
227 SubscribeReposMessage::Sync(s) => (&s.did, s.seq),
228 _ => continue,
229 };
230
231 match Self::process_message(&mut ctx, &msg, did) {
232 Ok(RepoProcessResult::Ok(_)) => {}
233 Ok(RepoProcessResult::Deleted) => {}
234 Ok(RepoProcessResult::Syncing(Some(commit))) => {
235 if let Err(e) = ops::persist_to_resync_buffer(&state.db, did, commit) {
236 error!("failed to persist commit to resync_buffer for {did}: {e}");
237 }
238 }
239 Ok(RepoProcessResult::Syncing(None)) => {}
240 Err(e) => {
241 if let IngestError::Generic(e) = &e {
242 db::check_poisoned_report(e);
243 }
244 error!("error processing message for {did}: {e}");
245 if Self::check_if_retriable_failure(&e) {
246 if let SubscribeReposMessage::Commit(commit) = &msg {
247 if let Err(e) =
248 ops::persist_to_resync_buffer(&state.db, did, commit)
249 {
250 error!(
251 "failed to persist commit to resync_buffer for {did}: {e}"
252 );
253 }
254 }
255 }
256 }
257 }
258
259 state
260 .cur_firehose
261 .store(seq, std::sync::atomic::Ordering::SeqCst);
262 }
263 }
264
265 if let Err(e) = batch.commit() {
266 error!("failed to commit batch in shard {id}: {e}");
267 }
268
269 if added_blocks > 0 {
270 state.db.update_count("blocks", added_blocks);
271 }
272 if records_delta != 0 {
273 state.db.update_count("records", records_delta);
274 }
275 for evt in broadcast_events.drain(..) {
276 let _ = state.db.event_tx.send(evt);
277 }
278
279 state.db.inner.persist(fjall::PersistMode::Buffer).ok();
280 }
281 }
282
283 // dont retry commit or sync on key fetch errors
284 // since we'll just try again later if we get commit or sync again
285 fn check_if_retriable_failure(e: &IngestError) -> bool {
286 matches!(
287 e,
288 IngestError::Generic(_)
289 | IngestError::Resolver(ResolverError::Ratelimited)
290 | IngestError::Resolver(ResolverError::Transport(_))
291 )
292 }
293
294 fn process_message<'s, 'c>(
295 ctx: &mut WorkerContext,
296 msg: &'c SubscribeReposMessage<'static>,
297 did: &Did,
298 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> {
299 let check_repo_res = Self::check_repo_state(ctx, did, msg)?;
300 let mut repo_state = match check_repo_res {
301 RepoProcessResult::Syncing(_) | RepoProcessResult::Deleted => {
302 return Ok(check_repo_res);
303 }
304 RepoProcessResult::Ok(s) => s,
305 };
306
307 match msg {
308 SubscribeReposMessage::Commit(commit) => {
309 trace!("processing buffered commit for {did}");
310
311 return Self::process_commit(ctx, did, repo_state, commit);
312 }
313 SubscribeReposMessage::Sync(sync) => {
314 debug!("processing buffered sync for {did}");
315
316 match ops::verify_sync_event(
317 sync.blocks.as_ref(),
318 Self::fetch_key(ctx, did)?.as_ref(),
319 ) {
320 Ok((root, rev)) => {
321 if let Some(current_data) = &repo_state.data {
322 if current_data == &root.to_ipld().expect("valid cid") {
323 debug!("skipping noop sync for {did}");
324 return Ok(RepoProcessResult::Ok(repo_state));
325 }
326 }
327
328 if let Some(current_rev) = &repo_state.rev {
329 if rev.as_str() <= current_rev.to_tid().as_str() {
330 debug!("skipping replayed sync for {did}");
331 return Ok(RepoProcessResult::Ok(repo_state));
332 }
333 }
334
335 warn!("sync event for {did}: triggering backfill");
336 let mut batch = ctx.state.db.inner.batch();
337 repo_state = ops::update_repo_status(
338 &mut batch,
339 &ctx.state.db,
340 did,
341 repo_state,
342 RepoStatus::Backfilling,
343 )?;
344 batch.commit().into_diagnostic()?;
345 ctx.state
346 .db
347 .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending);
348 ctx.state.notify_backfill();
349 return Ok(RepoProcessResult::Ok(repo_state));
350 }
351 Err(e) => {
352 error!("failed to process sync event for {did}: {e}");
353 }
354 }
355 }
356 SubscribeReposMessage::Identity(identity) => {
357 debug!("processing buffered identity for {did}");
358 let handle = identity
359 .handle
360 .as_ref()
361 .map(|h| h.to_cowstr().into_static());
362
363 let evt = IdentityEvt {
364 did: did.clone().into_static(),
365 handle,
366 };
367 ctx.broadcast_events
368 .push(ops::make_identity_event(&ctx.state.db, evt));
369 }
370 SubscribeReposMessage::Account(account) => {
371 debug!("processing buffered account for {did}");
372 let evt = AccountEvt {
373 did: did.clone().into_static(),
374 active: account.active,
375 status: account.status.as_ref().map(|s| s.to_cowstr().into_static()),
376 };
377
378 if !account.active {
379 use jacquard::api::com_atproto::sync::subscribe_repos::AccountStatus;
380 match &account.status {
381 Some(AccountStatus::Deleted) => {
382 debug!("account {did} deleted, wiping data");
383 ops::delete_repo(ctx.batch, &ctx.state.db, did, repo_state)?;
384 return Ok(RepoProcessResult::Deleted);
385 }
386 status => {
387 let target_status = match status {
388 Some(status) => match status {
389 AccountStatus::Deleted => {
390 unreachable!("deleted account status is handled before")
391 }
392 AccountStatus::Takendown => RepoStatus::Takendown,
393 AccountStatus::Suspended => RepoStatus::Suspended,
394 AccountStatus::Deactivated => RepoStatus::Deactivated,
395 AccountStatus::Throttled => {
396 RepoStatus::Error("throttled".into())
397 }
398 AccountStatus::Desynchronized => {
399 RepoStatus::Error("desynchronized".into())
400 }
401 AccountStatus::Other(s) => {
402 warn!(
403 "unknown account status for {did}, will put in error state: {s}"
404 );
405 RepoStatus::Error(s.to_smolstr())
406 }
407 },
408 None => {
409 warn!("account {did} inactive but no status provided");
410 RepoStatus::Error("unknown".into())
411 }
412 };
413
414 if repo_state.status == target_status {
415 debug!("account status unchanged for {did}: {target_status:?}");
416 return Ok(RepoProcessResult::Ok(repo_state));
417 }
418
419 repo_state = ops::update_repo_status(
420 ctx.batch,
421 &ctx.state.db,
422 did,
423 repo_state,
424 target_status,
425 )?;
426 ctx.state
427 .db
428 .update_gauge_diff(&GaugeState::Synced, &GaugeState::Resync(None));
429 }
430 }
431 } else {
432 // normally we would initiate backfill here
433 // but we don't have to do anything because:
434 // 1. we handle changing repo status to Synced before this (in check repo state)
435 // 2. initiating backfilling is also handled there
436 }
437 ctx.broadcast_events
438 .push(ops::make_account_event(&ctx.state.db, evt));
439 }
440 _ => {
441 warn!("unknown message type in buffer for {did}");
442 }
443 }
444
445 Ok(RepoProcessResult::Ok(repo_state))
446 }
447
448 fn process_commit<'c, 'ns, 's: 'ns>(
449 ctx: &mut WorkerContext,
450 did: &Did,
451 repo_state: RepoState<'s>,
452 commit: &'c Commit<'c>,
453 ) -> Result<RepoProcessResult<'ns, 'c>, IngestError> {
454 // check for replayed events (already seen revision)
455 if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.to_tid().as_str()) {
456 debug!(
457 "skipping replayed event for {}: {} <= {}",
458 did,
459 commit.rev,
460 repo_state
461 .rev
462 .as_ref()
463 .map(|r| r.to_tid())
464 .expect("we checked in if")
465 );
466 return Ok(RepoProcessResult::Ok(repo_state));
467 }
468
469 if let (Some(repo), Some(prev_commit)) = (&repo_state.data, &commit.prev_data)
470 && repo
471 != &prev_commit
472 .0
473 .to_ipld()
474 .into_diagnostic()
475 .wrap_err("invalid cid from relay")?
476 {
477 warn!(
478 "gap detected for {}: repo {} != commit prev {}. triggering backfill",
479 did, repo, prev_commit.0
480 );
481
482 let mut batch = ctx.state.db.inner.batch();
483 let _repo_state = ops::update_repo_status(
484 &mut batch,
485 &ctx.state.db,
486 did,
487 repo_state,
488 RepoStatus::Backfilling,
489 )?;
490 batch.commit().into_diagnostic()?;
491 ctx.state.db.update_gauge_diff(
492 &crate::types::GaugeState::Synced,
493 &crate::types::GaugeState::Pending,
494 );
495 ctx.state.notify_backfill();
496 return Ok(RepoProcessResult::Syncing(Some(commit)));
497 }
498
499 let res = ops::apply_commit(
500 ctx.batch,
501 &ctx.state.db,
502 repo_state,
503 &commit,
504 Self::fetch_key(ctx, did)?.as_ref(),
505 &ctx.state.filter.load(),
506 )?;
507 let repo_state = res.repo_state;
508 *ctx.added_blocks += res.blocks_count;
509 *ctx.records_delta += res.records_delta;
510 ctx.broadcast_events.push(BroadcastEvent::Persisted(
511 ctx.state
512 .db
513 .next_event_id
514 .load(std::sync::atomic::Ordering::SeqCst)
515 - 1,
516 ));
517
518 Ok(RepoProcessResult::Ok(repo_state))
519 }
520
521 // checks the current state of the repo in the database
522 // if the repo is new, creates initial state and triggers backfill
523 // handles transitions between states (backfilling -> synced, etc)
524 fn check_repo_state<'s, 'c>(
525 ctx: &mut WorkerContext,
526 did: &Did<'_>,
527 msg: &'c SubscribeReposMessage<'static>,
528 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> {
529 let repo_key = keys::repo_key(&did);
530 let Some(state_bytes) = ctx.state.db.repos.get(&repo_key).into_diagnostic()? else {
531 let filter = ctx.state.filter.load();
532
533 if filter.mode == FilterMode::Signal {
534 let commit = match msg {
535 SubscribeReposMessage::Commit(c) => c,
536 _ => return Ok(RepoProcessResult::Syncing(None)),
537 };
538 let touches_signal = commit.ops.iter().any(|op| {
539 op.path
540 .split_once('/')
541 .map(|(col, _)| {
542 let m = filter.matches_signal(col);
543 debug!(
544 "signal check for {did}: op path={} col={col} signals={:?} -> {m}",
545 op.path, filter.signals
546 );
547 m
548 })
549 .unwrap_or(false)
550 });
551 if !touches_signal {
552 debug!("dropping {did}: commit has no signal-matching ops");
553 return Ok(RepoProcessResult::Syncing(None));
554 }
555 debug!("{did}: commit touches a signal, queuing backfill");
556 }
557
558 debug!("discovered new account {did} from firehose, queueing backfill");
559
560 let repo_state = RepoState::backfilling(rand::rng().next_u64());
561 let mut batch = ctx.state.db.inner.batch();
562 batch.insert(
563 &ctx.state.db.repos,
564 &repo_key,
565 crate::db::ser_repo_state(&repo_state)?,
566 );
567 batch.insert(
568 &ctx.state.db.pending,
569 keys::pending_key(repo_state.index_id),
570 &repo_key,
571 );
572 batch.commit().into_diagnostic()?;
573
574 ctx.state.db.update_count("repos", 1);
575 ctx.state.db.update_gauge_diff(
576 &crate::types::GaugeState::Synced,
577 &crate::types::GaugeState::Pending,
578 );
579
580 ctx.state.notify_backfill();
581
582 return Ok(RepoProcessResult::Syncing(None));
583 };
584 let mut repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static();
585
586 // if we are backfilling or it is new, DON'T mark it as synced yet
587 // the backfill worker will do that when it finishes
588 match &repo_state.status {
589 RepoStatus::Synced => {
590 // lazy drain: if there are buffered commits, drain them now
591 if ops::has_buffered_commits(&ctx.state.db, did) {
592 Self::drain_resync_buffer(ctx, did, repo_state)
593 } else {
594 Ok(RepoProcessResult::Ok(repo_state))
595 }
596 }
597 RepoStatus::Backfilling | RepoStatus::Error(_) => {
598 debug!(
599 "ignoring active status for {did} as it is {:?}",
600 repo_state.status
601 );
602 Ok(RepoProcessResult::Syncing(None))
603 }
604 RepoStatus::Deactivated | RepoStatus::Suspended | RepoStatus::Takendown => {
605 // if it was in deactivated/takendown/suspended state, we can mark it as synced
606 // because we are receiving live events now
607 // UNLESS it is an account status event that keeps it deactivated
608 if let SubscribeReposMessage::Account(acc) = msg {
609 if !acc.active {
610 return Ok(RepoProcessResult::Ok(repo_state));
611 }
612 }
613 repo_state = ops::update_repo_status(
614 ctx.batch,
615 &ctx.state.db,
616 did,
617 repo_state,
618 RepoStatus::Synced,
619 )?;
620 ctx.state.db.update_gauge_diff(
621 &crate::types::GaugeState::Resync(None),
622 &crate::types::GaugeState::Synced,
623 );
624 Ok(RepoProcessResult::Ok(repo_state))
625 }
626 }
627 }
628
629 fn drain_resync_buffer<'s>(
630 ctx: &mut WorkerContext,
631 did: &Did,
632 mut repo_state: RepoState<'s>,
633 ) -> Result<RepoProcessResult<'s, 'static>, IngestError> {
634 let prefix = keys::resync_buffer_prefix(did);
635
636 for guard in ctx.state.db.resync_buffer.prefix(&prefix) {
637 let (key, value) = guard.into_inner().into_diagnostic()?;
638 let commit: Commit = rmp_serde::from_slice(&value).into_diagnostic()?;
639
640 let res = Self::process_commit(ctx, did, repo_state, &commit);
641 let res = match res {
642 Ok(r) => r,
643 Err(e) => {
644 if !Self::check_if_retriable_failure(&e) {
645 ctx.batch.remove(&ctx.state.db.resync_buffer, key);
646 }
647 return Err(e);
648 }
649 };
650 match res {
651 RepoProcessResult::Ok(rs) => {
652 ctx.batch.remove(&ctx.state.db.resync_buffer, key);
653 repo_state = rs;
654 }
655 RepoProcessResult::Syncing(_) => {
656 return Ok(RepoProcessResult::Syncing(None));
657 }
658 RepoProcessResult::Deleted => {
659 ctx.batch.remove(&ctx.state.db.resync_buffer, key);
660 return Ok(RepoProcessResult::Deleted);
661 }
662 }
663 }
664
665 Ok(RepoProcessResult::Ok(repo_state))
666 }
667
668 fn fetch_key(
669 ctx: &WorkerContext,
670 did: &Did,
671 ) -> Result<Option<PublicKey<'static>>, IngestError> {
672 if ctx.verify_signatures {
673 let key = ctx
674 .handle
675 .block_on(ctx.state.resolver.resolve_signing_key(did))?;
676 Ok(Some(key))
677 } else {
678 Ok(None)
679 }
680 }
681}