at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 791 lines 29 kB view raw
1use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 2use crate::db::{self, Db, keys, ser_repo_state}; 3use crate::filter::FilterMode; 4use crate::ops; 5use crate::resolver::ResolverError; 6use crate::state::AppState; 7use crate::types::{ 8 AccountEvt, BroadcastEvent, GaugeState, RepoState, RepoStatus, ResyncErrorKind, ResyncState, 9 StoredEvent, 10}; 11 12use fjall::Slice; 13use jacquard_api::com_atproto::sync::get_repo::{GetRepo, GetRepoError}; 14use jacquard_common::error::{ClientError, ClientErrorKind}; 15use jacquard_common::types::cid::Cid; 16use jacquard_common::types::did::Did; 17use jacquard_common::xrpc::{XrpcError, XrpcExt}; 18use jacquard_common::{CowStr, IntoStatic}; 19use jacquard_repo::mst::Mst; 20use jacquard_repo::{BlockStore, MemoryBlockStore}; 21use miette::{Diagnostic, IntoDiagnostic, Result}; 22use reqwest::StatusCode; 23use smol_str::{SmolStr, ToSmolStr}; 24use std::collections::HashMap; 25use std::str::FromStr; 26use std::sync::Arc; 27use std::sync::atomic::Ordering; 28use std::time::{Duration, Instant}; 29use thiserror::Error; 30use tokio::sync::Semaphore; 31use tracing::{debug, error, info, trace, warn}; 32 33pub mod manager; 34 35use crate::ingest::{BufferTx, IngestMessage}; 36 37pub struct BackfillWorker { 38 state: Arc<AppState>, 39 buffer_tx: BufferTx, 40 http: reqwest::Client, 41 semaphore: Arc<Semaphore>, 42 verify_signatures: bool, 43 ephemeral: bool, 44 in_flight: Arc<scc::HashSet<Did<'static>>>, 45} 46 47impl BackfillWorker { 48 pub fn new( 49 state: Arc<AppState>, 50 buffer_tx: BufferTx, 51 timeout: Duration, 52 concurrency_limit: usize, 53 verify_signatures: bool, 54 ephemeral: bool, 55 ) -> Self { 56 Self { 57 state, 58 buffer_tx, 59 http: reqwest::Client::builder() 60 .timeout(timeout) 61 .zstd(true) 62 .brotli(true) 63 .gzip(true) 64 .build() 65 .expect("failed to build http client"), 66 semaphore: Arc::new(Semaphore::new(concurrency_limit)), 67 verify_signatures, 68 ephemeral, 69 in_flight: Arc::new(scc::HashSet::new()), 70 } 71 } 72} 73 74struct InFlightGuard { 75 did: Did<'static>, 76 set: Arc<scc::HashSet<Did<'static>>>, 77} 78 79impl Drop for InFlightGuard { 80 fn drop(&mut self) { 81 let _ = self.set.remove_sync(&self.did); 82 } 83} 84 85impl BackfillWorker { 86 pub async fn run(self) { 87 info!("backfill worker started"); 88 89 loop { 90 let mut spawned = 0; 91 92 for guard in self.state.db.pending.iter() { 93 let (key, value) = match guard.into_inner() { 94 Ok(kv) => kv, 95 Err(e) => { 96 error!(err = %e, "failed to read pending entry"); 97 db::check_poisoned(&e); 98 continue; 99 } 100 }; 101 102 let did = match TrimmedDid::try_from(value.as_ref()) { 103 Ok(d) => d.to_did(), 104 Err(e) => { 105 error!(err = %e, "invalid did in pending value"); 106 continue; 107 } 108 }; 109 110 if self.in_flight.contains_sync(&did) { 111 continue; 112 } 113 let _ = self.in_flight.insert_sync(did.clone().into_static()); 114 115 let permit = match self.semaphore.clone().try_acquire_owned() { 116 Ok(p) => p, 117 Err(_) => break, 118 }; 119 120 let guard = InFlightGuard { 121 did: did.clone().into_static(), 122 set: self.in_flight.clone(), 123 }; 124 125 let state = self.state.clone(); 126 let http = self.http.clone(); 127 let did = did.clone(); 128 let buffer_tx = self.buffer_tx.clone(); 129 let verify = self.verify_signatures; 130 let ephemeral = self.ephemeral; 131 132 tokio::spawn(async move { 133 let _guard = guard; 134 let res = did_task( 135 &state, http, buffer_tx, &did, key, permit, verify, ephemeral, 136 ) 137 .await; 138 139 if let Err(e) = res { 140 error!(did = %did, err = %e, "backfill process failed"); 141 if let BackfillError::Generic(report) = &e { 142 db::check_poisoned_report(report); 143 } 144 } 145 146 // wake worker to pick up more (in case we were sleeping at limit) 147 state.backfill_notify.notify_one(); 148 }); 149 150 spawned += 1; 151 } 152 153 if spawned == 0 { 154 // wait for new tasks 155 self.state.backfill_notify.notified().await; 156 } else { 157 // if we spawned tasks, yield briefly to let them start and avoid tight loop 158 tokio::time::sleep(Duration::from_millis(10)).await; 159 } 160 } 161 } 162} 163 164async fn did_task( 165 state: &Arc<AppState>, 166 http: reqwest::Client, 167 buffer_tx: BufferTx, 168 did: &Did<'static>, 169 pending_key: Slice, 170 _permit: tokio::sync::OwnedSemaphorePermit, 171 verify_signatures: bool, 172 ephemeral: bool, 173) -> Result<(), BackfillError> { 174 let db = &state.db; 175 176 match process_did(&state, &http, &did, verify_signatures, ephemeral).await { 177 Ok(Some(repo_state)) => { 178 let did_key = keys::repo_key(&did); 179 180 // determine old gauge state 181 // if it was error/suspended etc, we need to know which error kind it was to decrement correctly. 182 // we have to peek at the resync state. 183 let old_gauge = state.db.repo_gauge_state_async(&repo_state, &did_key).await; 184 185 let mut batch = db.inner.batch(); 186 // remove from pending 187 if old_gauge == GaugeState::Pending { 188 batch.remove(&db.pending, pending_key); 189 } 190 // remove from resync 191 if old_gauge.is_resync() { 192 batch.remove(&db.resync, &did_key); 193 } 194 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 195 .await 196 .into_diagnostic()??; 197 198 state 199 .db 200 .update_gauge_diff_async(&old_gauge, &GaugeState::Synced) 201 .await; 202 203 let state = state.clone(); 204 tokio::task::spawn_blocking(move || { 205 state 206 .db 207 .inner 208 .persist(fjall::PersistMode::Buffer) 209 .into_diagnostic() 210 }) 211 .await 212 .into_diagnostic()??; 213 214 // Notify completion to worker shard 215 if let Err(e) = buffer_tx.send(IngestMessage::BackfillFinished(did.clone())) { 216 error!(did = %did, err = %e, "failed to send BackfillFinished"); 217 } 218 Ok(()) 219 } 220 Ok(None) => { 221 // signal mode: repo had no matching records, was cleaned up by process_did 222 state.db.update_count_async("repos", -1).await; 223 state.db.update_count_async("pending", -1).await; 224 Ok(()) 225 } 226 Err(BackfillError::Deleted) => { 227 warn!(did = %did, "orphaned pending entry, cleaning up"); 228 // orphaned pending entry, clean it up 229 Db::remove(db.pending.clone(), pending_key).await?; 230 state.db.update_count_async("pending", -1).await; 231 Ok(()) 232 } 233 Err(e) => { 234 match &e { 235 BackfillError::Ratelimited => { 236 debug!(did = %did, "too many requests"); 237 } 238 BackfillError::Transport(reason) => { 239 error!(did = %did, %reason, "transport error"); 240 } 241 BackfillError::Generic(e) => { 242 error!(did = %did, err = %e, "failed"); 243 } 244 BackfillError::Deleted => unreachable!("already handled"), 245 } 246 247 let error_kind = match &e { 248 BackfillError::Ratelimited => ResyncErrorKind::Ratelimited, 249 BackfillError::Transport(_) => ResyncErrorKind::Transport, 250 BackfillError::Generic(_) => ResyncErrorKind::Generic, 251 BackfillError::Deleted => unreachable!("already handled"), 252 }; 253 254 let did_key = keys::repo_key(&did); 255 256 // 1. get current retry count 257 let existing_state = Db::get(db.resync.clone(), &did_key).await.and_then(|b| { 258 b.map(|b| rmp_serde::from_slice::<ResyncState>(&b).into_diagnostic()) 259 .transpose() 260 })?; 261 262 let (mut retry_count, prev_kind) = match existing_state { 263 Some(ResyncState::Error { 264 kind, retry_count, .. 265 }) => (retry_count, Some(kind)), 266 Some(ResyncState::Gone { .. }) => return Ok(()), // should handle gone? original code didn't really? 267 None => (0, None), 268 }; 269 270 // Calculate new stats 271 retry_count += 1; 272 let next_retry = ResyncState::next_backoff(retry_count); 273 274 let resync_state = ResyncState::Error { 275 kind: error_kind.clone(), 276 retry_count, 277 next_retry, 278 }; 279 280 let error_string = e.to_string(); 281 282 tokio::task::spawn_blocking({ 283 let state = state.clone(); 284 let did_key = did_key.into_static(); 285 move || { 286 // 3. save to resync 287 let serialized_resync_state = 288 rmp_serde::to_vec(&resync_state).into_diagnostic()?; 289 290 // 4. and update the main repo state 291 let serialized_repo_state = if let Some(state_bytes) = 292 state.db.repos.get(&did_key).into_diagnostic()? 293 { 294 let mut state: RepoState = 295 rmp_serde::from_slice(&state_bytes).into_diagnostic()?; 296 state.status = RepoStatus::Error(error_string.into()); 297 Some(rmp_serde::to_vec(&state).into_diagnostic()?) 298 } else { 299 None 300 }; 301 302 let mut batch = state.db.inner.batch(); 303 batch.insert(&state.db.resync, &did_key, serialized_resync_state); 304 batch.remove(&state.db.pending, pending_key.clone()); 305 if let Some(state_bytes) = serialized_repo_state { 306 batch.insert(&state.db.repos, &did_key, state_bytes); 307 } 308 batch.commit().into_diagnostic() 309 } 310 }) 311 .await 312 .into_diagnostic()??; 313 314 let old_gauge = prev_kind 315 .map(|k| GaugeState::Resync(Some(k))) 316 .unwrap_or(GaugeState::Pending); 317 318 let new_gauge = GaugeState::Resync(Some(error_kind)); 319 320 state 321 .db 322 .update_gauge_diff_async(&old_gauge, &new_gauge) 323 .await; 324 325 Err(e) 326 } 327 } 328} 329 330#[derive(Debug, Diagnostic, Error)] 331enum BackfillError { 332 #[error("{0}")] 333 Generic(miette::Report), 334 #[error("too many requests")] 335 Ratelimited, 336 #[error("transport error: {0}")] 337 Transport(SmolStr), 338 #[error("repo was concurrently deleted")] 339 Deleted, 340} 341 342impl From<ClientError> for BackfillError { 343 fn from(e: ClientError) -> Self { 344 match e.kind() { 345 ClientErrorKind::Http { 346 status: StatusCode::TOO_MANY_REQUESTS, 347 } => Self::Ratelimited, 348 ClientErrorKind::Transport => Self::Transport( 349 e.source_err() 350 .expect("transport error without source") 351 .to_smolstr(), 352 ), 353 _ => Self::Generic(e.into()), 354 } 355 } 356} 357 358impl From<miette::Report> for BackfillError { 359 fn from(e: miette::Report) -> Self { 360 Self::Generic(e) 361 } 362} 363 364impl From<ResolverError> for BackfillError { 365 fn from(e: ResolverError) -> Self { 366 match e { 367 ResolverError::Ratelimited => Self::Ratelimited, 368 ResolverError::Transport(s) => Self::Transport(s), 369 ResolverError::Generic(e) => Self::Generic(e), 370 } 371 } 372} 373 374async fn process_did<'i>( 375 app_state: &Arc<AppState>, 376 http: &reqwest::Client, 377 did: &Did<'static>, 378 verify_signatures: bool, 379 ephemeral: bool, 380) -> Result<Option<RepoState<'static>>, BackfillError> { 381 debug!(did = %did, "backfilling"); 382 383 // always invalidate doc before backfilling 384 app_state.resolver.invalidate(did).await; 385 386 let db = &app_state.db; 387 let did_key = keys::repo_key(did); 388 let Some(state_bytes) = Db::get(db.repos.clone(), did_key).await? else { 389 return Err(BackfillError::Deleted); 390 }; 391 let mut state: RepoState<'static> = rmp_serde::from_slice::<RepoState>(&state_bytes) 392 .into_diagnostic()? 393 .into_static(); 394 let previous_state = state.clone(); 395 396 // 1. resolve pds 397 let start = Instant::now(); 398 let doc = app_state.resolver.resolve_doc(did).await?; 399 let pds = doc.pds.clone(); 400 trace!( 401 did = %did, 402 pds = %doc.pds, 403 handle = ?doc.handle, 404 elapsed = ?start.elapsed(), 405 "resolved to pds" 406 ); 407 state.update_from_doc(doc); 408 409 let emit_identity = |status: &RepoStatus| { 410 let evt = AccountEvt { 411 did: did.clone(), 412 active: !matches!( 413 status, 414 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended 415 ), 416 status: Some( 417 match status { 418 RepoStatus::Deactivated => "deactivated", 419 RepoStatus::Takendown => "takendown", 420 RepoStatus::Suspended => "suspended", 421 _ => "active", 422 } 423 .into(), 424 ), 425 }; 426 let _ = app_state.db.event_tx.send(ops::make_account_event(db, evt)); 427 }; 428 429 // 2. fetch repo (car) 430 let start = Instant::now(); 431 let req = GetRepo::new().did(did.clone()).build(); 432 let resp = http.xrpc(pds).send(&req).await?; 433 434 let car_bytes = match resp.into_output() { 435 Ok(o) => o, 436 Err(XrpcError::Xrpc(e)) => { 437 if matches!(e, GetRepoError::RepoNotFound(_)) { 438 warn!(did = %did, "repo not found, deleting"); 439 let mut batch = db::refcount::RefcountedBatch::new(db); 440 if let Err(e) = crate::ops::delete_repo(&mut batch, db, did, &state) { 441 tracing::error!(err = %e, "failed to wipe repo during backfill"); 442 } 443 batch.commit().into_diagnostic()?; 444 return Ok(Some(previous_state)); // stop backfill 445 } 446 447 let inactive_status = match e { 448 GetRepoError::RepoDeactivated(_) => Some(RepoStatus::Deactivated), 449 GetRepoError::RepoTakendown(_) => Some(RepoStatus::Takendown), 450 GetRepoError::RepoSuspended(_) => Some(RepoStatus::Suspended), 451 _ => None, 452 }; 453 454 if let Some(status) = inactive_status { 455 warn!(did = %did, ?status, "repo is inactive, stopping backfill"); 456 457 emit_identity(&status); 458 459 let resync_state = ResyncState::Gone { 460 status: status.clone(), 461 }; 462 let resync_bytes = rmp_serde::to_vec(&resync_state).into_diagnostic()?; 463 464 let app_state_clone = app_state.clone(); 465 app_state 466 .db 467 .update_repo_state_async(did, move |state, (key, batch)| { 468 state.status = status; 469 batch.insert(&app_state_clone.db.resync, key, resync_bytes); 470 Ok((true, ())) 471 }) 472 .await?; 473 474 // return success so wrapper stops retrying 475 return Ok(Some(previous_state)); 476 } 477 478 Err(e).into_diagnostic()? 479 } 480 Err(e) => Err(e).into_diagnostic()?, 481 }; 482 483 // emit identity event so any consumers know 484 emit_identity(&state.status); 485 486 trace!( 487 did = %did, 488 bytes = car_bytes.body.len(), 489 elapsed = ?start.elapsed(), 490 "fetched car bytes" 491 ); 492 493 // 3. import repo 494 let start = Instant::now(); 495 let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body) 496 .await 497 .into_diagnostic()?; 498 trace!(did = %did, elapsed = ?start.elapsed(), "parsed car"); 499 500 let start = Instant::now(); 501 let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks)); 502 trace!( 503 did = %did, 504 blocks = store.len(), 505 elapsed = ?start.elapsed(), 506 "stored blocks in memory" 507 ); 508 509 // 4. parse root commit to get mst root 510 let root_bytes = store 511 .get(&parsed.root) 512 .await 513 .into_diagnostic()? 514 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 515 516 let root_commit = jacquard_repo::commit::Commit::from_cbor(&root_bytes).into_diagnostic()?; 517 debug!( 518 did = %did, 519 rev = %root_commit.rev, 520 cid = %root_commit.data, 521 "backfilling repo at revision" 522 ); 523 524 // 4.5. verify commit signature 525 if verify_signatures { 526 let pubkey = app_state.resolver.resolve_signing_key(did).await?; 527 root_commit 528 .verify(&pubkey) 529 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 530 trace!(did = %did, "signature verified"); 531 } 532 533 // 5. walk mst 534 let start = Instant::now(); 535 let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None); 536 let leaves = mst.leaves().await.into_diagnostic()?; 537 trace!(did = %did, elapsed = ?start.elapsed(), "walked mst"); 538 539 // 6. insert records into db 540 let start = Instant::now(); 541 let result = { 542 let app_state = app_state.clone(); 543 let did = did.clone(); 544 let rev = root_commit.rev; 545 546 tokio::task::spawn_blocking(move || { 547 let filter = app_state.filter.load(); 548 let mut count = 0; 549 let mut delta = 0; 550 let mut added_blocks = 0; 551 let mut collection_counts: HashMap<SmolStr, u64> = HashMap::new(); 552 let mut batch = db::refcount::RefcountedBatch::new(&app_state.db); 553 let store = mst.storage(); 554 555 let prefix = keys::record_prefix_did(&did); 556 let mut existing_cids: HashMap<(SmolStr, DbRkey), SmolStr> = HashMap::new(); 557 558 if !ephemeral { 559 for guard in app_state.db.records.prefix(&prefix) { 560 let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 561 // key is did|collection|rkey 562 // skip did| 563 let mut remaining = key[prefix.len()..].splitn(2, |b| keys::SEP.eq(b)); 564 let collection_raw = remaining 565 .next() 566 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?; 567 let rkey_raw = remaining 568 .next() 569 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?; 570 571 let collection = std::str::from_utf8(collection_raw) 572 .map_err(|e| miette::miette!("invalid collection utf8: {e}"))?; 573 574 let rkey = keys::parse_rkey(rkey_raw) 575 .map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?; 576 577 let cid = cid::Cid::read_bytes(cid_bytes.as_ref()) 578 .map_err(|e| miette::miette!("invalid cid '{cid_bytes:?}' for {did}: {e}"))? 579 .to_smolstr(); 580 581 existing_cids.insert((collection.into(), rkey), cid); 582 } 583 } 584 585 let mut signal_seen = filter.mode == FilterMode::Full || filter.signals.is_empty(); 586 587 debug!( 588 did = %did, 589 initial = signal_seen, 590 mode = ?filter.mode, 591 signals = ?filter.signals, 592 "backfilling signal check" 593 ); 594 595 for (key, cid) in leaves { 596 let val_bytes = tokio::runtime::Handle::current() 597 .block_on(store.get(&cid)) 598 .into_diagnostic()?; 599 600 if let Some(val) = val_bytes { 601 let (collection, rkey) = ops::parse_path(&key)?; 602 603 if !filter.matches_collection(collection) { 604 continue; 605 } 606 607 if !signal_seen && filter.matches_signal(collection) { 608 debug!(did = %did, collection = %collection, "signal matched"); 609 signal_seen = true; 610 } 611 612 let rkey = DbRkey::new(rkey); 613 let path = (collection.to_smolstr(), rkey.clone()); 614 let cid_obj = Cid::ipld(cid); 615 616 // check if this record already exists with same CID 617 let existing_cid = existing_cids.remove(&path); 618 let action = if let Some(existing_cid) = &existing_cid { 619 if existing_cid == cid_obj.as_str() { 620 trace!(did = %did, collection = %collection, rkey = %rkey, cid = %cid, "skip unchanged record"); 621 continue; // skip unchanged record 622 } 623 DbAction::Update 624 } else { 625 DbAction::Create 626 }; 627 trace!(did = %did, collection = %collection, rkey = %rkey, cid = %cid, ?action, "action record"); 628 629 // key is did|collection|rkey 630 let db_key = keys::record_key(&did, collection, &rkey); 631 632 batch.batch_mut().insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref()); 633 if !ephemeral { 634 batch.batch_mut().insert(&app_state.db.records, db_key, cid.to_bytes()); 635 } 636 637 added_blocks += 1; 638 if action == DbAction::Create { 639 delta += 1; 640 *collection_counts.entry(path.0.clone()).or_default() += 1; 641 } 642 643 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 644 let evt = StoredEvent { 645 live: false, 646 did: TrimmedDid::from(&did), 647 rev: DbTid::from(&rev), 648 collection: CowStr::Borrowed(collection), 649 rkey, 650 action, 651 cid: Some(cid_obj.to_ipld().expect("valid cid")), 652 }; 653 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 654 batch.batch_mut().insert(&app_state.db.events, keys::event_key(event_id), bytes); 655 656 // update block refcount 657 let cid_bytes = Slice::from(cid.to_bytes()); 658 // if ephemeral, we only care about events, so its 1 659 // if not, then its 2 since we also insert to records 660 batch.update_block_refcount(cid_bytes, ephemeral.then_some(1).unwrap_or(2))?; 661 // for Update, also decrement old CID refcount 662 // event will still be there, so we only decrement for records 663 // which means only if not ephemeral 664 if !ephemeral && action == DbAction::Update { 665 let existing_cid = existing_cid.expect("that cid exists since this is Update"); 666 if existing_cid != cid_obj.as_str() { 667 let old_cid_bytes = Slice::from( 668 cid::Cid::from_str(&existing_cid) 669 .expect("valid cid from existing_cids") 670 .to_bytes() 671 ); 672 batch.update_block_refcount(old_cid_bytes, -1)?; 673 } 674 } 675 676 count += 1; 677 } 678 } 679 680 // remove any remaining existing records (they weren't in the new MST) 681 for ((collection, rkey), cid) in existing_cids { 682 trace!(did = %did, collection = %collection, rkey = %rkey, cid = %cid, "remove existing record"); 683 684 // we dont have to put if ephemeral around here since 685 // existing_cids will be empty anyyway 686 batch.batch_mut().remove( 687 &app_state.db.records, 688 keys::record_key(&did, &collection, &rkey), 689 ); 690 691 // decrement block refcount 692 let cid_bytes = Slice::from( 693 cid::Cid::from_str(&cid) 694 .expect("valid cid from existing_cids") 695 .to_bytes() 696 ); 697 batch.update_block_refcount(cid_bytes, -1)?; 698 699 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 700 let evt = StoredEvent { 701 live: false, 702 did: TrimmedDid::from(&did), 703 rev: DbTid::from(&rev), 704 collection: CowStr::Borrowed(&collection), 705 rkey, 706 action: DbAction::Delete, 707 cid: None, 708 }; 709 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 710 batch.batch_mut().insert(&app_state.db.events, keys::event_key(event_id), bytes); 711 712 delta -= 1; 713 count += 1; 714 } 715 716 if !signal_seen { 717 debug!(did = %did, "no signal-matching records found, discarding repo"); 718 return Ok::<_, miette::Report>(None); 719 } 720 721 // 6. update data, status is updated in worker shard 722 state.tracked = true; 723 state.rev = Some((&rev).into()); 724 state.data = Some(root_commit.data); 725 state.last_updated_at = chrono::Utc::now().timestamp(); 726 727 batch.batch_mut().insert( 728 &app_state.db.repos, 729 keys::repo_key(&did), 730 ser_repo_state(&state)?, 731 ); 732 733 // add the counts 734 if !ephemeral { 735 for (col, cnt) in collection_counts { 736 db::set_record_count(batch.batch_mut(), &app_state.db, &did, &col, cnt); 737 } 738 } 739 740 batch.commit().into_diagnostic()?; 741 742 Ok::<_, miette::Report>(Some((state, delta, added_blocks, count))) 743 }) 744 .await 745 .into_diagnostic()?? 746 }; 747 748 let Some((_state, records_cnt_delta, added_blocks, count)) = result else { 749 // signal mode: no signal-matching records found — clean up the optimistically-added repo 750 let did_key = keys::repo_key(did); 751 let backfill_pending_key = keys::pending_key(previous_state.index_id); 752 let app_state = app_state.clone(); 753 tokio::task::spawn_blocking(move || { 754 let mut batch = app_state.db.inner.batch(); 755 batch.remove(&app_state.db.repos, &did_key); 756 batch.remove(&app_state.db.pending, backfill_pending_key); 757 batch.commit().into_diagnostic() 758 }) 759 .await 760 .into_diagnostic()??; 761 return Ok(None); 762 }; 763 764 trace!(did = %did, ops = count, elapsed = ?start.elapsed(), "did ops"); 765 766 // do the counts 767 if records_cnt_delta > 0 { 768 app_state 769 .db 770 .update_count_async("records", records_cnt_delta) 771 .await; 772 } 773 if added_blocks > 0 { 774 app_state 775 .db 776 .update_count_async("blocks", added_blocks) 777 .await; 778 } 779 trace!( 780 did = %did, 781 elapsed = ?start.elapsed(), 782 "committed backfill batch" 783 ); 784 785 let _ = db.event_tx.send(BroadcastEvent::Persisted( 786 db.next_event_id.load(Ordering::SeqCst) - 1, 787 )); 788 789 trace!(did = %did, "backfill complete"); 790 Ok(Some(previous_state)) 791}