The smokesignal.events web application
1//! TAP event processor for AT Protocol events.
2//!
3//! This module provides a unified TAP stream consumer and event processor,
4//! eliminating the need for channel-based message passing.
5
6use anyhow::Result;
7use atproto_tap::{RecordAction, TapConfig, TapEvent, connect};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio_stream::StreamExt;
11use tokio_util::sync::CancellationToken;
12
13use crate::processor::ContentFetcher;
14use crate::task_search_indexer::SearchIndexer;
15
16/// Unified TAP stream consumer and event processor.
17///
18/// Combines TAP event consumption with direct processing, eliminating
19/// channel overhead. Events are processed inline as they arrive from
20/// the TAP stream.
21pub struct TapProcessor {
22 config: TapConfig,
23 content_fetcher: ContentFetcher,
24 search_indexer: Option<SearchIndexer>,
25 cancel_token: CancellationToken,
26}
27
28impl TapProcessor {
29 /// Create a new TAP processor.
30 ///
31 /// # Arguments
32 ///
33 /// * `hostname` - TAP service hostname (e.g., "localhost:2480")
34 /// * `password` - Optional admin password for authentication
35 /// * `user_agent` - User-Agent header for WebSocket connections
36 /// * `content_fetcher` - Processor for event/RSVP/profile content
37 /// * `search_indexer` - Optional search indexer for OpenSearch
38 /// * `cancel_token` - Token for graceful shutdown
39 pub fn new(
40 hostname: &str,
41 password: Option<String>,
42 user_agent: &str,
43 content_fetcher: ContentFetcher,
44 search_indexer: Option<SearchIndexer>,
45 cancel_token: CancellationToken,
46 ) -> Self {
47 let mut config_builder = TapConfig::builder()
48 .hostname(hostname)
49 .send_acks(true)
50 .user_agent(user_agent)
51 .initial_reconnect_delay(Duration::from_secs(1))
52 .max_reconnect_delay(Duration::from_secs(30));
53
54 if let Some(password) = password {
55 config_builder = config_builder.admin_password(password);
56 }
57
58 Self {
59 config: config_builder.build(),
60 content_fetcher,
61 search_indexer,
62 cancel_token,
63 }
64 }
65
66 /// Run the TAP processor, consuming events until cancelled.
67 pub async fn run(self) -> Result<()> {
68 tracing::info!(
69 "TAP processor starting, connecting to {}",
70 self.config.hostname
71 );
72 let mut stream = connect(self.config.clone());
73
74 loop {
75 tokio::select! {
76 () = self.cancel_token.cancelled() => {
77 tracing::info!("TAP processor cancelled, closing stream");
78 stream.close().await;
79 break;
80 }
81 Some(result) = stream.next() => {
82 match result {
83 Ok(event) => {
84 self.process_event(&event).await;
85 }
86 Err(err) => {
87 if err.is_fatal() {
88 tracing::error!(?err, "Fatal TAP error, exiting processor");
89 break;
90 }
91 tracing::warn!(?err, "TAP stream error (will retry)");
92 }
93 }
94 }
95 }
96 }
97
98 tracing::info!("TAP processor stopped");
99 Ok(())
100 }
101
102 async fn process_event(&self, event: &Arc<TapEvent>) {
103 match event.as_ref() {
104 TapEvent::Record { record, .. } => {
105 let collection = record.collection.as_ref();
106
107 // Skip irrelevant collections (TAP should filter, but double-check)
108 if !Self::is_relevant_collection(collection) {
109 return;
110 }
111
112 let did = record.did.as_ref();
113 let rkey = record.rkey.as_ref();
114 let live = record.live;
115
116 if !live {
117 tracing::debug!("Processing backfill event: {} {} {}", collection, did, rkey);
118 }
119
120 match record.action {
121 RecordAction::Create | RecordAction::Update => {
122 let cid = record.cid.as_ref().map(|c| c.as_ref()).unwrap_or("");
123 let record_value = record
124 .record
125 .as_ref()
126 .cloned()
127 .unwrap_or(serde_json::Value::Null);
128
129 // Process content fetcher and search indexer in parallel
130 // Clone record_value for search indexer so both can take ownership
131 let indexer_record = if self.search_indexer.is_some() {
132 Some(record_value.clone())
133 } else {
134 None
135 };
136
137 let content_future = self.content_fetcher.handle_commit(
138 did,
139 collection,
140 rkey,
141 cid,
142 record_value,
143 live,
144 );
145
146 let index_future = async {
147 if let (Some(indexer), Some(record)) =
148 (&self.search_indexer, indexer_record)
149 {
150 indexer.index_commit(did, collection, rkey, record).await
151 } else {
152 Ok(())
153 }
154 };
155
156 let (content_result, index_result) =
157 tokio::join!(content_future, index_future);
158
159 if let Err(e) = content_result {
160 tracing::error!(?e, "Error processing commit");
161 }
162 if let Err(e) = index_result {
163 tracing::error!(?e, "Error indexing commit");
164 }
165 }
166 RecordAction::Delete => {
167 // Process content fetcher and search indexer delete in parallel
168 let content_future = self
169 .content_fetcher
170 .handle_delete(did, collection, rkey, live);
171
172 let index_future = async {
173 if let Some(ref indexer) = self.search_indexer {
174 indexer.delete_record(did, collection, rkey).await
175 } else {
176 Ok(())
177 }
178 };
179
180 let (content_result, index_result) =
181 tokio::join!(content_future, index_future);
182
183 if let Err(e) = content_result {
184 tracing::error!(?e, "Error processing delete");
185 }
186 if let Err(e) = index_result {
187 tracing::error!(?e, "Error deleting from index");
188 }
189 }
190 }
191 }
192 TapEvent::Identity { identity, .. } => {
193 // Identity events ignored (handle updates can be added in future iteration)
194 tracing::trace!(
195 did = %identity.did,
196 handle = %identity.handle,
197 "Identity event ignored"
198 );
199 }
200 }
201 }
202
203 fn is_relevant_collection(collection: &str) -> bool {
204 matches!(
205 collection,
206 "community.lexicon.calendar.rsvp"
207 | "community.lexicon.calendar.event"
208 | "events.smokesignal.profile"
209 | "events.smokesignal.calendar.acceptance"
210 | "events.smokesignal.lfg"
211 | "app.beaconbits.bookmark.item"
212 | "app.beaconbits.beacon"
213 | "app.dropanchor.checkin"
214 )
215 }
216}