forked from
ptr.pet/hydrant
kind of like tap but different and in rust
1use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
2use crate::db::{self, Db, keys, ser_repo_state};
3use crate::filter::FilterMode;
4use crate::ops;
5use crate::resolver::ResolverError;
6use crate::state::AppState;
7use crate::types::{
8 AccountEvt, BroadcastEvent, GaugeState, RepoState, RepoStatus, ResyncState, StoredEvent,
9};
10
11use fjall::Slice;
12use jacquard::api::com_atproto::sync::get_repo::{GetRepo, GetRepoError};
13use jacquard::error::{ClientError, ClientErrorKind};
14use jacquard::types::cid::Cid;
15use jacquard::types::did::Did;
16use jacquard::{CowStr, IntoStatic, prelude::*};
17use jacquard_common::xrpc::XrpcError;
18use jacquard_repo::mst::Mst;
19use jacquard_repo::{BlockStore, MemoryBlockStore};
20use miette::{Diagnostic, IntoDiagnostic, Result};
21use reqwest::StatusCode;
22use smol_str::{SmolStr, ToSmolStr};
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::sync::atomic::Ordering;
26use std::time::{Duration, Instant};
27use thiserror::Error;
28use tokio::sync::Semaphore;
29use tracing::{debug, error, info, trace, warn};
30
31pub mod manager;
32
33use crate::ingest::{BufferTx, IngestMessage};
34
35pub struct BackfillWorker {
36 state: Arc<AppState>,
37 buffer_tx: BufferTx,
38 http: reqwest::Client,
39 semaphore: Arc<Semaphore>,
40 verify_signatures: bool,
41 in_flight: Arc<scc::HashSet<Did<'static>>>,
42}
43
44impl BackfillWorker {
45 pub fn new(
46 state: Arc<AppState>,
47 buffer_tx: BufferTx,
48 timeout: Duration,
49 concurrency_limit: usize,
50 verify_signatures: bool,
51 ) -> Self {
52 Self {
53 state,
54 buffer_tx,
55 http: reqwest::Client::builder()
56 .timeout(timeout)
57 .zstd(true)
58 .brotli(true)
59 .gzip(true)
60 .build()
61 .expect("failed to build http client"),
62 semaphore: Arc::new(Semaphore::new(concurrency_limit)),
63 verify_signatures,
64 in_flight: Arc::new(scc::HashSet::new()),
65 }
66 }
67}
68
69struct InFlightGuard {
70 did: Did<'static>,
71 set: Arc<scc::HashSet<Did<'static>>>,
72}
73
74impl Drop for InFlightGuard {
75 fn drop(&mut self) {
76 let _ = self.set.remove_sync(&self.did);
77 }
78}
79
80impl BackfillWorker {
81 pub async fn run(self) {
82 info!("backfill worker started");
83
84 loop {
85 let mut spawned = 0;
86
87 for guard in self.state.db.pending.iter() {
88 let (key, value) = match guard.into_inner() {
89 Ok(kv) => kv,
90 Err(e) => {
91 error!("failed to read pending entry: {e}");
92 db::check_poisoned(&e);
93 continue;
94 }
95 };
96
97 let did = match TrimmedDid::try_from(value.as_ref()) {
98 Ok(d) => d.to_did(),
99 Err(e) => {
100 error!("invalid did in pending value: {e}");
101 continue;
102 }
103 };
104
105 if self.in_flight.contains_sync(&did) {
106 continue;
107 }
108 let _ = self.in_flight.insert_sync(did.clone().into_static());
109
110 let permit = match self.semaphore.clone().try_acquire_owned() {
111 Ok(p) => p,
112 Err(_) => break,
113 };
114
115 let guard = InFlightGuard {
116 did: did.clone().into_static(),
117 set: self.in_flight.clone(),
118 };
119
120 let state = self.state.clone();
121 let http = self.http.clone();
122 let did = did.clone();
123 let buffer_tx = self.buffer_tx.clone();
124 let verify = self.verify_signatures;
125
126 tokio::spawn(async move {
127 let _guard = guard;
128 let res = did_task(&state, http, buffer_tx, &did, key, permit, verify).await;
129
130 if let Err(e) = res {
131 error!("backfill process failed for {did}: {e}");
132 if let BackfillError::Generic(report) = &e {
133 db::check_poisoned_report(report);
134 }
135 }
136
137 // wake worker to pick up more (in case we were sleeping at limit)
138 state.backfill_notify.notify_one();
139 });
140
141 spawned += 1;
142 }
143
144 if spawned == 0 {
145 // wait for new tasks
146 self.state.backfill_notify.notified().await;
147 } else {
148 // if we spawned tasks, yield briefly to let them start and avoid tight loop
149 tokio::time::sleep(Duration::from_millis(10)).await;
150 }
151 }
152 }
153}
154
155async fn did_task(
156 state: &Arc<AppState>,
157 http: reqwest::Client,
158 buffer_tx: BufferTx,
159 did: &Did<'static>,
160 pending_key: Slice,
161 _permit: tokio::sync::OwnedSemaphorePermit,
162 verify_signatures: bool,
163) -> Result<(), BackfillError> {
164 let db = &state.db;
165
166 match process_did(&state, &http, &did, verify_signatures).await {
167 Ok(Some(previous_state)) => {
168 let did_key = keys::repo_key(&did);
169
170 // determine old gauge state
171 // if it was error/suspended etc, we need to know which error kind it was to decrement correctly.
172 // we have to peek at the resync state. `previous_state` is the repo state, which tells us the Status.
173 let old_gauge = match previous_state.status {
174 RepoStatus::Backfilling => GaugeState::Pending,
175 RepoStatus::Error(_)
176 | RepoStatus::Deactivated
177 | RepoStatus::Takendown
178 | RepoStatus::Suspended => {
179 // we need to fetch the resync state to know the kind
180 // if it's missing, we assume Generic (or handle gracefully)
181 // this is an extra read, but necessary for accurate gauges.
182 let resync_state = Db::get(db.resync.clone(), &did_key).await.ok().flatten();
183 let kind = resync_state.and_then(|b| {
184 rmp_serde::from_slice::<ResyncState>(&b)
185 .ok()
186 .and_then(|s| match s {
187 ResyncState::Error { kind, .. } => Some(kind),
188 _ => None,
189 })
190 });
191 GaugeState::Resync(kind)
192 }
193 RepoStatus::Synced => GaugeState::Synced,
194 };
195
196 let mut batch = db.inner.batch();
197 // remove from pending
198 if old_gauge == GaugeState::Pending {
199 batch.remove(&db.pending, pending_key);
200 }
201 // remove from resync
202 if old_gauge.is_resync() {
203 batch.remove(&db.resync, &did_key);
204 }
205 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic())
206 .await
207 .into_diagnostic()??;
208
209 state
210 .db
211 .update_gauge_diff_async(&old_gauge, &GaugeState::Synced)
212 .await;
213
214 let state = state.clone();
215 tokio::task::spawn_blocking(move || {
216 state
217 .db
218 .inner
219 .persist(fjall::PersistMode::Buffer)
220 .into_diagnostic()
221 })
222 .await
223 .into_diagnostic()??;
224
225 // Notify completion to worker shard
226 if let Err(e) = buffer_tx.send(IngestMessage::BackfillFinished(did.clone())) {
227 error!("failed to send BackfillFinished for {did}: {e}");
228 }
229 Ok(())
230 }
231 Ok(None) => {
232 // signal mode: repo had no matching records, was cleaned up by process_did
233 state.db.update_count_async("repos", -1).await;
234 state.db.update_count_async("pending", -1).await;
235 Ok(())
236 }
237 Err(e) => {
238 match &e {
239 BackfillError::Ratelimited => {
240 debug!("failed for {did}: too many requests");
241 }
242 BackfillError::Transport(reason) => {
243 error!("failed for {did}: transport error: {reason}");
244 }
245 BackfillError::Generic(e) => {
246 error!("failed for {did}: {e}");
247 }
248 }
249
250 let error_kind = match &e {
251 BackfillError::Ratelimited => crate::types::ResyncErrorKind::Ratelimited,
252 BackfillError::Transport(_) => crate::types::ResyncErrorKind::Transport,
253 BackfillError::Generic(_) => crate::types::ResyncErrorKind::Generic,
254 };
255
256 let did_key = keys::repo_key(&did);
257
258 // 1. get current retry count
259 let existing_state = Db::get(db.resync.clone(), &did_key).await.and_then(|b| {
260 b.map(|b| rmp_serde::from_slice::<ResyncState>(&b).into_diagnostic())
261 .transpose()
262 })?;
263
264 let (mut retry_count, prev_kind) = match existing_state {
265 Some(ResyncState::Error {
266 kind, retry_count, ..
267 }) => (retry_count, Some(kind)),
268 Some(ResyncState::Gone { .. }) => return Ok(()), // should handle gone? original code didn't really?
269 None => (0, None),
270 };
271
272 // Calculate new stats
273 retry_count += 1;
274 let next_retry = ResyncState::next_backoff(retry_count);
275
276 let resync_state = ResyncState::Error {
277 kind: error_kind.clone(),
278 retry_count,
279 next_retry,
280 };
281
282 let error_string = e.to_string();
283
284 tokio::task::spawn_blocking({
285 let state = state.clone();
286 let did_key = did_key.into_static();
287 move || {
288 // 3. save to resync
289 let serialized_resync_state =
290 rmp_serde::to_vec(&resync_state).into_diagnostic()?;
291
292 // 4. and update the main repo state
293 let serialized_repo_state = if let Some(state_bytes) =
294 state.db.repos.get(&did_key).into_diagnostic()?
295 {
296 let mut state: RepoState =
297 rmp_serde::from_slice(&state_bytes).into_diagnostic()?;
298 state.status = RepoStatus::Error(error_string.into());
299 Some(rmp_serde::to_vec(&state).into_diagnostic()?)
300 } else {
301 None
302 };
303
304 let mut batch = state.db.inner.batch();
305 batch.insert(&state.db.resync, &did_key, serialized_resync_state);
306 batch.remove(&state.db.pending, pending_key.clone());
307 if let Some(state_bytes) = serialized_repo_state {
308 batch.insert(&state.db.repos, &did_key, state_bytes);
309 }
310 batch.commit().into_diagnostic()
311 }
312 })
313 .await
314 .into_diagnostic()??;
315
316 let old_gauge = if let Some(k) = prev_kind {
317 GaugeState::Resync(Some(k))
318 } else {
319 GaugeState::Pending
320 };
321
322 let new_gauge = GaugeState::Resync(Some(error_kind));
323
324 state
325 .db
326 .update_gauge_diff_async(&old_gauge, &new_gauge)
327 .await;
328
329 Err(e)
330 }
331 }
332}
333
334#[derive(Debug, Diagnostic, Error)]
335enum BackfillError {
336 #[error("{0}")]
337 Generic(miette::Report),
338 #[error("too many requests")]
339 Ratelimited,
340 #[error("transport error: {0}")]
341 Transport(SmolStr),
342}
343
344impl From<ClientError> for BackfillError {
345 fn from(e: ClientError) -> Self {
346 match e.kind() {
347 ClientErrorKind::Http {
348 status: StatusCode::TOO_MANY_REQUESTS,
349 } => Self::Ratelimited,
350 ClientErrorKind::Transport => Self::Transport(
351 e.source_err()
352 .expect("transport error without source")
353 .to_smolstr(),
354 ),
355 _ => Self::Generic(e.into()),
356 }
357 }
358}
359
360impl From<miette::Report> for BackfillError {
361 fn from(e: miette::Report) -> Self {
362 Self::Generic(e)
363 }
364}
365
366impl From<ResolverError> for BackfillError {
367 fn from(e: ResolverError) -> Self {
368 match e {
369 ResolverError::Ratelimited => Self::Ratelimited,
370 ResolverError::Transport(s) => Self::Transport(s),
371 ResolverError::Generic(e) => Self::Generic(e),
372 }
373 }
374}
375
376async fn process_did<'i>(
377 app_state: &Arc<AppState>,
378 http: &reqwest::Client,
379 did: &Did<'static>,
380 verify_signatures: bool,
381) -> Result<Option<RepoState<'static>>, BackfillError> {
382 debug!("backfilling {}", did);
383
384 let db = &app_state.db;
385 let did_key = keys::repo_key(did);
386 let state_bytes = Db::get(db.repos.clone(), did_key)
387 .await?
388 .ok_or_else(|| miette::miette!("!!!THIS IS A BUG!!! repo state for {did} missing"))?;
389 let mut state: RepoState<'static> = rmp_serde::from_slice::<RepoState>(&state_bytes)
390 .into_diagnostic()?
391 .into_static();
392 let previous_state = state.clone();
393
394 // 1. resolve pds
395 let start = Instant::now();
396 let (pds_url, handle) = app_state.resolver.resolve_identity_info(did).await?;
397 trace!(
398 "resolved {did} to pds {pds_url} handle {handle:?} in {:?}",
399 start.elapsed()
400 );
401
402 state.handle = handle.map(|h| h.into_static());
403
404 let emit_identity = |status: &RepoStatus| {
405 let evt = AccountEvt {
406 did: did.clone(),
407 active: !matches!(
408 status,
409 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended
410 ),
411 status: Some(
412 match status {
413 RepoStatus::Deactivated => "deactivated",
414 RepoStatus::Takendown => "takendown",
415 RepoStatus::Suspended => "suspended",
416 _ => "active",
417 }
418 .into(),
419 ),
420 };
421 let _ = app_state.db.event_tx.send(ops::make_account_event(db, evt));
422 };
423
424 // 2. fetch repo (car)
425 let start = Instant::now();
426 let req = GetRepo::new().did(did.clone()).build();
427 let resp = http.xrpc(pds_url).send(&req).await?;
428
429 let car_bytes = match resp.into_output() {
430 Ok(o) => o,
431 Err(XrpcError::Xrpc(e)) => {
432 if matches!(e, GetRepoError::RepoNotFound(_)) {
433 warn!("repo {did} not found, deleting");
434 let mut batch = db.inner.batch();
435 ops::delete_repo(&mut batch, db, did, state)?;
436 batch.commit().into_diagnostic()?;
437 return Ok(Some(previous_state)); // stop backfill
438 }
439
440 let inactive_status = match e {
441 GetRepoError::RepoDeactivated(_) => Some(RepoStatus::Deactivated),
442 GetRepoError::RepoTakendown(_) => Some(RepoStatus::Takendown),
443 GetRepoError::RepoSuspended(_) => Some(RepoStatus::Suspended),
444 _ => None,
445 };
446
447 if let Some(status) = inactive_status {
448 warn!("repo {did} is {status:?}, stopping backfill");
449
450 emit_identity(&status);
451
452 let resync_state = ResyncState::Gone {
453 status: status.clone(),
454 };
455 let resync_bytes = rmp_serde::to_vec(&resync_state).into_diagnostic()?;
456
457 let app_state_clone = app_state.clone();
458 app_state
459 .db
460 .update_repo_state_async(did, move |state, (key, batch)| {
461 state.status = status;
462 batch.insert(&app_state_clone.db.resync, key, resync_bytes);
463 Ok((true, ()))
464 })
465 .await?;
466
467 // return success so wrapper stops retrying
468 return Ok(Some(previous_state));
469 }
470
471 Err(e).into_diagnostic()?
472 }
473 Err(e) => Err(e).into_diagnostic()?,
474 };
475
476 // emit identity event so any consumers know
477 emit_identity(&state.status);
478
479 trace!(
480 "fetched {} bytes for {did} in {:?}",
481 car_bytes.body.len(),
482 start.elapsed()
483 );
484
485 // 3. import repo
486 let start = Instant::now();
487 let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body)
488 .await
489 .into_diagnostic()?;
490 trace!("parsed car for {did} in {:?}", start.elapsed());
491
492 let start = Instant::now();
493 let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks));
494 trace!(
495 "stored {} blocks in memory for {did} in {:?}",
496 store.len(),
497 start.elapsed()
498 );
499
500 // 4. parse root commit to get mst root
501 let root_bytes = store
502 .get(&parsed.root)
503 .await
504 .into_diagnostic()?
505 .ok_or_else(|| miette::miette!("root block missing from CAR"))?;
506
507 let root_commit = jacquard_repo::commit::Commit::from_cbor(&root_bytes).into_diagnostic()?;
508 debug!(
509 "backfilling repo at revision {}, root cid {}",
510 root_commit.rev, root_commit.data
511 );
512
513 // 4.5. verify commit signature
514 if verify_signatures {
515 let pubkey = app_state.resolver.resolve_signing_key(did).await?;
516 root_commit
517 .verify(&pubkey)
518 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?;
519 trace!("signature verified for {did}");
520 }
521
522 // 5. walk mst
523 let start = Instant::now();
524 let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None);
525 let leaves = mst.leaves().await.into_diagnostic()?;
526 trace!("walked mst for {did} in {}", start.elapsed().as_secs_f64());
527
528 // 6. insert records into db
529 let start = Instant::now();
530 let result = {
531 let app_state = app_state.clone();
532 let did = did.clone();
533 let rev = root_commit.rev;
534
535 tokio::task::spawn_blocking(move || {
536 let filter = app_state.filter.load();
537 let mut count = 0;
538 let mut delta = 0;
539 let mut added_blocks = 0;
540 let mut collection_counts: HashMap<SmolStr, u64> = HashMap::new();
541 let mut batch = app_state.db.inner.batch();
542 let store = mst.storage();
543
544 let prefix = keys::record_prefix_did(&did);
545 let mut existing_cids: HashMap<(SmolStr, DbRkey), SmolStr> = HashMap::new();
546
547 for guard in app_state.db.records.prefix(&prefix) {
548 let (key, cid_bytes) = guard.into_inner().into_diagnostic()?;
549 // key is did|collection|rkey
550 // skip did|
551 let mut remaining = key[prefix.len()..].splitn(2, |b| keys::SEP.eq(b));
552 let collection_raw = remaining
553 .next()
554 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?;
555 let rkey_raw = remaining
556 .next()
557 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?;
558
559 let collection = std::str::from_utf8(collection_raw)
560 .map_err(|e| miette::miette!("invalid collection utf8: {e}"))?;
561
562 let rkey = keys::parse_rkey(rkey_raw)
563 .map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?;
564
565 let cid = cid::Cid::read_bytes(cid_bytes.as_ref())
566 .map_err(|e| miette::miette!("invalid cid '{cid_bytes:?}' for {did}: {e}"))?
567 .to_smolstr();
568
569 existing_cids.insert((collection.into(), rkey), cid);
570 }
571
572 let mut signal_seen = filter.mode != FilterMode::Signal;
573 debug!(
574 "backfilling {did}: signal_seen initial={signal_seen}, mode={:?}, signals={:?}",
575 filter.mode, filter.signals
576 );
577
578 for (key, cid) in leaves {
579 let val_bytes = tokio::runtime::Handle::current()
580 .block_on(store.get(&cid))
581 .into_diagnostic()?;
582
583 if let Some(val) = val_bytes {
584 let (collection, rkey) = ops::parse_path(&key)?;
585
586 if !filter.matches_collection(collection) {
587 continue;
588 }
589
590 if !signal_seen && filter.matches_signal(collection) {
591 debug!("backfill {did}: signal matched on {collection}");
592 signal_seen = true;
593 }
594
595 let rkey = DbRkey::new(rkey);
596 let path = (collection.to_smolstr(), rkey.clone());
597 let cid_obj = Cid::ipld(cid);
598
599 // check if this record already exists with same CID
600 let (action, is_new) = if let Some(existing_cid) = existing_cids.remove(&path) {
601 if existing_cid == cid_obj.as_str() {
602 trace!("skip {did}/{collection}/{rkey} ({cid})");
603 continue; // skip unchanged record
604 }
605 (DbAction::Update, false)
606 } else {
607 (DbAction::Create, true)
608 };
609 trace!("{action} {did}/{collection}/{rkey} ({cid})");
610
611 // key is did|collection|rkey
612 let db_key = keys::record_key(&did, collection, &rkey);
613
614 batch.insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref());
615 batch.insert(&app_state.db.records, db_key, cid.to_bytes());
616
617 added_blocks += 1;
618 if is_new {
619 delta += 1;
620 *collection_counts.entry(path.0.clone()).or_default() += 1;
621 }
622
623 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst);
624 let evt = StoredEvent {
625 live: false,
626 did: TrimmedDid::from(&did),
627 rev: DbTid::from(&rev),
628 collection: CowStr::Borrowed(collection),
629 rkey,
630 action,
631 cid: Some(cid_obj.to_ipld().expect("valid cid")),
632 };
633 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
634 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes);
635
636 count += 1;
637 }
638 }
639
640 // remove any remaining existing records (they weren't in the new MST)
641 for ((collection, rkey), cid) in existing_cids {
642 trace!("remove {did}/{collection}/{rkey} ({cid})");
643
644 batch.remove(
645 &app_state.db.records,
646 keys::record_key(&did, &collection, &rkey),
647 );
648
649 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst);
650 let evt = StoredEvent {
651 live: false,
652 did: TrimmedDid::from(&did),
653 rev: DbTid::from(&rev),
654 collection: CowStr::Borrowed(&collection),
655 rkey,
656 action: DbAction::Delete,
657 cid: None,
658 };
659 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
660 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes);
661
662 delta -= 1;
663 count += 1;
664 }
665
666 if !signal_seen {
667 debug!("backfill {did}: no signal-matching records found, discarding repo");
668 return Ok::<_, miette::Report>(None);
669 }
670
671 // 6. update data, status is updated in worker shard
672 state.rev = Some((&rev).into());
673 state.data = Some(root_commit.data);
674 state.last_updated_at = chrono::Utc::now().timestamp();
675
676 batch.insert(
677 &app_state.db.repos,
678 keys::repo_key(&did),
679 ser_repo_state(&state)?,
680 );
681
682 // add the counts
683 for (col, cnt) in collection_counts {
684 db::set_record_count(&mut batch, &app_state.db, &did, &col, cnt);
685 }
686
687 batch.commit().into_diagnostic()?;
688
689 Ok::<_, miette::Report>(Some((state, delta, added_blocks, count)))
690 })
691 .await
692 .into_diagnostic()??
693 };
694
695 let Some((_state, records_cnt_delta, added_blocks, count)) = result else {
696 // signal mode: no signal-matching records found — clean up the optimistically-added repo
697 let did_key = keys::repo_key(did);
698 let backfill_pending_key = keys::pending_key(previous_state.index_id);
699 let repos_ks = app_state.db.repos.clone();
700 let pending_ks = app_state.db.pending.clone();
701 let db_inner = app_state.db.inner.clone();
702 tokio::task::spawn_blocking(move || {
703 let mut batch = db_inner.batch();
704 batch.remove(&repos_ks, &did_key);
705 batch.remove(&pending_ks, backfill_pending_key);
706 batch.commit().into_diagnostic()
707 })
708 .await
709 .into_diagnostic()??;
710 return Ok(None);
711 };
712
713 trace!("did {count} ops for {did} in {:?}", start.elapsed());
714
715 // do the counts
716 if records_cnt_delta > 0 {
717 app_state
718 .db
719 .update_count_async("records", records_cnt_delta)
720 .await;
721 app_state
722 .db
723 .update_count_async("blocks", added_blocks)
724 .await;
725 }
726 trace!(
727 "committed backfill batch for {did} in {:?}",
728 start.elapsed()
729 );
730
731 let _ = db.event_tx.send(BroadcastEvent::Persisted(
732 db.next_event_id.load(Ordering::SeqCst) - 1,
733 ));
734
735 trace!("backfill complete for {did}");
736 Ok(Some(previous_state))
737}