The smokesignal.events web application
at main 409 lines 14 kB view raw
1use anyhow::Result; 2use atproto_identity::{model::Document, resolve::IdentityResolver, traits::DidDocumentStorage}; 3use atproto_record::lexicon::community::lexicon::calendar::event::NSID as LexiconCommunityEventNSID; 4use opensearch::{ 5 DeleteParts, IndexParts, OpenSearch, 6 http::transport::Transport, 7 indices::{IndicesCreateParts, IndicesExistsParts}, 8}; 9use serde_json::{Value, json}; 10use std::sync::Arc; 11 12use crate::atproto::lexicon::lfg::{Lfg, NSID as LFG_NSID}; 13use crate::atproto::lexicon::profile::{NSID as PROFILE_NSID, Profile}; 14use crate::atproto::utils::get_profile_hashtags; 15use crate::search_index::SearchIndexManager; 16use crate::storage::StoragePool; 17use crate::storage::event::event_get; 18use crate::task_search_indexer_errors::SearchIndexerError; 19 20/// Build an AT URI with pre-allocated capacity to avoid format! overhead. 21#[inline] 22fn build_aturi(did: &str, nsid: &str, rkey: &str) -> String { 23 let capacity = 7 + did.len() + nsid.len() + rkey.len(); 24 let mut uri = String::with_capacity(capacity); 25 uri.push_str("at://"); 26 uri.push_str(did); 27 uri.push('/'); 28 uri.push_str(nsid); 29 uri.push('/'); 30 uri.push_str(rkey); 31 uri 32} 33 34const EVENTS_INDEX_NAME: &str = "smokesignal-events"; 35const PROFILES_INDEX_NAME: &str = "smokesignal-profiles"; 36const LFG_INDEX_NAME: &str = "smokesignal-lfg-profile"; 37 38pub struct SearchIndexer { 39 client: Arc<OpenSearch>, 40 pool: StoragePool, 41 identity_resolver: Arc<dyn IdentityResolver>, 42 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>, 43 event_index_manager: SearchIndexManager, 44} 45 46impl SearchIndexer { 47 /// Create a new SearchIndexer. 48 /// 49 /// # Arguments 50 /// 51 /// * `endpoint` - OpenSearch endpoint URL 52 /// * `pool` - Database connection pool for fetching events 53 /// * `identity_resolver` - Resolver for DID identities 54 /// * `document_storage` - Storage for DID documents 55 pub async fn new( 56 endpoint: &str, 57 pool: StoragePool, 58 identity_resolver: Arc<dyn IdentityResolver>, 59 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>, 60 ) -> Result<Self> { 61 let transport = Transport::single_node(endpoint)?; 62 let client = Arc::new(OpenSearch::new(transport)); 63 let event_index_manager = SearchIndexManager::new(endpoint)?; 64 65 let indexer = Self { 66 client, 67 pool, 68 identity_resolver, 69 document_storage, 70 event_index_manager, 71 }; 72 73 indexer.ensure_index().await?; 74 75 Ok(indexer) 76 } 77 78 async fn ensure_index(&self) -> Result<()> { 79 // Ensure events index 80 self.ensure_events_index().await?; 81 // Ensure profiles index 82 self.ensure_profiles_index().await?; 83 // Ensure LFG profiles index 84 self.ensure_lfg_profiles_index().await?; 85 Ok(()) 86 } 87 88 async fn ensure_events_index(&self) -> Result<()> { 89 let exists_response = self 90 .client 91 .indices() 92 .exists(IndicesExistsParts::Index(&[EVENTS_INDEX_NAME])) 93 .send() 94 .await?; 95 96 if exists_response.status_code().is_success() { 97 tracing::info!("OpenSearch index {} already exists", EVENTS_INDEX_NAME); 98 return Ok(()); 99 } 100 101 let index_body = json!({ 102 "mappings": { 103 "properties": { 104 "did": { "type": "keyword" }, 105 "handle": { "type": "keyword" }, 106 "name": { "type": "text" }, 107 "description": { "type": "text" }, 108 "tags": { "type": "keyword" }, 109 "location_cids": { "type": "keyword" }, 110 "locations_geo": { "type": "geo_point" }, 111 "start_time": { "type": "date" }, 112 "end_time": { "type": "date" }, 113 "created_at": { "type": "date" }, 114 "updated_at": { "type": "date" } 115 } 116 }, 117 }); 118 119 let response = self 120 .client 121 .indices() 122 .create(IndicesCreateParts::Index(EVENTS_INDEX_NAME)) 123 .body(index_body) 124 .send() 125 .await?; 126 127 if response.status_code().is_success() { 128 tracing::info!("Created OpenSearch index {}", EVENTS_INDEX_NAME); 129 } else { 130 let error_body = response.text().await?; 131 return Err(SearchIndexerError::IndexCreationFailed { error_body }.into()); 132 } 133 134 Ok(()) 135 } 136 137 async fn ensure_profiles_index(&self) -> Result<()> { 138 let exists_response = self 139 .client 140 .indices() 141 .exists(IndicesExistsParts::Index(&[PROFILES_INDEX_NAME])) 142 .send() 143 .await?; 144 145 if exists_response.status_code().is_success() { 146 tracing::info!("OpenSearch index {} already exists", PROFILES_INDEX_NAME); 147 return Ok(()); 148 } 149 150 let index_body = json!({ 151 "mappings": { 152 "properties": { 153 "did": { "type": "keyword" }, 154 "handle": { "type": "keyword" }, 155 "display_name": { "type": "text" }, 156 "description": { "type": "text" }, 157 "tags": { "type": "keyword" }, 158 "created_at": { "type": "date" }, 159 "updated_at": { "type": "date" } 160 } 161 }, 162 }); 163 164 let response = self 165 .client 166 .indices() 167 .create(IndicesCreateParts::Index(PROFILES_INDEX_NAME)) 168 .body(index_body) 169 .send() 170 .await?; 171 172 if response.status_code().is_success() { 173 tracing::info!("Created OpenSearch index {}", PROFILES_INDEX_NAME); 174 } else { 175 let error_body = response.text().await?; 176 return Err(SearchIndexerError::IndexCreationFailed { error_body }.into()); 177 } 178 179 Ok(()) 180 } 181 182 async fn ensure_lfg_profiles_index(&self) -> Result<()> { 183 let exists_response = self 184 .client 185 .indices() 186 .exists(IndicesExistsParts::Index(&[LFG_INDEX_NAME])) 187 .send() 188 .await?; 189 190 if exists_response.status_code().is_success() { 191 tracing::info!("OpenSearch index {} already exists", LFG_INDEX_NAME); 192 return Ok(()); 193 } 194 195 let index_body = json!({ 196 "mappings": { 197 "properties": { 198 "aturi": { "type": "keyword" }, 199 "did": { "type": "keyword" }, 200 "location": { "type": "geo_point" }, 201 "tags": { "type": "keyword" }, 202 "starts_at": { "type": "date" }, 203 "ends_at": { "type": "date" }, 204 "active": { "type": "boolean" }, 205 "created_at": { "type": "date" } 206 } 207 }, 208 }); 209 210 let response = self 211 .client 212 .indices() 213 .create(IndicesCreateParts::Index(LFG_INDEX_NAME)) 214 .body(index_body) 215 .send() 216 .await?; 217 218 if response.status_code().is_success() { 219 tracing::info!("Created OpenSearch index {}", LFG_INDEX_NAME); 220 } else { 221 let error_body = response.text().await?; 222 return Err(SearchIndexerError::IndexCreationFailed { error_body }.into()); 223 } 224 225 Ok(()) 226 } 227 228 /// Index a commit event (create or update). 229 /// 230 /// Dispatches to the appropriate indexer based on collection type. 231 /// Takes ownership of `record` to avoid cloning during deserialization. 232 pub async fn index_commit( 233 &self, 234 did: &str, 235 collection: &str, 236 rkey: &str, 237 record: Value, 238 ) -> Result<()> { 239 match collection { 240 "community.lexicon.calendar.event" => self.index_event(did, rkey, record).await, 241 c if c == PROFILE_NSID => self.index_profile(did, rkey, record).await, 242 c if c == LFG_NSID => self.index_lfg_profile(did, rkey, record).await, 243 _ => Ok(()), 244 } 245 } 246 247 /// Delete a record from the search index. 248 /// 249 /// Dispatches to the appropriate delete method based on collection type. 250 pub async fn delete_record(&self, did: &str, collection: &str, rkey: &str) -> Result<()> { 251 match collection { 252 "community.lexicon.calendar.event" => self.delete_event(did, rkey).await, 253 c if c == PROFILE_NSID => self.delete_profile(did, rkey).await, 254 c if c == LFG_NSID => self.delete_lfg_profile(did, rkey).await, 255 _ => Ok(()), 256 } 257 } 258 259 async fn index_event(&self, did: &str, rkey: &str, _record: Value) -> Result<()> { 260 let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey); 261 262 // Fetch the event from the database and delegate to SearchIndexManager 263 // This ensures we use the same indexing logic as the web handlers 264 match event_get(&self.pool, &aturi).await { 265 Ok(event) => { 266 self.event_index_manager 267 .index_event(&self.pool, self.identity_resolver.clone(), &event) 268 .await?; 269 tracing::debug!("Indexed event {} for DID {}", rkey, did); 270 } 271 Err(err) => { 272 // Event might not be in the database yet if content fetcher hasn't processed it 273 tracing::warn!( 274 "Could not fetch event {} for indexing: {}. It may be indexed on next update.", 275 aturi, 276 err 277 ); 278 } 279 } 280 281 Ok(()) 282 } 283 284 async fn delete_event(&self, did: &str, rkey: &str) -> Result<()> { 285 let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey); 286 287 // Delegate to SearchIndexManager for consistent deletion logic 288 self.event_index_manager 289 .delete_indexed_event(&aturi) 290 .await?; 291 292 tracing::debug!("Deleted event {} for DID {} from search index", rkey, did); 293 Ok(()) 294 } 295 296 async fn index_profile(&self, did: &str, rkey: &str, record: Value) -> Result<()> { 297 let profile: Profile = serde_json::from_value(record)?; 298 299 let document = self.ensure_identity_stored(did).await?; 300 let handle = document.handles().unwrap_or("invalid.handle"); 301 302 let aturi = build_aturi(did, PROFILE_NSID, rkey); 303 304 // Extract fields from the Profile struct 305 let display_name = profile.display_name.as_deref().unwrap_or(""); 306 let description = profile.description.as_deref().unwrap_or(""); 307 308 // Extract hashtags from facets 309 let tags = get_profile_hashtags(&profile); 310 311 let doc = json!({ 312 "did": did, 313 "handle": handle, 314 "display_name": display_name, 315 "description": description, 316 "tags": tags, 317 "created_at": json!(chrono::Utc::now()), 318 "updated_at": json!(chrono::Utc::now()) 319 }); 320 321 let response = self 322 .client 323 .index(IndexParts::IndexId(PROFILES_INDEX_NAME, &aturi)) 324 .body(doc) 325 .send() 326 .await?; 327 328 if response.status_code().is_success() { 329 tracing::debug!("Indexed profile {} for DID {}", rkey, did); 330 } else { 331 let error_body = response.text().await?; 332 tracing::error!("Failed to index profile: {}", error_body); 333 } 334 335 Ok(()) 336 } 337 338 async fn delete_profile(&self, did: &str, rkey: &str) -> Result<()> { 339 let aturi = build_aturi(did, PROFILE_NSID, rkey); 340 341 let response = self 342 .client 343 .delete(DeleteParts::IndexId(PROFILES_INDEX_NAME, &aturi)) 344 .send() 345 .await?; 346 347 if response.status_code().is_success() || response.status_code() == 404 { 348 tracing::debug!("Deleted profile {} for DID {} from search index", rkey, did); 349 } else { 350 let error_body = response.text().await?; 351 tracing::error!("Failed to delete profile from index: {}", error_body); 352 } 353 354 Ok(()) 355 } 356 357 async fn index_lfg_profile(&self, did: &str, rkey: &str, record: Value) -> Result<()> { 358 let lfg: Lfg = serde_json::from_value(record)?; 359 let aturi = build_aturi(did, LFG_NSID, rkey); 360 361 // Extract coordinates from location 362 let (lat, lon) = lfg.get_coordinates().unwrap_or((0.0, 0.0)); 363 364 // Delegate to SearchIndexManager for consistent indexing logic 365 self.event_index_manager 366 .index_lfg_profile( 367 &aturi, 368 did, 369 lat, 370 lon, 371 &lfg.tags, 372 &lfg.starts_at, 373 &lfg.ends_at, 374 &lfg.created_at, 375 lfg.active, 376 ) 377 .await?; 378 379 tracing::debug!("Indexed LFG profile {} for DID {}", rkey, did); 380 Ok(()) 381 } 382 383 async fn delete_lfg_profile(&self, did: &str, rkey: &str) -> Result<()> { 384 let aturi = build_aturi(did, LFG_NSID, rkey); 385 386 // Delegate to SearchIndexManager for consistent deletion logic 387 self.event_index_manager.delete_lfg_profile(&aturi).await?; 388 389 tracing::debug!( 390 "Deleted LFG profile {} for DID {} from search index", 391 rkey, 392 did 393 ); 394 Ok(()) 395 } 396 397 async fn ensure_identity_stored(&self, did: &str) -> Result<Document> { 398 // Check if we already have this identity 399 if let Some(document) = self.document_storage.get_document_by_did(did).await? { 400 return Ok(document); 401 } 402 403 let document = self.identity_resolver.resolve(did).await?; 404 self.document_storage 405 .store_document(document.clone()) 406 .await?; 407 Ok(document) 408 } 409}