at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[ingest] extract buffer processor to dedicated module

ptr.pet 577e3c79 befe77f8

verified
+271 -485
+23
Cargo.lock
··· 1636 1636 "n0-future 0.3.2", 1637 1637 "reqwest", 1638 1638 "rmp-serde", 1639 + "scc", 1639 1640 "serde", 1640 1641 "serde_ipld_dagcbor", 1641 1642 "serde_json", ··· 3528 3529 checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" 3529 3530 3530 3531 [[package]] 3532 + name = "saa" 3533 + version = "5.4.5" 3534 + source = "registry+https://github.com/rust-lang/crates.io-index" 3535 + checksum = "78d0325695c3a94a3751f300fbf253ccc33db53eeed81b1474eec1081921e43e" 3536 + 3537 + [[package]] 3531 3538 name = "safemem" 3532 3539 version = "0.3.3" 3533 3540 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3543 3550 ] 3544 3551 3545 3552 [[package]] 3553 + name = "scc" 3554 + version = "3.5.4" 3555 + source = "registry+https://github.com/rust-lang/crates.io-index" 3556 + checksum = "4d69c5cd152521a58dce71a935578f472af5f01bfc6c48d453e869d47285e31c" 3557 + dependencies = [ 3558 + "saa", 3559 + "sdd", 3560 + ] 3561 + 3562 + [[package]] 3546 3563 name = "schannel" 3547 3564 version = "0.1.28" 3548 3565 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3562 3579 version = "1.2.0" 3563 3580 source = "registry+https://github.com/rust-lang/crates.io-index" 3564 3581 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 3582 + 3583 + [[package]] 3584 + name = "sdd" 3585 + version = "4.6.0" 3586 + source = "registry+https://github.com/rust-lang/crates.io-index" 3587 + checksum = "caac2546b7d3b36e0ca5d57a5435b4d5967d3d2f9b72c5a492c7a9893684daed" 3565 3588 3566 3589 [[package]] 3567 3590 name = "sec1"
+1
Cargo.toml
··· 38 38 humantime = "2.3.0" 39 39 40 40 mimalloc = { version = "0.1", features = ["v3"] } 41 + scc = "3"
+9
src/buffer/mod.rs
··· 1 + use jacquard::{api::com_atproto::sync::subscribe_repos::SubscribeReposMessage, types::did::Did}; 2 + 3 + pub mod processor; 4 + 5 + pub struct BufferedMessage { 6 + pub did: Did<'static>, 7 + pub msg: SubscribeReposMessage<'static>, 8 + pub buffered_at: i64, 9 + }
+156
src/buffer/processor.rs
··· 1 + use crate::db::{keys, Db}; 2 + use crate::ops; 3 + use crate::state::AppState; 4 + use crate::types::{AccountEvt, IdentityEvt}; 5 + 6 + use super::BufferedMessage; 7 + use jacquard::api::com_atproto::sync::subscribe_repos::SubscribeReposMessage; 8 + use jacquard::types::did::Did; 9 + use jacquard_common::IntoStatic; 10 + use miette::{IntoDiagnostic, Result}; 11 + use smol_str::ToSmolStr; 12 + use std::collections::{HashMap, VecDeque}; 13 + use std::sync::Arc; 14 + use tokio::sync::mpsc; 15 + use tokio::task::spawn_blocking; 16 + use tracing::{debug, error, info, trace, warn}; 17 + 18 + pub struct BufferProcessor { 19 + state: Arc<AppState>, 20 + rx: mpsc::UnboundedReceiver<BufferedMessage>, 21 + } 22 + 23 + impl BufferProcessor { 24 + pub fn new(state: Arc<AppState>, rx: mpsc::UnboundedReceiver<BufferedMessage>) -> Self { 25 + Self { state, rx } 26 + } 27 + 28 + pub async fn run(mut self) -> Result<()> { 29 + let mut queues: HashMap<Did<'static>, VecDeque<BufferedMessage>> = HashMap::new(); 30 + 31 + // recover from DB 32 + let recovered = self.recover_from_db().await?; 33 + if !recovered.is_empty() { 34 + info!("recovered {} buffered messages from db", recovered.len()); 35 + for msg in recovered { 36 + queues.entry(msg.did.clone()).or_default().push_back(msg); 37 + } 38 + } 39 + 40 + let mut to_remove: Vec<Did<'static>> = Vec::new(); 41 + 42 + loop { 43 + // receive new messages (non-blocking drain) 44 + while let Ok(msg) = self.rx.try_recv() { 45 + queues.entry(msg.did.clone()).or_default().push_back(msg); 46 + } 47 + 48 + // process unblocked DIDs 49 + for (did, queue) in &mut queues { 50 + if self.state.blocked_dids.contains_sync(did) { 51 + continue; 52 + } 53 + 54 + while let Some(buffered) = queue.pop_front() { 55 + if let Err(e) = self.process_message(buffered).await { 56 + error!("failed to process buffered message for {did}: {e}"); 57 + Db::check_poisoned_report(&e); 58 + } 59 + } 60 + 61 + if queue.is_empty() { 62 + to_remove.push(did.clone()); 63 + } 64 + } 65 + 66 + for did in to_remove.drain(..) { 67 + queues.remove(&did); 68 + } 69 + 70 + // wait until we receive a new message 71 + match self.rx.recv().await { 72 + Some(msg) => { 73 + queues.entry(msg.did.clone()).or_default().push_back(msg); 74 + } 75 + None => { 76 + debug!("buffer processor channel closed, exiting"); 77 + break; 78 + } 79 + } 80 + } 81 + 82 + Ok(()) 83 + } 84 + 85 + async fn process_message(&self, buffered: BufferedMessage) -> Result<()> { 86 + let did = buffered.did; 87 + let buffered_at = buffered.buffered_at; 88 + 89 + match buffered.msg { 90 + SubscribeReposMessage::Commit(commit) => { 91 + trace!("processing buffered commit for {did}"); 92 + let state = self.state.clone(); 93 + tokio::task::spawn_blocking(move || ops::apply_commit(&state.db, &commit, true)) 94 + .await 95 + .into_diagnostic()??; 96 + } 97 + SubscribeReposMessage::Identity(identity) => { 98 + debug!("processing buffered identity for {did}"); 99 + let handle = identity.handle.as_ref().map(|h| h.as_str().to_smolstr()); 100 + let evt = IdentityEvt { 101 + did: did.to_smolstr(), 102 + handle, 103 + }; 104 + ops::emit_identity_event(&self.state.db, evt); 105 + } 106 + SubscribeReposMessage::Account(ref account) => { 107 + debug!("processing buffered account for {did}"); 108 + let evt = AccountEvt { 109 + did: did.to_smolstr(), 110 + active: account.active, 111 + status: account.status.as_ref().map(|s| s.to_smolstr()), 112 + }; 113 + ops::emit_account_event(&self.state.db, evt); 114 + } 115 + _ => { 116 + warn!("unknown message type in buffer for {did}"); 117 + } 118 + } 119 + 120 + // remove from DB buffer 121 + self.remove_from_db_buffer(&did, buffered_at).await?; 122 + 123 + Ok(()) 124 + } 125 + 126 + async fn recover_from_db(&self) -> Result<Vec<BufferedMessage>> { 127 + let state = self.state.clone(); 128 + 129 + spawn_blocking(move || { 130 + let mut recovered = Vec::new(); 131 + for item in state.db.buffer.iter() { 132 + let (key, value) = item.into_inner().into_diagnostic()?; 133 + let (did, ts) = keys::parse_buffer_key(&key)?; 134 + let msg = 135 + rmp_serde::from_slice::<SubscribeReposMessage>(&value).into_diagnostic()?; 136 + recovered.push(BufferedMessage { 137 + did, 138 + msg: msg.into_static(), 139 + buffered_at: ts, 140 + }); 141 + } 142 + // ensure chronological order across all DIDs 143 + recovered.sort_by_key(|m| m.buffered_at); 144 + Ok(recovered) 145 + }) 146 + .await 147 + .into_diagnostic() 148 + .flatten() 149 + } 150 + 151 + async fn remove_from_db_buffer(&self, did: &str, buffered_at: i64) -> Result<()> { 152 + let key = keys::buffer_key(did, buffered_at); 153 + self.state.db.buffer.remove(key).into_diagnostic()?; 154 + Ok(()) 155 + } 156 + }
+27 -5
src/db/keys.rs
··· 1 1 use jacquard_common::types::string::Did; 2 - use miette::{Context, IntoDiagnostic}; 2 + use miette::{Context, IntoDiagnostic, Result}; 3 3 4 4 /// separator used for composite keys 5 5 pub const SEP: u8 = 0x00; ··· 8 8 did.as_str().trim_start_matches("did:") 9 9 } 10 10 11 - pub fn reconstruct_did<'a>(trimmed_did: &'a str) -> Result<Did<'static>, miette::Error> { 11 + pub fn reconstruct_did<'a>(trimmed_did: &'a str) -> Result<Did<'static>> { 12 12 Did::new_owned(format!("did:{trimmed_did}")) 13 13 .into_diagnostic() 14 14 .wrap_err("expected did to be trimmed") ··· 32 32 } 33 33 34 34 // key format: {DID} 35 - pub fn buffer_prefix<'a>(did: &'a Did) -> &'a [u8] { 36 - repo_key(did) 37 - } 38 35 39 36 // key format: {SEQ} 40 37 pub fn event_key(seq: i64) -> [u8; 8] { ··· 67 64 key.extend_from_slice(collection.as_bytes()); 68 65 key 69 66 } 67 + 68 + // key format: {DID}\x00{timestamp} (for buffer entries) 69 + pub fn buffer_key(did: &str, timestamp: i64) -> Vec<u8> { 70 + let mut key = Vec::with_capacity(did.len() + 1 + 8); 71 + key.extend_from_slice(did.as_bytes()); 72 + key.push(SEP); 73 + key.extend_from_slice(&timestamp.to_be_bytes()); 74 + key 75 + } 76 + 77 + pub fn parse_buffer_key(key: &[u8]) -> Result<(Did<'static>, i64)> { 78 + let pos = key 79 + .iter() 80 + .rposition(|&b| b == SEP) 81 + .ok_or_else(|| miette::miette!("buffer key invalid, no seperator found"))?; 82 + let did_bytes = &key[..pos]; 83 + let ts_bytes = &key[pos + 1..]; 84 + let timestamp = i64::from_be_bytes( 85 + ts_bytes 86 + .try_into() 87 + .map_err(|e| miette::miette!("buffer key invalid, {e}"))?, 88 + ); 89 + let did = reconstruct_did(&String::from_utf8_lossy(did_bytes))?; 90 + Ok((did, timestamp)) 91 + }
+29 -475
src/ingest/mod.rs
··· 1 - use crate::db::{deser_repo_state, keys, ser_repo_state, Db}; 2 - use crate::ops::{self, emit_identity_event}; 1 + use crate::db::{keys, Db}; 3 2 use crate::state::AppState; 4 - use crate::types::{IdentityEvt, RepoState, RepoStatus}; 5 3 use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 4 + use jacquard::types::did::Did; 6 5 use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 7 6 use jacquard_common::IntoStatic; 8 7 use miette::{IntoDiagnostic, Result}; ··· 10 9 use smol_str::SmolStr; 11 10 use std::sync::atomic::Ordering; 12 11 use std::sync::Arc; 13 - use tokio::sync::mpsc; 14 12 use tracing::{debug, error, info}; 15 13 use url::Url; 16 14 17 15 pub struct Ingestor { 18 16 state: Arc<AppState>, 19 17 relay_host: SmolStr, 20 - buffer_tx: mpsc::Sender<(Vec<u8>, Vec<u8>)>, 21 - full_network: bool, 22 18 } 23 19 24 20 impl Ingestor { 25 - pub fn new(state: Arc<AppState>, relay_host: SmolStr, full_network: bool) -> Self { 26 - let (buffer_tx, mut buffer_rx) = mpsc::channel::<(Vec<u8>, Vec<u8>)>(1000); 27 - 28 - let state_clone = state.clone(); 29 - tokio::spawn(async move { 30 - let mut batch_items = Vec::with_capacity(100); 31 - const MAX_BATCH_SIZE: usize = 100; 32 - const BATCH_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10); 33 - 34 - loop { 35 - // wait for at least one item 36 - match buffer_rx.recv().await { 37 - Some(item) => batch_items.push(item), 38 - None => break, 39 - } 40 - 41 - // collect more items until batch is full or timeout 42 - let deadline = tokio::time::Instant::now() + BATCH_TIMEOUT; 43 - while batch_items.len() < MAX_BATCH_SIZE { 44 - match tokio::time::timeout_at(deadline, buffer_rx.recv()).await { 45 - Ok(Some(item)) => batch_items.push(item), 46 - Ok(None) => break, // channel closed 47 - Err(_) => break, // timeout reached 48 - } 49 - } 50 - 51 - if !batch_items.is_empty() { 52 - let db_inner = state_clone.db.inner.clone(); 53 - let buffer = state_clone.db.buffer.clone(); 54 - 55 - let mut items_to_persist = Vec::with_capacity(batch_items.len()); 56 - items_to_persist.extend(batch_items.drain(..)); 57 - 58 - let res = tokio::task::spawn_blocking(move || { 59 - let mut batch = db_inner.batch(); 60 - for (k, v) in items_to_persist { 61 - batch.insert(&buffer, k, v); 62 - } 63 - batch.commit() 64 - }) 65 - .await; 66 - 67 - match res { 68 - Ok(Ok(_)) => {} 69 - Ok(Err(e)) => { 70 - Db::check_poisoned(&e); 71 - error!("failed to persist buffer batch: {}", e) 72 - } 73 - Err(e) => error!("buffer worker join error: {}", e), 74 - } 75 - } 76 - } 77 - }); 78 - 79 - Self { 80 - state, 81 - relay_host, 82 - buffer_tx, 83 - full_network, 84 - } 21 + pub fn new(state: Arc<AppState>, relay_host: SmolStr, _full_network: bool) -> Self { 22 + Self { state, relay_host } 85 23 } 86 24 87 25 pub async fn run(mut self) -> Result<()> { ··· 95 33 } else { 96 34 let cursor_key = b"firehose_cursor"; 97 35 if let Ok(Some(bytes)) = 98 - Db::get(self.state.db.cursors.clone(), cursor_key.to_vec()).await 36 + crate::db::Db::get(self.state.db.cursors.clone(), cursor_key.to_vec()).await 99 37 { 100 38 let s = String::from_utf8_lossy(&bytes); 101 39 debug!("resuming from cursor: {}", s); ··· 148 86 } 149 87 150 88 async fn handle_message(&mut self, msg: SubscribeReposMessage<'_>) { 151 - match msg { 152 - SubscribeReposMessage::Commit(commit) => { 153 - self.state.cur_firehose.store(commit.seq, Ordering::SeqCst); 154 - 155 - if let Err(e) = self.process_commit(&commit).await { 156 - error!("failed to process commit {}: {e}", commit.seq); 157 - Db::check_poisoned_report(&e); 158 - // buffer for later inspection/retry 159 - let _ = self.buffer_event(&commit).await; 160 - } 161 - } 162 - SubscribeReposMessage::Identity(identity) => { 163 - self.state 164 - .cur_firehose 165 - .store(identity.seq, Ordering::SeqCst); 166 - self.process_identity(*identity).await; 167 - } 168 - SubscribeReposMessage::Account(account) => { 169 - self.state.cur_firehose.store(account.seq, Ordering::SeqCst); 170 - self.process_account(&account).await; 171 - } 172 - _ => {} // ignore info/sync for now 173 - } 174 - } 175 - 176 - async fn process_identity( 177 - &mut self, 178 - identity: jacquard::api::com_atproto::sync::subscribe_repos::Identity<'_>, 179 - ) { 180 - // identity update implies activity 181 - self.check_reactivation(&identity.did).await; 182 - 183 - let Some(handle) = identity.handle else { 184 - return; 89 + let (did, seq) = match &msg { 90 + SubscribeReposMessage::Commit(commit) => (&commit.repo, commit.seq), 91 + SubscribeReposMessage::Identity(identity) => (&identity.did, identity.seq), 92 + SubscribeReposMessage::Account(account) => (&account.did, account.seq), 93 + _ => return, 185 94 }; 186 95 187 - let handle_str = handle.as_str(); 188 - 189 - let res = self 190 - .state 191 - .db 192 - .update_repo_state_async(&identity.did, { 193 - let handle_str = handle_str.to_string(); 194 - move |state, _| { 195 - if state.handle.as_deref() == Some(&handle_str) { 196 - return Ok((false, ())); 197 - } 198 - info!("updating handle for {} to {}", state.did, handle_str); 199 - state.handle = Some(handle_str.into()); 200 - Ok((true, ())) 201 - } 202 - }) 203 - .await; 204 - 205 - match res { 206 - Ok(Some((state, _))) => { 207 - // emit identity event 208 - let evt = IdentityEvt { 209 - did: identity.did.as_str().into(), 210 - handle: handle_str.into(), 211 - is_active: matches!( 212 - state.status, 213 - RepoStatus::Synced | RepoStatus::Backfilling | RepoStatus::New 214 - ), 215 - status: match state.status { 216 - RepoStatus::Deactivated => "deactivated".into(), 217 - RepoStatus::Takendown => "takendown".into(), 218 - RepoStatus::Suspended => "suspended".into(), 219 - _ => "active".into(), 220 - }, 221 - }; 222 - 223 - if state.status == RepoStatus::Synced { 224 - self.emit_identity_event(evt).await; 225 - } 226 - } 227 - Ok(None) => {} 228 - Err(e) => { 229 - error!("failed to update repo state for {}: {e}", identity.did); 230 - Db::check_poisoned_report(&e); 231 - } 232 - } 233 - } 234 - 235 - async fn process_account( 236 - &mut self, 237 - account: &jacquard::api::com_atproto::sync::subscribe_repos::Account<'_>, 238 - ) { 239 - use jacquard::api::com_atproto::sync::subscribe_repos::AccountStatus; 240 - 241 - if account.active { 242 - self.check_reactivation(&account.did).await; 243 - } 244 - 245 - if let Some(status) = &account.status { 246 - match status { 247 - AccountStatus::Deleted => { 248 - info!("repo {} deleted, removing", account.did); 249 - // emit deletion event FIRST if live 250 - let did_key = keys::repo_key(&account.did); 251 - if let Ok(Some(bytes)) = Db::get(self.state.db.repos.clone(), did_key).await { 252 - if let Ok(state) = deser_repo_state(bytes) { 253 - if state.status == RepoStatus::Synced { 254 - let evt = IdentityEvt { 255 - did: account.did.as_str().into(), 256 - handle: state.handle.unwrap_or_default(), 257 - is_active: false, 258 - status: "deleted".into(), 259 - }; 260 - self.emit_identity_event(evt).await; 261 - } 262 - } 263 - } 264 - 265 - if let Err(e) = tokio::task::spawn_blocking({ 266 - let state = self.state.clone(); 267 - let did = account.did.clone().into_static(); 268 - move || ops::delete_repo(&state.db, &did) 269 - }) 270 - .await 271 - .into_diagnostic() 272 - .and_then(|r| r) 273 - { 274 - error!("failed to delete repo {}: {e}", account.did); 275 - Db::check_poisoned_report(&e); 276 - } 277 - } 278 - AccountStatus::Deactivated 279 - | AccountStatus::Takendown 280 - | AccountStatus::Suspended => { 281 - let new_status = match status { 282 - AccountStatus::Deactivated => RepoStatus::Deactivated, 283 - AccountStatus::Takendown => RepoStatus::Takendown, 284 - AccountStatus::Suspended => RepoStatus::Suspended, 285 - _ => unreachable!(), 286 - }; 287 - 288 - let res = self 289 - .state 290 - .db 291 - .update_repo_state_async(&account.did, { 292 - let new_status = new_status.clone(); 293 - move |state, _| { 294 - if state.status != new_status { 295 - let was_synced = state.status == RepoStatus::Synced; 296 - state.status = new_status; 297 - Ok((true, was_synced)) 298 - } else { 299 - Ok((false, false)) 300 - } 301 - } 302 - }) 303 - .await; 304 - 305 - match res { 306 - Ok(Some((state, was_synced))) => { 307 - if was_synced { 308 - let evt = IdentityEvt { 309 - did: account.did.as_str().into(), 310 - handle: state.handle.unwrap_or_default(), 311 - is_active: false, 312 - status: match state.status { 313 - RepoStatus::Deactivated => "deactivated".into(), 314 - RepoStatus::Takendown => "takendown".into(), 315 - RepoStatus::Suspended => "suspended".into(), 316 - _ => "active".into(), 317 - }, 318 - }; 319 - self.emit_identity_event(evt).await; 320 - } 321 - } 322 - Ok(None) => {} 323 - Err(e) => { 324 - error!("failed to update repo status for {}: {e}", account.did); 325 - Db::check_poisoned_report(&e); 326 - } 327 - } 328 - } 329 - _ => {} // ignore others 330 - } 331 - } else if account.active { 332 - if let Ok(Some(bytes)) = 333 - Db::get(self.state.db.repos.clone(), keys::repo_key(&account.did)).await 334 - { 335 - if let Ok(state) = rmp_serde::from_slice::<RepoState>(&bytes) { 336 - if matches!( 337 - state.status, 338 - RepoStatus::Backfilling | RepoStatus::Synced | RepoStatus::New 339 - ) { 340 - let evt = IdentityEvt { 341 - did: account.did.as_str().into(), 342 - handle: state.handle.unwrap_or_default(), 343 - is_active: true, 344 - status: "active".into(), 345 - }; 346 - self.emit_identity_event(evt).await; 347 - } 348 - } 349 - } 350 - } 351 - } 352 - 353 - async fn emit_identity_event(&self, evt: IdentityEvt) { 354 - emit_identity_event(&self.state.db, evt); 355 - } 356 - 357 - async fn check_reactivation(&mut self, did: &jacquard::types::did::Did<'_>) { 358 - let did_key = keys::repo_key(did); 359 - let Ok(Some(bytes)) = Db::get(self.state.db.repos.clone(), did_key).await else { 96 + if !self.should_process(did).await.unwrap_or(false) { 360 97 return; 361 - }; 362 - let Ok(mut state) = deser_repo_state(bytes) else { 363 - return; 364 - }; 365 - 366 - match state.status { 367 - RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended => { 368 - info!("reactivating repo {did} (was {:?})", state.status); 369 - 370 - state.status = RepoStatus::Backfilling; 371 - let Ok(bytes) = ser_repo_state(&state) else { 372 - return; 373 - }; 374 - 375 - let mut batch = self.state.db.inner.batch(); 376 - batch.insert(&self.state.db.repos, did_key, bytes); 377 - batch.insert(&self.state.db.pending, did_key, Vec::new()); 378 - batch.remove(&self.state.db.resync, did_key); 379 - 380 - let res = tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 381 - .await 382 - .into_diagnostic() 383 - .flatten(); 384 - if let Err(e) = res { 385 - error!("failed to commit reactivation for {did}: {e}"); 386 - return; 387 - } 388 - 389 - if self 390 - .state 391 - .backfill_tx 392 - .send(did.clone().into_static()) 393 - .is_err() 394 - { 395 - error!("failed to queue backfill for {did}, backfill worker crashed?"); 396 - } 397 - } 398 - _ => {} 399 98 } 400 - } 401 - 402 - async fn process_commit( 403 - &self, 404 - commit: &jacquard::api::com_atproto::sync::subscribe_repos::Commit<'_>, 405 - ) -> Result<()> { 406 - let db = &self.state.db; 407 - let did = &commit.repo; 408 99 409 - let mut should_process = self.full_network; 410 - let did_key = keys::repo_key(&did); 100 + self.state.cur_firehose.store(seq, Ordering::SeqCst); 411 101 412 - // check repo state 413 - let repo_state_bytes = Db::get(db.repos.clone(), did_key).await?; 102 + let buffered_at = chrono::Utc::now().timestamp_millis(); 414 103 415 - if !should_process && repo_state_bytes.is_some() { 416 - should_process = true; 104 + // persist to DB for crash recovery 105 + let db_key = keys::buffer_key(&did, buffered_at); 106 + if let Ok(bytes) = rmp_serde::to_vec(&msg) { 107 + if let Err(e) = Db::insert(self.state.db.buffer.clone(), db_key, bytes).await { 108 + error!("failed to persist buffered message: {e}"); 109 + } 417 110 } 418 111 419 - if !should_process { 420 - return Ok(()); 421 - } 422 - 423 - let repo_state = if let Some(bytes) = repo_state_bytes { 424 - deser_repo_state(bytes).ok() 425 - } else { 426 - None 112 + // always buffer through the BufferProcessor 113 + let buffered_msg = crate::buffer::BufferedMessage { 114 + did: did.clone().into_static(), 115 + msg: msg.into_static(), 116 + buffered_at, 427 117 }; 428 118 429 - let status = repo_state 430 - .as_ref() 431 - .map(|s| s.status.clone()) 432 - .unwrap_or(RepoStatus::New); 433 - 434 - match status { 435 - RepoStatus::New 436 - | RepoStatus::Deactivated 437 - | RepoStatus::Takendown 438 - | RepoStatus::Suspended => { 439 - if matches!( 440 - status, 441 - RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended 442 - ) { 443 - info!("reactivating repo {did} due to new commit"); 444 - } else { 445 - debug!("new repo detected: {did}"); 446 - } 447 - 448 - // 1. save state as backfilling 449 - tokio::task::spawn_blocking({ 450 - let state = self.state.clone(); 451 - let did = did.clone().into_static(); 452 - let did_key = did_key.to_vec(); 453 - move || { 454 - let mut new_state = RepoState::new(did); 455 - new_state.status = RepoStatus::Backfilling; 456 - 457 - let mut batch = state.db.inner.batch(); 458 - batch.insert(&state.db.repos, &did_key, ser_repo_state(&new_state)?); 459 - batch.insert(&state.db.pending, &did_key, Vec::new()); 460 - batch.remove(&state.db.resync, &did_key); 461 - batch.commit().into_diagnostic() 462 - } 463 - }) 464 - .await 465 - .into_diagnostic()??; 466 - 467 - // 2. queue for backfill 468 - if let Err(e) = self.state.backfill_tx.send(did.clone().into_static()) { 469 - error!("failed to queue backfill for {}: {}", did, e); 470 - } 471 - 472 - // 3. buffer this event 473 - self.buffer_event(commit).await?; 474 - } 475 - RepoStatus::Backfilling => { 476 - debug!("buffering event for backfilling repo: {}", did); 477 - self.buffer_event(commit).await?; 478 - } 479 - RepoStatus::Synced => { 480 - // check revision 481 - if let Some(mut state) = repo_state { 482 - if !state.rev.is_empty() && commit.rev.as_str() <= state.rev.as_str() { 483 - debug!( 484 - "skipping replayed event for {}: {} <= {}", 485 - did, commit.rev, state.rev 486 - ); 487 - return Ok(()); 488 - } 489 - 490 - // check gap 491 - if let Some(prev) = &commit.prev_data { 492 - if !state.data.is_empty() && prev.as_str() != state.data.as_str() { 493 - tracing::warn!( 494 - "gap detected for {}: prev {} != stored {}. triggering backfill", 495 - did, 496 - prev, 497 - state.data 498 - ); 499 - 500 - // 1. update status to Backfilling 501 - state.status = RepoStatus::Backfilling; 502 - tokio::task::spawn_blocking({ 503 - let app_state = self.state.clone(); 504 - let did_key = did_key.to_vec(); 505 - move || { 506 - let mut batch = app_state.db.inner.batch(); 507 - batch.insert( 508 - &app_state.db.repos, 509 - &did_key, 510 - ser_repo_state(&state)?, 511 - ); 512 - batch.insert(&app_state.db.pending, &did_key, Vec::new()); // ensure it's in pending for recovery 513 - batch.commit().into_diagnostic() 514 - } 515 - }) 516 - .await 517 - .into_diagnostic()??; 518 - 519 - // 2. queue backfill 520 - if let Err(e) = self.state.backfill_tx.send(did.clone().into_static()) { 521 - error!("failed to queue backfill for {}: {}", did, e); 522 - } 523 - 524 - // 3. buffer this event 525 - self.buffer_event(commit).await?; 526 - return Ok(()); 527 - } 528 - } 529 - } 530 - 531 - let res = tokio::task::spawn_blocking({ 532 - let app_state = self.state.clone(); 533 - let commit_static = commit.clone().into_static(); 534 - move || ops::apply_commit(&app_state.db, &commit_static, true) 535 - }) 536 - .await 537 - .into_diagnostic()?; 538 - 539 - if let Err(e) = res { 540 - error!("failed to apply live commit for {did}: {e}"); 541 - Db::check_poisoned_report(&e); 542 - self.buffer_event(commit).await?; 543 - } else { 544 - debug!("synced event for {did}, {} ops", commit.ops.len()); 545 - } 546 - } 547 - RepoStatus::Error(_) => { 548 - // maybe retry? for now ignore. 549 - } 119 + if let Err(e) = self.state.buffer_tx.send(buffered_msg) { 120 + error!("failed to send message to buffer processor: {e}"); 550 121 } 551 - Ok(()) 552 122 } 553 123 554 - async fn buffer_event( 555 - &self, 556 - commit: &jacquard::api::com_atproto::sync::subscribe_repos::Commit<'_>, 557 - ) -> Result<()> { 558 - // we need to store the event to replay it later. 559 - // key: {DID}\x00{SEQ} -> guarantees ordering 560 - let mut key = Vec::new(); 561 - key.extend_from_slice(keys::buffer_prefix(&commit.repo)); 562 - key.push(0x00); 563 - key.extend_from_slice(&commit.seq.to_be_bytes()); 564 - 565 - // value: serialized commit 566 - let val = rmp_serde::to_vec(commit).into_diagnostic()?; 567 - 568 - if let Err(e) = self.buffer_tx.send((key, val)).await { 569 - error!("failed to buffer event (channel closed): {e}"); 570 - } 571 - 572 - Ok(()) 124 + async fn should_process(&self, did: &Did<'_>) -> Result<bool> { 125 + let did_key = keys::repo_key(did); 126 + Db::contains_key(self.state.db.repos.clone(), did_key).await 573 127 } 574 128 }
+15 -4
src/main.rs
··· 1 1 mod api; 2 2 mod backfill; 3 + mod buffer; 3 4 mod config; 4 5 mod crawler; 5 6 mod db; ··· 10 11 mod types; 11 12 12 13 use crate::backfill::Worker; 14 + use crate::buffer::processor::BufferProcessor; 13 15 use crate::config::Config; 14 16 use crate::crawler::Crawler; 15 17 use crate::db::Db; 16 18 use crate::ingest::Ingestor; 17 19 use crate::state::AppState; 18 - use futures::TryFutureExt; 20 + use futures::{future::BoxFuture, FutureExt, TryFutureExt}; 19 21 use miette::IntoDiagnostic; 20 22 use mimalloc::MiMalloc; 21 23 use std::sync::atomic::Ordering; ··· 35 37 36 38 info!("starting hydrant with config: {cfg:#?}"); 37 39 38 - let (state, backfill_rx) = AppState::new(&cfg)?; 40 + let (state, backfill_rx, buffer_rx) = AppState::new(&cfg)?; 39 41 let state = Arc::new(state); 40 42 41 43 tokio::spawn({ ··· 52 54 let state = state.clone(); 53 55 let timeout = cfg.repo_fetch_timeout; 54 56 Worker::new(state, backfill_rx, timeout, cfg.backfill_concurrency_limit).run() 57 + }); 58 + 59 + let buffer_processor_task = tokio::spawn({ 60 + let state = state.clone(); 61 + BufferProcessor::new(state, buffer_rx).run() 55 62 }); 56 63 57 64 if let Err(e) = spawn_blocking({ ··· 152 159 153 160 let ingestor = Ingestor::new(state.clone(), cfg.relay_host.clone(), cfg.full_network); 154 161 155 - if let Err(e) = ingestor.run().await { 156 - error!("ingestor died: {e}"); 162 + let res = futures::future::try_join_all::<[BoxFuture<_>; _]>([ 163 + Box::pin(buffer_processor_task.map(|r| r.into_diagnostic().flatten())), 164 + Box::pin(ingestor.run()), 165 + ]); 166 + if let Err(e) = res.await { 167 + error!("ingestor or buffer processor died: {e}"); 157 168 Db::check_poisoned_report(&e); 158 169 } 159 170
+11 -1
src/state.rs
··· 5 5 6 6 use miette::Result; 7 7 8 + use crate::buffer::BufferedMessage; 8 9 use crate::{config::Config, db::Db, resolver::Resolver}; 9 10 10 11 pub type BackfillTx = mpsc::UnboundedSender<Did<'static>>; 11 12 pub type BackfillRx = mpsc::UnboundedReceiver<Did<'static>>; 12 13 14 + pub type BufferTx = mpsc::UnboundedSender<BufferedMessage>; 15 + pub type BufferRx = mpsc::UnboundedReceiver<BufferedMessage>; 16 + 13 17 pub struct AppState { 14 18 pub db: Db, 15 19 pub backfill_tx: BackfillTx, 16 20 pub resolver: Resolver, 17 21 pub cur_firehose: AtomicI64, 22 + pub blocked_dids: scc::HashSet<Did<'static>>, 23 + pub buffer_tx: BufferTx, 18 24 } 19 25 20 26 impl AppState { 21 - pub fn new(config: &Config) -> Result<(Self, BackfillRx)> { 27 + pub fn new(config: &Config) -> Result<(Self, BackfillRx, BufferRx)> { 22 28 let db = Db::open( 23 29 &config.database_path, 24 30 config.cache_size, ··· 26 32 )?; 27 33 let resolver = Resolver::new(config.plc_url.clone()); 28 34 let (backfill_tx, backfill_rx) = mpsc::unbounded_channel(); 35 + let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 29 36 30 37 Ok(( 31 38 Self { ··· 33 40 backfill_tx, 34 41 resolver, 35 42 cur_firehose: AtomicI64::new(0), 43 + blocked_dids: scc::HashSet::new(), 44 + buffer_tx, 36 45 }, 37 46 backfill_rx, 47 + buffer_rx, 38 48 )) 39 49 } 40 50 }