The smokesignal.events web application
1use anyhow::Result;
2use atproto_identity::resolve::IdentityResolver;
3use opensearch::{
4 DeleteParts, IndexParts, OpenSearch, SearchParts,
5 http::transport::Transport,
6 indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts},
7};
8use serde::{Deserialize, Serialize};
9use serde_json::{Value, json};
10use std::sync::Arc;
11
12use crate::atproto::lexicon::profile::Profile as ProfileRecord;
13use crate::atproto::utils::get_profile_hashtags;
14use crate::profile_index_errors::ProfileIndexError;
15use crate::storage::{StoragePool, identity_profile::handle_for_did, profile::profile_list};
16
17const INDEX_NAME: &str = "smokesignal-profiles";
18
19#[derive(Debug, Serialize, Deserialize)]
20pub struct IndexedProfile {
21 pub aturi: String,
22 pub did: String,
23 pub handle: String,
24 pub display_name: String,
25 pub description: String,
26 pub tags: Vec<String>,
27 pub created_at: String,
28 pub updated_at: String,
29}
30
31#[derive(Debug, Serialize)]
32pub struct ProfileIndexStats {
33 pub total_documents: u64,
34 pub index_exists: bool,
35}
36
37pub struct ProfileIndexManager {
38 client: Arc<OpenSearch>,
39}
40
41impl ProfileIndexManager {
42 pub fn new(endpoint: &str) -> Result<Self> {
43 let transport = Transport::single_node(endpoint)?;
44 let client = Arc::new(OpenSearch::new(transport));
45
46 Ok(Self { client })
47 }
48
49 /// Check if the index exists
50 pub async fn index_exists(&self) -> Result<bool> {
51 let response = self
52 .client
53 .indices()
54 .exists(IndicesExistsParts::Index(&[INDEX_NAME]))
55 .send()
56 .await?;
57
58 Ok(response.status_code().is_success())
59 }
60
61 /// Get statistics about the profile index
62 pub async fn get_stats(&self) -> Result<ProfileIndexStats> {
63 let index_exists = self.index_exists().await?;
64
65 let total_documents = if index_exists {
66 let search_body = json!({
67 "size": 0,
68 "query": {
69 "match_all": {}
70 }
71 });
72
73 let response = self
74 .client
75 .search(SearchParts::Index(&[INDEX_NAME]))
76 .body(search_body)
77 .send()
78 .await?;
79
80 if response.status_code().is_success() {
81 let body = response.json::<Value>().await?;
82 body["hits"]["total"]["value"].as_u64().unwrap_or(0)
83 } else {
84 0
85 }
86 } else {
87 0
88 };
89
90 Ok(ProfileIndexStats {
91 total_documents,
92 index_exists,
93 })
94 }
95
96 /// List indexed profiles with pagination
97 pub async fn list_indexed_profiles(
98 &self,
99 page: i64,
100 page_size: i64,
101 ) -> Result<(u64, Vec<IndexedProfile>)> {
102 // Check if index exists first
103 if !self.index_exists().await? {
104 // Return empty results if index doesn't exist
105 return Ok((0, vec![]));
106 }
107
108 let offset = (page - 1) * page_size;
109
110 let search_body = json!({
111 "query": {
112 "match_all": {}
113 },
114 "sort": [{ "updated_at": { "order": "desc" } }],
115 "from": offset,
116 "size": page_size + 1 // Fetch one more to know if there are more entries
117 });
118
119 let response = self
120 .client
121 .search(SearchParts::Index(&[INDEX_NAME]))
122 .body(search_body)
123 .send()
124 .await?;
125
126 if !response.status_code().is_success() {
127 return Err(ProfileIndexError::ListProfilesFailed.into());
128 }
129
130 let body = response.json::<Value>().await?;
131 let total = body["hits"]["total"]["value"].as_u64().unwrap_or(0);
132
133 let hits = body["hits"]["hits"]
134 .as_array()
135 .ok_or(ProfileIndexError::InvalidSearchResponse)?;
136
137 let profiles: Vec<IndexedProfile> = hits
138 .iter()
139 .filter_map(|hit| {
140 let source = &hit["_source"];
141 serde_json::from_value(source.clone()).ok()
142 })
143 .collect();
144
145 Ok((total, profiles))
146 }
147
148 /// Search indexed profiles across multiple fields
149 pub async fn search_indexed_profiles(
150 &self,
151 query: &str,
152 page: i64,
153 page_size: i64,
154 ) -> Result<(u64, Vec<IndexedProfile>)> {
155 // Check if index exists first
156 if !self.index_exists().await? {
157 // Return empty results if index doesn't exist
158 return Ok((0, vec![]));
159 }
160
161 let offset = (page - 1) * page_size;
162
163 let search_body = json!({
164 "query": {
165 "simple_query_string": {
166 "query": query,
167 "fields": [
168 "display_name^3",
169 "description^2",
170 "tags^2",
171 "handle",
172 "did"
173 ]
174 }
175 },
176 "sort": [
177 { "_score": { "order": "desc" } },
178 { "updated_at": { "order": "desc" } }
179 ],
180 "from": offset,
181 "size": page_size + 1
182 });
183
184 let response = self
185 .client
186 .search(SearchParts::Index(&[INDEX_NAME]))
187 .body(search_body)
188 .send()
189 .await?;
190
191 if !response.status_code().is_success() {
192 return Err(ProfileIndexError::SearchFailed.into());
193 }
194
195 let body = response.json::<Value>().await?;
196 let total = body["hits"]["total"]["value"].as_u64().unwrap_or(0);
197
198 let hits = body["hits"]["hits"]
199 .as_array()
200 .ok_or(ProfileIndexError::InvalidSearchResponse)?;
201
202 let profiles: Vec<IndexedProfile> = hits
203 .iter()
204 .filter_map(|hit| {
205 let source = &hit["_source"];
206 serde_json::from_value(source.clone()).ok()
207 })
208 .collect();
209
210 Ok((total, profiles))
211 }
212
213 /// Delete a single indexed profile by AT-URI
214 pub async fn delete_indexed_profile(&self, aturi: &str) -> Result<()> {
215 let response = self
216 .client
217 .delete(DeleteParts::IndexId(INDEX_NAME, aturi))
218 .send()
219 .await?;
220
221 if !response.status_code().is_success() && response.status_code() != 404 {
222 return Err(ProfileIndexError::DeleteProfileFailed.into());
223 }
224
225 Ok(())
226 }
227
228 /// Delete the entire index
229 pub async fn delete_index(&self) -> Result<()> {
230 let exists = self.index_exists().await?;
231
232 if !exists {
233 tracing::info!("Index {} does not exist, nothing to delete", INDEX_NAME);
234 return Ok(());
235 }
236
237 let response = self
238 .client
239 .indices()
240 .delete(IndicesDeleteParts::Index(&[INDEX_NAME]))
241 .send()
242 .await?;
243
244 if !response.status_code().is_success() {
245 let error_body = response.text().await?;
246 return Err(ProfileIndexError::IndexDeletionFailed { error_body }.into());
247 }
248
249 tracing::info!("Deleted OpenSearch index {}", INDEX_NAME);
250 Ok(())
251 }
252
253 /// Create the index with proper mappings
254 pub async fn create_index(&self) -> Result<()> {
255 let exists = self.index_exists().await?;
256
257 if exists {
258 tracing::info!("Index {} already exists", INDEX_NAME);
259 return Ok(());
260 }
261
262 let index_body = json!({
263 "mappings": {
264 "properties": {
265 "did": { "type": "keyword" },
266 "handle": { "type": "keyword" },
267 "display_name": { "type": "text" },
268 "description": { "type": "text" },
269 "tags": { "type": "keyword" },
270 "created_at": { "type": "date" },
271 "updated_at": { "type": "date" }
272 }
273 },
274 });
275
276 let response = self
277 .client
278 .indices()
279 .create(IndicesCreateParts::Index(INDEX_NAME))
280 .body(index_body)
281 .send()
282 .await?;
283
284 if !response.status_code().is_success() {
285 let error_body = response.text().await?;
286 return Err(ProfileIndexError::IndexCreationFailed { error_body }.into());
287 }
288
289 tracing::info!("Created OpenSearch index {}", INDEX_NAME);
290 Ok(())
291 }
292
293 /// Index a single profile
294 async fn index_profile(
295 &self,
296 pool: &StoragePool,
297 identity_resolver: Arc<dyn IdentityResolver>,
298 profile: &crate::storage::profile::model::Profile,
299 ) -> Result<()> {
300 // Parse the profile record
301 let profile_record: ProfileRecord = serde_json::from_value(profile.record.0.clone())?;
302
303 // Get the handle for the DID
304 let handle = match handle_for_did(pool, &profile.did).await {
305 Ok(identity_profile) => identity_profile.handle,
306 Err(_) => {
307 // Fallback to resolving the identity
308 match identity_resolver.resolve(&profile.did).await {
309 Ok(document) => document.handles().unwrap_or("unknown.handle").to_string(),
310 Err(_) => "unknown.handle".to_string(),
311 }
312 }
313 };
314
315 // Extract hashtags from facets
316 let tags = get_profile_hashtags(&profile_record);
317
318 let doc = json!({
319 "aturi": profile.aturi,
320 "did": profile.did,
321 "handle": handle,
322 "display_name": profile_record.display_name.unwrap_or_default(),
323 "description": profile_record.description.unwrap_or_default(),
324 "tags": tags,
325 "created_at": json!(chrono::Utc::now()),
326 "updated_at": json!(profile.updated_at.unwrap_or_else(chrono::Utc::now))
327 });
328
329 let response = self
330 .client
331 .index(IndexParts::IndexId(INDEX_NAME, &profile.aturi))
332 .body(doc)
333 .send()
334 .await?;
335
336 if !response.status_code().is_success() {
337 let error_body = response.text().await?;
338 tracing::error!("Failed to index profile {}: {}", profile.aturi, error_body);
339 return Err(ProfileIndexError::IndexProfileFailed.into());
340 }
341
342 Ok(())
343 }
344
345 /// Rebuild the entire index by re-indexing all profiles from the database
346 pub async fn rebuild_index(
347 &self,
348 pool: &StoragePool,
349 identity_resolver: Arc<dyn IdentityResolver>,
350 ) -> Result<u64> {
351 tracing::info!("Starting profile index rebuild");
352
353 // Step 1: Delete the old index
354 self.delete_index().await?;
355
356 // Step 2: Create a fresh index
357 self.create_index().await?;
358
359 // Step 3: Re-index all profiles from the database
360 let mut indexed_count = 0u64;
361 let page_size = 100i64;
362 let mut current_page = 1i64;
363
364 loop {
365 let (total_count, profiles) = profile_list(pool, current_page, page_size).await?;
366
367 if profiles.is_empty() {
368 break;
369 }
370
371 for profile in &profiles {
372 match self
373 .index_profile(pool, identity_resolver.clone(), profile)
374 .await
375 {
376 Ok(_) => {
377 indexed_count += 1;
378 tracing::debug!("Indexed profile {}", profile.aturi);
379 }
380 Err(err) => {
381 tracing::warn!("Failed to index profile {}: {}", profile.aturi, err);
382 }
383 }
384 }
385
386 // Check if we've processed all profiles
387 if profiles.len() <= page_size as usize {
388 break;
389 }
390
391 current_page += 1;
392
393 // Log progress
394 if current_page % 10 == 0 {
395 tracing::info!(
396 "Profile index rebuild progress: {} of {} profiles indexed",
397 indexed_count,
398 total_count
399 );
400 }
401 }
402
403 tracing::info!(
404 "Profile index rebuild complete: {} profiles indexed",
405 indexed_count
406 );
407 Ok(indexed_count)
408 }
409}