at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
2use crate::db::{self, Db, keys, ser_repo_state};
3use crate::filter::FilterMode;
4use crate::ops;
5use crate::resolver::ResolverError;
6use crate::state::AppState;
7use crate::types::{
8 AccountEvt, BroadcastEvent, GaugeState, RepoState, RepoStatus, ResyncErrorKind, ResyncState,
9 StoredEvent,
10};
11
12use fjall::Slice;
13use jacquard_api::com_atproto::sync::get_repo::{GetRepo, GetRepoError};
14use jacquard_common::error::{ClientError, ClientErrorKind};
15use jacquard_common::types::cid::Cid;
16use jacquard_common::types::did::Did;
17use jacquard_common::xrpc::{XrpcError, XrpcExt};
18use jacquard_common::{CowStr, IntoStatic};
19use jacquard_repo::mst::Mst;
20use jacquard_repo::{BlockStore, MemoryBlockStore};
21use miette::{Diagnostic, IntoDiagnostic, Result};
22use reqwest::StatusCode;
23use smol_str::{SmolStr, ToSmolStr};
24use std::collections::HashMap;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::sync::atomic::Ordering;
28use std::time::{Duration, Instant};
29use thiserror::Error;
30use tokio::sync::Semaphore;
31use tracing::{debug, error, info, trace, warn};
32
33pub mod manager;
34
35use crate::ingest::{BufferTx, IngestMessage};
36
37pub struct BackfillWorker {
38 state: Arc<AppState>,
39 buffer_tx: BufferTx,
40 http: reqwest::Client,
41 semaphore: Arc<Semaphore>,
42 verify_signatures: bool,
43 ephemeral: bool,
44 in_flight: Arc<scc::HashSet<Did<'static>>>,
45}
46
47impl BackfillWorker {
48 pub fn new(
49 state: Arc<AppState>,
50 buffer_tx: BufferTx,
51 timeout: Duration,
52 concurrency_limit: usize,
53 verify_signatures: bool,
54 ephemeral: bool,
55 ) -> Self {
56 Self {
57 state,
58 buffer_tx,
59 http: reqwest::Client::builder()
60 .timeout(timeout)
61 .zstd(true)
62 .brotli(true)
63 .gzip(true)
64 .build()
65 .expect("failed to build http client"),
66 semaphore: Arc::new(Semaphore::new(concurrency_limit)),
67 verify_signatures,
68 ephemeral,
69 in_flight: Arc::new(scc::HashSet::new()),
70 }
71 }
72}
73
74struct InFlightGuard {
75 did: Did<'static>,
76 set: Arc<scc::HashSet<Did<'static>>>,
77}
78
79impl Drop for InFlightGuard {
80 fn drop(&mut self) {
81 let _ = self.set.remove_sync(&self.did);
82 }
83}
84
85impl BackfillWorker {
86 pub async fn run(self) {
87 info!("backfill worker started");
88
89 loop {
90 let mut spawned = 0;
91
92 for guard in self.state.db.pending.iter() {
93 let (key, value) = match guard.into_inner() {
94 Ok(kv) => kv,
95 Err(e) => {
96 error!(err = %e, "failed to read pending entry");
97 db::check_poisoned(&e);
98 continue;
99 }
100 };
101
102 let did = match TrimmedDid::try_from(value.as_ref()) {
103 Ok(d) => d.to_did(),
104 Err(e) => {
105 error!(err = %e, "invalid did in pending value");
106 continue;
107 }
108 };
109
110 if self.in_flight.contains_sync(&did) {
111 continue;
112 }
113 let _ = self.in_flight.insert_sync(did.clone().into_static());
114
115 let permit = match self.semaphore.clone().try_acquire_owned() {
116 Ok(p) => p,
117 Err(_) => break,
118 };
119
120 let guard = InFlightGuard {
121 did: did.clone().into_static(),
122 set: self.in_flight.clone(),
123 };
124
125 let state = self.state.clone();
126 let http = self.http.clone();
127 let did = did.clone();
128 let buffer_tx = self.buffer_tx.clone();
129 let verify = self.verify_signatures;
130 let ephemeral = self.ephemeral;
131
132 tokio::spawn(async move {
133 let _guard = guard;
134 let res = did_task(
135 &state, http, buffer_tx, &did, key, permit, verify, ephemeral,
136 )
137 .await;
138
139 if let Err(e) = res {
140 error!(did = %did, err = %e, "backfill process failed");
141 if let BackfillError::Generic(report) = &e {
142 db::check_poisoned_report(report);
143 }
144 }
145
146 // wake worker to pick up more (in case we were sleeping at limit)
147 state.backfill_notify.notify_one();
148 });
149
150 spawned += 1;
151 }
152
153 if spawned == 0 {
154 // wait for new tasks
155 self.state.backfill_notify.notified().await;
156 } else {
157 // if we spawned tasks, yield briefly to let them start and avoid tight loop
158 tokio::time::sleep(Duration::from_millis(10)).await;
159 }
160 }
161 }
162}
163
164async fn did_task(
165 state: &Arc<AppState>,
166 http: reqwest::Client,
167 buffer_tx: BufferTx,
168 did: &Did<'static>,
169 pending_key: Slice,
170 _permit: tokio::sync::OwnedSemaphorePermit,
171 verify_signatures: bool,
172 ephemeral: bool,
173) -> Result<(), BackfillError> {
174 let db = &state.db;
175
176 match process_did(&state, &http, &did, verify_signatures, ephemeral).await {
177 Ok(Some(repo_state)) => {
178 let did_key = keys::repo_key(&did);
179
180 // determine old gauge state
181 // if it was error/suspended etc, we need to know which error kind it was to decrement correctly.
182 // we have to peek at the resync state.
183 let old_gauge = state.db.repo_gauge_state_async(&repo_state, &did_key).await;
184
185 let mut batch = db.inner.batch();
186 // remove from pending
187 if old_gauge == GaugeState::Pending {
188 batch.remove(&db.pending, pending_key);
189 }
190 // remove from resync
191 if old_gauge.is_resync() {
192 batch.remove(&db.resync, &did_key);
193 }
194 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic())
195 .await
196 .into_diagnostic()??;
197
198 state
199 .db
200 .update_gauge_diff_async(&old_gauge, &GaugeState::Synced)
201 .await;
202
203 let state = state.clone();
204 tokio::task::spawn_blocking(move || {
205 state
206 .db
207 .inner
208 .persist(fjall::PersistMode::Buffer)
209 .into_diagnostic()
210 })
211 .await
212 .into_diagnostic()??;
213
214 // Notify completion to worker shard
215 if let Err(e) = buffer_tx.send(IngestMessage::BackfillFinished(did.clone())) {
216 error!(did = %did, err = %e, "failed to send BackfillFinished");
217 }
218 Ok(())
219 }
220 Ok(None) => {
221 // signal mode: repo had no matching records, was cleaned up by process_did
222 state.db.update_count_async("repos", -1).await;
223 state.db.update_count_async("pending", -1).await;
224 Ok(())
225 }
226 Err(BackfillError::Deleted) => {
227 warn!(did = %did, "orphaned pending entry, cleaning up");
228 // orphaned pending entry, clean it up
229 Db::remove(db.pending.clone(), pending_key).await?;
230 state.db.update_count_async("pending", -1).await;
231 Ok(())
232 }
233 Err(e) => {
234 match &e {
235 BackfillError::Ratelimited => {
236 debug!(did = %did, "too many requests");
237 }
238 BackfillError::Transport(reason) => {
239 error!(did = %did, %reason, "transport error");
240 }
241 BackfillError::Generic(e) => {
242 error!(did = %did, err = %e, "failed");
243 }
244 BackfillError::Deleted => unreachable!("already handled"),
245 }
246
247 let error_kind = match &e {
248 BackfillError::Ratelimited => ResyncErrorKind::Ratelimited,
249 BackfillError::Transport(_) => ResyncErrorKind::Transport,
250 BackfillError::Generic(_) => ResyncErrorKind::Generic,
251 BackfillError::Deleted => unreachable!("already handled"),
252 };
253
254 let did_key = keys::repo_key(&did);
255
256 // 1. get current retry count
257 let existing_state = Db::get(db.resync.clone(), &did_key).await.and_then(|b| {
258 b.map(|b| rmp_serde::from_slice::<ResyncState>(&b).into_diagnostic())
259 .transpose()
260 })?;
261
262 let (mut retry_count, prev_kind) = match existing_state {
263 Some(ResyncState::Error {
264 kind, retry_count, ..
265 }) => (retry_count, Some(kind)),
266 Some(ResyncState::Gone { .. }) => return Ok(()), // should handle gone? original code didn't really?
267 None => (0, None),
268 };
269
270 // Calculate new stats
271 retry_count += 1;
272 let next_retry = ResyncState::next_backoff(retry_count);
273
274 let resync_state = ResyncState::Error {
275 kind: error_kind.clone(),
276 retry_count,
277 next_retry,
278 };
279
280 let error_string = e.to_string();
281
282 tokio::task::spawn_blocking({
283 let state = state.clone();
284 let did_key = did_key.into_static();
285 move || {
286 // 3. save to resync
287 let serialized_resync_state =
288 rmp_serde::to_vec(&resync_state).into_diagnostic()?;
289
290 // 4. and update the main repo state
291 let serialized_repo_state = if let Some(state_bytes) =
292 state.db.repos.get(&did_key).into_diagnostic()?
293 {
294 let mut state: RepoState =
295 rmp_serde::from_slice(&state_bytes).into_diagnostic()?;
296 state.status = RepoStatus::Error(error_string.into());
297 Some(rmp_serde::to_vec(&state).into_diagnostic()?)
298 } else {
299 None
300 };
301
302 let mut batch = state.db.inner.batch();
303 batch.insert(&state.db.resync, &did_key, serialized_resync_state);
304 batch.remove(&state.db.pending, pending_key.clone());
305 if let Some(state_bytes) = serialized_repo_state {
306 batch.insert(&state.db.repos, &did_key, state_bytes);
307 }
308 batch.commit().into_diagnostic()
309 }
310 })
311 .await
312 .into_diagnostic()??;
313
314 let old_gauge = prev_kind
315 .map(|k| GaugeState::Resync(Some(k)))
316 .unwrap_or(GaugeState::Pending);
317
318 let new_gauge = GaugeState::Resync(Some(error_kind));
319
320 state
321 .db
322 .update_gauge_diff_async(&old_gauge, &new_gauge)
323 .await;
324
325 Err(e)
326 }
327 }
328}
329
330#[derive(Debug, Diagnostic, Error)]
331enum BackfillError {
332 #[error("{0}")]
333 Generic(miette::Report),
334 #[error("too many requests")]
335 Ratelimited,
336 #[error("transport error: {0}")]
337 Transport(SmolStr),
338 #[error("repo was concurrently deleted")]
339 Deleted,
340}
341
342impl From<ClientError> for BackfillError {
343 fn from(e: ClientError) -> Self {
344 match e.kind() {
345 ClientErrorKind::Http {
346 status: StatusCode::TOO_MANY_REQUESTS,
347 } => Self::Ratelimited,
348 ClientErrorKind::Transport => Self::Transport(
349 e.source_err()
350 .expect("transport error without source")
351 .to_smolstr(),
352 ),
353 _ => Self::Generic(e.into()),
354 }
355 }
356}
357
358impl From<miette::Report> for BackfillError {
359 fn from(e: miette::Report) -> Self {
360 Self::Generic(e)
361 }
362}
363
364impl From<ResolverError> for BackfillError {
365 fn from(e: ResolverError) -> Self {
366 match e {
367 ResolverError::Ratelimited => Self::Ratelimited,
368 ResolverError::Transport(s) => Self::Transport(s),
369 ResolverError::Generic(e) => Self::Generic(e),
370 }
371 }
372}
373
374async fn process_did<'i>(
375 app_state: &Arc<AppState>,
376 http: &reqwest::Client,
377 did: &Did<'static>,
378 verify_signatures: bool,
379 ephemeral: bool,
380) -> Result<Option<RepoState<'static>>, BackfillError> {
381 debug!(did = %did, "backfilling");
382
383 // always invalidate doc before backfilling
384 app_state.resolver.invalidate(did).await;
385
386 let db = &app_state.db;
387 let did_key = keys::repo_key(did);
388 let Some(state_bytes) = Db::get(db.repos.clone(), did_key).await? else {
389 return Err(BackfillError::Deleted);
390 };
391 let mut state: RepoState<'static> = rmp_serde::from_slice::<RepoState>(&state_bytes)
392 .into_diagnostic()?
393 .into_static();
394 let previous_state = state.clone();
395
396 // 1. resolve pds
397 let start = Instant::now();
398 let doc = app_state.resolver.resolve_doc(did).await?;
399 let pds = doc.pds.clone();
400 trace!(
401 did = %did,
402 pds = %doc.pds,
403 handle = ?doc.handle,
404 elapsed = ?start.elapsed(),
405 "resolved to pds"
406 );
407 state.update_from_doc(doc);
408
409 let emit_identity = |status: &RepoStatus| {
410 let evt = AccountEvt {
411 did: did.clone(),
412 active: !matches!(
413 status,
414 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended
415 ),
416 status: Some(
417 match status {
418 RepoStatus::Deactivated => "deactivated",
419 RepoStatus::Takendown => "takendown",
420 RepoStatus::Suspended => "suspended",
421 _ => "active",
422 }
423 .into(),
424 ),
425 };
426 let _ = app_state.db.event_tx.send(ops::make_account_event(db, evt));
427 };
428
429 // 2. fetch repo (car)
430 let start = Instant::now();
431 let req = GetRepo::new().did(did.clone()).build();
432 let resp = http.xrpc(pds).send(&req).await?;
433
434 let car_bytes = match resp.into_output() {
435 Ok(o) => o,
436 Err(XrpcError::Xrpc(e)) => {
437 if matches!(e, GetRepoError::RepoNotFound(_)) {
438 warn!(did = %did, "repo not found, deleting");
439 let mut batch = db::refcount::RefcountedBatch::new(db);
440 if let Err(e) = crate::ops::delete_repo(&mut batch, db, did, &state) {
441 tracing::error!(err = %e, "failed to wipe repo during backfill");
442 }
443 batch.commit().into_diagnostic()?;
444 return Ok(Some(previous_state)); // stop backfill
445 }
446
447 let inactive_status = match e {
448 GetRepoError::RepoDeactivated(_) => Some(RepoStatus::Deactivated),
449 GetRepoError::RepoTakendown(_) => Some(RepoStatus::Takendown),
450 GetRepoError::RepoSuspended(_) => Some(RepoStatus::Suspended),
451 _ => None,
452 };
453
454 if let Some(status) = inactive_status {
455 warn!(did = %did, ?status, "repo is inactive, stopping backfill");
456
457 emit_identity(&status);
458
459 let resync_state = ResyncState::Gone {
460 status: status.clone(),
461 };
462 let resync_bytes = rmp_serde::to_vec(&resync_state).into_diagnostic()?;
463
464 let app_state_clone = app_state.clone();
465 app_state
466 .db
467 .update_repo_state_async(did, move |state, (key, batch)| {
468 state.status = status;
469 batch.insert(&app_state_clone.db.resync, key, resync_bytes);
470 Ok((true, ()))
471 })
472 .await?;
473
474 // return success so wrapper stops retrying
475 return Ok(Some(previous_state));
476 }
477
478 Err(e).into_diagnostic()?
479 }
480 Err(e) => Err(e).into_diagnostic()?,
481 };
482
483 // emit identity event so any consumers know
484 emit_identity(&state.status);
485
486 trace!(
487 did = %did,
488 bytes = car_bytes.body.len(),
489 elapsed = ?start.elapsed(),
490 "fetched car bytes"
491 );
492
493 // 3. import repo
494 let start = Instant::now();
495 let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body)
496 .await
497 .into_diagnostic()?;
498 trace!(did = %did, elapsed = ?start.elapsed(), "parsed car");
499
500 let start = Instant::now();
501 let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks));
502 trace!(
503 did = %did,
504 blocks = store.len(),
505 elapsed = ?start.elapsed(),
506 "stored blocks in memory"
507 );
508
509 // 4. parse root commit to get mst root
510 let root_bytes = store
511 .get(&parsed.root)
512 .await
513 .into_diagnostic()?
514 .ok_or_else(|| miette::miette!("root block missing from CAR"))?;
515
516 let root_commit = jacquard_repo::commit::Commit::from_cbor(&root_bytes).into_diagnostic()?;
517 debug!(
518 did = %did,
519 rev = %root_commit.rev,
520 cid = %root_commit.data,
521 "backfilling repo at revision"
522 );
523
524 // 4.5. verify commit signature
525 if verify_signatures {
526 let pubkey = app_state.resolver.resolve_signing_key(did).await?;
527 root_commit
528 .verify(&pubkey)
529 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?;
530 trace!(did = %did, "signature verified");
531 }
532
533 // 5. walk mst
534 let start = Instant::now();
535 let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None);
536 let leaves = mst.leaves().await.into_diagnostic()?;
537 trace!(did = %did, elapsed = ?start.elapsed(), "walked mst");
538
539 // 6. insert records into db
540 let start = Instant::now();
541 let result = {
542 let app_state = app_state.clone();
543 let did = did.clone();
544 let rev = root_commit.rev;
545
546 tokio::task::spawn_blocking(move || {
547 let filter = app_state.filter.load();
548 let mut count = 0;
549 let mut delta = 0;
550 let mut added_blocks = 0;
551 let mut collection_counts: HashMap<SmolStr, u64> = HashMap::new();
552 let mut batch = db::refcount::RefcountedBatch::new(&app_state.db);
553 let store = mst.storage();
554
555 let prefix = keys::record_prefix_did(&did);
556 let mut existing_cids: HashMap<(SmolStr, DbRkey), SmolStr> = HashMap::new();
557
558 if !ephemeral {
559 for guard in app_state.db.records.prefix(&prefix) {
560 let (key, cid_bytes) = guard.into_inner().into_diagnostic()?;
561 // key is did|collection|rkey
562 // skip did|
563 let mut remaining = key[prefix.len()..].splitn(2, |b| keys::SEP.eq(b));
564 let collection_raw = remaining
565 .next()
566 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?;
567 let rkey_raw = remaining
568 .next()
569 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?;
570
571 let collection = std::str::from_utf8(collection_raw)
572 .map_err(|e| miette::miette!("invalid collection utf8: {e}"))?;
573
574 let rkey = keys::parse_rkey(rkey_raw)
575 .map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?;
576
577 let cid = cid::Cid::read_bytes(cid_bytes.as_ref())
578 .map_err(|e| miette::miette!("invalid cid '{cid_bytes:?}' for {did}: {e}"))?
579 .to_smolstr();
580
581 existing_cids.insert((collection.into(), rkey), cid);
582 }
583 }
584
585 let mut signal_seen = filter.mode == FilterMode::Full || filter.signals.is_empty();
586
587 debug!(
588 did = %did,
589 initial = signal_seen,
590 mode = ?filter.mode,
591 signals = ?filter.signals,
592 "backfilling signal check"
593 );
594
595 for (key, cid) in leaves {
596 let val_bytes = tokio::runtime::Handle::current()
597 .block_on(store.get(&cid))
598 .into_diagnostic()?;
599
600 if let Some(val) = val_bytes {
601 let (collection, rkey) = ops::parse_path(&key)?;
602
603 if !filter.matches_collection(collection) {
604 continue;
605 }
606
607 if !signal_seen && filter.matches_signal(collection) {
608 debug!(did = %did, collection = %collection, "signal matched");
609 signal_seen = true;
610 }
611
612 let rkey = DbRkey::new(rkey);
613 let path = (collection.to_smolstr(), rkey.clone());
614 let cid_obj = Cid::ipld(cid);
615
616 // check if this record already exists with same CID
617 let existing_cid = existing_cids.remove(&path);
618 let action = if let Some(existing_cid) = &existing_cid {
619 if existing_cid == cid_obj.as_str() {
620 trace!(did = %did, collection = %collection, rkey = %rkey, cid = %cid, "skip unchanged record");
621 continue; // skip unchanged record
622 }
623 DbAction::Update
624 } else {
625 DbAction::Create
626 };
627 trace!(did = %did, collection = %collection, rkey = %rkey, cid = %cid, ?action, "action record");
628
629 // key is did|collection|rkey
630 let db_key = keys::record_key(&did, collection, &rkey);
631
632 batch.batch_mut().insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref());
633 if !ephemeral {
634 batch.batch_mut().insert(&app_state.db.records, db_key, cid.to_bytes());
635 }
636
637 added_blocks += 1;
638 if action == DbAction::Create {
639 delta += 1;
640 *collection_counts.entry(path.0.clone()).or_default() += 1;
641 }
642
643 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst);
644 let evt = StoredEvent {
645 live: false,
646 did: TrimmedDid::from(&did),
647 rev: DbTid::from(&rev),
648 collection: CowStr::Borrowed(collection),
649 rkey,
650 action,
651 cid: Some(cid_obj.to_ipld().expect("valid cid")),
652 };
653 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
654 batch.batch_mut().insert(&app_state.db.events, keys::event_key(event_id), bytes);
655
656 // update block refcount
657 let cid_bytes = Slice::from(cid.to_bytes());
658 // if ephemeral, we only care about events, so its 1
659 // if not, then its 2 since we also insert to records
660 batch.update_block_refcount(cid_bytes, ephemeral.then_some(1).unwrap_or(2))?;
661 // for Update, also decrement old CID refcount
662 // event will still be there, so we only decrement for records
663 // which means only if not ephemeral
664 if !ephemeral && action == DbAction::Update {
665 let existing_cid = existing_cid.expect("that cid exists since this is Update");
666 if existing_cid != cid_obj.as_str() {
667 let old_cid_bytes = Slice::from(
668 cid::Cid::from_str(&existing_cid)
669 .expect("valid cid from existing_cids")
670 .to_bytes()
671 );
672 batch.update_block_refcount(old_cid_bytes, -1)?;
673 }
674 }
675
676 count += 1;
677 }
678 }
679
680 // remove any remaining existing records (they weren't in the new MST)
681 for ((collection, rkey), cid) in existing_cids {
682 trace!(did = %did, collection = %collection, rkey = %rkey, cid = %cid, "remove existing record");
683
684 // we dont have to put if ephemeral around here since
685 // existing_cids will be empty anyyway
686 batch.batch_mut().remove(
687 &app_state.db.records,
688 keys::record_key(&did, &collection, &rkey),
689 );
690
691 // decrement block refcount
692 let cid_bytes = Slice::from(
693 cid::Cid::from_str(&cid)
694 .expect("valid cid from existing_cids")
695 .to_bytes()
696 );
697 batch.update_block_refcount(cid_bytes, -1)?;
698
699 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst);
700 let evt = StoredEvent {
701 live: false,
702 did: TrimmedDid::from(&did),
703 rev: DbTid::from(&rev),
704 collection: CowStr::Borrowed(&collection),
705 rkey,
706 action: DbAction::Delete,
707 cid: None,
708 };
709 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
710 batch.batch_mut().insert(&app_state.db.events, keys::event_key(event_id), bytes);
711
712 delta -= 1;
713 count += 1;
714 }
715
716 if !signal_seen {
717 debug!(did = %did, "no signal-matching records found, discarding repo");
718 return Ok::<_, miette::Report>(None);
719 }
720
721 // 6. update data, status is updated in worker shard
722 state.tracked = true;
723 state.rev = Some((&rev).into());
724 state.data = Some(root_commit.data);
725 state.last_updated_at = chrono::Utc::now().timestamp();
726
727 batch.batch_mut().insert(
728 &app_state.db.repos,
729 keys::repo_key(&did),
730 ser_repo_state(&state)?,
731 );
732
733 // add the counts
734 if !ephemeral {
735 for (col, cnt) in collection_counts {
736 db::set_record_count(batch.batch_mut(), &app_state.db, &did, &col, cnt);
737 }
738 }
739
740 batch.commit().into_diagnostic()?;
741
742 Ok::<_, miette::Report>(Some((state, delta, added_blocks, count)))
743 })
744 .await
745 .into_diagnostic()??
746 };
747
748 let Some((_state, records_cnt_delta, added_blocks, count)) = result else {
749 // signal mode: no signal-matching records found — clean up the optimistically-added repo
750 let did_key = keys::repo_key(did);
751 let backfill_pending_key = keys::pending_key(previous_state.index_id);
752 let app_state = app_state.clone();
753 tokio::task::spawn_blocking(move || {
754 let mut batch = app_state.db.inner.batch();
755 batch.remove(&app_state.db.repos, &did_key);
756 batch.remove(&app_state.db.pending, backfill_pending_key);
757 batch.commit().into_diagnostic()
758 })
759 .await
760 .into_diagnostic()??;
761 return Ok(None);
762 };
763
764 trace!(did = %did, ops = count, elapsed = ?start.elapsed(), "did ops");
765
766 // do the counts
767 if records_cnt_delta > 0 {
768 app_state
769 .db
770 .update_count_async("records", records_cnt_delta)
771 .await;
772 }
773 if added_blocks > 0 {
774 app_state
775 .db
776 .update_count_async("blocks", added_blocks)
777 .await;
778 }
779 trace!(
780 did = %did,
781 elapsed = ?start.elapsed(),
782 "committed backfill batch"
783 );
784
785 let _ = db.event_tx.send(BroadcastEvent::Persisted(
786 db.next_event_id.load(Ordering::SeqCst) - 1,
787 ));
788
789 trace!(did = %did, "backfill complete");
790 Ok(Some(previous_state))
791}