kind of like tap but different and in rust
at main 737 lines 26 kB view raw
1use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 2use crate::db::{self, Db, keys, ser_repo_state}; 3use crate::filter::FilterMode; 4use crate::ops; 5use crate::resolver::ResolverError; 6use crate::state::AppState; 7use crate::types::{ 8 AccountEvt, BroadcastEvent, GaugeState, RepoState, RepoStatus, ResyncState, StoredEvent, 9}; 10 11use fjall::Slice; 12use jacquard::api::com_atproto::sync::get_repo::{GetRepo, GetRepoError}; 13use jacquard::error::{ClientError, ClientErrorKind}; 14use jacquard::types::cid::Cid; 15use jacquard::types::did::Did; 16use jacquard::{CowStr, IntoStatic, prelude::*}; 17use jacquard_common::xrpc::XrpcError; 18use jacquard_repo::mst::Mst; 19use jacquard_repo::{BlockStore, MemoryBlockStore}; 20use miette::{Diagnostic, IntoDiagnostic, Result}; 21use reqwest::StatusCode; 22use smol_str::{SmolStr, ToSmolStr}; 23use std::collections::HashMap; 24use std::sync::Arc; 25use std::sync::atomic::Ordering; 26use std::time::{Duration, Instant}; 27use thiserror::Error; 28use tokio::sync::Semaphore; 29use tracing::{debug, error, info, trace, warn}; 30 31pub mod manager; 32 33use crate::ingest::{BufferTx, IngestMessage}; 34 35pub struct BackfillWorker { 36 state: Arc<AppState>, 37 buffer_tx: BufferTx, 38 http: reqwest::Client, 39 semaphore: Arc<Semaphore>, 40 verify_signatures: bool, 41 in_flight: Arc<scc::HashSet<Did<'static>>>, 42} 43 44impl BackfillWorker { 45 pub fn new( 46 state: Arc<AppState>, 47 buffer_tx: BufferTx, 48 timeout: Duration, 49 concurrency_limit: usize, 50 verify_signatures: bool, 51 ) -> Self { 52 Self { 53 state, 54 buffer_tx, 55 http: reqwest::Client::builder() 56 .timeout(timeout) 57 .zstd(true) 58 .brotli(true) 59 .gzip(true) 60 .build() 61 .expect("failed to build http client"), 62 semaphore: Arc::new(Semaphore::new(concurrency_limit)), 63 verify_signatures, 64 in_flight: Arc::new(scc::HashSet::new()), 65 } 66 } 67} 68 69struct InFlightGuard { 70 did: Did<'static>, 71 set: Arc<scc::HashSet<Did<'static>>>, 72} 73 74impl Drop for InFlightGuard { 75 fn drop(&mut self) { 76 let _ = self.set.remove_sync(&self.did); 77 } 78} 79 80impl BackfillWorker { 81 pub async fn run(self) { 82 info!("backfill worker started"); 83 84 loop { 85 let mut spawned = 0; 86 87 for guard in self.state.db.pending.iter() { 88 let (key, value) = match guard.into_inner() { 89 Ok(kv) => kv, 90 Err(e) => { 91 error!("failed to read pending entry: {e}"); 92 db::check_poisoned(&e); 93 continue; 94 } 95 }; 96 97 let did = match TrimmedDid::try_from(value.as_ref()) { 98 Ok(d) => d.to_did(), 99 Err(e) => { 100 error!("invalid did in pending value: {e}"); 101 continue; 102 } 103 }; 104 105 if self.in_flight.contains_sync(&did) { 106 continue; 107 } 108 let _ = self.in_flight.insert_sync(did.clone().into_static()); 109 110 let permit = match self.semaphore.clone().try_acquire_owned() { 111 Ok(p) => p, 112 Err(_) => break, 113 }; 114 115 let guard = InFlightGuard { 116 did: did.clone().into_static(), 117 set: self.in_flight.clone(), 118 }; 119 120 let state = self.state.clone(); 121 let http = self.http.clone(); 122 let did = did.clone(); 123 let buffer_tx = self.buffer_tx.clone(); 124 let verify = self.verify_signatures; 125 126 tokio::spawn(async move { 127 let _guard = guard; 128 let res = did_task(&state, http, buffer_tx, &did, key, permit, verify).await; 129 130 if let Err(e) = res { 131 error!("backfill process failed for {did}: {e}"); 132 if let BackfillError::Generic(report) = &e { 133 db::check_poisoned_report(report); 134 } 135 } 136 137 // wake worker to pick up more (in case we were sleeping at limit) 138 state.backfill_notify.notify_one(); 139 }); 140 141 spawned += 1; 142 } 143 144 if spawned == 0 { 145 // wait for new tasks 146 self.state.backfill_notify.notified().await; 147 } else { 148 // if we spawned tasks, yield briefly to let them start and avoid tight loop 149 tokio::time::sleep(Duration::from_millis(10)).await; 150 } 151 } 152 } 153} 154 155async fn did_task( 156 state: &Arc<AppState>, 157 http: reqwest::Client, 158 buffer_tx: BufferTx, 159 did: &Did<'static>, 160 pending_key: Slice, 161 _permit: tokio::sync::OwnedSemaphorePermit, 162 verify_signatures: bool, 163) -> Result<(), BackfillError> { 164 let db = &state.db; 165 166 match process_did(&state, &http, &did, verify_signatures).await { 167 Ok(Some(previous_state)) => { 168 let did_key = keys::repo_key(&did); 169 170 // determine old gauge state 171 // if it was error/suspended etc, we need to know which error kind it was to decrement correctly. 172 // we have to peek at the resync state. `previous_state` is the repo state, which tells us the Status. 173 let old_gauge = match previous_state.status { 174 RepoStatus::Backfilling => GaugeState::Pending, 175 RepoStatus::Error(_) 176 | RepoStatus::Deactivated 177 | RepoStatus::Takendown 178 | RepoStatus::Suspended => { 179 // we need to fetch the resync state to know the kind 180 // if it's missing, we assume Generic (or handle gracefully) 181 // this is an extra read, but necessary for accurate gauges. 182 let resync_state = Db::get(db.resync.clone(), &did_key).await.ok().flatten(); 183 let kind = resync_state.and_then(|b| { 184 rmp_serde::from_slice::<ResyncState>(&b) 185 .ok() 186 .and_then(|s| match s { 187 ResyncState::Error { kind, .. } => Some(kind), 188 _ => None, 189 }) 190 }); 191 GaugeState::Resync(kind) 192 } 193 RepoStatus::Synced => GaugeState::Synced, 194 }; 195 196 let mut batch = db.inner.batch(); 197 // remove from pending 198 if old_gauge == GaugeState::Pending { 199 batch.remove(&db.pending, pending_key); 200 } 201 // remove from resync 202 if old_gauge.is_resync() { 203 batch.remove(&db.resync, &did_key); 204 } 205 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 206 .await 207 .into_diagnostic()??; 208 209 state 210 .db 211 .update_gauge_diff_async(&old_gauge, &GaugeState::Synced) 212 .await; 213 214 let state = state.clone(); 215 tokio::task::spawn_blocking(move || { 216 state 217 .db 218 .inner 219 .persist(fjall::PersistMode::Buffer) 220 .into_diagnostic() 221 }) 222 .await 223 .into_diagnostic()??; 224 225 // Notify completion to worker shard 226 if let Err(e) = buffer_tx.send(IngestMessage::BackfillFinished(did.clone())) { 227 error!("failed to send BackfillFinished for {did}: {e}"); 228 } 229 Ok(()) 230 } 231 Ok(None) => { 232 // signal mode: repo had no matching records, was cleaned up by process_did 233 state.db.update_count_async("repos", -1).await; 234 state.db.update_count_async("pending", -1).await; 235 Ok(()) 236 } 237 Err(e) => { 238 match &e { 239 BackfillError::Ratelimited => { 240 debug!("failed for {did}: too many requests"); 241 } 242 BackfillError::Transport(reason) => { 243 error!("failed for {did}: transport error: {reason}"); 244 } 245 BackfillError::Generic(e) => { 246 error!("failed for {did}: {e}"); 247 } 248 } 249 250 let error_kind = match &e { 251 BackfillError::Ratelimited => crate::types::ResyncErrorKind::Ratelimited, 252 BackfillError::Transport(_) => crate::types::ResyncErrorKind::Transport, 253 BackfillError::Generic(_) => crate::types::ResyncErrorKind::Generic, 254 }; 255 256 let did_key = keys::repo_key(&did); 257 258 // 1. get current retry count 259 let existing_state = Db::get(db.resync.clone(), &did_key).await.and_then(|b| { 260 b.map(|b| rmp_serde::from_slice::<ResyncState>(&b).into_diagnostic()) 261 .transpose() 262 })?; 263 264 let (mut retry_count, prev_kind) = match existing_state { 265 Some(ResyncState::Error { 266 kind, retry_count, .. 267 }) => (retry_count, Some(kind)), 268 Some(ResyncState::Gone { .. }) => return Ok(()), // should handle gone? original code didn't really? 269 None => (0, None), 270 }; 271 272 // Calculate new stats 273 retry_count += 1; 274 let next_retry = ResyncState::next_backoff(retry_count); 275 276 let resync_state = ResyncState::Error { 277 kind: error_kind.clone(), 278 retry_count, 279 next_retry, 280 }; 281 282 let error_string = e.to_string(); 283 284 tokio::task::spawn_blocking({ 285 let state = state.clone(); 286 let did_key = did_key.into_static(); 287 move || { 288 // 3. save to resync 289 let serialized_resync_state = 290 rmp_serde::to_vec(&resync_state).into_diagnostic()?; 291 292 // 4. and update the main repo state 293 let serialized_repo_state = if let Some(state_bytes) = 294 state.db.repos.get(&did_key).into_diagnostic()? 295 { 296 let mut state: RepoState = 297 rmp_serde::from_slice(&state_bytes).into_diagnostic()?; 298 state.status = RepoStatus::Error(error_string.into()); 299 Some(rmp_serde::to_vec(&state).into_diagnostic()?) 300 } else { 301 None 302 }; 303 304 let mut batch = state.db.inner.batch(); 305 batch.insert(&state.db.resync, &did_key, serialized_resync_state); 306 batch.remove(&state.db.pending, pending_key.clone()); 307 if let Some(state_bytes) = serialized_repo_state { 308 batch.insert(&state.db.repos, &did_key, state_bytes); 309 } 310 batch.commit().into_diagnostic() 311 } 312 }) 313 .await 314 .into_diagnostic()??; 315 316 let old_gauge = if let Some(k) = prev_kind { 317 GaugeState::Resync(Some(k)) 318 } else { 319 GaugeState::Pending 320 }; 321 322 let new_gauge = GaugeState::Resync(Some(error_kind)); 323 324 state 325 .db 326 .update_gauge_diff_async(&old_gauge, &new_gauge) 327 .await; 328 329 Err(e) 330 } 331 } 332} 333 334#[derive(Debug, Diagnostic, Error)] 335enum BackfillError { 336 #[error("{0}")] 337 Generic(miette::Report), 338 #[error("too many requests")] 339 Ratelimited, 340 #[error("transport error: {0}")] 341 Transport(SmolStr), 342} 343 344impl From<ClientError> for BackfillError { 345 fn from(e: ClientError) -> Self { 346 match e.kind() { 347 ClientErrorKind::Http { 348 status: StatusCode::TOO_MANY_REQUESTS, 349 } => Self::Ratelimited, 350 ClientErrorKind::Transport => Self::Transport( 351 e.source_err() 352 .expect("transport error without source") 353 .to_smolstr(), 354 ), 355 _ => Self::Generic(e.into()), 356 } 357 } 358} 359 360impl From<miette::Report> for BackfillError { 361 fn from(e: miette::Report) -> Self { 362 Self::Generic(e) 363 } 364} 365 366impl From<ResolverError> for BackfillError { 367 fn from(e: ResolverError) -> Self { 368 match e { 369 ResolverError::Ratelimited => Self::Ratelimited, 370 ResolverError::Transport(s) => Self::Transport(s), 371 ResolverError::Generic(e) => Self::Generic(e), 372 } 373 } 374} 375 376async fn process_did<'i>( 377 app_state: &Arc<AppState>, 378 http: &reqwest::Client, 379 did: &Did<'static>, 380 verify_signatures: bool, 381) -> Result<Option<RepoState<'static>>, BackfillError> { 382 debug!("backfilling {}", did); 383 384 let db = &app_state.db; 385 let did_key = keys::repo_key(did); 386 let state_bytes = Db::get(db.repos.clone(), did_key) 387 .await? 388 .ok_or_else(|| miette::miette!("!!!THIS IS A BUG!!! repo state for {did} missing"))?; 389 let mut state: RepoState<'static> = rmp_serde::from_slice::<RepoState>(&state_bytes) 390 .into_diagnostic()? 391 .into_static(); 392 let previous_state = state.clone(); 393 394 // 1. resolve pds 395 let start = Instant::now(); 396 let (pds_url, handle) = app_state.resolver.resolve_identity_info(did).await?; 397 trace!( 398 "resolved {did} to pds {pds_url} handle {handle:?} in {:?}", 399 start.elapsed() 400 ); 401 402 state.handle = handle.map(|h| h.into_static()); 403 404 let emit_identity = |status: &RepoStatus| { 405 let evt = AccountEvt { 406 did: did.clone(), 407 active: !matches!( 408 status, 409 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended 410 ), 411 status: Some( 412 match status { 413 RepoStatus::Deactivated => "deactivated", 414 RepoStatus::Takendown => "takendown", 415 RepoStatus::Suspended => "suspended", 416 _ => "active", 417 } 418 .into(), 419 ), 420 }; 421 let _ = app_state.db.event_tx.send(ops::make_account_event(db, evt)); 422 }; 423 424 // 2. fetch repo (car) 425 let start = Instant::now(); 426 let req = GetRepo::new().did(did.clone()).build(); 427 let resp = http.xrpc(pds_url).send(&req).await?; 428 429 let car_bytes = match resp.into_output() { 430 Ok(o) => o, 431 Err(XrpcError::Xrpc(e)) => { 432 if matches!(e, GetRepoError::RepoNotFound(_)) { 433 warn!("repo {did} not found, deleting"); 434 let mut batch = db.inner.batch(); 435 ops::delete_repo(&mut batch, db, did, state)?; 436 batch.commit().into_diagnostic()?; 437 return Ok(Some(previous_state)); // stop backfill 438 } 439 440 let inactive_status = match e { 441 GetRepoError::RepoDeactivated(_) => Some(RepoStatus::Deactivated), 442 GetRepoError::RepoTakendown(_) => Some(RepoStatus::Takendown), 443 GetRepoError::RepoSuspended(_) => Some(RepoStatus::Suspended), 444 _ => None, 445 }; 446 447 if let Some(status) = inactive_status { 448 warn!("repo {did} is {status:?}, stopping backfill"); 449 450 emit_identity(&status); 451 452 let resync_state = ResyncState::Gone { 453 status: status.clone(), 454 }; 455 let resync_bytes = rmp_serde::to_vec(&resync_state).into_diagnostic()?; 456 457 let app_state_clone = app_state.clone(); 458 app_state 459 .db 460 .update_repo_state_async(did, move |state, (key, batch)| { 461 state.status = status; 462 batch.insert(&app_state_clone.db.resync, key, resync_bytes); 463 Ok((true, ())) 464 }) 465 .await?; 466 467 // return success so wrapper stops retrying 468 return Ok(Some(previous_state)); 469 } 470 471 Err(e).into_diagnostic()? 472 } 473 Err(e) => Err(e).into_diagnostic()?, 474 }; 475 476 // emit identity event so any consumers know 477 emit_identity(&state.status); 478 479 trace!( 480 "fetched {} bytes for {did} in {:?}", 481 car_bytes.body.len(), 482 start.elapsed() 483 ); 484 485 // 3. import repo 486 let start = Instant::now(); 487 let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body) 488 .await 489 .into_diagnostic()?; 490 trace!("parsed car for {did} in {:?}", start.elapsed()); 491 492 let start = Instant::now(); 493 let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks)); 494 trace!( 495 "stored {} blocks in memory for {did} in {:?}", 496 store.len(), 497 start.elapsed() 498 ); 499 500 // 4. parse root commit to get mst root 501 let root_bytes = store 502 .get(&parsed.root) 503 .await 504 .into_diagnostic()? 505 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 506 507 let root_commit = jacquard_repo::commit::Commit::from_cbor(&root_bytes).into_diagnostic()?; 508 debug!( 509 "backfilling repo at revision {}, root cid {}", 510 root_commit.rev, root_commit.data 511 ); 512 513 // 4.5. verify commit signature 514 if verify_signatures { 515 let pubkey = app_state.resolver.resolve_signing_key(did).await?; 516 root_commit 517 .verify(&pubkey) 518 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 519 trace!("signature verified for {did}"); 520 } 521 522 // 5. walk mst 523 let start = Instant::now(); 524 let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None); 525 let leaves = mst.leaves().await.into_diagnostic()?; 526 trace!("walked mst for {did} in {}", start.elapsed().as_secs_f64()); 527 528 // 6. insert records into db 529 let start = Instant::now(); 530 let result = { 531 let app_state = app_state.clone(); 532 let did = did.clone(); 533 let rev = root_commit.rev; 534 535 tokio::task::spawn_blocking(move || { 536 let filter = app_state.filter.load(); 537 let mut count = 0; 538 let mut delta = 0; 539 let mut added_blocks = 0; 540 let mut collection_counts: HashMap<SmolStr, u64> = HashMap::new(); 541 let mut batch = app_state.db.inner.batch(); 542 let store = mst.storage(); 543 544 let prefix = keys::record_prefix_did(&did); 545 let mut existing_cids: HashMap<(SmolStr, DbRkey), SmolStr> = HashMap::new(); 546 547 for guard in app_state.db.records.prefix(&prefix) { 548 let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 549 // key is did|collection|rkey 550 // skip did| 551 let mut remaining = key[prefix.len()..].splitn(2, |b| keys::SEP.eq(b)); 552 let collection_raw = remaining 553 .next() 554 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?; 555 let rkey_raw = remaining 556 .next() 557 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?; 558 559 let collection = std::str::from_utf8(collection_raw) 560 .map_err(|e| miette::miette!("invalid collection utf8: {e}"))?; 561 562 let rkey = keys::parse_rkey(rkey_raw) 563 .map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?; 564 565 let cid = cid::Cid::read_bytes(cid_bytes.as_ref()) 566 .map_err(|e| miette::miette!("invalid cid '{cid_bytes:?}' for {did}: {e}"))? 567 .to_smolstr(); 568 569 existing_cids.insert((collection.into(), rkey), cid); 570 } 571 572 let mut signal_seen = filter.mode != FilterMode::Signal; 573 debug!( 574 "backfilling {did}: signal_seen initial={signal_seen}, mode={:?}, signals={:?}", 575 filter.mode, filter.signals 576 ); 577 578 for (key, cid) in leaves { 579 let val_bytes = tokio::runtime::Handle::current() 580 .block_on(store.get(&cid)) 581 .into_diagnostic()?; 582 583 if let Some(val) = val_bytes { 584 let (collection, rkey) = ops::parse_path(&key)?; 585 586 if !filter.matches_collection(collection) { 587 continue; 588 } 589 590 if !signal_seen && filter.matches_signal(collection) { 591 debug!("backfill {did}: signal matched on {collection}"); 592 signal_seen = true; 593 } 594 595 let rkey = DbRkey::new(rkey); 596 let path = (collection.to_smolstr(), rkey.clone()); 597 let cid_obj = Cid::ipld(cid); 598 599 // check if this record already exists with same CID 600 let (action, is_new) = if let Some(existing_cid) = existing_cids.remove(&path) { 601 if existing_cid == cid_obj.as_str() { 602 trace!("skip {did}/{collection}/{rkey} ({cid})"); 603 continue; // skip unchanged record 604 } 605 (DbAction::Update, false) 606 } else { 607 (DbAction::Create, true) 608 }; 609 trace!("{action} {did}/{collection}/{rkey} ({cid})"); 610 611 // key is did|collection|rkey 612 let db_key = keys::record_key(&did, collection, &rkey); 613 614 batch.insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref()); 615 batch.insert(&app_state.db.records, db_key, cid.to_bytes()); 616 617 added_blocks += 1; 618 if is_new { 619 delta += 1; 620 *collection_counts.entry(path.0.clone()).or_default() += 1; 621 } 622 623 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 624 let evt = StoredEvent { 625 live: false, 626 did: TrimmedDid::from(&did), 627 rev: DbTid::from(&rev), 628 collection: CowStr::Borrowed(collection), 629 rkey, 630 action, 631 cid: Some(cid_obj.to_ipld().expect("valid cid")), 632 }; 633 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 634 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 635 636 count += 1; 637 } 638 } 639 640 // remove any remaining existing records (they weren't in the new MST) 641 for ((collection, rkey), cid) in existing_cids { 642 trace!("remove {did}/{collection}/{rkey} ({cid})"); 643 644 batch.remove( 645 &app_state.db.records, 646 keys::record_key(&did, &collection, &rkey), 647 ); 648 649 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 650 let evt = StoredEvent { 651 live: false, 652 did: TrimmedDid::from(&did), 653 rev: DbTid::from(&rev), 654 collection: CowStr::Borrowed(&collection), 655 rkey, 656 action: DbAction::Delete, 657 cid: None, 658 }; 659 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 660 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 661 662 delta -= 1; 663 count += 1; 664 } 665 666 if !signal_seen { 667 debug!("backfill {did}: no signal-matching records found, discarding repo"); 668 return Ok::<_, miette::Report>(None); 669 } 670 671 // 6. update data, status is updated in worker shard 672 state.rev = Some((&rev).into()); 673 state.data = Some(root_commit.data); 674 state.last_updated_at = chrono::Utc::now().timestamp(); 675 676 batch.insert( 677 &app_state.db.repos, 678 keys::repo_key(&did), 679 ser_repo_state(&state)?, 680 ); 681 682 // add the counts 683 for (col, cnt) in collection_counts { 684 db::set_record_count(&mut batch, &app_state.db, &did, &col, cnt); 685 } 686 687 batch.commit().into_diagnostic()?; 688 689 Ok::<_, miette::Report>(Some((state, delta, added_blocks, count))) 690 }) 691 .await 692 .into_diagnostic()?? 693 }; 694 695 let Some((_state, records_cnt_delta, added_blocks, count)) = result else { 696 // signal mode: no signal-matching records found — clean up the optimistically-added repo 697 let did_key = keys::repo_key(did); 698 let backfill_pending_key = keys::pending_key(previous_state.index_id); 699 let repos_ks = app_state.db.repos.clone(); 700 let pending_ks = app_state.db.pending.clone(); 701 let db_inner = app_state.db.inner.clone(); 702 tokio::task::spawn_blocking(move || { 703 let mut batch = db_inner.batch(); 704 batch.remove(&repos_ks, &did_key); 705 batch.remove(&pending_ks, backfill_pending_key); 706 batch.commit().into_diagnostic() 707 }) 708 .await 709 .into_diagnostic()??; 710 return Ok(None); 711 }; 712 713 trace!("did {count} ops for {did} in {:?}", start.elapsed()); 714 715 // do the counts 716 if records_cnt_delta > 0 { 717 app_state 718 .db 719 .update_count_async("records", records_cnt_delta) 720 .await; 721 app_state 722 .db 723 .update_count_async("blocks", added_blocks) 724 .await; 725 } 726 trace!( 727 "committed backfill batch for {did} in {:?}", 728 start.elapsed() 729 ); 730 731 let _ = db.event_tx.send(BroadcastEvent::Persisted( 732 db.next_event_id.load(Ordering::SeqCst) - 1, 733 )); 734 735 trace!("backfill complete for {did}"); 736 Ok(Some(previous_state)) 737}