The smokesignal.events web application
at main 409 lines 12 kB view raw
1use anyhow::Result; 2use atproto_identity::resolve::IdentityResolver; 3use opensearch::{ 4 DeleteParts, IndexParts, OpenSearch, SearchParts, 5 http::transport::Transport, 6 indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts}, 7}; 8use serde::{Deserialize, Serialize}; 9use serde_json::{Value, json}; 10use std::sync::Arc; 11 12use crate::atproto::lexicon::profile::Profile as ProfileRecord; 13use crate::atproto::utils::get_profile_hashtags; 14use crate::profile_index_errors::ProfileIndexError; 15use crate::storage::{StoragePool, identity_profile::handle_for_did, profile::profile_list}; 16 17const INDEX_NAME: &str = "smokesignal-profiles"; 18 19#[derive(Debug, Serialize, Deserialize)] 20pub struct IndexedProfile { 21 pub aturi: String, 22 pub did: String, 23 pub handle: String, 24 pub display_name: String, 25 pub description: String, 26 pub tags: Vec<String>, 27 pub created_at: String, 28 pub updated_at: String, 29} 30 31#[derive(Debug, Serialize)] 32pub struct ProfileIndexStats { 33 pub total_documents: u64, 34 pub index_exists: bool, 35} 36 37pub struct ProfileIndexManager { 38 client: Arc<OpenSearch>, 39} 40 41impl ProfileIndexManager { 42 pub fn new(endpoint: &str) -> Result<Self> { 43 let transport = Transport::single_node(endpoint)?; 44 let client = Arc::new(OpenSearch::new(transport)); 45 46 Ok(Self { client }) 47 } 48 49 /// Check if the index exists 50 pub async fn index_exists(&self) -> Result<bool> { 51 let response = self 52 .client 53 .indices() 54 .exists(IndicesExistsParts::Index(&[INDEX_NAME])) 55 .send() 56 .await?; 57 58 Ok(response.status_code().is_success()) 59 } 60 61 /// Get statistics about the profile index 62 pub async fn get_stats(&self) -> Result<ProfileIndexStats> { 63 let index_exists = self.index_exists().await?; 64 65 let total_documents = if index_exists { 66 let search_body = json!({ 67 "size": 0, 68 "query": { 69 "match_all": {} 70 } 71 }); 72 73 let response = self 74 .client 75 .search(SearchParts::Index(&[INDEX_NAME])) 76 .body(search_body) 77 .send() 78 .await?; 79 80 if response.status_code().is_success() { 81 let body = response.json::<Value>().await?; 82 body["hits"]["total"]["value"].as_u64().unwrap_or(0) 83 } else { 84 0 85 } 86 } else { 87 0 88 }; 89 90 Ok(ProfileIndexStats { 91 total_documents, 92 index_exists, 93 }) 94 } 95 96 /// List indexed profiles with pagination 97 pub async fn list_indexed_profiles( 98 &self, 99 page: i64, 100 page_size: i64, 101 ) -> Result<(u64, Vec<IndexedProfile>)> { 102 // Check if index exists first 103 if !self.index_exists().await? { 104 // Return empty results if index doesn't exist 105 return Ok((0, vec![])); 106 } 107 108 let offset = (page - 1) * page_size; 109 110 let search_body = json!({ 111 "query": { 112 "match_all": {} 113 }, 114 "sort": [{ "updated_at": { "order": "desc" } }], 115 "from": offset, 116 "size": page_size + 1 // Fetch one more to know if there are more entries 117 }); 118 119 let response = self 120 .client 121 .search(SearchParts::Index(&[INDEX_NAME])) 122 .body(search_body) 123 .send() 124 .await?; 125 126 if !response.status_code().is_success() { 127 return Err(ProfileIndexError::ListProfilesFailed.into()); 128 } 129 130 let body = response.json::<Value>().await?; 131 let total = body["hits"]["total"]["value"].as_u64().unwrap_or(0); 132 133 let hits = body["hits"]["hits"] 134 .as_array() 135 .ok_or(ProfileIndexError::InvalidSearchResponse)?; 136 137 let profiles: Vec<IndexedProfile> = hits 138 .iter() 139 .filter_map(|hit| { 140 let source = &hit["_source"]; 141 serde_json::from_value(source.clone()).ok() 142 }) 143 .collect(); 144 145 Ok((total, profiles)) 146 } 147 148 /// Search indexed profiles across multiple fields 149 pub async fn search_indexed_profiles( 150 &self, 151 query: &str, 152 page: i64, 153 page_size: i64, 154 ) -> Result<(u64, Vec<IndexedProfile>)> { 155 // Check if index exists first 156 if !self.index_exists().await? { 157 // Return empty results if index doesn't exist 158 return Ok((0, vec![])); 159 } 160 161 let offset = (page - 1) * page_size; 162 163 let search_body = json!({ 164 "query": { 165 "simple_query_string": { 166 "query": query, 167 "fields": [ 168 "display_name^3", 169 "description^2", 170 "tags^2", 171 "handle", 172 "did" 173 ] 174 } 175 }, 176 "sort": [ 177 { "_score": { "order": "desc" } }, 178 { "updated_at": { "order": "desc" } } 179 ], 180 "from": offset, 181 "size": page_size + 1 182 }); 183 184 let response = self 185 .client 186 .search(SearchParts::Index(&[INDEX_NAME])) 187 .body(search_body) 188 .send() 189 .await?; 190 191 if !response.status_code().is_success() { 192 return Err(ProfileIndexError::SearchFailed.into()); 193 } 194 195 let body = response.json::<Value>().await?; 196 let total = body["hits"]["total"]["value"].as_u64().unwrap_or(0); 197 198 let hits = body["hits"]["hits"] 199 .as_array() 200 .ok_or(ProfileIndexError::InvalidSearchResponse)?; 201 202 let profiles: Vec<IndexedProfile> = hits 203 .iter() 204 .filter_map(|hit| { 205 let source = &hit["_source"]; 206 serde_json::from_value(source.clone()).ok() 207 }) 208 .collect(); 209 210 Ok((total, profiles)) 211 } 212 213 /// Delete a single indexed profile by AT-URI 214 pub async fn delete_indexed_profile(&self, aturi: &str) -> Result<()> { 215 let response = self 216 .client 217 .delete(DeleteParts::IndexId(INDEX_NAME, aturi)) 218 .send() 219 .await?; 220 221 if !response.status_code().is_success() && response.status_code() != 404 { 222 return Err(ProfileIndexError::DeleteProfileFailed.into()); 223 } 224 225 Ok(()) 226 } 227 228 /// Delete the entire index 229 pub async fn delete_index(&self) -> Result<()> { 230 let exists = self.index_exists().await?; 231 232 if !exists { 233 tracing::info!("Index {} does not exist, nothing to delete", INDEX_NAME); 234 return Ok(()); 235 } 236 237 let response = self 238 .client 239 .indices() 240 .delete(IndicesDeleteParts::Index(&[INDEX_NAME])) 241 .send() 242 .await?; 243 244 if !response.status_code().is_success() { 245 let error_body = response.text().await?; 246 return Err(ProfileIndexError::IndexDeletionFailed { error_body }.into()); 247 } 248 249 tracing::info!("Deleted OpenSearch index {}", INDEX_NAME); 250 Ok(()) 251 } 252 253 /// Create the index with proper mappings 254 pub async fn create_index(&self) -> Result<()> { 255 let exists = self.index_exists().await?; 256 257 if exists { 258 tracing::info!("Index {} already exists", INDEX_NAME); 259 return Ok(()); 260 } 261 262 let index_body = json!({ 263 "mappings": { 264 "properties": { 265 "did": { "type": "keyword" }, 266 "handle": { "type": "keyword" }, 267 "display_name": { "type": "text" }, 268 "description": { "type": "text" }, 269 "tags": { "type": "keyword" }, 270 "created_at": { "type": "date" }, 271 "updated_at": { "type": "date" } 272 } 273 }, 274 }); 275 276 let response = self 277 .client 278 .indices() 279 .create(IndicesCreateParts::Index(INDEX_NAME)) 280 .body(index_body) 281 .send() 282 .await?; 283 284 if !response.status_code().is_success() { 285 let error_body = response.text().await?; 286 return Err(ProfileIndexError::IndexCreationFailed { error_body }.into()); 287 } 288 289 tracing::info!("Created OpenSearch index {}", INDEX_NAME); 290 Ok(()) 291 } 292 293 /// Index a single profile 294 async fn index_profile( 295 &self, 296 pool: &StoragePool, 297 identity_resolver: Arc<dyn IdentityResolver>, 298 profile: &crate::storage::profile::model::Profile, 299 ) -> Result<()> { 300 // Parse the profile record 301 let profile_record: ProfileRecord = serde_json::from_value(profile.record.0.clone())?; 302 303 // Get the handle for the DID 304 let handle = match handle_for_did(pool, &profile.did).await { 305 Ok(identity_profile) => identity_profile.handle, 306 Err(_) => { 307 // Fallback to resolving the identity 308 match identity_resolver.resolve(&profile.did).await { 309 Ok(document) => document.handles().unwrap_or("unknown.handle").to_string(), 310 Err(_) => "unknown.handle".to_string(), 311 } 312 } 313 }; 314 315 // Extract hashtags from facets 316 let tags = get_profile_hashtags(&profile_record); 317 318 let doc = json!({ 319 "aturi": profile.aturi, 320 "did": profile.did, 321 "handle": handle, 322 "display_name": profile_record.display_name.unwrap_or_default(), 323 "description": profile_record.description.unwrap_or_default(), 324 "tags": tags, 325 "created_at": json!(chrono::Utc::now()), 326 "updated_at": json!(profile.updated_at.unwrap_or_else(chrono::Utc::now)) 327 }); 328 329 let response = self 330 .client 331 .index(IndexParts::IndexId(INDEX_NAME, &profile.aturi)) 332 .body(doc) 333 .send() 334 .await?; 335 336 if !response.status_code().is_success() { 337 let error_body = response.text().await?; 338 tracing::error!("Failed to index profile {}: {}", profile.aturi, error_body); 339 return Err(ProfileIndexError::IndexProfileFailed.into()); 340 } 341 342 Ok(()) 343 } 344 345 /// Rebuild the entire index by re-indexing all profiles from the database 346 pub async fn rebuild_index( 347 &self, 348 pool: &StoragePool, 349 identity_resolver: Arc<dyn IdentityResolver>, 350 ) -> Result<u64> { 351 tracing::info!("Starting profile index rebuild"); 352 353 // Step 1: Delete the old index 354 self.delete_index().await?; 355 356 // Step 2: Create a fresh index 357 self.create_index().await?; 358 359 // Step 3: Re-index all profiles from the database 360 let mut indexed_count = 0u64; 361 let page_size = 100i64; 362 let mut current_page = 1i64; 363 364 loop { 365 let (total_count, profiles) = profile_list(pool, current_page, page_size).await?; 366 367 if profiles.is_empty() { 368 break; 369 } 370 371 for profile in &profiles { 372 match self 373 .index_profile(pool, identity_resolver.clone(), profile) 374 .await 375 { 376 Ok(_) => { 377 indexed_count += 1; 378 tracing::debug!("Indexed profile {}", profile.aturi); 379 } 380 Err(err) => { 381 tracing::warn!("Failed to index profile {}: {}", profile.aturi, err); 382 } 383 } 384 } 385 386 // Check if we've processed all profiles 387 if profiles.len() <= page_size as usize { 388 break; 389 } 390 391 current_page += 1; 392 393 // Log progress 394 if current_page % 10 == 0 { 395 tracing::info!( 396 "Profile index rebuild progress: {} of {} profiles indexed", 397 indexed_count, 398 total_count 399 ); 400 } 401 } 402 403 tracing::info!( 404 "Profile index rebuild complete: {} profiles indexed", 405 indexed_count 406 ); 407 Ok(indexed_count) 408 } 409}