The smokesignal.events web application
at main 216 lines 8.2 kB view raw
1//! TAP event processor for AT Protocol events. 2//! 3//! This module provides a unified TAP stream consumer and event processor, 4//! eliminating the need for channel-based message passing. 5 6use anyhow::Result; 7use atproto_tap::{RecordAction, TapConfig, TapEvent, connect}; 8use std::sync::Arc; 9use std::time::Duration; 10use tokio_stream::StreamExt; 11use tokio_util::sync::CancellationToken; 12 13use crate::processor::ContentFetcher; 14use crate::task_search_indexer::SearchIndexer; 15 16/// Unified TAP stream consumer and event processor. 17/// 18/// Combines TAP event consumption with direct processing, eliminating 19/// channel overhead. Events are processed inline as they arrive from 20/// the TAP stream. 21pub struct TapProcessor { 22 config: TapConfig, 23 content_fetcher: ContentFetcher, 24 search_indexer: Option<SearchIndexer>, 25 cancel_token: CancellationToken, 26} 27 28impl TapProcessor { 29 /// Create a new TAP processor. 30 /// 31 /// # Arguments 32 /// 33 /// * `hostname` - TAP service hostname (e.g., "localhost:2480") 34 /// * `password` - Optional admin password for authentication 35 /// * `user_agent` - User-Agent header for WebSocket connections 36 /// * `content_fetcher` - Processor for event/RSVP/profile content 37 /// * `search_indexer` - Optional search indexer for OpenSearch 38 /// * `cancel_token` - Token for graceful shutdown 39 pub fn new( 40 hostname: &str, 41 password: Option<String>, 42 user_agent: &str, 43 content_fetcher: ContentFetcher, 44 search_indexer: Option<SearchIndexer>, 45 cancel_token: CancellationToken, 46 ) -> Self { 47 let mut config_builder = TapConfig::builder() 48 .hostname(hostname) 49 .send_acks(true) 50 .user_agent(user_agent) 51 .initial_reconnect_delay(Duration::from_secs(1)) 52 .max_reconnect_delay(Duration::from_secs(30)); 53 54 if let Some(password) = password { 55 config_builder = config_builder.admin_password(password); 56 } 57 58 Self { 59 config: config_builder.build(), 60 content_fetcher, 61 search_indexer, 62 cancel_token, 63 } 64 } 65 66 /// Run the TAP processor, consuming events until cancelled. 67 pub async fn run(self) -> Result<()> { 68 tracing::info!( 69 "TAP processor starting, connecting to {}", 70 self.config.hostname 71 ); 72 let mut stream = connect(self.config.clone()); 73 74 loop { 75 tokio::select! { 76 () = self.cancel_token.cancelled() => { 77 tracing::info!("TAP processor cancelled, closing stream"); 78 stream.close().await; 79 break; 80 } 81 Some(result) = stream.next() => { 82 match result { 83 Ok(event) => { 84 self.process_event(&event).await; 85 } 86 Err(err) => { 87 if err.is_fatal() { 88 tracing::error!(?err, "Fatal TAP error, exiting processor"); 89 break; 90 } 91 tracing::warn!(?err, "TAP stream error (will retry)"); 92 } 93 } 94 } 95 } 96 } 97 98 tracing::info!("TAP processor stopped"); 99 Ok(()) 100 } 101 102 async fn process_event(&self, event: &Arc<TapEvent>) { 103 match event.as_ref() { 104 TapEvent::Record { record, .. } => { 105 let collection = record.collection.as_ref(); 106 107 // Skip irrelevant collections (TAP should filter, but double-check) 108 if !Self::is_relevant_collection(collection) { 109 return; 110 } 111 112 let did = record.did.as_ref(); 113 let rkey = record.rkey.as_ref(); 114 let live = record.live; 115 116 if !live { 117 tracing::debug!("Processing backfill event: {} {} {}", collection, did, rkey); 118 } 119 120 match record.action { 121 RecordAction::Create | RecordAction::Update => { 122 let cid = record.cid.as_ref().map(|c| c.as_ref()).unwrap_or(""); 123 let record_value = record 124 .record 125 .as_ref() 126 .cloned() 127 .unwrap_or(serde_json::Value::Null); 128 129 // Process content fetcher and search indexer in parallel 130 // Clone record_value for search indexer so both can take ownership 131 let indexer_record = if self.search_indexer.is_some() { 132 Some(record_value.clone()) 133 } else { 134 None 135 }; 136 137 let content_future = self.content_fetcher.handle_commit( 138 did, 139 collection, 140 rkey, 141 cid, 142 record_value, 143 live, 144 ); 145 146 let index_future = async { 147 if let (Some(indexer), Some(record)) = 148 (&self.search_indexer, indexer_record) 149 { 150 indexer.index_commit(did, collection, rkey, record).await 151 } else { 152 Ok(()) 153 } 154 }; 155 156 let (content_result, index_result) = 157 tokio::join!(content_future, index_future); 158 159 if let Err(e) = content_result { 160 tracing::error!(?e, "Error processing commit"); 161 } 162 if let Err(e) = index_result { 163 tracing::error!(?e, "Error indexing commit"); 164 } 165 } 166 RecordAction::Delete => { 167 // Process content fetcher and search indexer delete in parallel 168 let content_future = self 169 .content_fetcher 170 .handle_delete(did, collection, rkey, live); 171 172 let index_future = async { 173 if let Some(ref indexer) = self.search_indexer { 174 indexer.delete_record(did, collection, rkey).await 175 } else { 176 Ok(()) 177 } 178 }; 179 180 let (content_result, index_result) = 181 tokio::join!(content_future, index_future); 182 183 if let Err(e) = content_result { 184 tracing::error!(?e, "Error processing delete"); 185 } 186 if let Err(e) = index_result { 187 tracing::error!(?e, "Error deleting from index"); 188 } 189 } 190 } 191 } 192 TapEvent::Identity { identity, .. } => { 193 // Identity events ignored (handle updates can be added in future iteration) 194 tracing::trace!( 195 did = %identity.did, 196 handle = %identity.handle, 197 "Identity event ignored" 198 ); 199 } 200 } 201 } 202 203 fn is_relevant_collection(collection: &str) -> bool { 204 matches!( 205 collection, 206 "community.lexicon.calendar.rsvp" 207 | "community.lexicon.calendar.event" 208 | "events.smokesignal.profile" 209 | "events.smokesignal.calendar.acceptance" 210 | "events.smokesignal.lfg" 211 | "app.beaconbits.bookmark.item" 212 | "app.beaconbits.beacon" 213 | "app.dropanchor.checkin" 214 ) 215 } 216}