at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 4fe44f8a28620861fb772d40d55fbeb65716e5df 720 lines 26 kB view raw
1use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 2use crate::db::{self, Db, keys, ser_repo_state}; 3use crate::filter::FilterMode; 4use crate::ops; 5use crate::resolver::ResolverError; 6use crate::state::AppState; 7use crate::types::{ 8 AccountEvt, BroadcastEvent, GaugeState, RepoState, RepoStatus, ResyncErrorKind, ResyncState, 9 StoredEvent, 10}; 11 12use fjall::Slice; 13use jacquard::api::com_atproto::sync::get_repo::{GetRepo, GetRepoError}; 14use jacquard::error::{ClientError, ClientErrorKind}; 15use jacquard::types::cid::Cid; 16use jacquard::types::did::Did; 17use jacquard::{CowStr, IntoStatic, prelude::*}; 18use jacquard_common::xrpc::XrpcError; 19use jacquard_repo::mst::Mst; 20use jacquard_repo::{BlockStore, MemoryBlockStore}; 21use miette::{Diagnostic, IntoDiagnostic, Result}; 22use reqwest::StatusCode; 23use smol_str::{SmolStr, ToSmolStr}; 24use std::collections::HashMap; 25use std::sync::Arc; 26use std::sync::atomic::Ordering; 27use std::time::{Duration, Instant}; 28use thiserror::Error; 29use tokio::sync::Semaphore; 30use tracing::{debug, error, info, trace, warn}; 31 32pub mod manager; 33 34use crate::ingest::{BufferTx, IngestMessage}; 35 36pub struct BackfillWorker { 37 state: Arc<AppState>, 38 buffer_tx: BufferTx, 39 http: reqwest::Client, 40 semaphore: Arc<Semaphore>, 41 verify_signatures: bool, 42 in_flight: Arc<scc::HashSet<Did<'static>>>, 43} 44 45impl BackfillWorker { 46 pub fn new( 47 state: Arc<AppState>, 48 buffer_tx: BufferTx, 49 timeout: Duration, 50 concurrency_limit: usize, 51 verify_signatures: bool, 52 ) -> Self { 53 Self { 54 state, 55 buffer_tx, 56 http: reqwest::Client::builder() 57 .timeout(timeout) 58 .zstd(true) 59 .brotli(true) 60 .gzip(true) 61 .build() 62 .expect("failed to build http client"), 63 semaphore: Arc::new(Semaphore::new(concurrency_limit)), 64 verify_signatures, 65 in_flight: Arc::new(scc::HashSet::new()), 66 } 67 } 68} 69 70struct InFlightGuard { 71 did: Did<'static>, 72 set: Arc<scc::HashSet<Did<'static>>>, 73} 74 75impl Drop for InFlightGuard { 76 fn drop(&mut self) { 77 let _ = self.set.remove_sync(&self.did); 78 } 79} 80 81impl BackfillWorker { 82 pub async fn run(self) { 83 info!("backfill worker started"); 84 85 loop { 86 let mut spawned = 0; 87 88 for guard in self.state.db.pending.iter() { 89 let (key, value) = match guard.into_inner() { 90 Ok(kv) => kv, 91 Err(e) => { 92 error!("failed to read pending entry: {e}"); 93 db::check_poisoned(&e); 94 continue; 95 } 96 }; 97 98 let did = match TrimmedDid::try_from(value.as_ref()) { 99 Ok(d) => d.to_did(), 100 Err(e) => { 101 error!("invalid did in pending value: {e}"); 102 continue; 103 } 104 }; 105 106 if self.in_flight.contains_sync(&did) { 107 continue; 108 } 109 let _ = self.in_flight.insert_sync(did.clone().into_static()); 110 111 let permit = match self.semaphore.clone().try_acquire_owned() { 112 Ok(p) => p, 113 Err(_) => break, 114 }; 115 116 let guard = InFlightGuard { 117 did: did.clone().into_static(), 118 set: self.in_flight.clone(), 119 }; 120 121 let state = self.state.clone(); 122 let http = self.http.clone(); 123 let did = did.clone(); 124 let buffer_tx = self.buffer_tx.clone(); 125 let verify = self.verify_signatures; 126 127 tokio::spawn(async move { 128 let _guard = guard; 129 let res = did_task(&state, http, buffer_tx, &did, key, permit, verify).await; 130 131 if let Err(e) = res { 132 error!("backfill process failed for {did}: {e}"); 133 if let BackfillError::Generic(report) = &e { 134 db::check_poisoned_report(report); 135 } 136 } 137 138 // wake worker to pick up more (in case we were sleeping at limit) 139 state.backfill_notify.notify_one(); 140 }); 141 142 spawned += 1; 143 } 144 145 if spawned == 0 { 146 // wait for new tasks 147 self.state.backfill_notify.notified().await; 148 } else { 149 // if we spawned tasks, yield briefly to let them start and avoid tight loop 150 tokio::time::sleep(Duration::from_millis(10)).await; 151 } 152 } 153 } 154} 155 156async fn did_task( 157 state: &Arc<AppState>, 158 http: reqwest::Client, 159 buffer_tx: BufferTx, 160 did: &Did<'static>, 161 pending_key: Slice, 162 _permit: tokio::sync::OwnedSemaphorePermit, 163 verify_signatures: bool, 164) -> Result<(), BackfillError> { 165 let db = &state.db; 166 167 match process_did(&state, &http, &did, verify_signatures).await { 168 Ok(Some(repo_state)) => { 169 let did_key = keys::repo_key(&did); 170 171 // determine old gauge state 172 // if it was error/suspended etc, we need to know which error kind it was to decrement correctly. 173 // we have to peek at the resync state. 174 let old_gauge = state.db.repo_gauge_state_async(&repo_state, &did_key).await; 175 176 let mut batch = db.inner.batch(); 177 // remove from pending 178 if old_gauge == GaugeState::Pending { 179 batch.remove(&db.pending, pending_key); 180 } 181 // remove from resync 182 if old_gauge.is_resync() { 183 batch.remove(&db.resync, &did_key); 184 } 185 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 186 .await 187 .into_diagnostic()??; 188 189 state 190 .db 191 .update_gauge_diff_async(&old_gauge, &GaugeState::Synced) 192 .await; 193 194 let state = state.clone(); 195 tokio::task::spawn_blocking(move || { 196 state 197 .db 198 .inner 199 .persist(fjall::PersistMode::Buffer) 200 .into_diagnostic() 201 }) 202 .await 203 .into_diagnostic()??; 204 205 // Notify completion to worker shard 206 if let Err(e) = buffer_tx.send(IngestMessage::BackfillFinished(did.clone())) { 207 error!("failed to send BackfillFinished for {did}: {e}"); 208 } 209 Ok(()) 210 } 211 Ok(None) => { 212 // signal mode: repo had no matching records, was cleaned up by process_did 213 state.db.update_count_async("repos", -1).await; 214 state.db.update_count_async("pending", -1).await; 215 Ok(()) 216 } 217 Err(e) => { 218 match &e { 219 BackfillError::Ratelimited => { 220 debug!("failed for {did}: too many requests"); 221 } 222 BackfillError::Transport(reason) => { 223 error!("failed for {did}: transport error: {reason}"); 224 } 225 BackfillError::Generic(e) => { 226 error!("failed for {did}: {e}"); 227 } 228 } 229 230 let error_kind = match &e { 231 BackfillError::Ratelimited => ResyncErrorKind::Ratelimited, 232 BackfillError::Transport(_) => ResyncErrorKind::Transport, 233 BackfillError::Generic(_) => ResyncErrorKind::Generic, 234 }; 235 236 let did_key = keys::repo_key(&did); 237 238 // 1. get current retry count 239 let existing_state = Db::get(db.resync.clone(), &did_key).await.and_then(|b| { 240 b.map(|b| rmp_serde::from_slice::<ResyncState>(&b).into_diagnostic()) 241 .transpose() 242 })?; 243 244 let (mut retry_count, prev_kind) = match existing_state { 245 Some(ResyncState::Error { 246 kind, retry_count, .. 247 }) => (retry_count, Some(kind)), 248 Some(ResyncState::Gone { .. }) => return Ok(()), // should handle gone? original code didn't really? 249 None => (0, None), 250 }; 251 252 // Calculate new stats 253 retry_count += 1; 254 let next_retry = ResyncState::next_backoff(retry_count); 255 256 let resync_state = ResyncState::Error { 257 kind: error_kind.clone(), 258 retry_count, 259 next_retry, 260 }; 261 262 let error_string = e.to_string(); 263 264 tokio::task::spawn_blocking({ 265 let state = state.clone(); 266 let did_key = did_key.into_static(); 267 move || { 268 // 3. save to resync 269 let serialized_resync_state = 270 rmp_serde::to_vec(&resync_state).into_diagnostic()?; 271 272 // 4. and update the main repo state 273 let serialized_repo_state = if let Some(state_bytes) = 274 state.db.repos.get(&did_key).into_diagnostic()? 275 { 276 let mut state: RepoState = 277 rmp_serde::from_slice(&state_bytes).into_diagnostic()?; 278 state.status = RepoStatus::Error(error_string.into()); 279 Some(rmp_serde::to_vec(&state).into_diagnostic()?) 280 } else { 281 None 282 }; 283 284 let mut batch = state.db.inner.batch(); 285 batch.insert(&state.db.resync, &did_key, serialized_resync_state); 286 batch.remove(&state.db.pending, pending_key.clone()); 287 if let Some(state_bytes) = serialized_repo_state { 288 batch.insert(&state.db.repos, &did_key, state_bytes); 289 } 290 batch.commit().into_diagnostic() 291 } 292 }) 293 .await 294 .into_diagnostic()??; 295 296 let old_gauge = if let Some(k) = prev_kind { 297 GaugeState::Resync(Some(k)) 298 } else { 299 GaugeState::Pending 300 }; 301 302 let new_gauge = GaugeState::Resync(Some(error_kind)); 303 304 state 305 .db 306 .update_gauge_diff_async(&old_gauge, &new_gauge) 307 .await; 308 309 Err(e) 310 } 311 } 312} 313 314#[derive(Debug, Diagnostic, Error)] 315enum BackfillError { 316 #[error("{0}")] 317 Generic(miette::Report), 318 #[error("too many requests")] 319 Ratelimited, 320 #[error("transport error: {0}")] 321 Transport(SmolStr), 322} 323 324impl From<ClientError> for BackfillError { 325 fn from(e: ClientError) -> Self { 326 match e.kind() { 327 ClientErrorKind::Http { 328 status: StatusCode::TOO_MANY_REQUESTS, 329 } => Self::Ratelimited, 330 ClientErrorKind::Transport => Self::Transport( 331 e.source_err() 332 .expect("transport error without source") 333 .to_smolstr(), 334 ), 335 _ => Self::Generic(e.into()), 336 } 337 } 338} 339 340impl From<miette::Report> for BackfillError { 341 fn from(e: miette::Report) -> Self { 342 Self::Generic(e) 343 } 344} 345 346impl From<ResolverError> for BackfillError { 347 fn from(e: ResolverError) -> Self { 348 match e { 349 ResolverError::Ratelimited => Self::Ratelimited, 350 ResolverError::Transport(s) => Self::Transport(s), 351 ResolverError::Generic(e) => Self::Generic(e), 352 } 353 } 354} 355 356async fn process_did<'i>( 357 app_state: &Arc<AppState>, 358 http: &reqwest::Client, 359 did: &Did<'static>, 360 verify_signatures: bool, 361) -> Result<Option<RepoState<'static>>, BackfillError> { 362 debug!("backfilling {}", did); 363 364 let db = &app_state.db; 365 let did_key = keys::repo_key(did); 366 let state_bytes = Db::get(db.repos.clone(), did_key) 367 .await? 368 .ok_or_else(|| miette::miette!("!!!THIS IS A BUG!!! repo state for {did} missing"))?; 369 let mut state: RepoState<'static> = rmp_serde::from_slice::<RepoState>(&state_bytes) 370 .into_diagnostic()? 371 .into_static(); 372 let previous_state = state.clone(); 373 374 // 1. resolve pds 375 let start = Instant::now(); 376 let (pds_url, handle) = app_state.resolver.resolve_identity_info(did).await?; 377 trace!( 378 "resolved {did} to pds {pds_url} handle {handle:?} in {:?}", 379 start.elapsed() 380 ); 381 382 state.handle = handle.map(|h| h.into_static()); 383 384 let emit_identity = |status: &RepoStatus| { 385 let evt = AccountEvt { 386 did: did.clone(), 387 active: !matches!( 388 status, 389 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended 390 ), 391 status: Some( 392 match status { 393 RepoStatus::Deactivated => "deactivated", 394 RepoStatus::Takendown => "takendown", 395 RepoStatus::Suspended => "suspended", 396 _ => "active", 397 } 398 .into(), 399 ), 400 }; 401 let _ = app_state.db.event_tx.send(ops::make_account_event(db, evt)); 402 }; 403 404 // 2. fetch repo (car) 405 let start = Instant::now(); 406 let req = GetRepo::new().did(did.clone()).build(); 407 let resp = http.xrpc(pds_url).send(&req).await?; 408 409 let car_bytes = match resp.into_output() { 410 Ok(o) => o, 411 Err(XrpcError::Xrpc(e)) => { 412 if matches!(e, GetRepoError::RepoNotFound(_)) { 413 warn!("repo {did} not found, deleting"); 414 let mut batch = db.inner.batch(); 415 if let Err(e) = crate::ops::delete_repo(&mut batch, db, did, &state) { 416 tracing::error!("failed to wipe repo during backfill: {e}"); 417 } 418 batch.commit().into_diagnostic()?; 419 return Ok(Some(previous_state)); // stop backfill 420 } 421 422 let inactive_status = match e { 423 GetRepoError::RepoDeactivated(_) => Some(RepoStatus::Deactivated), 424 GetRepoError::RepoTakendown(_) => Some(RepoStatus::Takendown), 425 GetRepoError::RepoSuspended(_) => Some(RepoStatus::Suspended), 426 _ => None, 427 }; 428 429 if let Some(status) = inactive_status { 430 warn!("repo {did} is {status:?}, stopping backfill"); 431 432 emit_identity(&status); 433 434 let resync_state = ResyncState::Gone { 435 status: status.clone(), 436 }; 437 let resync_bytes = rmp_serde::to_vec(&resync_state).into_diagnostic()?; 438 439 let app_state_clone = app_state.clone(); 440 app_state 441 .db 442 .update_repo_state_async(did, move |state, (key, batch)| { 443 state.status = status; 444 batch.insert(&app_state_clone.db.resync, key, resync_bytes); 445 Ok((true, ())) 446 }) 447 .await?; 448 449 // return success so wrapper stops retrying 450 return Ok(Some(previous_state)); 451 } 452 453 Err(e).into_diagnostic()? 454 } 455 Err(e) => Err(e).into_diagnostic()?, 456 }; 457 458 // emit identity event so any consumers know 459 emit_identity(&state.status); 460 461 trace!( 462 "fetched {} bytes for {did} in {:?}", 463 car_bytes.body.len(), 464 start.elapsed() 465 ); 466 467 // 3. import repo 468 let start = Instant::now(); 469 let parsed = jacquard_repo::car::reader::parse_car_bytes(&car_bytes.body) 470 .await 471 .into_diagnostic()?; 472 trace!("parsed car for {did} in {:?}", start.elapsed()); 473 474 let start = Instant::now(); 475 let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks)); 476 trace!( 477 "stored {} blocks in memory for {did} in {:?}", 478 store.len(), 479 start.elapsed() 480 ); 481 482 // 4. parse root commit to get mst root 483 let root_bytes = store 484 .get(&parsed.root) 485 .await 486 .into_diagnostic()? 487 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 488 489 let root_commit = jacquard_repo::commit::Commit::from_cbor(&root_bytes).into_diagnostic()?; 490 debug!( 491 "backfilling repo at revision {}, root cid {}", 492 root_commit.rev, root_commit.data 493 ); 494 495 // 4.5. verify commit signature 496 if verify_signatures { 497 let pubkey = app_state.resolver.resolve_signing_key(did).await?; 498 root_commit 499 .verify(&pubkey) 500 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 501 trace!("signature verified for {did}"); 502 } 503 504 // 5. walk mst 505 let start = Instant::now(); 506 let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None); 507 let leaves = mst.leaves().await.into_diagnostic()?; 508 trace!("walked mst for {did} in {}", start.elapsed().as_secs_f64()); 509 510 // 6. insert records into db 511 let start = Instant::now(); 512 let result = { 513 let app_state = app_state.clone(); 514 let did = did.clone(); 515 let rev = root_commit.rev; 516 517 tokio::task::spawn_blocking(move || { 518 let filter = app_state.filter.load(); 519 let mut count = 0; 520 let mut delta = 0; 521 let mut added_blocks = 0; 522 let mut collection_counts: HashMap<SmolStr, u64> = HashMap::new(); 523 let mut batch = app_state.db.inner.batch(); 524 let store = mst.storage(); 525 526 let prefix = keys::record_prefix_did(&did); 527 let mut existing_cids: HashMap<(SmolStr, DbRkey), SmolStr> = HashMap::new(); 528 529 for guard in app_state.db.records.prefix(&prefix) { 530 let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 531 // key is did|collection|rkey 532 // skip did| 533 let mut remaining = key[prefix.len()..].splitn(2, |b| keys::SEP.eq(b)); 534 let collection_raw = remaining 535 .next() 536 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?; 537 let rkey_raw = remaining 538 .next() 539 .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?; 540 541 let collection = std::str::from_utf8(collection_raw) 542 .map_err(|e| miette::miette!("invalid collection utf8: {e}"))?; 543 544 let rkey = keys::parse_rkey(rkey_raw) 545 .map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?; 546 547 let cid = cid::Cid::read_bytes(cid_bytes.as_ref()) 548 .map_err(|e| miette::miette!("invalid cid '{cid_bytes:?}' for {did}: {e}"))? 549 .to_smolstr(); 550 551 existing_cids.insert((collection.into(), rkey), cid); 552 } 553 554 let mut signal_seen = filter.mode == FilterMode::Full || state.tracked; 555 556 debug!( 557 "backfilling {did}: signal_seen initial={signal_seen}, mode={:?}, signals={:?}", 558 filter.mode, filter.signals 559 ); 560 561 for (key, cid) in leaves { 562 let val_bytes = tokio::runtime::Handle::current() 563 .block_on(store.get(&cid)) 564 .into_diagnostic()?; 565 566 if let Some(val) = val_bytes { 567 let (collection, rkey) = ops::parse_path(&key)?; 568 569 if !filter.matches_collection(collection) { 570 continue; 571 } 572 573 if !signal_seen && filter.matches_signal(collection) { 574 debug!("backfill {did}: signal matched on {collection}"); 575 signal_seen = true; 576 } 577 578 let rkey = DbRkey::new(rkey); 579 let path = (collection.to_smolstr(), rkey.clone()); 580 let cid_obj = Cid::ipld(cid); 581 582 // check if this record already exists with same CID 583 let (action, is_new) = if let Some(existing_cid) = existing_cids.remove(&path) { 584 if existing_cid == cid_obj.as_str() { 585 trace!("skip {did}/{collection}/{rkey} ({cid})"); 586 continue; // skip unchanged record 587 } 588 (DbAction::Update, false) 589 } else { 590 (DbAction::Create, true) 591 }; 592 trace!("{action} {did}/{collection}/{rkey} ({cid})"); 593 594 // key is did|collection|rkey 595 let db_key = keys::record_key(&did, collection, &rkey); 596 597 batch.insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref()); 598 batch.insert(&app_state.db.records, db_key, cid.to_bytes()); 599 600 added_blocks += 1; 601 if is_new { 602 delta += 1; 603 *collection_counts.entry(path.0.clone()).or_default() += 1; 604 } 605 606 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 607 let evt = StoredEvent { 608 live: false, 609 did: TrimmedDid::from(&did), 610 rev: DbTid::from(&rev), 611 collection: CowStr::Borrowed(collection), 612 rkey, 613 action, 614 cid: Some(cid_obj.to_ipld().expect("valid cid")), 615 }; 616 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 617 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 618 619 count += 1; 620 } 621 } 622 623 // remove any remaining existing records (they weren't in the new MST) 624 for ((collection, rkey), cid) in existing_cids { 625 trace!("remove {did}/{collection}/{rkey} ({cid})"); 626 627 batch.remove( 628 &app_state.db.records, 629 keys::record_key(&did, &collection, &rkey), 630 ); 631 632 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 633 let evt = StoredEvent { 634 live: false, 635 did: TrimmedDid::from(&did), 636 rev: DbTid::from(&rev), 637 collection: CowStr::Borrowed(&collection), 638 rkey, 639 action: DbAction::Delete, 640 cid: None, 641 }; 642 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 643 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 644 645 delta -= 1; 646 count += 1; 647 } 648 649 if !signal_seen { 650 debug!("backfill {did}: no signal-matching records found, discarding repo"); 651 return Ok::<_, miette::Report>(None); 652 } 653 654 // 6. update data, status is updated in worker shard 655 state.rev = Some((&rev).into()); 656 state.data = Some(root_commit.data); 657 state.last_updated_at = chrono::Utc::now().timestamp(); 658 659 batch.insert( 660 &app_state.db.repos, 661 keys::repo_key(&did), 662 ser_repo_state(&state)?, 663 ); 664 665 // add the counts 666 for (col, cnt) in collection_counts { 667 db::set_record_count(&mut batch, &app_state.db, &did, &col, cnt); 668 } 669 670 batch.commit().into_diagnostic()?; 671 672 Ok::<_, miette::Report>(Some((state, delta, added_blocks, count))) 673 }) 674 .await 675 .into_diagnostic()?? 676 }; 677 678 let Some((_state, records_cnt_delta, added_blocks, count)) = result else { 679 // signal mode: no signal-matching records found — clean up the optimistically-added repo 680 let did_key = keys::repo_key(did); 681 let backfill_pending_key = keys::pending_key(previous_state.index_id); 682 let repos_ks = app_state.db.repos.clone(); 683 let pending_ks = app_state.db.pending.clone(); 684 let db_inner = app_state.db.inner.clone(); 685 tokio::task::spawn_blocking(move || { 686 let mut batch = db_inner.batch(); 687 batch.remove(&repos_ks, &did_key); 688 batch.remove(&pending_ks, backfill_pending_key); 689 batch.commit().into_diagnostic() 690 }) 691 .await 692 .into_diagnostic()??; 693 return Ok(None); 694 }; 695 696 trace!("did {count} ops for {did} in {:?}", start.elapsed()); 697 698 // do the counts 699 if records_cnt_delta > 0 { 700 app_state 701 .db 702 .update_count_async("records", records_cnt_delta) 703 .await; 704 app_state 705 .db 706 .update_count_async("blocks", added_blocks) 707 .await; 708 } 709 trace!( 710 "committed backfill batch for {did} in {:?}", 711 start.elapsed() 712 ); 713 714 let _ = db.event_tx.send(BroadcastEvent::Persisted( 715 db.next_event_id.load(Ordering::SeqCst) - 1, 716 )); 717 718 trace!("backfill complete for {did}"); 719 Ok(Some(previous_state)) 720}