···11-use crate::db::{keys, Db};
22-use crate::state::AppState;
33-use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage};
44-use jacquard::types::did::Did;
55-use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient};
66-use jacquard_common::IntoStatic;
77-use miette::Result;
88-use n0_future::StreamExt;
99-use std::sync::atomic::Ordering;
1010-use std::sync::Arc;
1111-use std::time::Duration;
1212-use tracing::{debug, error, info};
1313-use url::Url;
1414-1515-pub struct Ingestor {
1616- state: Arc<AppState>,
1717- relay_host: Url,
1818- full_network: bool,
1919-}
2020-2121-impl Ingestor {
2222- pub fn new(state: Arc<AppState>, relay_host: Url, full_network: bool) -> Self {
2323- Self {
2424- state,
2525- relay_host,
2626- full_network,
2727- }
2828- }
2929-3030- pub async fn run(mut self) -> Result<()> {
3131- loop {
3232- // 1. load cursor
3333- let current_cursor = self.state.cur_firehose.load(Ordering::SeqCst);
3434- let start_cursor = if current_cursor > 0 {
3535- Some(current_cursor)
3636- } else {
3737- let cursor_key = b"firehose_cursor";
3838- if let Ok(Some(bytes)) =
3939- crate::db::Db::get(self.state.db.cursors.clone(), cursor_key.to_vec()).await
4040- {
4141- let s = String::from_utf8_lossy(&bytes);
4242- debug!("resuming from cursor: {}", s);
4343- s.parse::<i64>().ok()
4444- } else {
4545- info!("no cursor found, live tailing");
4646- None
4747- }
4848- };
11+use jacquard_api::com_atproto::sync::subscribe_repos::SubscribeReposMessage;
22+use tokio::sync::mpsc;
4935050- if let Some(c) = start_cursor {
5151- self.state.cur_firehose.store(c, Ordering::SeqCst);
5252- }
44+pub mod firehose;
55+pub mod worker;
5365454- // 2. connect
5555- let client = TungsteniteSubscriptionClient::from_base_uri(self.relay_host.clone());
5656- let params = if let Some(c) = start_cursor {
5757- SubscribeRepos::new().cursor(c).build()
5858- } else {
5959- SubscribeRepos::new().build()
6060- };
77+pub type BufferedMessage = SubscribeReposMessage<'static>;
6186262- let stream = match client.subscribe(¶ms).await {
6363- Ok(s) => s,
6464- Err(e) => {
6565- error!("failed to connect to firehose: {e}, retrying in 5s...");
6666- tokio::time::sleep(Duration::from_secs(5)).await;
6767- continue;
6868- }
6969- };
7070-7171- let (_sink, mut messages) = stream.into_stream();
7272-7373- info!("firehose connected");
7474-7575- // 3. process loop
7676- while let Some(msg_res) = messages.next().await {
7777- match msg_res {
7878- Ok(msg) => self.handle_message(msg).await,
7979- Err(e) => {
8080- error!("firehose stream error: {e}");
8181- break;
8282- }
8383- }
8484- }
8585-8686- error!("firehose disconnected, reconnecting in 5s...");
8787- tokio::time::sleep(Duration::from_secs(5)).await;
8888- }
8989- }
9090-9191- async fn handle_message(&mut self, msg: SubscribeReposMessage<'_>) {
9292- let (did, seq) = match &msg {
9393- SubscribeReposMessage::Commit(commit) => (&commit.repo, commit.seq),
9494- SubscribeReposMessage::Identity(identity) => (&identity.did, identity.seq),
9595- SubscribeReposMessage::Account(account) => (&account.did, account.seq),
9696- _ => return,
9797- };
9898-9999- if !self.should_process(did).await.unwrap_or(false) {
100100- return;
101101- }
102102-103103- self.state.cur_firehose.store(seq, Ordering::SeqCst);
104104-105105- let buffered_at = chrono::Utc::now().timestamp_millis();
106106-107107- // persist to DB for crash recovery
108108- let db_key = keys::buffer_key(&did, buffered_at);
109109- if let Ok(bytes) = rmp_serde::to_vec(&msg) {
110110- if let Err(e) = Db::insert(self.state.db.buffer.clone(), db_key, bytes).await {
111111- error!("failed to persist buffered message: {e}");
112112- }
113113- }
114114-115115- // always buffer through the BufferProcessor
116116- let buffered_msg = crate::buffer::BufferedMessage {
117117- did: did.clone().into_static(),
118118- msg: msg.into_static(),
119119- buffered_at,
120120- };
121121-122122- if let Err(e) = self.state.buffer_tx.send(buffered_msg) {
123123- error!("failed to send message to buffer processor: {e}");
124124- }
125125- }
126126-127127- async fn should_process(&self, did: &Did<'_>) -> Result<bool> {
128128- if self.full_network {
129129- return Ok(true);
130130- }
131131- let did_key = keys::repo_key(did);
132132- Db::contains_key(self.state.db.repos.clone(), did_key).await
133133- }
134134-}
99+pub type BufferTx = mpsc::UnboundedSender<BufferedMessage>;
1010+#[allow(dead_code)]
1111+pub type BufferRx = mpsc::UnboundedReceiver<BufferedMessage>;
+302
src/ingest/worker.rs
···11+use crate::db::{self, keys};
22+use crate::ingest::BufferedMessage;
33+use crate::ops::{self, send_backfill_req};
44+use crate::state::AppState;
55+use crate::types::{AccountEvt, IdentityEvt, RepoState, RepoStatus};
66+use jacquard::api::com_atproto::sync::subscribe_repos::SubscribeReposMessage;
77+88+use fjall::OwnedWriteBatch;
99+use jacquard::cowstr::ToCowStr;
1010+use jacquard::types::did::Did;
1111+use jacquard_common::IntoStatic;
1212+use miette::{IntoDiagnostic, Result};
1313+use smol_str::ToSmolStr;
1414+use std::collections::HashSet;
1515+use std::sync::Arc;
1616+use std::time::Duration;
1717+use tokio::sync::mpsc;
1818+use tracing::{debug, error, trace, warn};
1919+2020+#[derive(Debug, Clone, Copy)]
2121+enum ProcessResult {
2222+ Deleted,
2323+ Ok,
2424+}
2525+2626+enum RepoCheckResult {
2727+ Syncing,
2828+ Ok(RepoState<'static>),
2929+}
3030+3131+pub struct FirehoseWorker {
3232+ state: Arc<AppState>,
3333+ rx: mpsc::UnboundedReceiver<BufferedMessage>,
3434+}
3535+3636+impl FirehoseWorker {
3737+ pub fn new(state: Arc<AppState>, rx: mpsc::UnboundedReceiver<BufferedMessage>) -> Self {
3838+ Self { state, rx }
3939+ }
4040+4141+ pub fn run(mut self, handle: tokio::runtime::Handle) -> Result<()> {
4242+ const BUF_SIZE: usize = 500;
4343+ let mut buf = Vec::<BufferedMessage>::with_capacity(BUF_SIZE);
4444+ let mut failed = Vec::<BufferedMessage>::new();
4545+4646+ loop {
4747+ let mut batch = self.state.db.inner.batch();
4848+ let mut deleted = HashSet::new();
4949+5050+ for msg in buf.drain(..) {
5151+ let (did, seq) = match &msg {
5252+ SubscribeReposMessage::Commit(c) => (&c.repo, c.seq),
5353+ SubscribeReposMessage::Identity(i) => (&i.did, i.seq),
5454+ SubscribeReposMessage::Account(a) => (&a.did, a.seq),
5555+ SubscribeReposMessage::Sync(s) => (&s.did, s.seq),
5656+ _ => continue,
5757+ };
5858+5959+ if self.state.blocked_dids.contains_sync(did) {
6060+ failed.push(msg);
6161+ continue;
6262+ }
6363+ if deleted.contains(did) {
6464+ continue;
6565+ }
6666+6767+ match Self::process_message(&self.state, &mut batch, &msg, did) {
6868+ Ok(ProcessResult::Ok) => {}
6969+ Ok(ProcessResult::Deleted) => {
7070+ deleted.insert(did.clone());
7171+ }
7272+ Err(e) => {
7373+ error!("failed to process buffered message for {did}: {e}");
7474+ db::check_poisoned_report(&e);
7575+ failed.push(msg);
7676+ }
7777+ }
7878+7979+ self.state
8080+ .cur_firehose
8181+ .store(seq, std::sync::atomic::Ordering::SeqCst);
8282+ }
8383+8484+ // commit all changes to db
8585+ batch.commit().into_diagnostic()?;
8686+ self.state
8787+ .db
8888+ .inner
8989+ .persist(fjall::PersistMode::Buffer)
9090+ .into_diagnostic()?;
9191+9292+ // add failed back to buf here so the ordering is preserved
9393+ if !failed.is_empty() {
9494+ buf.append(&mut failed);
9595+ }
9696+9797+ // wait until we receive some messages
9898+ // this does mean we will have an up to 1 second delay, before we send events to consumers
9999+ // but thats reasonable imo, could also be configured of course
100100+ let _ = handle.block_on(tokio::time::timeout(
101101+ Duration::from_secs(1),
102102+ self.rx.recv_many(&mut buf, BUF_SIZE),
103103+ ));
104104+ if buf.is_empty() {
105105+ if self.rx.is_closed() {
106106+ error!("ingestor crashed? shutting down buffer processor");
107107+ break;
108108+ }
109109+ continue;
110110+ }
111111+ }
112112+113113+ Ok(())
114114+ }
115115+116116+ fn process_message(
117117+ state: &AppState,
118118+ batch: &mut OwnedWriteBatch,
119119+ msg: &BufferedMessage,
120120+ did: &Did,
121121+ ) -> Result<ProcessResult> {
122122+ let RepoCheckResult::Ok(repo_state) = Self::check_repo_state(batch, state, did)? else {
123123+ return Ok(ProcessResult::Ok);
124124+ };
125125+126126+ match msg {
127127+ SubscribeReposMessage::Commit(commit) => {
128128+ trace!("processing buffered commit for {did}");
129129+130130+ if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.as_str()) {
131131+ debug!(
132132+ "skipping replayed event for {}: {} <= {}",
133133+ did,
134134+ commit.rev,
135135+ repo_state.rev.as_ref().expect("we checked in if")
136136+ );
137137+ return Ok(ProcessResult::Ok);
138138+ }
139139+140140+ if let (Some(prev_repo), Some(prev_commit)) = (&repo_state.data, &commit.prev_data)
141141+ && prev_repo != &prev_commit.0
142142+ {
143143+ warn!(
144144+ "gap detected for {}: prev {} != stored {}. triggering backfill",
145145+ did, prev_repo, prev_commit.0
146146+ );
147147+148148+ let mut batch = state.db.inner.batch();
149149+ ops::update_repo_status(
150150+ &mut batch,
151151+ &state.db,
152152+ did,
153153+ repo_state,
154154+ RepoStatus::Backfilling,
155155+ )?;
156156+ batch.commit().into_diagnostic()?;
157157+158158+ send_backfill_req(state, did.clone().into_static())?;
159159+160160+ return Ok(ProcessResult::Ok);
161161+ }
162162+163163+ ops::apply_commit(batch, &state.db, repo_state, &commit)?();
164164+ }
165165+ SubscribeReposMessage::Identity(identity) => {
166166+ debug!("processing buffered identity for {did}");
167167+ let handle = identity
168168+ .handle
169169+ .as_ref()
170170+ .map(|h| h.to_cowstr().into_static());
171171+172172+ let evt = IdentityEvt {
173173+ did: did.clone().into_static(),
174174+ handle,
175175+ };
176176+ ops::emit_identity_event(&state.db, evt);
177177+ }
178178+ SubscribeReposMessage::Account(account) => {
179179+ debug!("processing buffered account for {did}");
180180+ let evt = AccountEvt {
181181+ did: did.clone().into_static(),
182182+ active: account.active,
183183+ status: account.status.as_ref().map(|s| s.to_cowstr().into_static()),
184184+ };
185185+186186+ if !account.active {
187187+ use jacquard::api::com_atproto::sync::subscribe_repos::AccountStatus;
188188+ match &account.status {
189189+ Some(AccountStatus::Deleted) => {
190190+ debug!("account {did} deleted, wiping data");
191191+ ops::delete_repo(batch, &state.db, did)?;
192192+ return Ok(ProcessResult::Deleted);
193193+ }
194194+ status => {
195195+ let status = match status {
196196+ Some(status) => match status {
197197+ AccountStatus::Deleted => {
198198+ unreachable!("deleted account status is handled before")
199199+ }
200200+ AccountStatus::Takendown => RepoStatus::Takendown,
201201+ AccountStatus::Suspended => RepoStatus::Suspended,
202202+ AccountStatus::Deactivated => RepoStatus::Deactivated,
203203+ AccountStatus::Throttled => {
204204+ RepoStatus::Error("throttled".into())
205205+ }
206206+ AccountStatus::Desynchronized => {
207207+ RepoStatus::Error("desynchronized".into())
208208+ }
209209+ AccountStatus::Other(s) => {
210210+ warn!(
211211+ "unknown account status for {did}, will put in error state: {s}"
212212+ );
213213+ RepoStatus::Error(s.to_smolstr())
214214+ }
215215+ },
216216+ None => {
217217+ warn!("account {did} inactive but no status provided");
218218+ RepoStatus::Error("unknown".into())
219219+ }
220220+ };
221221+ ops::update_repo_status(batch, &state.db, did, repo_state, status)?;
222222+ }
223223+ }
224224+ } else {
225225+ // normally we would initiate backfill here
226226+ // but we don't have to do anything because:
227227+ // 1. we handle changing repo status to Synced before this (in check repo state)
228228+ // 2. initiating backfilling is also handled there
229229+ }
230230+231231+ ops::emit_account_event(&state.db, evt);
232232+ }
233233+ _ => {
234234+ warn!("unknown message type in buffer for {did}");
235235+ }
236236+ }
237237+238238+ Ok(ProcessResult::Ok)
239239+ }
240240+241241+ fn check_repo_state(
242242+ batch: &mut OwnedWriteBatch,
243243+ state: &AppState,
244244+ did: &Did<'_>,
245245+ ) -> Result<RepoCheckResult> {
246246+ // check if we have this repo
247247+ let repo_key = keys::repo_key(&did);
248248+ let Some(state_bytes) = state.db.repos.get(&repo_key).into_diagnostic()? else {
249249+ // we don't know this repo, but we are receiving events for it
250250+ // this means we should backfill it before processing its events
251251+ debug!("discovered new account {did} from firehose, queueing backfill");
252252+253253+ let new_state = RepoState::backfilling(did);
254254+ // using a separate batch here since we want to make it known its being backfilled
255255+ // immediately. we could use the batch for the unit of work we are doing but
256256+ // then we wouldn't be able to start backfilling until the unit of work is done
257257+ let mut batch = state.db.inner.batch();
258258+259259+ batch.insert(
260260+ &state.db.repos,
261261+ &repo_key,
262262+ crate::db::ser_repo_state(&new_state)?,
263263+ );
264264+ batch.insert(&state.db.pending, &repo_key, &[]);
265265+ batch.commit().into_diagnostic()?;
266266+267267+ send_backfill_req(state, did.clone().into_static())?;
268268+269269+ return Ok(RepoCheckResult::Syncing);
270270+ };
271271+ let mut repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static();
272272+273273+ // if we are backfilling or it is new, DON'T mark it as synced yet
274274+ // the backfill worker will do that when it finishes
275275+ match &repo_state.status {
276276+ RepoStatus::Synced => Ok(RepoCheckResult::Ok(repo_state)),
277277+ RepoStatus::Backfilling | RepoStatus::Error(_) => {
278278+ // repo is being backfilled or is in error state
279279+ // we dont touch the state because the backfill worker will do that
280280+ // we should not really get here because the backfill worker should have marked it as
281281+ // being worked on (blocked repos) meaning we would have returned earlier
282282+ debug!(
283283+ "ignoring active status for {did} as it is {:?}",
284284+ repo_state.status
285285+ );
286286+ Ok(RepoCheckResult::Syncing)
287287+ }
288288+ RepoStatus::Deactivated | RepoStatus::Suspended | RepoStatus::Takendown => {
289289+ // if it was in deactivated/takendown/suspended state, we can mark it as synced
290290+ // because we are receiving live events now
291291+ repo_state = ops::update_repo_status(
292292+ batch,
293293+ &state.db,
294294+ &did,
295295+ repo_state,
296296+ RepoStatus::Synced,
297297+ )?;
298298+ Ok(RepoCheckResult::Ok(repo_state))
299299+ }
300300+ }
301301+ }
302302+}
+37-25
src/main.rs
···11mod api;
22mod backfill;
33-mod buffer;
43mod config;
54mod crawler;
65mod db;
···109mod state;
1110mod types;
12111313-use crate::backfill::Worker;
1414-use crate::buffer::processor::BufferProcessor;
1512use crate::config::Config;
1613use crate::crawler::Crawler;
1717-use crate::db::Db;
1818-use crate::ingest::Ingestor;
1414+use crate::db::set_firehose_cursor;
1515+use crate::ingest::firehose::FirehoseIngestor;
1916use crate::state::AppState;
1717+use crate::{backfill::BackfillWorker, ingest::worker::FirehoseWorker};
2018use futures::{future::BoxFuture, FutureExt, TryFutureExt};
2119use miette::IntoDiagnostic;
2220use mimalloc::MiMalloc;
2321use std::sync::atomic::Ordering;
2422use std::sync::Arc;
2525-use tokio::task::spawn_blocking;
2323+use tokio::{sync::mpsc, task::spawn_blocking};
2624use tracing::{error, info};
27252826#[global_allocator]
···37353836 info!("{cfg}");
39374040- let (state, backfill_rx, buffer_rx) = AppState::new(&cfg)?;
3838+ let (state, backfill_rx) = AppState::new(&cfg)?;
3939+ let (buffer_tx, buffer_rx) = mpsc::unbounded_channel();
4140 let state = Arc::new(state);
42414342 tokio::spawn(
···5453 tokio::spawn({
5554 let state = state.clone();
5655 let timeout = cfg.repo_fetch_timeout;
5757- Worker::new(state, backfill_rx, timeout, cfg.backfill_concurrency_limit).run()
5656+ BackfillWorker::new(state, backfill_rx, timeout, cfg.backfill_concurrency_limit).run()
5857 });
59586060- let buffer_processor_task = tokio::spawn({
5959+ let firehose_worker = std::thread::spawn({
6160 let state = state.clone();
6262- BufferProcessor::new(state, buffer_rx).run()
6161+ let handle = tokio::runtime::Handle::current();
6262+ move || FirehoseWorker::new(state, buffer_rx).run(handle)
6363 });
64646565 if let Err(e) = spawn_blocking({
···7070 .into_diagnostic()?
7171 {
7272 error!("failed to queue pending backfills: {e}");
7373- Db::check_poisoned_report(&e);
7373+ db::check_poisoned_report(&e);
7474 }
75757676 if let Err(e) = spawn_blocking({
···8181 .into_diagnostic()?
8282 {
8383 error!("failed to queue gone backfills: {e}");
8484- Db::check_poisoned_report(&e);
8484+ db::check_poisoned_report(&e);
8585 }
86868787 std::thread::spawn({
···127127 loop {
128128 std::thread::sleep(persist_interval);
129129130130+ // persist firehose cursor
130131 let seq = state.cur_firehose.load(Ordering::SeqCst);
131131- const CURSOR_KEY: &[u8] = b"firehose_cursor";
132132- if let Err(e) = state
133133- .db
134134- .cursors
135135- .insert(CURSOR_KEY, seq.to_string().into_bytes())
136136- {
132132+ if let Err(e) = set_firehose_cursor(&state.db, seq) {
137133 error!("failed to save cursor: {e}");
138138- Db::check_poisoned(&e);
134134+ db::check_poisoned_report(&e);
139135 }
140136137137+ // persist counts
138138+ // TODO: make this more durable
139139+ if let Err(e) = db::persist_counts(&state.db) {
140140+ error!("failed to persist counts: {e}");
141141+ db::check_poisoned_report(&e);
142142+ }
143143+144144+ // persist journal
141145 if let Err(e) = state.db.persist() {
142146 error!("db persist failed: {e}");
143143- Db::check_poisoned_report(&e);
147147+ db::check_poisoned_report(&e);
144148 }
145149 }
146150 }
···152156 .run()
153157 .inspect_err(|e| {
154158 error!("crawler died: {e}");
155155- Db::check_poisoned_report(&e);
159159+ db::check_poisoned_report(&e);
156160 }),
157161 );
158162 }
159163160160- let ingestor = Ingestor::new(state.clone(), cfg.relay_host, cfg.full_network);
164164+ let ingestor =
165165+ FirehoseIngestor::new(state.clone(), buffer_tx, cfg.relay_host, cfg.full_network);
161166162167 let res = futures::future::try_join_all::<[BoxFuture<_>; _]>([
163163- Box::pin(buffer_processor_task.map(|r| r.into_diagnostic().flatten())),
168168+ Box::pin(
169169+ tokio::task::spawn_blocking(move || {
170170+ firehose_worker
171171+ .join()
172172+ .map_err(|e| miette::miette!("buffer processor thread died: {e:?}"))
173173+ })
174174+ .map(|r| r.into_diagnostic().flatten().flatten()),
175175+ ),
164176 Box::pin(ingestor.run()),
165177 ]);
166178 if let Err(e) = res.await {
167179 error!("ingestor or buffer processor died: {e}");
168168- Db::check_poisoned_report(&e);
180180+ db::check_poisoned_report(&e);
169181 }
170182171183 if let Err(e) = state.db.persist() {
172172- Db::check_poisoned_report(&e);
184184+ db::check_poisoned_report(&e);
173185 return Err(e);
174186 }
175187