at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
2use crate::db::{self, Db, keys, ser_repo_state};
3use crate::filter::FilterMode;
4use crate::ops;
5use crate::resolver::ResolverError;
6use crate::state::AppState;
7use crate::types::{
8 AccountEvt, BroadcastEvent, GaugeState, RepoState, RepoStatus, ResyncErrorKind, ResyncState,
9 StoredEvent,
10};
11
12use fjall::Slice;
13use jacquard::api::com_atproto::sync::get_repo::{GetRepo, GetRepoError};
14use jacquard::error::{ClientError, ClientErrorKind};
15use jacquard::types::cid::Cid;
16use jacquard::types::did::Did;
17use jacquard::{CowStr, IntoStatic, prelude::*};
18use jacquard_common::xrpc::XrpcError;
19use jacquard_repo::mst::Mst;
20use jacquard_repo::{BlockStore, MemoryBlockStore};
21use miette::{Diagnostic, IntoDiagnostic, Result};
22use reqwest::StatusCode;
23use smol_str::{SmolStr, ToSmolStr};
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::sync::atomic::Ordering;
27use std::time::{Duration, Instant};
28use thiserror::Error;
29use tokio::sync::Semaphore;
30use tracing::{debug, error, info, trace, warn};
31
32pub mod manager;
33
34use crate::ingest::{BufferTx, IngestMessage};
35
36pub struct BackfillWorker {
37 state: Arc<AppState>,
38 buffer_tx: BufferTx,
39 http: reqwest::Client,
40 semaphore: Arc<Semaphore>,
41 verify_signatures: bool,
42 in_flight: Arc<scc::HashSet<Did<'static>>>,
43}
44
45impl BackfillWorker {
46 pub fn new(
47 state: Arc<AppState>,
48 buffer_tx: BufferTx,
49 timeout: Duration,
50 concurrency_limit: usize,
51 verify_signatures: bool,
52 ) -> Self {
53 Self {
54 state,
55 buffer_tx,
56 http: reqwest::Client::builder()
57 .timeout(timeout)
58 .zstd(true)
59 .brotli(true)
60 .gzip(true)
61 .build()
62 .expect("failed to build http client"),
63 semaphore: Arc::new(Semaphore::new(concurrency_limit)),
64 verify_signatures,
65 in_flight: Arc::new(scc::HashSet::new()),
66 }
67 }
68}
69
70struct InFlightGuard {
71 did: Did<'static>,
72 set: Arc<scc::HashSet<Did<'static>>>,
73}
74
75impl Drop for InFlightGuard {
76 fn drop(&mut self) {
77 let _ = self.set.remove_sync(&self.did);
78 }
79}
80
81impl BackfillWorker {
82 pub async fn run(self) {
83 info!("backfill worker started");
84
85 loop {
86 let mut spawned = 0;
87
88 for guard in self.state.db.pending.iter() {
89 let (key, value) = match guard.into_inner() {
90 Ok(kv) => kv,
91 Err(e) => {
92 error!("failed to read pending entry: {e}");
93 db::check_poisoned(&e);
94 continue;
95 }
96 };
97
98 let did = match TrimmedDid::try_from(value.as_ref()) {
99 Ok(d) => d.to_did(),
100 Err(e) => {
101 error!("invalid did in pending value: {e}");
102 continue;
103 }
104 };
105
106 if self.in_flight.contains_sync(&did) {
107 continue;
108 }
109 let _ = self.in_flight.insert_sync(did.clone().into_static());
110
111 let permit = match self.semaphore.clone().try_acquire_owned() {
112 Ok(p) => p,
113 Err(_) => break,
114 };
115
116 let guard = InFlightGuard {
117 did: did.clone().into_static(),
118 set: self.in_flight.clone(),
119 };
120
121 let state = self.state.clone();
122 let http = self.http.clone();
123 let did = did.clone();
124 let buffer_tx = self.buffer_tx.clone();
125 let verify = self.verify_signatures;
126
127 tokio::spawn(async move {
128 let _guard = guard;
129 let res = did_task(&state, http, buffer_tx, &did, key, permit, verify).await;
130
131 if let Err(e) = res {
132 error!("backfill process failed for {did}: {e}");
133 if let BackfillError::Generic(report) = &e {
134 db::check_poisoned_report(report);
135 }
136 }
137
138 // wake worker to pick up more (in case we were sleeping at limit)
139 state.backfill_notify.notify_one();
140 });
141
142 spawned += 1;
143 }
144
145 if spawned == 0 {
146 // wait for new tasks
147 self.state.backfill_notify.notified().await;
148 } else {
149 // if we spawned tasks, yield briefly to let them start and avoid tight loop
150 tokio::time::sleep(Duration::from_millis(10)).await;
151 }
152 }
153 }
154}
155
156async fn did_task(
157 state: &Arc<AppState>,
158 http: reqwest::Client,
159 buffer_tx: BufferTx,
160 did: &Did<'static>,
161 pending_key: Slice,
162 _permit: tokio::sync::OwnedSemaphorePermit,
163 verify_signatures: bool,
164) -> Result<(), BackfillError> {
165 let db = &state.db;
166
167 match process_did(&state, &http, &did, verify_signatures).await {
168 Ok(Some(repo_state)) => {
169 let did_key = keys::repo_key(&did);
170
171 // determine old gauge state
172 // if it was error/suspended etc, we need to know which error kind it was to decrement correctly.
173 // we have to peek at the resync state.
174 let old_gauge = state.db.repo_gauge_state_async(&repo_state, &did_key).await;
175
176 let mut batch = db.inner.batch();
177 // remove from pending
178 if old_gauge == GaugeState::Pending {
179 batch.remove(&db.pending, pending_key);
180 }
181 // remove from resync
182 if old_gauge.is_resync() {
183 batch.remove(&db.resync, &did_key);
184 }
185 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic())
186 .await
187 .into_diagnostic()??;
188
189 state
190 .db
191 .update_gauge_diff_async(&old_gauge, &GaugeState::Synced)
192 .await;
193
194 let state = state.clone();
195 tokio::task::spawn_blocking(move || {
196 state
197 .db
198 .inner
199 .persist(fjall::PersistMode::Buffer)
200 .into_diagnostic()
201 })
202 .await
203 .into_diagnostic()??;
204
205 // Notify completion to worker shard
206 if let Err(e) = buffer_tx.send(IngestMessage::BackfillFinished(did.clone())) {
207 error!("failed to send BackfillFinished for {did}: {e}");
208 }
209 Ok(())
210 }
211 Ok(None) => {
212 // signal mode: repo had no matching records, was cleaned up by process_did
213 state.db.update_count_async("repos", -1).await;
214 state.db.update_count_async("pending", -1).await;
215 Ok(())
216 }
217 Err(e) => {
218 match &e {
219 BackfillError::Ratelimited => {
220 debug!("failed for {did}: too many requests");
221 }
222 BackfillError::Transport(reason) => {
223 error!("failed for {did}: transport error: {reason}");
224 }
225 BackfillError::Generic(e) => {
226 error!("failed for {did}: {e}");
227 }
228 }
229
230 let error_kind = match &e {
231 BackfillError::Ratelimited => ResyncErrorKind::Ratelimited,
232 BackfillError::Transport(_) => ResyncErrorKind::Transport,
233 BackfillError::Generic(_) => ResyncErrorKind::Generic,
234 };
235
236 let did_key = keys::repo_key(&did);
237
238 // 1. get current retry count
239 let existing_state = Db::get(db.resync.clone(), &did_key).await.and_then(|b| {
240 b.map(|b| rmp_serde::from_slice::<ResyncState>(&b).into_diagnostic())
241 .transpose()
242 })?;
243
244 let (mut retry_count, prev_kind) = match existing_state {
245 Some(ResyncState::Error {
246 kind, retry_count, ..
247 }) => (retry_count, Some(kind)),
248 Some(ResyncState::Gone { .. }) => return Ok(()), // should handle gone? original code didn't really?
249 None => (0, None),
250 };
251
252 // Calculate new stats
253 retry_count += 1;
254 let next_retry = ResyncState::next_backoff(retry_count);
255
256 let resync_state = ResyncState::Error {
257 kind: error_kind.clone(),
258 retry_count,
259 next_retry,
260 };
261
262 let error_string = e.to_string();
263
264 tokio::task::spawn_blocking({
265 let state = state.clone();
266 let did_key = did_key.into_static();
267 move || {
268 // 3. save to resync
269 let serialized_resync_state =
270 rmp_serde::to_vec(&resync_state).into_diagnostic()?;
271
272 // 4. and update the main repo state
273 let serialized_repo_state = if let Some(state_bytes) =
274 state.db.repos.get(&did_key).into_diagnostic()?
275 {
276 let mut state: RepoState =
277 rmp_serde::from_slice(&state_bytes).into_diagnostic()?;
278 state.status = RepoStatus::Error(error_string.into());
279 Some(rmp_serde::to_vec(&state).into_diagnostic()?)
280 } else {
281 None
282 };
283
284 let mut batch = state.db.inner.batch();
285 batch.insert(&state.db.resync, &did_key, serialized_resync_state);
286 batch.remove(&state.db.pending, pending_key.clone());
287 if let Some(state_bytes) = serialized_repo_state {
288 batch.insert(&state.db.repos, &did_key, state_bytes);
289 }
290 batch.commit().into_diagnostic()
291 }
292 })
293 .await
294 .into_diagnostic()??;
295
296 let old_gauge = if let Some(k) = prev_kind {
297 GaugeState::Resync(Some(k))
298 } else {
299 GaugeState::Pending
300 };
301
302 let new_gauge = GaugeState::Resync(Some(error_kind));
303
304 state
305 .db
306 .update_gauge_diff_async(&old_gauge, &new_gauge)
307 .await;
308
309 Err(e)
310 }
311 }
312}
313
314#[derive(Debug, Diagnostic, Error)]
315enum BackfillError {
316 #[error("{0}")]
317 Generic(miette::Report),
318 #[error("too many requests")]
319 Ratelimited,
320 #[error("transport error: {0}")]
321 Transport(SmolStr),
322}
323
324impl From<ClientError> for BackfillError {
325 fn from(e: ClientError) -> Self {
326 match e.kind() {
327 ClientErrorKind::Http {
328 status: StatusCode::TOO_MANY_REQUESTS,
329 } => Self::Ratelimited,
330 ClientErrorKind::Transport => Self::Transport(
331 e.source_err()
332 .expect("transport error without source")
333 .to_smolstr(),
334 ),
335 _ => Self::Generic(e.into()),
336 }
337 }
338}
339
340impl From<miette::Report> for BackfillError {
341 fn from(e: miette::Report) -> Self {
342 Self::Generic(e)
343 }
344}
345
346impl From<ResolverError> for BackfillError {
347 fn from(e: ResolverError) -> Self {
348 match e {
349 ResolverError::Ratelimited => Self::Ratelimited,
350 ResolverError::Transport(s) => Self::Transport(s),
351 ResolverError::Generic(e) => Self::Generic(e),
352 }
353 }
354}
355
356async fn process_did<'i>(
357 app_state: &Arc<AppState>,
358 http: &reqwest::Client,
359 did: &Did<'static>,
360 verify_signatures: bool,
361) -> Result<Option<RepoState<'static>>, BackfillError> {
362 debug!("backfilling {}", did);
363
364 let db = &app_state.db;
365 let did_key = keys::repo_key(did);
366 let state_bytes = Db::get(db.repos.clone(), did_key)
367 .await?
368 .ok_or_else(|| miette::miette!("!!!THIS IS A BUG!!! repo state for {did} missing"))?;
369 let mut state: RepoState<'static> = rmp_serde::from_slice::<RepoState>(&state_bytes)
370 .into_diagnostic()?
371 .into_static();
372 let previous_state = state.clone();
373
374 // 1. resolve pds
375 let start = Instant::now();
376 let (pds_url, handle) = app_state.resolver.resolve_identity_info(did).await?;
377 trace!(
378 "resolved {did} to pds {pds_url} handle {handle:?} in {:?}",
379 start.elapsed()
380 );
381
382 state.handle = handle.map(|h| h.into_static());
383
384 let emit_identity = |status: &RepoStatus| {
385 let evt = AccountEvt {
386 did: did.clone(),
387 active: !matches!(
388 status,
389 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended
390 ),
391 status: Some(
392 match status {
393 RepoStatus::Deactivated => "deactivated",
394 RepoStatus::Takendown => "takendown",
395 RepoStatus::Suspended => "suspended",
396 _ => "active",
397 }
398 .into(),
399 ),
400 };
401 let _ = app_state.db.event_tx.send(ops::make_account_event(db, evt));
402 };
403
404 // 2. fetch repo (car)
405 let start = Instant::now();
406 let req = GetRepo::new().did(did.clone()).build();
407 let resp = http.xrpc(pds_url).send(&req).await?;
408
409 let car_bytes = match resp.into_output() {
410 Ok(o) => o,
411 Err(XrpcError::Xrpc(e)) => {
412 if matches!(e, GetRepoError::RepoNotFound(_)) {
413 warn!("repo {did} not found, deleting");
414 let mut batch = db.inner.batch();
415 if let Err(e) = crate::ops::delete_repo(&mut batch, db, did, &state) {
416 tracing::error!("failed to wipe repo during backfill: {e}");
417 }
418 batch.commit().into_diagnostic()?;
419 return Ok(Some(previous_state)); // stop backfill
420 }
421
422 let inactive_status = match e {
423 GetRepoError::RepoDeactivated(_) => Some(RepoStatus::Deactivated),
424 GetRepoError::RepoTakendown(_) => Some(RepoStatus::Takendown),
425 GetRepoError::RepoSuspended(_) => Some(RepoStatus::Suspended),
426 _ => None,
427 };
428
429 if let Some(status) = inactive_status {
430 warn!("repo {did} is {status:?}, stopping backfill");
431
432 emit_identity(&status);
433
434 let resync_state = ResyncState::Gone {
435 status: status.clone(),
436 };
437 let resync_bytes = rmp_serde::to_vec(&resync_state).into_diagnostic()?;
438
439 let app_state_clone = app_state.clone();
440 app_state
441 .db
442 .update_repo_state_async(did, move |state, (key, batch)| {
443 state.status = status;
444 batch.insert(&app_state_clone.db.resync, key, resync_bytes);
445 Ok((true, ()))
446 })
447 .await?;
448
449 // return success so wrapper stops retrying
450 return Ok(Some(previous_state));
451 }
452
453 Err(e).into_diagnostic()?
454 }
455 Err(e) => Err(e).into_diagnostic()?,
456 };
457
458 // emit identity event so any consumers know
459 emit_identity(&state.status);
460
461 trace!(
462 "fetched {} bytes for {did} in {:?}",
463 car_bytes.body.len(),
464 start.elapsed()
465 );
466
467 // 3. import repo
468 let start = Instant::now();
469 let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body)
470 .await
471 .into_diagnostic()?;
472 trace!("parsed car for {did} in {:?}", start.elapsed());
473
474 let start = Instant::now();
475 let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks));
476 trace!(
477 "stored {} blocks in memory for {did} in {:?}",
478 store.len(),
479 start.elapsed()
480 );
481
482 // 4. parse root commit to get mst root
483 let root_bytes = store
484 .get(&parsed.root)
485 .await
486 .into_diagnostic()?
487 .ok_or_else(|| miette::miette!("root block missing from CAR"))?;
488
489 let root_commit = jacquard_repo::commit::Commit::from_cbor(&root_bytes).into_diagnostic()?;
490 debug!(
491 "backfilling repo at revision {}, root cid {}",
492 root_commit.rev, root_commit.data
493 );
494
495 // 4.5. verify commit signature
496 if verify_signatures {
497 let pubkey = app_state.resolver.resolve_signing_key(did).await?;
498 root_commit
499 .verify(&pubkey)
500 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?;
501 trace!("signature verified for {did}");
502 }
503
504 // 5. walk mst
505 let start = Instant::now();
506 let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None);
507 let leaves = mst.leaves().await.into_diagnostic()?;
508 trace!("walked mst for {did} in {}", start.elapsed().as_secs_f64());
509
510 // 6. insert records into db
511 let start = Instant::now();
512 let result = {
513 let app_state = app_state.clone();
514 let did = did.clone();
515 let rev = root_commit.rev;
516
517 tokio::task::spawn_blocking(move || {
518 let filter = app_state.filter.load();
519 let mut count = 0;
520 let mut delta = 0;
521 let mut added_blocks = 0;
522 let mut collection_counts: HashMap<SmolStr, u64> = HashMap::new();
523 let mut batch = app_state.db.inner.batch();
524 let store = mst.storage();
525
526 let prefix = keys::record_prefix_did(&did);
527 let mut existing_cids: HashMap<(SmolStr, DbRkey), SmolStr> = HashMap::new();
528
529 for guard in app_state.db.records.prefix(&prefix) {
530 let (key, cid_bytes) = guard.into_inner().into_diagnostic()?;
531 // key is did|collection|rkey
532 // skip did|
533 let mut remaining = key[prefix.len()..].splitn(2, |b| keys::SEP.eq(b));
534 let collection_raw = remaining
535 .next()
536 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?;
537 let rkey_raw = remaining
538 .next()
539 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?;
540
541 let collection = std::str::from_utf8(collection_raw)
542 .map_err(|e| miette::miette!("invalid collection utf8: {e}"))?;
543
544 let rkey = keys::parse_rkey(rkey_raw)
545 .map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?;
546
547 let cid = cid::Cid::read_bytes(cid_bytes.as_ref())
548 .map_err(|e| miette::miette!("invalid cid '{cid_bytes:?}' for {did}: {e}"))?
549 .to_smolstr();
550
551 existing_cids.insert((collection.into(), rkey), cid);
552 }
553
554 let mut signal_seen = filter.mode == FilterMode::Full || state.tracked;
555
556 debug!(
557 "backfilling {did}: signal_seen initial={signal_seen}, mode={:?}, signals={:?}",
558 filter.mode, filter.signals
559 );
560
561 for (key, cid) in leaves {
562 let val_bytes = tokio::runtime::Handle::current()
563 .block_on(store.get(&cid))
564 .into_diagnostic()?;
565
566 if let Some(val) = val_bytes {
567 let (collection, rkey) = ops::parse_path(&key)?;
568
569 if !filter.matches_collection(collection) {
570 continue;
571 }
572
573 if !signal_seen && filter.matches_signal(collection) {
574 debug!("backfill {did}: signal matched on {collection}");
575 signal_seen = true;
576 }
577
578 let rkey = DbRkey::new(rkey);
579 let path = (collection.to_smolstr(), rkey.clone());
580 let cid_obj = Cid::ipld(cid);
581
582 // check if this record already exists with same CID
583 let (action, is_new) = if let Some(existing_cid) = existing_cids.remove(&path) {
584 if existing_cid == cid_obj.as_str() {
585 trace!("skip {did}/{collection}/{rkey} ({cid})");
586 continue; // skip unchanged record
587 }
588 (DbAction::Update, false)
589 } else {
590 (DbAction::Create, true)
591 };
592 trace!("{action} {did}/{collection}/{rkey} ({cid})");
593
594 // key is did|collection|rkey
595 let db_key = keys::record_key(&did, collection, &rkey);
596
597 batch.insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref());
598 batch.insert(&app_state.db.records, db_key, cid.to_bytes());
599
600 added_blocks += 1;
601 if is_new {
602 delta += 1;
603 *collection_counts.entry(path.0.clone()).or_default() += 1;
604 }
605
606 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst);
607 let evt = StoredEvent {
608 live: false,
609 did: TrimmedDid::from(&did),
610 rev: DbTid::from(&rev),
611 collection: CowStr::Borrowed(collection),
612 rkey,
613 action,
614 cid: Some(cid_obj.to_ipld().expect("valid cid")),
615 };
616 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
617 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes);
618
619 count += 1;
620 }
621 }
622
623 // remove any remaining existing records (they weren't in the new MST)
624 for ((collection, rkey), cid) in existing_cids {
625 trace!("remove {did}/{collection}/{rkey} ({cid})");
626
627 batch.remove(
628 &app_state.db.records,
629 keys::record_key(&did, &collection, &rkey),
630 );
631
632 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst);
633 let evt = StoredEvent {
634 live: false,
635 did: TrimmedDid::from(&did),
636 rev: DbTid::from(&rev),
637 collection: CowStr::Borrowed(&collection),
638 rkey,
639 action: DbAction::Delete,
640 cid: None,
641 };
642 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
643 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes);
644
645 delta -= 1;
646 count += 1;
647 }
648
649 if !signal_seen {
650 debug!("backfill {did}: no signal-matching records found, discarding repo");
651 return Ok::<_, miette::Report>(None);
652 }
653
654 // 6. update data, status is updated in worker shard
655 state.rev = Some((&rev).into());
656 state.data = Some(root_commit.data);
657 state.last_updated_at = chrono::Utc::now().timestamp();
658
659 batch.insert(
660 &app_state.db.repos,
661 keys::repo_key(&did),
662 ser_repo_state(&state)?,
663 );
664
665 // add the counts
666 for (col, cnt) in collection_counts {
667 db::set_record_count(&mut batch, &app_state.db, &did, &col, cnt);
668 }
669
670 batch.commit().into_diagnostic()?;
671
672 Ok::<_, miette::Report>(Some((state, delta, added_blocks, count)))
673 })
674 .await
675 .into_diagnostic()??
676 };
677
678 let Some((_state, records_cnt_delta, added_blocks, count)) = result else {
679 // signal mode: no signal-matching records found — clean up the optimistically-added repo
680 let did_key = keys::repo_key(did);
681 let backfill_pending_key = keys::pending_key(previous_state.index_id);
682 let repos_ks = app_state.db.repos.clone();
683 let pending_ks = app_state.db.pending.clone();
684 let db_inner = app_state.db.inner.clone();
685 tokio::task::spawn_blocking(move || {
686 let mut batch = db_inner.batch();
687 batch.remove(&repos_ks, &did_key);
688 batch.remove(&pending_ks, backfill_pending_key);
689 batch.commit().into_diagnostic()
690 })
691 .await
692 .into_diagnostic()??;
693 return Ok(None);
694 };
695
696 trace!("did {count} ops for {did} in {:?}", start.elapsed());
697
698 // do the counts
699 if records_cnt_delta > 0 {
700 app_state
701 .db
702 .update_count_async("records", records_cnt_delta)
703 .await;
704 app_state
705 .db
706 .update_count_async("blocks", added_blocks)
707 .await;
708 }
709 trace!(
710 "committed backfill batch for {did} in {:?}",
711 start.elapsed()
712 );
713
714 let _ = db.event_tx.send(BroadcastEvent::Persisted(
715 db.next_event_id.load(Ordering::SeqCst) - 1,
716 ));
717
718 trace!("backfill complete for {did}");
719 Ok(Some(previous_state))
720}