at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[ingest] shard worker threads and replace backfill channel with notify

dispatch firehose messages to worker shards by consistent DID hash.
replace mpsc backfill channel with Notify + pending keyspace polling.
buffer live commits for repos mid-backfill in resync_buffer, drained
on completion via BackfillFinished messages routed to the owning shard.

ptr.pet 68cc0286 5996ab14

verified
+583 -390
+3 -11
src/api/repo.rs
··· 1 use crate::api::AppState; 2 use crate::db::{Db, keys, ser_repo_state}; 3 - use crate::ops::send_backfill_req; 4 use crate::types::RepoState; 5 use axum::{Json, Router, extract::State, http::StatusCode, routing::post}; 6 use jacquard::types::did::Did; ··· 25 let db = &state.db; 26 let mut batch = db.inner.batch(); 27 let mut added = 0; 28 - let mut to_backfill = Vec::new(); 29 30 for did_str in req.dids { 31 let did = Did::new_owned(did_str.as_str()) ··· 43 batch.insert(&db.pending, &did_key, Vec::new()); 44 45 added += 1; 46 - 47 - let jacquard_did = Did::new_owned(did.as_str()) 48 - .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; 49 - to_backfill.push(jacquard_did); 50 } 51 } 52 ··· 55 .await 56 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 57 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 58 state.db.update_count_async("repos", added).await; 59 state.db.update_count_async("pending", added).await; 60 61 - // trigger backfill 62 - for did in to_backfill { 63 - send_backfill_req(&state, did) 64 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 65 - } 66 } 67 Ok(StatusCode::OK) 68 }
··· 1 use crate::api::AppState; 2 use crate::db::{Db, keys, ser_repo_state}; 3 use crate::types::RepoState; 4 use axum::{Json, Router, extract::State, http::StatusCode, routing::post}; 5 use jacquard::types::did::Did; ··· 24 let db = &state.db; 25 let mut batch = db.inner.batch(); 26 let mut added = 0; 27 28 for did_str in req.dids { 29 let did = Did::new_owned(did_str.as_str()) ··· 41 batch.insert(&db.pending, &did_key, Vec::new()); 42 43 added += 1; 44 } 45 } 46 ··· 49 .await 50 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 51 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 52 + 53 state.db.update_count_async("repos", added).await; 54 state.db.update_count_async("pending", added).await; 55 56 + // trigger backfill worker 57 + state.notify_backfill(); 58 } 59 Ok(StatusCode::OK) 60 }
+10 -40
src/backfill/manager.rs
··· 7 use std::time::Duration; 8 use tracing::{debug, error, info}; 9 10 - pub fn queue_pending_backfills(state: &AppState) -> Result<()> { 11 - info!("scanning for pending backfills..."); 12 - let mut count = 0; 13 - 14 - for guard in state.db.pending.iter() { 15 - let key = guard.key().into_diagnostic()?; 16 - let did = match TrimmedDid::try_from(key.as_ref()) { 17 - Ok(did) => did.to_did(), 18 - Err(e) => { 19 - error!("invalid did in db, skipping: {e}"); 20 - continue; 21 - } 22 - }; 23 - 24 - debug!("queuing did {did}"); 25 - if let Err(e) = state.backfill_tx.send(did.clone()) { 26 - error!("failed to queue pending backfill for did:{did}: {e}"); 27 - } else { 28 - count += 1; 29 - } 30 - } 31 - 32 - info!("queued {count} pending backfills"); 33 - Ok(()) 34 - } 35 - 36 pub fn queue_gone_backfills(state: &Arc<AppState>) -> Result<()> { 37 - info!("scanning for deactivated/takendown repos to retry..."); 38 let mut count = 0; 39 40 for guard in state.db.resync.iter() { ··· 49 50 if let Ok(resync_state) = rmp_serde::from_slice::<ResyncState>(&val) { 51 if matches!(resync_state, ResyncState::Gone { .. }) { 52 - info!("queuing retry for gone repo: {did}"); 53 54 // move back to pending 55 let mut batch = state.db.inner.batch(); ··· 64 batch.insert(&state.db.repos, &repo_key, ser_repo_state(&repo_state)?); 65 } 66 67 batch.commit().into_diagnostic()?; 68 69 - if let Err(e) = state.backfill_tx.send(did.clone()) { 70 - error!("failed to queue retry for {did}: {e}"); 71 - } else { 72 - count += 1; 73 - } 74 } 75 } 76 } ··· 110 rmp_serde::from_slice::<ResyncState>(&value) 111 { 112 if next_retry <= now { 113 - info!("retrying backfill for {did}"); 114 115 // move back to pending 116 if let Err(e) = db.pending.insert(key, Vec::new()) { 117 error!("failed to move {did} to pending: {e}"); 118 db::check_poisoned(&e); 119 continue; 120 } 121 122 - // queue 123 - if let Err(e) = state.backfill_tx.send(did.clone()) { 124 - error!("failed to queue retry for {did}: {e}"); 125 - } else { 126 - count += 1; 127 - } 128 } 129 } 130 }
··· 7 use std::time::Duration; 8 use tracing::{debug, error, info}; 9 10 pub fn queue_gone_backfills(state: &Arc<AppState>) -> Result<()> { 11 + debug!("scanning for deactivated/takendown repos to retry..."); 12 let mut count = 0; 13 14 for guard in state.db.resync.iter() { ··· 23 24 if let Ok(resync_state) = rmp_serde::from_slice::<ResyncState>(&val) { 25 if matches!(resync_state, ResyncState::Gone { .. }) { 26 + debug!("queuing retry for gone repo: {did}"); 27 28 // move back to pending 29 let mut batch = state.db.inner.batch(); ··· 38 batch.insert(&state.db.repos, &repo_key, ser_repo_state(&repo_state)?); 39 } 40 41 + state.db.update_count("resync", -1); 42 + state.db.update_count("pending", 1); 43 batch.commit().into_diagnostic()?; 44 45 + state.notify_backfill(); 46 + count += 1; 47 } 48 } 49 } ··· 83 rmp_serde::from_slice::<ResyncState>(&value) 84 { 85 if next_retry <= now { 86 + debug!("retrying backfill for {did}"); 87 88 // move back to pending 89 + state.db.update_count("pending", 1); 90 if let Err(e) = db.pending.insert(key, Vec::new()) { 91 error!("failed to move {did} to pending: {e}"); 92 db::check_poisoned(&e); 93 continue; 94 } 95 96 + state.notify_backfill(); 97 + count += 1; 98 } 99 } 100 }
+96 -30
src/backfill/mod.rs
··· 1 use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 2 use crate::db::{self, Db, keys, ser_repo_state}; 3 use crate::ops; 4 - use crate::state::{AppState, BackfillRx}; 5 use crate::types::{AccountEvt, BroadcastEvent, RepoState, RepoStatus, ResyncState, StoredEvent}; 6 - use futures::TryFutureExt; 7 use jacquard::api::com_atproto::sync::get_repo::{GetRepo, GetRepoError}; 8 use jacquard::error::{ClientError, ClientErrorKind}; 9 use jacquard::types::cid::Cid; ··· 24 25 pub mod manager; 26 27 pub struct BackfillWorker { 28 state: Arc<AppState>, 29 - rx: BackfillRx, 30 http: reqwest::Client, 31 semaphore: Arc<Semaphore>, 32 verify_signatures: bool, 33 } 34 35 impl BackfillWorker { 36 pub fn new( 37 state: Arc<AppState>, 38 - rx: BackfillRx, 39 timeout: Duration, 40 concurrency_limit: usize, 41 verify_signatures: bool, 42 ) -> Self { 43 Self { 44 state, 45 - rx, 46 http: reqwest::Client::builder() 47 .timeout(timeout) 48 .zstd(true) ··· 52 .expect("failed to build http client"), 53 semaphore: Arc::new(Semaphore::new(concurrency_limit)), 54 verify_signatures, 55 } 56 } 57 58 - pub async fn run(mut self) { 59 info!("backfill worker started"); 60 - while let Some(did) = self.rx.recv().await { 61 - let permit = self 62 - .semaphore 63 - .clone() 64 - .acquire_owned() 65 - .await 66 - .expect("semaphore closed"); 67 68 - tokio::spawn( 69 - Self::process_did_wrapper( 70 - self.state.clone(), 71 - self.http.clone(), 72 - did.clone(), 73 - permit, 74 - self.verify_signatures, 75 - ) 76 - .inspect_err(move |e| { 77 - error!("backfill process failed for {did}: {e}"); 78 - db::check_poisoned_report(e); 79 - }), 80 - ); 81 } 82 } 83 84 async fn process_did_wrapper( 85 state: Arc<AppState>, 86 http: reqwest::Client, 87 did: Did<'static>, 88 _permit: tokio::sync::OwnedSemaphorePermit, 89 verify_signatures: bool, ··· 132 }) 133 .await 134 .into_diagnostic()??; 135 } 136 Err(e) => { 137 let mut was_ratelimited = false; ··· 232 } 233 } 234 235 - // unblock buffer processing for this DID 236 - state.blocked_dids.remove_async(&did).await; 237 Ok(()) 238 } 239 ··· 512 count += 1; 513 } 514 515 - // 6. update status to synced 516 - state.status = RepoStatus::Synced; 517 state.rev = Some(rev.clone().into()); 518 state.data = Some(root_commit.data); 519 state.last_updated_at = chrono::Utc::now().timestamp();
··· 1 use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 2 use crate::db::{self, Db, keys, ser_repo_state}; 3 use crate::ops; 4 + use crate::state::AppState; 5 use crate::types::{AccountEvt, BroadcastEvent, RepoState, RepoStatus, ResyncState, StoredEvent}; 6 + 7 use jacquard::api::com_atproto::sync::get_repo::{GetRepo, GetRepoError}; 8 use jacquard::error::{ClientError, ClientErrorKind}; 9 use jacquard::types::cid::Cid; ··· 24 25 pub mod manager; 26 27 + use crate::ingest::{BufferTx, IngestMessage}; 28 + 29 pub struct BackfillWorker { 30 state: Arc<AppState>, 31 + buffer_tx: BufferTx, 32 http: reqwest::Client, 33 semaphore: Arc<Semaphore>, 34 verify_signatures: bool, 35 + in_flight: Arc<scc::HashSet<Did<'static>>>, 36 } 37 38 impl BackfillWorker { 39 pub fn new( 40 state: Arc<AppState>, 41 + buffer_tx: BufferTx, 42 timeout: Duration, 43 concurrency_limit: usize, 44 verify_signatures: bool, 45 ) -> Self { 46 Self { 47 state, 48 + buffer_tx, 49 http: reqwest::Client::builder() 50 .timeout(timeout) 51 .zstd(true) ··· 55 .expect("failed to build http client"), 56 semaphore: Arc::new(Semaphore::new(concurrency_limit)), 57 verify_signatures, 58 + in_flight: Arc::new(scc::HashSet::new()), 59 } 60 } 61 + } 62 63 + struct InFlightGuard { 64 + did: Did<'static>, 65 + set: Arc<scc::HashSet<Did<'static>>>, 66 + } 67 + 68 + impl Drop for InFlightGuard { 69 + fn drop(&mut self) { 70 + let _ = self.set.remove_sync(&self.did); 71 + } 72 + } 73 + 74 + impl BackfillWorker { 75 + pub async fn run(self) { 76 info!("backfill worker started"); 77 + loop { 78 + let mut spawned = 0; 79 + 80 + for guard in self.state.db.pending.iter() { 81 + let key = match guard.key() { 82 + Ok(k) => k, 83 + Err(e) => { 84 + error!("failed to read pending key: {e}"); 85 + db::check_poisoned(&e); 86 + continue; 87 + } 88 + }; 89 90 + let did = match TrimmedDid::try_from(key.as_ref()) { 91 + Ok(d) => d.to_did(), 92 + Err(e) => { 93 + error!("invalid did in pending: {e}"); 94 + continue; 95 + } 96 + }; 97 + 98 + if self.in_flight.contains_sync(&did) { 99 + continue; 100 + } 101 + let _ = self.in_flight.insert_sync(did.clone().into_static()); 102 + 103 + let permit = match self.semaphore.clone().try_acquire_owned() { 104 + Ok(p) => p, 105 + Err(_) => break, 106 + }; 107 + 108 + let guard = InFlightGuard { 109 + did: did.clone().into_static(), 110 + set: self.in_flight.clone(), 111 + }; 112 + 113 + let state = self.state.clone(); 114 + let http = self.http.clone(); 115 + let buffer_tx_clone = self.buffer_tx.clone(); 116 + let did_clone = did.clone(); 117 + let verify = self.verify_signatures; 118 + 119 + tokio::spawn(async move { 120 + let _guard = guard; 121 + Self::process_did_wrapper( 122 + state, 123 + http, 124 + buffer_tx_clone, 125 + did_clone.clone(), 126 + permit, 127 + verify, 128 + ) 129 + .await 130 + .inspect_err(move |e| { 131 + error!("backfill process failed for {did_clone}: {e}"); 132 + db::check_poisoned_report(e); 133 + }) 134 + }); 135 + 136 + spawned += 1; 137 + } 138 + 139 + if spawned == 0 { 140 + self.state.backfill_notify.notified().await; 141 + } 142 } 143 } 144 145 async fn process_did_wrapper( 146 state: Arc<AppState>, 147 http: reqwest::Client, 148 + buffer_tx: BufferTx, 149 did: Did<'static>, 150 _permit: tokio::sync::OwnedSemaphorePermit, 151 verify_signatures: bool, ··· 194 }) 195 .await 196 .into_diagnostic()??; 197 + 198 + // Notify completion to worker shard 199 + if let Err(e) = buffer_tx.send(IngestMessage::BackfillFinished(did.clone())) { 200 + error!("failed to send BackfillFinished for {did}: {e}"); 201 + } 202 } 203 Err(e) => { 204 let mut was_ratelimited = false; ··· 299 } 300 } 301 302 + // wake worker to pick up more 303 + state.backfill_notify.notify_one(); 304 Ok(()) 305 } 306 ··· 579 count += 1; 580 } 581 582 + // 6. update data, status is updated in worker shard 583 state.rev = Some(rev.clone().into()); 584 state.data = Some(root_commit.data); 585 state.last_updated_at = chrono::Utc::now().timestamp();
+3 -4
src/crawler/mod.rs
··· 1 use crate::db::{Db, keys, ser_repo_state}; 2 - use crate::ops::send_backfill_req; 3 use crate::state::AppState; 4 use crate::types::RepoState; 5 use jacquard::api::com_atproto::sync::list_repos::{ListRepos, ListReposOutput}; ··· 120 self.state.db.update_count_async("pending", count).await; 121 } 122 123 - // 5. queue for backfill 124 - for did in to_queue { 125 - send_backfill_req(&self.state, did)?; 126 } 127 128 if cursor.is_none() {
··· 1 use crate::db::{Db, keys, ser_repo_state}; 2 use crate::state::AppState; 3 use crate::types::RepoState; 4 use jacquard::api::com_atproto::sync::list_repos::{ListRepos, ListReposOutput}; ··· 119 self.state.db.update_count_async("pending", count).await; 120 } 121 122 + // 5. notify backfill worker 123 + if !to_queue.is_empty() { 124 + self.state.notify_backfill(); 125 } 126 127 if cursor.is_none() {
+4 -4
src/ingest/firehose.rs
··· 1 use crate::db::{self, Db, keys}; 2 - use crate::ingest::BufferTx; 3 use crate::state::AppState; 4 use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 5 use jacquard::types::did::Did; ··· 17 buffer_tx: BufferTx, 18 relay_host: Url, 19 full_network: bool, 20 - verify_signatures: bool, 21 } 22 23 impl FirehoseIngestor { ··· 33 buffer_tx, 34 relay_host, 35 full_network, 36 - verify_signatures, 37 } 38 } 39 ··· 114 // }); 115 // } 116 117 - if let Err(e) = self.buffer_tx.send(msg) { 118 error!("failed to send message to buffer processor: {e}"); 119 } 120 }
··· 1 use crate::db::{self, Db, keys}; 2 + use crate::ingest::{BufferTx, IngestMessage}; 3 use crate::state::AppState; 4 use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 5 use jacquard::types::did::Did; ··· 17 buffer_tx: BufferTx, 18 relay_host: Url, 19 full_network: bool, 20 + _verify_signatures: bool, 21 } 22 23 impl FirehoseIngestor { ··· 33 buffer_tx, 34 relay_host, 35 full_network, 36 + _verify_signatures: verify_signatures, 37 } 38 } 39 ··· 114 // }); 115 // } 116 117 + if let Err(e) = self.buffer_tx.send(IngestMessage::Firehose(msg)) { 118 error!("failed to send message to buffer processor: {e}"); 119 } 120 }
+9 -1
src/ingest/mod.rs
··· 4 pub mod firehose; 5 pub mod worker; 6 7 - pub type BufferedMessage = SubscribeReposMessage<'static>; 8 9 pub type BufferTx = mpsc::UnboundedSender<BufferedMessage>; 10 #[allow(dead_code)]
··· 4 pub mod firehose; 5 pub mod worker; 6 7 + use jacquard::types::did::Did; 8 + 9 + #[derive(Debug)] 10 + pub enum IngestMessage { 11 + Firehose(SubscribeReposMessage<'static>), 12 + BackfillFinished(Did<'static>), 13 + } 14 + 15 + pub type BufferedMessage = IngestMessage; 16 17 pub type BufferTx = mpsc::UnboundedSender<BufferedMessage>; 18 #[allow(dead_code)]
+405 -248
src/ingest/worker.rs
··· 1 use crate::db::{self, keys}; 2 - use crate::ingest::BufferedMessage; 3 - use crate::ops::{self, send_backfill_req}; 4 use crate::resolver::NoSigningKeyError; 5 use crate::state::AppState; 6 use crate::types::{AccountEvt, BroadcastEvent, IdentityEvt, RepoState, RepoStatus}; 7 use jacquard::api::com_atproto::sync::subscribe_repos::SubscribeReposMessage; 8 9 use fjall::OwnedWriteBatch; 10 - use futures::future::join_all; 11 use jacquard::cowstr::ToCowStr; 12 use jacquard::types::did::Did; 13 use jacquard_common::IntoStatic; 14 - use jacquard_common::types::crypto::PublicKey; 15 use jacquard_repo::error::CommitError; 16 - use miette::{Diagnostic, IntoDiagnostic, Result}; 17 use smol_str::ToSmolStr; 18 - use std::collections::{HashMap, HashSet}; 19 use std::sync::Arc; 20 - use std::time::Duration; 21 use tokio::sync::mpsc; 22 - use tracing::{debug, error, trace, warn}; 23 24 #[derive(Debug)] 25 struct KeyFetchError(miette::Report); ··· 70 } 71 } 72 73 - #[derive(Debug, Clone, Copy)] 74 - enum ProcessResult { 75 Deleted, 76 - Ok, 77 - } 78 - 79 - enum RepoCheckResult { 80 - Syncing, 81 - Ok(RepoState<'static>), 82 } 83 84 pub struct FirehoseWorker { 85 state: Arc<AppState>, 86 rx: mpsc::UnboundedReceiver<BufferedMessage>, 87 verify_signatures: bool, 88 } 89 90 impl FirehoseWorker { ··· 97 state, 98 rx, 99 verify_signatures, 100 } 101 } 102 103 pub fn run(mut self, handle: tokio::runtime::Handle) -> Result<()> { 104 - const BUF_SIZE: usize = 500; 105 - let mut buf = Vec::<BufferedMessage>::with_capacity(BUF_SIZE); 106 - let mut failed = Vec::<BufferedMessage>::new(); 107 108 let _g = handle.enter(); 109 let mut repo_cache = HashMap::new(); 110 let mut deleted = HashSet::new(); 111 - let mut broadcast_events = Vec::<BroadcastEvent>::with_capacity(BUF_SIZE); 112 113 - loop { 114 - let mut batch = self.state.db.inner.batch(); 115 repo_cache.clear(); 116 deleted.clear(); 117 broadcast_events.clear(); 118 119 - // resolve signing keys for commits and syncs if verification is enabled 120 - let keys = if self.verify_signatures { 121 - let dids: HashSet<Did> = buf 122 - .iter() 123 - .filter_map(|msg| match msg { 124 - SubscribeReposMessage::Commit(c) => Some(c.repo.clone()), 125 - SubscribeReposMessage::Sync(s) => Some(s.did.clone()), 126 - _ => None, 127 - }) 128 - .collect(); 129 130 - let futures = dids.into_iter().map(|did| async { 131 - let res = self.state.resolver.resolve_signing_key(&did).await; 132 - (did, res) 133 - }); 134 135 - handle.block_on(join_all(futures)).into_iter().collect() 136 - } else { 137 - HashMap::new() 138 - }; 139 140 - let mut added_blocks = 0; 141 - let mut records_delta = 0; 142 - for msg in buf.drain(..) { 143 - let (did, seq) = match &msg { 144 - SubscribeReposMessage::Commit(c) => (&c.repo, c.seq), 145 - SubscribeReposMessage::Identity(i) => (&i.did, i.seq), 146 - SubscribeReposMessage::Account(a) => (&a.did, a.seq), 147 - SubscribeReposMessage::Sync(s) => (&s.did, s.seq), 148 - _ => continue, 149 - }; 150 151 - if self.state.blocked_dids.contains_sync(did) { 152 - failed.push(msg); 153 - continue; 154 - } 155 - if deleted.contains(did) { 156 - continue; 157 } 158 159 - match self.process_message( 160 - &mut repo_cache, 161 - &mut batch, 162 - &mut added_blocks, 163 - &mut records_delta, 164 - &mut broadcast_events, 165 - &msg, 166 - did, 167 - &keys, 168 - ) { 169 - Ok(ProcessResult::Ok) => {} 170 - Ok(ProcessResult::Deleted) => { 171 - deleted.insert(did.clone()); 172 } 173 - Err(e) => { 174 - error!("error processing message for {did}: {e}"); 175 - db::check_poisoned_report(&e); 176 - // dont retry commit or sync on key fetch errors 177 - // since we'll just try again later if we get commit or sync again 178 - if e.downcast_ref::<KeyFetchError>().is_none() 179 - && e.downcast_ref::<CommitError>().is_none() 180 - && e.downcast_ref::<NoSigningKeyError>().is_none() 181 - { 182 - failed.push(msg); 183 } 184 } 185 - } 186 187 - self.state 188 - .cur_firehose 189 - .store(seq, std::sync::atomic::Ordering::SeqCst); 190 } 191 192 - // commit all changes to db 193 - batch.commit().into_diagnostic()?; 194 195 if added_blocks > 0 { 196 - self.state.db.update_count("blocks", added_blocks); 197 } 198 if records_delta != 0 { 199 - self.state.db.update_count("records", records_delta); 200 } 201 for evt in broadcast_events.drain(..) { 202 - let _ = self.state.db.event_tx.send(evt); 203 - } 204 - 205 - self.state 206 - .db 207 - .inner 208 - .persist(fjall::PersistMode::Buffer) 209 - .into_diagnostic()?; 210 - 211 - // add failed back to buf here so the ordering is preserved 212 - if !failed.is_empty() { 213 - buf.append(&mut failed); 214 } 215 216 - // wait until we receive some messages 217 - // this does mean we will have an up to 1 second delay, before we send events to consumers 218 - // but thats reasonable imo, could also be configured of course 219 - let _ = handle.block_on(async { 220 - tokio::time::timeout( 221 - Duration::from_secs(1), 222 - self.rx.recv_many(&mut buf, BUF_SIZE), 223 - ) 224 - .await 225 - }); 226 - if buf.is_empty() { 227 - if self.rx.is_closed() { 228 - error!("ingestor crashed? shutting down buffer processor"); 229 - break; 230 - } 231 - continue; 232 - } 233 } 234 235 - Ok(()) 236 } 237 238 - fn process_message( 239 - &self, 240 - repo_cache: &mut HashMap<Did<'static>, RepoState<'static>>, 241 - batch: &mut OwnedWriteBatch, 242 - added_blocks: &mut i64, 243 - records_delta: &mut i64, 244 - broadcast_events: &mut Vec<BroadcastEvent>, 245 - msg: &BufferedMessage, 246 did: &Did, 247 - keys: &HashMap<Did<'static>, Result<PublicKey<'static>>>, 248 - ) -> Result<ProcessResult> { 249 - let state = &self.state; 250 - let verify_signatures = self.verify_signatures; 251 - 252 - let RepoCheckResult::Ok(repo_state) = 253 - Self::check_repo_state(repo_cache, batch, state, did, msg)? 254 - else { 255 - return Ok(ProcessResult::Ok); 256 - }; 257 - 258 - let get_key = || { 259 - if verify_signatures { 260 - let key = keys.get(did).ok_or_else(|| { 261 - KeyFetchError(miette::miette!( 262 - "!!! THIS IS A BUG !!! missing pubkey for {did}" 263 - )) 264 - })?; 265 - match key { 266 - Ok(key) => Ok(Some(key)), 267 - Err(e) => { 268 - return Err(KeyFetchError(miette::miette!( 269 - "failed to get pubkey for {did}: {e}" 270 - ))); 271 - } 272 - } 273 - } else { 274 - Ok(None) 275 } 276 }; 277 278 match msg { 279 SubscribeReposMessage::Commit(commit) => { 280 trace!("processing buffered commit for {did}"); 281 282 - if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.to_tid().as_str()) 283 - { 284 - debug!( 285 - "skipping replayed event for {}: {} <= {}", 286 - did, 287 - commit.rev, 288 - repo_state 289 - .rev 290 - .as_ref() 291 - .map(|r| r.to_tid()) 292 - .expect("we checked in if") 293 - ); 294 - return Ok(ProcessResult::Ok); 295 - } 296 - 297 - if let (Some(repo), Some(prev_commit)) = (&repo_state.data, &commit.prev_data) 298 - && repo != &prev_commit.0.to_ipld().expect("valid cid") 299 - { 300 - warn!( 301 - "gap detected for {}: repo {} != commit prev {}. triggering backfill", 302 - did, repo, prev_commit.0 303 - ); 304 - 305 - let mut batch = state.db.inner.batch(); 306 - ops::update_repo_status( 307 - &mut batch, 308 - &state.db, 309 - did, 310 - repo_state, 311 - RepoStatus::Backfilling, 312 - )?; 313 - batch.commit().into_diagnostic()?; 314 - send_backfill_req(state, did.clone().into_static())?; 315 - 316 - return Ok(ProcessResult::Ok); 317 - } 318 - 319 - let res = ops::apply_commit(batch, &state.db, repo_state, &commit, get_key()?)?; 320 - repo_cache.insert(did.clone().into_static(), res.repo_state); 321 - *added_blocks += res.blocks_count; 322 - *records_delta += res.records_delta; 323 - broadcast_events.push(BroadcastEvent::Persisted( 324 - self.state 325 - .db 326 - .next_event_id 327 - .load(std::sync::atomic::Ordering::SeqCst) 328 - - 1, 329 - )); 330 } 331 SubscribeReposMessage::Sync(sync) => { 332 debug!("processing buffered sync for {did}"); 333 334 - match ops::verify_sync_event(sync.blocks.as_ref(), get_key()?) { 335 Ok((root, rev)) => { 336 if let Some(current_data) = &repo_state.data { 337 if current_data == &root.to_ipld().expect("valid cid") { 338 debug!("skipping noop sync for {did}"); 339 - return Ok(ProcessResult::Ok); 340 } 341 } 342 343 if let Some(current_rev) = &repo_state.rev { 344 if rev.as_str() <= current_rev.to_tid().as_str() { 345 debug!("skipping replayed sync for {did}"); 346 - return Ok(ProcessResult::Ok); 347 } 348 } 349 350 warn!("sync event for {did}: triggering backfill"); 351 - let mut batch = state.db.inner.batch(); 352 - ops::update_repo_status( 353 &mut batch, 354 - &state.db, 355 did, 356 repo_state, 357 RepoStatus::Backfilling, 358 )?; 359 batch.commit().into_diagnostic()?; 360 - 361 - send_backfill_req(state, did.clone().into_static())?; 362 - return Ok(ProcessResult::Ok); 363 } 364 Err(e) => { 365 error!("failed to process sync event for {did}: {e}"); ··· 377 did: did.clone().into_static(), 378 handle, 379 }; 380 - broadcast_events.push(ops::make_identity_event(&state.db, evt)); 381 } 382 SubscribeReposMessage::Account(account) => { 383 debug!("processing buffered account for {did}"); ··· 392 match &account.status { 393 Some(AccountStatus::Deleted) => { 394 debug!("account {did} deleted, wiping data"); 395 - ops::delete_repo(batch, &state.db, did)?; 396 - return Ok(ProcessResult::Deleted); 397 } 398 status => { 399 let target_status = match status { ··· 425 426 if repo_state.status == target_status { 427 debug!("account status unchanged for {did}: {target_status:?}"); 428 - return Ok(ProcessResult::Ok); 429 } 430 431 - let new_state = ops::update_repo_status( 432 - batch, 433 - &state.db, 434 did, 435 repo_state, 436 target_status, 437 )?; 438 - repo_cache.insert(did.clone().into_static(), new_state); 439 } 440 } 441 } else { ··· 444 // 1. we handle changing repo status to Synced before this (in check repo state) 445 // 2. initiating backfilling is also handled there 446 } 447 - 448 - broadcast_events.push(ops::make_account_event(&state.db, evt)); 449 } 450 _ => { 451 warn!("unknown message type in buffer for {did}"); 452 } 453 } 454 455 - Ok(ProcessResult::Ok) 456 } 457 458 - fn check_repo_state( 459 - repo_cache: &mut HashMap<Did<'static>, RepoState<'static>>, 460 - batch: &mut OwnedWriteBatch, 461 - state: &AppState, 462 did: &Did<'_>, 463 - msg: &BufferedMessage, 464 - ) -> Result<RepoCheckResult> { 465 // check if we have this repo 466 - if let Some(state) = repo_cache.get(did) { 467 - return Ok(RepoCheckResult::Ok(state.clone())); 468 } 469 470 let repo_key = keys::repo_key(&did); 471 - let Some(state_bytes) = state.db.repos.get(&repo_key).into_diagnostic()? else { 472 // we don't know this repo, but we are receiving events for it 473 // this means we should backfill it before processing its events 474 debug!("discovered new account {did} from firehose, queueing backfill"); ··· 477 // using a separate batch here since we want to make it known its being backfilled 478 // immediately. we could use the batch for the unit of work we are doing but 479 // then we wouldn't be able to start backfilling until the unit of work is done 480 - let mut batch = state.db.inner.batch(); 481 482 batch.insert( 483 - &state.db.repos, 484 &repo_key, 485 crate::db::ser_repo_state(&new_state)?, 486 ); 487 - batch.insert(&state.db.pending, &repo_key, &[]); 488 batch.commit().into_diagnostic()?; 489 490 - send_backfill_req(state, did.clone().into_static())?; 491 492 - return Ok(RepoCheckResult::Syncing); 493 }; 494 let mut repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static(); 495 496 // if we are backfilling or it is new, DON'T mark it as synced yet 497 // the backfill worker will do that when it finishes 498 match &repo_state.status { 499 - RepoStatus::Synced => Ok(RepoCheckResult::Ok(repo_state)), 500 RepoStatus::Backfilling | RepoStatus::Error(_) => { 501 - // repo is being backfilled or is in error state 502 - // we dont touch the state because the backfill worker will do that 503 - // we should not really get here because the backfill worker should have marked it as 504 - // being worked on (blocked repos) meaning we would have returned earlier 505 debug!( 506 "ignoring active status for {did} as it is {:?}", 507 repo_state.status 508 ); 509 - Ok(RepoCheckResult::Syncing) 510 } 511 RepoStatus::Deactivated | RepoStatus::Suspended | RepoStatus::Takendown => { 512 // if it was in deactivated/takendown/suspended state, we can mark it as synced ··· 514 // UNLESS it is an account status event that keeps it deactivated 515 if let SubscribeReposMessage::Account(acc) = msg { 516 if !acc.active { 517 - return Ok(RepoCheckResult::Ok(repo_state)); 518 } 519 } 520 - 521 repo_state = ops::update_repo_status( 522 - batch, 523 - &state.db, 524 - &did, 525 repo_state, 526 RepoStatus::Synced, 527 )?; 528 - repo_cache.insert(did.clone().into_static(), repo_state.clone()); 529 - Ok(RepoCheckResult::Ok(repo_state)) 530 } 531 } 532 } 533 }
··· 1 use crate::db::{self, keys}; 2 + use crate::ingest::{BufferedMessage, IngestMessage}; 3 + use crate::ops; 4 use crate::resolver::NoSigningKeyError; 5 use crate::state::AppState; 6 use crate::types::{AccountEvt, BroadcastEvent, IdentityEvt, RepoState, RepoStatus}; 7 use jacquard::api::com_atproto::sync::subscribe_repos::SubscribeReposMessage; 8 9 use fjall::OwnedWriteBatch; 10 + 11 use jacquard::cowstr::ToCowStr; 12 + use jacquard::types::crypto::PublicKey; 13 use jacquard::types::did::Did; 14 + use jacquard_api::com_atproto::sync::subscribe_repos::Commit; 15 use jacquard_common::IntoStatic; 16 use jacquard_repo::error::CommitError; 17 + use miette::{Context, Diagnostic, IntoDiagnostic, Result}; 18 use smol_str::ToSmolStr; 19 + use std::collections::{HashMap, HashSet, hash_map::DefaultHasher}; 20 + use std::hash::{Hash, Hasher}; 21 use std::sync::Arc; 22 use tokio::sync::mpsc; 23 + use tracing::{debug, error, info, trace, warn}; 24 25 #[derive(Debug)] 26 struct KeyFetchError(miette::Report); ··· 71 } 72 } 73 74 + #[derive(Debug)] 75 + enum RepoProcessResult<'s, 'c> { 76 Deleted, 77 + Syncing(Option<&'c Commit<'c>>), 78 + Ok(RepoState<'s>), 79 } 80 81 pub struct FirehoseWorker { 82 state: Arc<AppState>, 83 rx: mpsc::UnboundedReceiver<BufferedMessage>, 84 verify_signatures: bool, 85 + num_shards: usize, 86 + } 87 + 88 + struct WorkerContext<'a> { 89 + verify_signatures: bool, 90 + state: &'a AppState, 91 + repo_cache: &'a mut HashMap<Did<'static>, RepoState<'static>>, 92 + batch: &'a mut OwnedWriteBatch, 93 + added_blocks: &'a mut i64, 94 + records_delta: &'a mut i64, 95 + broadcast_events: &'a mut Vec<BroadcastEvent>, 96 + handle: &'a tokio::runtime::Handle, 97 } 98 99 impl FirehoseWorker { ··· 106 state, 107 rx, 108 verify_signatures, 109 + num_shards: 64, 110 } 111 } 112 113 + // starts the worker threads and the main dispatch loop 114 + // the dispatch loop reads from the firehose channel and distributes messages to shards 115 + // based on the consistent hash of the DID 116 pub fn run(mut self, handle: tokio::runtime::Handle) -> Result<()> { 117 + let mut shards = Vec::with_capacity(self.num_shards); 118 + 119 + for i in 0..self.num_shards { 120 + let (tx, rx) = mpsc::unbounded_channel(); 121 + shards.push(tx); 122 + 123 + let state = self.state.clone(); 124 + let verify = self.verify_signatures; 125 + let handle = handle.clone(); 126 + 127 + std::thread::Builder::new() 128 + .name(format!("ingest-shard-{}", i)) 129 + .spawn(move || { 130 + Self::worker_thread(i, rx, state, verify, handle); 131 + }) 132 + .into_diagnostic()?; 133 + } 134 + 135 + info!("started {} ingest shards", self.num_shards); 136 137 let _g = handle.enter(); 138 + 139 + // dispatch loop 140 + while let Some(msg) = self.rx.blocking_recv() { 141 + let did = match &msg { 142 + IngestMessage::Firehose(m) => match m { 143 + SubscribeReposMessage::Commit(c) => &c.repo, 144 + SubscribeReposMessage::Identity(i) => &i.did, 145 + SubscribeReposMessage::Account(a) => &a.did, 146 + SubscribeReposMessage::Sync(s) => &s.did, 147 + _ => continue, 148 + }, 149 + IngestMessage::BackfillFinished(did) => did, 150 + }; 151 + 152 + let mut hasher = DefaultHasher::new(); 153 + did.hash(&mut hasher); 154 + let hash = hasher.finish(); 155 + let shard_idx = (hash as usize) % self.num_shards; 156 + 157 + if let Err(e) = shards[shard_idx].send(msg) { 158 + error!("failed to send message to shard {shard_idx}: {e}"); 159 + // break if send fails; receiver likely closed 160 + break; 161 + } 162 + } 163 + 164 + error!("firehose worker dispatcher shutting down"); 165 + 166 + Ok(()) 167 + } 168 + 169 + // synchronous worker loop running on a dedicated thread 170 + // pulls messages from the channel, builds batches, and processes them 171 + // enters the tokio runtime only when necessary (key resolution) 172 + fn worker_thread( 173 + id: usize, 174 + mut rx: mpsc::UnboundedReceiver<BufferedMessage>, 175 + state: Arc<AppState>, 176 + verify_signatures: bool, 177 + handle: tokio::runtime::Handle, 178 + ) { 179 + let _guard = handle.enter(); 180 + debug!("shard {id} started"); 181 + 182 let mut repo_cache = HashMap::new(); 183 let mut deleted = HashSet::new(); 184 + let mut broadcast_events = Vec::new(); 185 186 + while let Some(msg) = rx.blocking_recv() { 187 + let mut batch = state.db.inner.batch(); 188 repo_cache.clear(); 189 deleted.clear(); 190 broadcast_events.clear(); 191 192 + let mut added_blocks = 0; 193 + let mut records_delta = 0; 194 195 + let mut ctx = WorkerContext { 196 + state: &state, 197 + repo_cache: &mut repo_cache, 198 + batch: &mut batch, 199 + added_blocks: &mut added_blocks, 200 + records_delta: &mut records_delta, 201 + broadcast_events: &mut broadcast_events, 202 + handle: &handle, 203 + verify_signatures, 204 + }; 205 206 + match msg { 207 + IngestMessage::BackfillFinished(did) => { 208 + debug!("backfill finished for {did}, verifying state and draining buffer"); 209 210 + // load repo state to transition status and draining buffer 211 + let repo_key = keys::repo_key(&did); 212 + if let Ok(Some(state_bytes)) = state.db.repos.get(&repo_key).into_diagnostic() { 213 + match crate::db::deser_repo_state(&state_bytes) { 214 + Ok(repo_state) => { 215 + let repo_state = repo_state.into_static(); 216 217 + match Self::drain_resync_buffer(&mut ctx, &did, repo_state) { 218 + Ok(res) => match res { 219 + RepoProcessResult::Ok(s) => { 220 + // TODO: there might be a race condition here where we get a new commit 221 + // while the resync buffer is being drained, we should handle that probably 222 + // but also it should still be fine since we'll sync eventually anyway 223 + match ops::update_repo_status( 224 + &mut batch, 225 + &state.db, 226 + &did, 227 + s, 228 + RepoStatus::Synced, 229 + ) { 230 + Ok(s) => { 231 + repo_cache.insert(did.clone(), s.into_static()); 232 + } 233 + Err(e) => { 234 + // this can only fail if serde retry fails which would be really weird 235 + error!( 236 + "failed to transition {did} to synced: {e}" 237 + ); 238 + } 239 + } 240 + } 241 + RepoProcessResult::Deleted => { 242 + deleted.insert(did.clone()); 243 + } 244 + // we don't have to handle this since drain_resync_buffer doesn't delete 245 + // the commits from the resync buffer so they will get retried later 246 + RepoProcessResult::Syncing(_) => {} 247 + }, 248 + Err(e) => { 249 + error!("failed to drain resync buffer for {did}: {e}") 250 + } 251 + }; 252 + } 253 + Err(e) => error!("failed to deser repo state for {did}: {e}"), 254 + } 255 + } 256 } 257 + IngestMessage::Firehose(msg) => { 258 + let (did, seq) = match &msg { 259 + SubscribeReposMessage::Commit(c) => (&c.repo, c.seq), 260 + SubscribeReposMessage::Identity(i) => (&i.did, i.seq), 261 + SubscribeReposMessage::Account(a) => (&a.did, a.seq), 262 + SubscribeReposMessage::Sync(s) => (&s.did, s.seq), 263 + _ => continue, 264 + }; 265 266 + if deleted.contains(did) { 267 + continue; 268 } 269 + 270 + match Self::process_message(&mut ctx, &msg, did) { 271 + Ok(RepoProcessResult::Ok(_)) => {} 272 + Ok(RepoProcessResult::Deleted) => { 273 + deleted.insert(did.clone()); 274 + } 275 + Ok(RepoProcessResult::Syncing(Some(commit))) => { 276 + if let Err(e) = ops::persist_to_resync_buffer(&state.db, did, commit) { 277 + error!("failed to persist commit to resync_buffer for {did}: {e}"); 278 + } 279 + } 280 + Ok(RepoProcessResult::Syncing(None)) => {} 281 + Err(e) => { 282 + error!("error processing message for {did}: {e}"); 283 + db::check_poisoned_report(&e); 284 + if Self::check_if_retriable_failure(&e) { 285 + if let SubscribeReposMessage::Commit(commit) = &msg { 286 + if let Err(e) = 287 + ops::persist_to_resync_buffer(&state.db, did, commit) 288 + { 289 + error!( 290 + "failed to persist commit to resync_buffer for {did}: {e}" 291 + ); 292 + } 293 + } 294 + } 295 } 296 } 297 298 + state 299 + .cur_firehose 300 + .store(seq, std::sync::atomic::Ordering::SeqCst); 301 + } 302 } 303 304 + if let Err(e) = batch.commit() { 305 + error!("failed to commit batch in shard {id}: {e}"); 306 + } 307 308 if added_blocks > 0 { 309 + state.db.update_count("blocks", added_blocks); 310 } 311 if records_delta != 0 { 312 + state.db.update_count("records", records_delta); 313 } 314 for evt in broadcast_events.drain(..) { 315 + let _ = state.db.event_tx.send(evt); 316 } 317 318 + state.db.inner.persist(fjall::PersistMode::Buffer).ok(); 319 } 320 + } 321 322 + // dont retry commit or sync on key fetch errors 323 + // since we'll just try again later if we get commit or sync again 324 + fn check_if_retriable_failure(e: &miette::Report) -> bool { 325 + e.downcast_ref::<KeyFetchError>().is_none() 326 + && e.downcast_ref::<CommitError>().is_none() 327 + && e.downcast_ref::<NoSigningKeyError>().is_none() 328 } 329 330 + fn process_message<'s, 'c>( 331 + ctx: &mut WorkerContext, 332 + msg: &'c SubscribeReposMessage<'static>, 333 did: &Did, 334 + ) -> Result<RepoProcessResult<'s, 'c>> { 335 + let check_repo_res = Self::check_repo_state(ctx, did, msg)?; 336 + let mut repo_state = match check_repo_res { 337 + RepoProcessResult::Syncing(_) | RepoProcessResult::Deleted => { 338 + return Ok(check_repo_res); 339 } 340 + RepoProcessResult::Ok(s) => s, 341 }; 342 343 match msg { 344 SubscribeReposMessage::Commit(commit) => { 345 trace!("processing buffered commit for {did}"); 346 347 + return Self::process_commit(ctx, did, repo_state, commit); 348 } 349 SubscribeReposMessage::Sync(sync) => { 350 debug!("processing buffered sync for {did}"); 351 352 + match ops::verify_sync_event( 353 + sync.blocks.as_ref(), 354 + Self::fetch_key(ctx, did)?.as_ref(), 355 + ) { 356 Ok((root, rev)) => { 357 if let Some(current_data) = &repo_state.data { 358 if current_data == &root.to_ipld().expect("valid cid") { 359 debug!("skipping noop sync for {did}"); 360 + return Ok(RepoProcessResult::Ok(repo_state)); 361 } 362 } 363 364 if let Some(current_rev) = &repo_state.rev { 365 if rev.as_str() <= current_rev.to_tid().as_str() { 366 debug!("skipping replayed sync for {did}"); 367 + return Ok(RepoProcessResult::Ok(repo_state)); 368 } 369 } 370 371 warn!("sync event for {did}: triggering backfill"); 372 + let mut batch = ctx.state.db.inner.batch(); 373 + repo_state = ops::update_repo_status( 374 &mut batch, 375 + &ctx.state.db, 376 did, 377 repo_state, 378 RepoStatus::Backfilling, 379 )?; 380 + ctx.state.db.update_count("pending", 1); 381 batch.commit().into_diagnostic()?; 382 + ctx.state.notify_backfill(); 383 + return Ok(RepoProcessResult::Ok(repo_state)); 384 } 385 Err(e) => { 386 error!("failed to process sync event for {did}: {e}"); ··· 398 did: did.clone().into_static(), 399 handle, 400 }; 401 + ctx.broadcast_events 402 + .push(ops::make_identity_event(&ctx.state.db, evt)); 403 } 404 SubscribeReposMessage::Account(account) => { 405 debug!("processing buffered account for {did}"); ··· 414 match &account.status { 415 Some(AccountStatus::Deleted) => { 416 debug!("account {did} deleted, wiping data"); 417 + ops::delete_repo(ctx.batch, &ctx.state.db, did)?; 418 + return Ok(RepoProcessResult::Deleted); 419 } 420 status => { 421 let target_status = match status { ··· 447 448 if repo_state.status == target_status { 449 debug!("account status unchanged for {did}: {target_status:?}"); 450 + return Ok(RepoProcessResult::Ok(repo_state)); 451 } 452 453 + repo_state = ops::update_repo_status( 454 + ctx.batch, 455 + &ctx.state.db, 456 did, 457 repo_state, 458 target_status, 459 )?; 460 + ctx.repo_cache.insert( 461 + did.clone().into_static(), 462 + repo_state.clone().into_static(), 463 + ); 464 } 465 } 466 } else { ··· 469 // 1. we handle changing repo status to Synced before this (in check repo state) 470 // 2. initiating backfilling is also handled there 471 } 472 + ctx.broadcast_events 473 + .push(ops::make_account_event(&ctx.state.db, evt)); 474 } 475 _ => { 476 warn!("unknown message type in buffer for {did}"); 477 } 478 } 479 480 + Ok(RepoProcessResult::Ok(repo_state)) 481 } 482 483 + fn process_commit<'c, 'ns, 's: 'ns>( 484 + ctx: &mut WorkerContext, 485 + did: &Did, 486 + repo_state: RepoState<'s>, 487 + commit: &'c Commit<'c>, 488 + ) -> Result<RepoProcessResult<'ns, 'c>> { 489 + // check for replayed events (already seen revision) 490 + if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.to_tid().as_str()) { 491 + debug!( 492 + "skipping replayed event for {}: {} <= {}", 493 + did, 494 + commit.rev, 495 + repo_state 496 + .rev 497 + .as_ref() 498 + .map(|r| r.to_tid()) 499 + .expect("we checked in if") 500 + ); 501 + return Ok(RepoProcessResult::Ok(repo_state)); 502 + } 503 + 504 + if let (Some(repo), Some(prev_commit)) = (&repo_state.data, &commit.prev_data) 505 + && repo 506 + != &prev_commit 507 + .0 508 + .to_ipld() 509 + .into_diagnostic() 510 + .wrap_err("invalid cid from relay")? 511 + { 512 + warn!( 513 + "gap detected for {}: repo {} != commit prev {}. triggering backfill", 514 + did, repo, prev_commit.0 515 + ); 516 + 517 + let mut batch = ctx.state.db.inner.batch(); 518 + let repo_state = ops::update_repo_status( 519 + &mut batch, 520 + &ctx.state.db, 521 + did, 522 + repo_state, 523 + RepoStatus::Backfilling, 524 + )?; 525 + ctx.state.db.update_count("pending", 1); 526 + batch.commit().into_diagnostic()?; 527 + ctx.repo_cache 528 + .insert(did.clone().into_static(), repo_state.clone().into_static()); 529 + ctx.state.notify_backfill(); 530 + return Ok(RepoProcessResult::Syncing(Some(commit))); 531 + } 532 + 533 + let res = ops::apply_commit( 534 + ctx.batch, 535 + &ctx.state.db, 536 + repo_state, 537 + &commit, 538 + Self::fetch_key(ctx, did)?.as_ref(), 539 + )?; 540 + let repo_state = res.repo_state; 541 + ctx.repo_cache 542 + .insert(did.clone().into_static(), repo_state.clone().into_static()); 543 + *ctx.added_blocks += res.blocks_count; 544 + *ctx.records_delta += res.records_delta; 545 + ctx.broadcast_events.push(BroadcastEvent::Persisted( 546 + ctx.state 547 + .db 548 + .next_event_id 549 + .load(std::sync::atomic::Ordering::SeqCst) 550 + - 1, 551 + )); 552 + 553 + Ok(RepoProcessResult::Ok(repo_state)) 554 + } 555 + 556 + // checks the current state of the repo in the database 557 + // if the repo is new, creates initial state and triggers backfill 558 + // handles transitions between states (backfilling -> synced, etc) 559 + fn check_repo_state<'s, 'c>( 560 + ctx: &mut WorkerContext, 561 did: &Did<'_>, 562 + msg: &'c SubscribeReposMessage<'static>, 563 + ) -> Result<RepoProcessResult<'s, 'c>> { 564 // check if we have this repo 565 + if let Some(state) = ctx.repo_cache.get(did) { 566 + return Ok(RepoProcessResult::Ok(state.clone())); 567 } 568 569 let repo_key = keys::repo_key(&did); 570 + let Some(state_bytes) = ctx.state.db.repos.get(&repo_key).into_diagnostic()? else { 571 // we don't know this repo, but we are receiving events for it 572 // this means we should backfill it before processing its events 573 debug!("discovered new account {did} from firehose, queueing backfill"); ··· 576 // using a separate batch here since we want to make it known its being backfilled 577 // immediately. we could use the batch for the unit of work we are doing but 578 // then we wouldn't be able to start backfilling until the unit of work is done 579 580 + let mut batch = ctx.state.db.inner.batch(); 581 batch.insert( 582 + &ctx.state.db.repos, 583 &repo_key, 584 crate::db::ser_repo_state(&new_state)?, 585 ); 586 + batch.insert(&ctx.state.db.pending, &repo_key, &[]); 587 + ctx.state.db.update_count("repos", 1); 588 + ctx.state.db.update_count("pending", 1); 589 batch.commit().into_diagnostic()?; 590 591 + ctx.state.notify_backfill(); 592 593 + return Ok(RepoProcessResult::Syncing(None)); 594 }; 595 let mut repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static(); 596 597 // if we are backfilling or it is new, DON'T mark it as synced yet 598 // the backfill worker will do that when it finishes 599 match &repo_state.status { 600 + RepoStatus::Synced => { 601 + // lazy drain: if there are buffered commits, drain them now 602 + if ops::has_buffered_commits(&ctx.state.db, did) { 603 + Self::drain_resync_buffer(ctx, did, repo_state) 604 + } else { 605 + Ok(RepoProcessResult::Ok(repo_state)) 606 + } 607 + } 608 RepoStatus::Backfilling | RepoStatus::Error(_) => { 609 debug!( 610 "ignoring active status for {did} as it is {:?}", 611 repo_state.status 612 ); 613 + Ok(RepoProcessResult::Syncing(None)) 614 } 615 RepoStatus::Deactivated | RepoStatus::Suspended | RepoStatus::Takendown => { 616 // if it was in deactivated/takendown/suspended state, we can mark it as synced ··· 618 // UNLESS it is an account status event that keeps it deactivated 619 if let SubscribeReposMessage::Account(acc) = msg { 620 if !acc.active { 621 + return Ok(RepoProcessResult::Ok(repo_state)); 622 } 623 } 624 repo_state = ops::update_repo_status( 625 + ctx.batch, 626 + &ctx.state.db, 627 + did, 628 repo_state, 629 RepoStatus::Synced, 630 )?; 631 + ctx.repo_cache 632 + .insert(did.clone().into_static(), repo_state.clone()); 633 + Ok(RepoProcessResult::Ok(repo_state)) 634 + } 635 + } 636 + } 637 + 638 + fn drain_resync_buffer<'s>( 639 + ctx: &mut WorkerContext, 640 + did: &Did, 641 + mut repo_state: RepoState<'s>, 642 + ) -> Result<RepoProcessResult<'s, 'static>> { 643 + let prefix = keys::resync_buffer_prefix(did); 644 + 645 + for guard in ctx.state.db.resync_buffer.prefix(&prefix) { 646 + let (key, value) = guard.into_inner().into_diagnostic()?; 647 + let commit: Commit = rmp_serde::from_slice(&value).into_diagnostic()?; 648 + 649 + let res = Self::process_commit(ctx, did, repo_state, &commit); 650 + let res = match res { 651 + Ok(r) => r, 652 + Err(e) => { 653 + if !Self::check_if_retriable_failure(&e) { 654 + ctx.batch.remove(&ctx.state.db.resync_buffer, key); 655 + } 656 + return Err(e); 657 + } 658 + }; 659 + match res { 660 + RepoProcessResult::Ok(rs) => { 661 + ctx.batch.remove(&ctx.state.db.resync_buffer, key); 662 + repo_state = rs; 663 + } 664 + RepoProcessResult::Syncing(_) => { 665 + return Ok(RepoProcessResult::Syncing(None)); 666 + } 667 + RepoProcessResult::Deleted => { 668 + ctx.batch.remove(&ctx.state.db.resync_buffer, key); 669 + return Ok(RepoProcessResult::Deleted); 670 + } 671 } 672 + } 673 + 674 + Ok(RepoProcessResult::Ok(repo_state)) 675 + } 676 + 677 + fn fetch_key(ctx: &WorkerContext, did: &Did) -> Result<Option<PublicKey<'static>>> { 678 + if ctx.verify_signatures { 679 + let key = ctx 680 + .handle 681 + .block_on(ctx.state.resolver.resolve_signing_key(did)) 682 + .map_err(|e| { 683 + KeyFetchError(miette::miette!("failed to get pubkey for {did}: {e}")) 684 + })?; 685 + Ok(Some(key)) 686 + } else { 687 + Ok(None) 688 } 689 } 690 }
+19 -25
src/main.rs
··· 24 25 info!("{cfg}"); 26 27 - let (state, backfill_rx) = AppState::new(&cfg)?; 28 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 29 let state = Arc::new(state); 30 - 31 - tokio::spawn( 32 - api::serve(state.clone(), cfg.api_port).inspect_err(|e| error!("API server failed: {e}")), 33 - ); 34 - 35 - if cfg.enable_debug { 36 - tokio::spawn( 37 - api::serve_debug(state.clone(), cfg.debug_port) 38 - .inspect_err(|e| error!("debug server failed: {e}")), 39 - ); 40 - } 41 42 if !cfg.disable_backfill { 43 tokio::spawn({ ··· 45 let timeout = cfg.repo_fetch_timeout; 46 BackfillWorker::new( 47 state, 48 - backfill_rx, 49 timeout, 50 cfg.backfill_concurrency_limit, 51 matches!( ··· 55 ) 56 .run() 57 }); 58 - } 59 - 60 - if let Err(e) = spawn_blocking({ 61 - let state = state.clone(); 62 - move || hydrant::backfill::manager::queue_pending_backfills(&state) 63 - }) 64 - .await 65 - .into_diagnostic()? 66 - { 67 - error!("failed to queue pending backfills: {e}"); 68 - db::check_poisoned_report(&e); 69 } 70 71 if let Err(e) = spawn_blocking({ ··· 156 ); 157 } 158 159 - let tasks = if !cfg.disable_firehose { 160 let firehose_worker = std::thread::spawn({ 161 let state = state.clone(); 162 let handle = tokio::runtime::Handle::current(); ··· 195 // essentially we just want to keep the main thread alive for the other components 196 vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>] 197 }; 198 199 let res = futures::future::select_all(tasks); 200 if let (Err(e), _, _) = res.await {
··· 24 25 info!("{cfg}"); 26 27 + let state = AppState::new(&cfg)?; 28 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 29 let state = Arc::new(state); 30 31 if !cfg.disable_backfill { 32 tokio::spawn({ ··· 34 let timeout = cfg.repo_fetch_timeout; 35 BackfillWorker::new( 36 state, 37 + buffer_tx.clone(), 38 timeout, 39 cfg.backfill_concurrency_limit, 40 matches!( ··· 44 ) 45 .run() 46 }); 47 } 48 49 if let Err(e) = spawn_blocking({ ··· 134 ); 135 } 136 137 + let mut tasks = if !cfg.disable_firehose { 138 let firehose_worker = std::thread::spawn({ 139 let state = state.clone(); 140 let handle = tokio::runtime::Handle::current(); ··· 173 // essentially we just want to keep the main thread alive for the other components 174 vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>] 175 }; 176 + 177 + let state_api = state.clone(); 178 + tasks.push(Box::pin(async move { 179 + api::serve(state_api, cfg.api_port) 180 + .await 181 + .map_err(|e| miette::miette!("API server failed: {e}")) 182 + }) as BoxFuture<_>); 183 + 184 + if cfg.enable_debug { 185 + let state_debug = state.clone(); 186 + tasks.push(Box::pin(async move { 187 + api::serve_debug(state_debug, cfg.debug_port) 188 + .await 189 + .map_err(|e| miette::miette!("debug server failed: {e}")) 190 + }) as BoxFuture<_>); 191 + } 192 193 let res = futures::future::select_all(tasks); 194 if let (Err(e), _, _) = res.await {
+21 -7
src/ops.rs
··· 1 use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 2 use crate::db::{self, Db, keys, ser_repo_state}; 3 - use crate::state::AppState; 4 use crate::types::{ 5 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState, 6 StoredEvent, ··· 10 use jacquard::IntoStatic; 11 12 use jacquard::types::cid::Cid; 13 use jacquard_api::com_atproto::sync::subscribe_repos::Commit; 14 use jacquard_common::types::crypto::PublicKey; 15 use jacquard_repo::car::reader::parse_car_bytes; ··· 19 use std::time::Instant; 20 use tracing::{debug, trace, warn}; 21 22 - pub fn send_backfill_req(state: &AppState, did: jacquard::types::did::Did<'static>) -> Result<()> { 23 - state 24 - .backfill_tx 25 - .send(did.clone()) 26 - .map_err(|_| miette::miette!("failed to send backfill request for {did}"))?; 27 - let _ = state.blocked_dids.insert_sync(did); 28 Ok(()) 29 } 30 31 // emitting identity is ephemeral ··· 66 batch.remove(&db.repos, &repo_key); 67 batch.remove(&db.pending, &repo_key); 68 batch.remove(&db.resync, &repo_key); 69 70 // 2. delete from records (all partitions) 71 let mut partitions = Vec::new(); ··· 151 152 Ok(repo_state) 153 } 154 pub fn verify_sync_event(blocks: &[u8], key: Option<&PublicKey>) -> Result<(Cid<'static>, String)> { 155 let parsed = tokio::task::block_in_place(|| { 156 tokio::runtime::Handle::current()
··· 1 use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 2 use crate::db::{self, Db, keys, ser_repo_state}; 3 use crate::types::{ 4 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState, 5 StoredEvent, ··· 9 use jacquard::IntoStatic; 10 11 use jacquard::types::cid::Cid; 12 + use jacquard::types::did::Did; 13 use jacquard_api::com_atproto::sync::subscribe_repos::Commit; 14 use jacquard_common::types::crypto::PublicKey; 15 use jacquard_repo::car::reader::parse_car_bytes; ··· 19 use std::time::Instant; 20 use tracing::{debug, trace, warn}; 21 22 + pub fn persist_to_resync_buffer(db: &Db, did: &Did, commit: &Commit) -> Result<()> { 23 + let key = keys::resync_buffer_key(did, DbTid::from(&commit.rev)); 24 + let value = rmp_serde::to_vec(commit).into_diagnostic()?; 25 + db.resync_buffer.insert(key, value).into_diagnostic()?; 26 + debug!( 27 + "buffered commit seq {} for {did} to resync_buffer", 28 + commit.seq 29 + ); 30 Ok(()) 31 + } 32 + 33 + pub fn has_buffered_commits(db: &Db, did: &Did) -> bool { 34 + let prefix = keys::resync_buffer_prefix(did); 35 + db.resync_buffer.prefix(&prefix).next().is_some() 36 } 37 38 // emitting identity is ephemeral ··· 73 batch.remove(&db.repos, &repo_key); 74 batch.remove(&db.pending, &repo_key); 75 batch.remove(&db.resync, &repo_key); 76 + 77 + let resync_prefix = keys::resync_buffer_prefix(did); 78 + for guard in db.resync_buffer.prefix(&resync_prefix) { 79 + let k = guard.key().into_diagnostic()?; 80 + batch.remove(&db.resync_buffer, k); 81 + } 82 83 // 2. delete from records (all partitions) 84 let mut partitions = Vec::new(); ··· 164 165 Ok(repo_state) 166 } 167 + 168 pub fn verify_sync_event(blocks: &[u8], key: Option<&PublicKey>) -> Result<(Cid<'static>, String)> { 169 let parsed = tokio::task::block_in_place(|| { 170 tokio::runtime::Handle::current()
+13 -20
src/state.rs
··· 1 use std::sync::atomic::AtomicI64; 2 3 - use jacquard_common::types::string::Did; 4 - use tokio::sync::mpsc; 5 - 6 use miette::Result; 7 8 use crate::{config::Config, db::Db, resolver::Resolver}; 9 10 - pub type BackfillTx = mpsc::UnboundedSender<Did<'static>>; 11 - pub type BackfillRx = mpsc::UnboundedReceiver<Did<'static>>; 12 - 13 pub struct AppState { 14 pub db: Db, 15 - pub backfill_tx: BackfillTx, 16 pub resolver: Resolver, 17 pub cur_firehose: AtomicI64, 18 - pub blocked_dids: scc::HashSet<Did<'static>>, 19 } 20 21 impl AppState { 22 - pub fn new(config: &Config) -> Result<(Self, BackfillRx)> { 23 let db = Db::open( 24 &config.database_path, 25 config.cache_size, 26 config.disable_lz4_compression, 27 )?; 28 let resolver = Resolver::new(config.plc_url.clone(), config.identity_cache_size); 29 - let (backfill_tx, backfill_rx) = mpsc::unbounded_channel(); 30 31 - Ok(( 32 - Self { 33 - db, 34 - backfill_tx, 35 - resolver, 36 - cur_firehose: AtomicI64::new(0), 37 - blocked_dids: scc::HashSet::new(), 38 - }, 39 - backfill_rx, 40 - )) 41 } 42 }
··· 1 use std::sync::atomic::AtomicI64; 2 3 use miette::Result; 4 + use tokio::sync::Notify; 5 6 use crate::{config::Config, db::Db, resolver::Resolver}; 7 8 pub struct AppState { 9 pub db: Db, 10 pub resolver: Resolver, 11 pub cur_firehose: AtomicI64, 12 + pub backfill_notify: Notify, 13 } 14 15 impl AppState { 16 + pub fn new(config: &Config) -> Result<Self> { 17 let db = Db::open( 18 &config.database_path, 19 config.cache_size, 20 config.disable_lz4_compression, 21 )?; 22 let resolver = Resolver::new(config.plc_url.clone(), config.identity_cache_size); 23 + 24 + Ok(Self { 25 + db, 26 + resolver, 27 + cur_firehose: AtomicI64::new(0), 28 + backfill_notify: Notify::new(), 29 + }) 30 + } 31 32 + pub fn notify_backfill(&self) { 33 + self.backfill_notify.notify_one(); 34 } 35 }