The smokesignal.events web application
1use anyhow::Result;
2use atproto_identity::{model::Document, resolve::IdentityResolver, traits::DidDocumentStorage};
3use atproto_record::lexicon::community::lexicon::calendar::event::NSID as LexiconCommunityEventNSID;
4use opensearch::{
5 DeleteParts, IndexParts, OpenSearch,
6 http::transport::Transport,
7 indices::{IndicesCreateParts, IndicesExistsParts},
8};
9use serde_json::{Value, json};
10use std::sync::Arc;
11
12use crate::atproto::lexicon::lfg::{Lfg, NSID as LFG_NSID};
13use crate::atproto::lexicon::profile::{NSID as PROFILE_NSID, Profile};
14use crate::atproto::utils::get_profile_hashtags;
15use crate::search_index::SearchIndexManager;
16use crate::storage::StoragePool;
17use crate::storage::event::event_get;
18use crate::task_search_indexer_errors::SearchIndexerError;
19
20/// Build an AT URI with pre-allocated capacity to avoid format! overhead.
21#[inline]
22fn build_aturi(did: &str, nsid: &str, rkey: &str) -> String {
23 let capacity = 7 + did.len() + nsid.len() + rkey.len();
24 let mut uri = String::with_capacity(capacity);
25 uri.push_str("at://");
26 uri.push_str(did);
27 uri.push('/');
28 uri.push_str(nsid);
29 uri.push('/');
30 uri.push_str(rkey);
31 uri
32}
33
34const EVENTS_INDEX_NAME: &str = "smokesignal-events";
35const PROFILES_INDEX_NAME: &str = "smokesignal-profiles";
36const LFG_INDEX_NAME: &str = "smokesignal-lfg-profile";
37
38pub struct SearchIndexer {
39 client: Arc<OpenSearch>,
40 pool: StoragePool,
41 identity_resolver: Arc<dyn IdentityResolver>,
42 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>,
43 event_index_manager: SearchIndexManager,
44}
45
46impl SearchIndexer {
47 /// Create a new SearchIndexer.
48 ///
49 /// # Arguments
50 ///
51 /// * `endpoint` - OpenSearch endpoint URL
52 /// * `pool` - Database connection pool for fetching events
53 /// * `identity_resolver` - Resolver for DID identities
54 /// * `document_storage` - Storage for DID documents
55 pub async fn new(
56 endpoint: &str,
57 pool: StoragePool,
58 identity_resolver: Arc<dyn IdentityResolver>,
59 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>,
60 ) -> Result<Self> {
61 let transport = Transport::single_node(endpoint)?;
62 let client = Arc::new(OpenSearch::new(transport));
63 let event_index_manager = SearchIndexManager::new(endpoint)?;
64
65 let indexer = Self {
66 client,
67 pool,
68 identity_resolver,
69 document_storage,
70 event_index_manager,
71 };
72
73 indexer.ensure_index().await?;
74
75 Ok(indexer)
76 }
77
78 async fn ensure_index(&self) -> Result<()> {
79 // Ensure events index
80 self.ensure_events_index().await?;
81 // Ensure profiles index
82 self.ensure_profiles_index().await?;
83 // Ensure LFG profiles index
84 self.ensure_lfg_profiles_index().await?;
85 Ok(())
86 }
87
88 async fn ensure_events_index(&self) -> Result<()> {
89 let exists_response = self
90 .client
91 .indices()
92 .exists(IndicesExistsParts::Index(&[EVENTS_INDEX_NAME]))
93 .send()
94 .await?;
95
96 if exists_response.status_code().is_success() {
97 tracing::info!("OpenSearch index {} already exists", EVENTS_INDEX_NAME);
98 return Ok(());
99 }
100
101 let index_body = json!({
102 "mappings": {
103 "properties": {
104 "did": { "type": "keyword" },
105 "handle": { "type": "keyword" },
106 "name": { "type": "text" },
107 "description": { "type": "text" },
108 "tags": { "type": "keyword" },
109 "location_cids": { "type": "keyword" },
110 "locations_geo": { "type": "geo_point" },
111 "start_time": { "type": "date" },
112 "end_time": { "type": "date" },
113 "created_at": { "type": "date" },
114 "updated_at": { "type": "date" }
115 }
116 },
117 });
118
119 let response = self
120 .client
121 .indices()
122 .create(IndicesCreateParts::Index(EVENTS_INDEX_NAME))
123 .body(index_body)
124 .send()
125 .await?;
126
127 if response.status_code().is_success() {
128 tracing::info!("Created OpenSearch index {}", EVENTS_INDEX_NAME);
129 } else {
130 let error_body = response.text().await?;
131 return Err(SearchIndexerError::IndexCreationFailed { error_body }.into());
132 }
133
134 Ok(())
135 }
136
137 async fn ensure_profiles_index(&self) -> Result<()> {
138 let exists_response = self
139 .client
140 .indices()
141 .exists(IndicesExistsParts::Index(&[PROFILES_INDEX_NAME]))
142 .send()
143 .await?;
144
145 if exists_response.status_code().is_success() {
146 tracing::info!("OpenSearch index {} already exists", PROFILES_INDEX_NAME);
147 return Ok(());
148 }
149
150 let index_body = json!({
151 "mappings": {
152 "properties": {
153 "did": { "type": "keyword" },
154 "handle": { "type": "keyword" },
155 "display_name": { "type": "text" },
156 "description": { "type": "text" },
157 "tags": { "type": "keyword" },
158 "created_at": { "type": "date" },
159 "updated_at": { "type": "date" }
160 }
161 },
162 });
163
164 let response = self
165 .client
166 .indices()
167 .create(IndicesCreateParts::Index(PROFILES_INDEX_NAME))
168 .body(index_body)
169 .send()
170 .await?;
171
172 if response.status_code().is_success() {
173 tracing::info!("Created OpenSearch index {}", PROFILES_INDEX_NAME);
174 } else {
175 let error_body = response.text().await?;
176 return Err(SearchIndexerError::IndexCreationFailed { error_body }.into());
177 }
178
179 Ok(())
180 }
181
182 async fn ensure_lfg_profiles_index(&self) -> Result<()> {
183 let exists_response = self
184 .client
185 .indices()
186 .exists(IndicesExistsParts::Index(&[LFG_INDEX_NAME]))
187 .send()
188 .await?;
189
190 if exists_response.status_code().is_success() {
191 tracing::info!("OpenSearch index {} already exists", LFG_INDEX_NAME);
192 return Ok(());
193 }
194
195 let index_body = json!({
196 "mappings": {
197 "properties": {
198 "aturi": { "type": "keyword" },
199 "did": { "type": "keyword" },
200 "location": { "type": "geo_point" },
201 "tags": { "type": "keyword" },
202 "starts_at": { "type": "date" },
203 "ends_at": { "type": "date" },
204 "active": { "type": "boolean" },
205 "created_at": { "type": "date" }
206 }
207 },
208 });
209
210 let response = self
211 .client
212 .indices()
213 .create(IndicesCreateParts::Index(LFG_INDEX_NAME))
214 .body(index_body)
215 .send()
216 .await?;
217
218 if response.status_code().is_success() {
219 tracing::info!("Created OpenSearch index {}", LFG_INDEX_NAME);
220 } else {
221 let error_body = response.text().await?;
222 return Err(SearchIndexerError::IndexCreationFailed { error_body }.into());
223 }
224
225 Ok(())
226 }
227
228 /// Index a commit event (create or update).
229 ///
230 /// Dispatches to the appropriate indexer based on collection type.
231 /// Takes ownership of `record` to avoid cloning during deserialization.
232 pub async fn index_commit(
233 &self,
234 did: &str,
235 collection: &str,
236 rkey: &str,
237 record: Value,
238 ) -> Result<()> {
239 match collection {
240 "community.lexicon.calendar.event" => self.index_event(did, rkey, record).await,
241 c if c == PROFILE_NSID => self.index_profile(did, rkey, record).await,
242 c if c == LFG_NSID => self.index_lfg_profile(did, rkey, record).await,
243 _ => Ok(()),
244 }
245 }
246
247 /// Delete a record from the search index.
248 ///
249 /// Dispatches to the appropriate delete method based on collection type.
250 pub async fn delete_record(&self, did: &str, collection: &str, rkey: &str) -> Result<()> {
251 match collection {
252 "community.lexicon.calendar.event" => self.delete_event(did, rkey).await,
253 c if c == PROFILE_NSID => self.delete_profile(did, rkey).await,
254 c if c == LFG_NSID => self.delete_lfg_profile(did, rkey).await,
255 _ => Ok(()),
256 }
257 }
258
259 async fn index_event(&self, did: &str, rkey: &str, _record: Value) -> Result<()> {
260 let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey);
261
262 // Fetch the event from the database and delegate to SearchIndexManager
263 // This ensures we use the same indexing logic as the web handlers
264 match event_get(&self.pool, &aturi).await {
265 Ok(event) => {
266 self.event_index_manager
267 .index_event(&self.pool, self.identity_resolver.clone(), &event)
268 .await?;
269 tracing::debug!("Indexed event {} for DID {}", rkey, did);
270 }
271 Err(err) => {
272 // Event might not be in the database yet if content fetcher hasn't processed it
273 tracing::warn!(
274 "Could not fetch event {} for indexing: {}. It may be indexed on next update.",
275 aturi,
276 err
277 );
278 }
279 }
280
281 Ok(())
282 }
283
284 async fn delete_event(&self, did: &str, rkey: &str) -> Result<()> {
285 let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey);
286
287 // Delegate to SearchIndexManager for consistent deletion logic
288 self.event_index_manager
289 .delete_indexed_event(&aturi)
290 .await?;
291
292 tracing::debug!("Deleted event {} for DID {} from search index", rkey, did);
293 Ok(())
294 }
295
296 async fn index_profile(&self, did: &str, rkey: &str, record: Value) -> Result<()> {
297 let profile: Profile = serde_json::from_value(record)?;
298
299 let document = self.ensure_identity_stored(did).await?;
300 let handle = document.handles().unwrap_or("invalid.handle");
301
302 let aturi = build_aturi(did, PROFILE_NSID, rkey);
303
304 // Extract fields from the Profile struct
305 let display_name = profile.display_name.as_deref().unwrap_or("");
306 let description = profile.description.as_deref().unwrap_or("");
307
308 // Extract hashtags from facets
309 let tags = get_profile_hashtags(&profile);
310
311 let doc = json!({
312 "did": did,
313 "handle": handle,
314 "display_name": display_name,
315 "description": description,
316 "tags": tags,
317 "created_at": json!(chrono::Utc::now()),
318 "updated_at": json!(chrono::Utc::now())
319 });
320
321 let response = self
322 .client
323 .index(IndexParts::IndexId(PROFILES_INDEX_NAME, &aturi))
324 .body(doc)
325 .send()
326 .await?;
327
328 if response.status_code().is_success() {
329 tracing::debug!("Indexed profile {} for DID {}", rkey, did);
330 } else {
331 let error_body = response.text().await?;
332 tracing::error!("Failed to index profile: {}", error_body);
333 }
334
335 Ok(())
336 }
337
338 async fn delete_profile(&self, did: &str, rkey: &str) -> Result<()> {
339 let aturi = build_aturi(did, PROFILE_NSID, rkey);
340
341 let response = self
342 .client
343 .delete(DeleteParts::IndexId(PROFILES_INDEX_NAME, &aturi))
344 .send()
345 .await?;
346
347 if response.status_code().is_success() || response.status_code() == 404 {
348 tracing::debug!("Deleted profile {} for DID {} from search index", rkey, did);
349 } else {
350 let error_body = response.text().await?;
351 tracing::error!("Failed to delete profile from index: {}", error_body);
352 }
353
354 Ok(())
355 }
356
357 async fn index_lfg_profile(&self, did: &str, rkey: &str, record: Value) -> Result<()> {
358 let lfg: Lfg = serde_json::from_value(record)?;
359 let aturi = build_aturi(did, LFG_NSID, rkey);
360
361 // Extract coordinates from location
362 let (lat, lon) = lfg.get_coordinates().unwrap_or((0.0, 0.0));
363
364 // Delegate to SearchIndexManager for consistent indexing logic
365 self.event_index_manager
366 .index_lfg_profile(
367 &aturi,
368 did,
369 lat,
370 lon,
371 &lfg.tags,
372 &lfg.starts_at,
373 &lfg.ends_at,
374 &lfg.created_at,
375 lfg.active,
376 )
377 .await?;
378
379 tracing::debug!("Indexed LFG profile {} for DID {}", rkey, did);
380 Ok(())
381 }
382
383 async fn delete_lfg_profile(&self, did: &str, rkey: &str) -> Result<()> {
384 let aturi = build_aturi(did, LFG_NSID, rkey);
385
386 // Delegate to SearchIndexManager for consistent deletion logic
387 self.event_index_manager.delete_lfg_profile(&aturi).await?;
388
389 tracing::debug!(
390 "Deleted LFG profile {} for DID {} from search index",
391 rkey,
392 did
393 );
394 Ok(())
395 }
396
397 async fn ensure_identity_stored(&self, did: &str) -> Result<Document> {
398 // Check if we already have this identity
399 if let Some(document) = self.document_storage.get_document_by_did(did).await? {
400 return Ok(document);
401 }
402
403 let document = self.identity_resolver.resolve(did).await?;
404 self.document_storage
405 .store_document(document.clone())
406 .await?;
407 Ok(document)
408 }
409}