//! TAP event processor for AT Protocol events. //! //! This module provides a unified TAP stream consumer and event processor, //! eliminating the need for channel-based message passing. use anyhow::Result; use atproto_tap::{RecordAction, TapConfig, TapEvent, connect}; use std::sync::Arc; use std::time::Duration; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; use crate::processor::ContentFetcher; use crate::task_search_indexer::SearchIndexer; /// Unified TAP stream consumer and event processor. /// /// Combines TAP event consumption with direct processing, eliminating /// channel overhead. Events are processed inline as they arrive from /// the TAP stream. pub struct TapProcessor { config: TapConfig, content_fetcher: ContentFetcher, search_indexer: Option, cancel_token: CancellationToken, } impl TapProcessor { /// Create a new TAP processor. /// /// # Arguments /// /// * `hostname` - TAP service hostname (e.g., "localhost:2480") /// * `password` - Optional admin password for authentication /// * `user_agent` - User-Agent header for WebSocket connections /// * `content_fetcher` - Processor for event/RSVP/profile content /// * `search_indexer` - Optional search indexer for OpenSearch /// * `cancel_token` - Token for graceful shutdown pub fn new( hostname: &str, password: Option, user_agent: &str, content_fetcher: ContentFetcher, search_indexer: Option, cancel_token: CancellationToken, ) -> Self { let mut config_builder = TapConfig::builder() .hostname(hostname) .send_acks(true) .user_agent(user_agent) .initial_reconnect_delay(Duration::from_secs(1)) .max_reconnect_delay(Duration::from_secs(30)); if let Some(password) = password { config_builder = config_builder.admin_password(password); } Self { config: config_builder.build(), content_fetcher, search_indexer, cancel_token, } } /// Run the TAP processor, consuming events until cancelled. pub async fn run(self) -> Result<()> { tracing::info!( "TAP processor starting, connecting to {}", self.config.hostname ); let mut stream = connect(self.config.clone()); loop { tokio::select! { () = self.cancel_token.cancelled() => { tracing::info!("TAP processor cancelled, closing stream"); stream.close().await; break; } Some(result) = stream.next() => { match result { Ok(event) => { self.process_event(&event).await; } Err(err) => { if err.is_fatal() { tracing::error!(?err, "Fatal TAP error, exiting processor"); break; } tracing::warn!(?err, "TAP stream error (will retry)"); } } } } } tracing::info!("TAP processor stopped"); Ok(()) } async fn process_event(&self, event: &Arc) { match event.as_ref() { TapEvent::Record { record, .. } => { let collection = record.collection.as_ref(); // Skip irrelevant collections (TAP should filter, but double-check) if !Self::is_relevant_collection(collection) { return; } let did = record.did.as_ref(); let rkey = record.rkey.as_ref(); let live = record.live; if !live { tracing::debug!("Processing backfill event: {} {} {}", collection, did, rkey); } match record.action { RecordAction::Create | RecordAction::Update => { let cid = record.cid.as_ref().map(|c| c.as_ref()).unwrap_or(""); let record_value = record .record .as_ref() .cloned() .unwrap_or(serde_json::Value::Null); // Process content fetcher and search indexer in parallel // Clone record_value for search indexer so both can take ownership let indexer_record = if self.search_indexer.is_some() { Some(record_value.clone()) } else { None }; let content_future = self.content_fetcher.handle_commit( did, collection, rkey, cid, record_value, live, ); let index_future = async { if let (Some(indexer), Some(record)) = (&self.search_indexer, indexer_record) { indexer.index_commit(did, collection, rkey, record).await } else { Ok(()) } }; let (content_result, index_result) = tokio::join!(content_future, index_future); if let Err(e) = content_result { tracing::error!(?e, "Error processing commit"); } if let Err(e) = index_result { tracing::error!(?e, "Error indexing commit"); } } RecordAction::Delete => { // Process content fetcher and search indexer delete in parallel let content_future = self .content_fetcher .handle_delete(did, collection, rkey, live); let index_future = async { if let Some(ref indexer) = self.search_indexer { indexer.delete_record(did, collection, rkey).await } else { Ok(()) } }; let (content_result, index_result) = tokio::join!(content_future, index_future); if let Err(e) = content_result { tracing::error!(?e, "Error processing delete"); } if let Err(e) = index_result { tracing::error!(?e, "Error deleting from index"); } } } } TapEvent::Identity { identity, .. } => { // Identity events ignored (handle updates can be added in future iteration) tracing::trace!( did = %identity.did, handle = %identity.handle, "Identity event ignored" ); } } } fn is_relevant_collection(collection: &str) -> bool { matches!( collection, "community.lexicon.calendar.rsvp" | "community.lexicon.calendar.event" | "events.smokesignal.profile" | "events.smokesignal.calendar.acceptance" | "events.smokesignal.lfg" | "app.beaconbits.bookmark.item" | "app.beaconbits.beacon" | "app.dropanchor.checkin" ) } }