at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[backfill] optimize backfill loop with spawn_blocking

ptr.pet 3dd76b29 577e3c79

verified
+68 -103
+68 -103
src/backfill/mod.rs
··· 1 1 use crate::db::{keys, Db}; 2 2 use crate::ops; 3 3 use crate::state::{AppState, BackfillRx}; 4 - use crate::types::{ 5 - BroadcastEvent, IdentityEvt, RepoState, RepoStatus, ResyncState, StoredEvent, 6 - }; 4 + use crate::types::{AccountEvt, BroadcastEvent, RepoState, RepoStatus, ResyncState, StoredEvent}; 7 5 use futures::TryFutureExt; 8 6 use jacquard::api::com_atproto::sync::get_repo::{GetRepo, GetRepoError}; 9 - use jacquard::api::com_atproto::sync::subscribe_repos::Commit; 10 7 use jacquard::prelude::*; 11 8 use jacquard::types::did::Did; 12 9 use jacquard_common::xrpc::XrpcError; ··· 84 81 _permit: tokio::sync::OwnedSemaphorePermit, 85 82 ) -> Result<()> { 86 83 let db = &state.db; 84 + 85 + // block buffer processing for this DID during backfill 86 + let _ = state.blocked_dids.insert_async(did.clone()).await; 87 + 87 88 match Self::process_did(&state, &http, &did).await { 88 89 Ok(previous_state) => { 89 90 let did_key = keys::repo_key(&did); ··· 127 128 futures::future::join_all(pending_fut.into_iter().chain(resync_fut)) 128 129 }); 129 130 130 - let state = state.clone(); 131 + let state_for_persist = state.clone(); 131 132 tokio::task::spawn_blocking(move || { 132 - state 133 + state_for_persist 133 134 .db 134 135 .inner 135 136 .persist(fjall::PersistMode::Buffer) ··· 137 138 }) 138 139 .await 139 140 .into_diagnostic()??; 140 - 141 - Ok(()) 142 141 } 143 142 Err(e) => { 144 143 error!("backfill failed for {did}: {e}"); ··· 165 164 next_retry, 166 165 }; 167 166 168 - let state = state.clone(); 169 - let did_key = did_key.to_vec(); 167 + tokio::task::spawn_blocking({ 168 + let state = state.clone(); 169 + let did_key = did_key.to_vec(); 170 + move || { 171 + // 3. save to resync 172 + let serialized_resync_state = 173 + rmp_serde::to_vec(&resync_state).into_diagnostic()?; 170 174 171 - tokio::task::spawn_blocking(move || { 172 - // 3. save to resync 173 - let serialized_resync_state = 174 - rmp_serde::to_vec(&resync_state).into_diagnostic()?; 175 + // 4. and update the main repo state 176 + let serialized_repo_state = if let Some(state_bytes) = 177 + state.db.repos.get(&did_key).into_diagnostic()? 178 + { 179 + let mut state: RepoState = 180 + rmp_serde::from_slice(&state_bytes).into_diagnostic()?; 181 + state.status = RepoStatus::Error(e.to_string().into()); 182 + Some(rmp_serde::to_vec(&state).into_diagnostic()?) 183 + } else { 184 + None 185 + }; 175 186 176 - // 4. and update the main repo state 177 - let serialized_repo_state = if let Some(state_bytes) = 178 - state.db.repos.get(&did_key).into_diagnostic()? 179 - { 180 - let mut state: RepoState = 181 - rmp_serde::from_slice(&state_bytes).into_diagnostic()?; 182 - state.status = RepoStatus::Error(e.to_string().into()); 183 - Some(rmp_serde::to_vec(&state).into_diagnostic()?) 184 - } else { 185 - None 186 - }; 187 + let mut batch = state.db.inner.batch(); 187 188 188 - let mut batch = state.db.inner.batch(); 189 + batch.insert(&state.db.resync, &did_key, serialized_resync_state); 189 190 190 - batch.insert(&state.db.resync, &did_key, serialized_resync_state); 191 + if let Some(state_bytes) = serialized_repo_state { 192 + batch.insert(&state.db.repos, &did_key, state_bytes); 193 + } 191 194 192 - if let Some(state_bytes) = serialized_repo_state { 193 - batch.insert(&state.db.repos, &did_key, state_bytes); 195 + // 5. remove from pending 196 + batch.remove(&state.db.pending, &did_key); 197 + batch.commit().into_diagnostic() 194 198 } 195 - 196 - // 5. remove from pending 197 - batch.remove(&state.db.pending, &did_key); 198 - 199 - batch.commit().into_diagnostic() 200 199 }) 201 200 .await 202 201 .into_diagnostic()??; 203 - 204 - Ok(()) 205 202 } 206 203 } 204 + 205 + // unblock buffer processing for this DID 206 + state.blocked_dids.remove_async(&did).await; 207 + Ok(()) 207 208 } 208 209 209 210 // returns previous repo state if successful ··· 235 236 } 236 237 237 238 let emit_identity = |status: &RepoStatus| { 238 - let evt = IdentityEvt { 239 + let evt = AccountEvt { 239 240 did: did.as_str().into(), 240 - handle: state.handle.clone().unwrap_or_default(), 241 - is_active: !matches!( 241 + active: !matches!( 242 242 status, 243 243 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended 244 244 ), 245 - status: match status { 246 - RepoStatus::Deactivated => "deactivated", 247 - RepoStatus::Takendown => "takendown", 248 - RepoStatus::Suspended => "suspended", 249 - _ => "active", 250 - } 251 - .into(), 245 + status: Some( 246 + match status { 247 + RepoStatus::Deactivated => "deactivated", 248 + RepoStatus::Takendown => "takendown", 249 + RepoStatus::Suspended => "suspended", 250 + _ => "active", 251 + } 252 + .into(), 253 + ), 252 254 }; 253 - ops::emit_identity_event(db, evt); 255 + ops::emit_account_event(db, evt); 254 256 }; 255 257 256 258 // 2. fetch repo (car) ··· 351 353 352 354 // 6. insert records into db 353 355 let start = Instant::now(); 354 - let (added_records, added_blocks, collection_counts, count) = { 356 + let (_state, added_records, added_blocks, collection_counts, count) = { 355 357 let app_state = app_state.clone(); 356 - let loop_did = did.clone(); 357 - let loop_rev = commit.rev; 358 + let did = did.clone(); 359 + let rev = commit.rev; 358 360 let storage = mst.storage().clone(); 361 + 359 362 tokio::task::spawn_blocking(move || { 360 363 let mut count = 0; 361 364 let mut added_records = 0; ··· 374 377 let collection = parts[0]; 375 378 let rkey = parts[1]; 376 379 377 - let db_key = keys::record_key(&loop_did, collection, rkey); 380 + let db_key = keys::record_key(&did, collection, rkey); 378 381 let cid_str = cid.to_smolstr(); 379 382 380 383 let val_vec: Vec<u8> = val.to_vec(); ··· 400 403 app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 401 404 let evt = StoredEvent::Record { 402 405 live: false, 403 - did: loop_did.as_str().into(), 404 - rev: loop_rev.as_str().into(), 406 + did: did.as_str().into(), 407 + rev: rev.as_str().into(), 405 408 collection: collection.into(), 406 409 rkey: rkey.into(), 407 410 action: "create".into(), ··· 420 423 } 421 424 } 422 425 423 - // 6. update status to synced (inside batch) 426 + // 6. update status to synced 424 427 state.status = RepoStatus::Synced; 425 - state.rev = loop_rev.as_str().into(); 428 + state.rev = rev.as_str().into(); 426 429 state.data = commit.data.to_smolstr(); 427 430 state.last_updated_at = chrono::Utc::now().timestamp(); 428 431 429 - let did_key = keys::repo_key(&loop_did); 432 + let did_key = keys::repo_key(&did); 430 433 let bytes = rmp_serde::to_vec(&state).into_diagnostic()?; 431 434 batch.insert(&app_state.db.repos, did_key, bytes); 432 435 433 436 batch.commit().into_diagnostic()?; 434 437 435 - Ok::<_, miette::Report>((added_records, added_blocks, collection_counts, count)) 438 + Ok::<_, miette::Report>(( 439 + state, 440 + added_records, 441 + added_blocks, 442 + collection_counts, 443 + count, 444 + )) 436 445 }) 437 446 .await 438 447 .into_diagnostic()?? ··· 473 482 }); 474 483 } 475 484 trace!( 476 - "committed backfill batch for {} in {:?}", 477 - did, 485 + "committed backfill batch for {did} in {:?}", 478 486 start.elapsed() 479 487 ); 480 488 ··· 482 490 db.next_event_id.load(Ordering::SeqCst) - 1, 483 491 )); 484 492 485 - debug!("marked {did} as synced, draining buffer..."); 486 - 487 - // 7. drain buffer 488 - let start = Instant::now(); 489 - let prefix = keys::buffer_prefix(did).to_vec(); 490 - 491 - let num_buffered = tokio::task::spawn_blocking({ 492 - let state = app_state.clone(); 493 - let did = did.clone(); 494 - move || -> Result<i64> { 495 - let mut batch = state.db.inner.batch(); 496 - 497 - for res in state 498 - .db 499 - .buffer 500 - .prefix(&prefix) 501 - .map(|item| item.into_inner().into_diagnostic()) 502 - { 503 - let (key, value) = res?; 504 - let commit: Commit = rmp_serde::from_slice(&value).into_diagnostic()?; 505 - debug!("applying buffered commit seq: {}", commit.seq); 506 - 507 - if let Err(e) = ops::apply_commit(&state.db, &commit, true) { 508 - error!("failed to apply buffered commit for {did}: {e}"); 509 - Db::check_poisoned_report(&e); 510 - } 511 - 512 - // delete from buffer 513 - batch.remove(&state.db.buffer, key); 514 - } 515 - 516 - batch.commit().into_diagnostic()?; 517 - 518 - Ok(count) 519 - } 520 - }) 521 - .await 522 - .into_diagnostic()??; 523 - 524 - trace!( 525 - "drained {num_buffered} buffered commits for {did} in {:?}", 526 - start.elapsed() 527 - ); 528 - 493 + // buffer processing is handled by BufferProcessor when blocked flag is cleared 529 494 debug!("backfill complete for {did}"); 530 495 Ok(previous_state) 531 496 }