···1use anyhow::Result;
2use atproto_client::client::get_dpop_json_with_headers;
03use http::HeaderMap;
4use reqwest::Client;
056use crate::atproto::auth::create_dpop_auth_from_aip_session;
7use crate::config::OAuthBackendConfig;
···9use crate::http::errors::LoginError;
10use crate::http::errors::web_error::WebError;
11use crate::http::middleware_auth::Auth;
0001213/// Result of checking if an AIP session is ready for AT Protocol operations.
14pub(crate) enum AipSessionStatus {
···82 }
83}
8400000000000000000000000000000000000000000000000000000000000000000000000085/// Check if the current AIP session is ready for AT Protocol operations.
86///
87/// This calls the AIP ready endpoint to validate the access token.
088/// For PDS sessions, this is a no-op (returns NotAip).
89pub(crate) async fn require_valid_aip_session(
90 web_context: &WebContext,
···101 return Ok(AipSessionStatus::Stale);
102 }
10300000000000104 // Get AIP hostname from config
105 let aip_hostname = match &web_context.config.oauth_backend {
106 OAuthBackendConfig::AIP { hostname, .. } => hostname,
···119 tracing::warn!(?e, "AIP ready check failed");
120 WebError::InternalError
121 })?;
000122123 if is_ready {
124 Ok(AipSessionStatus::Ready)
···1use anyhow::Result;
2use atproto_client::client::get_dpop_json_with_headers;
3+use deadpool_redis::redis::AsyncCommands;
4use http::HeaderMap;
5use reqwest::Client;
6+use sha2::{Digest, Sha256};
78use crate::atproto::auth::create_dpop_auth_from_aip_session;
9use crate::config::OAuthBackendConfig;
···11use crate::http::errors::LoginError;
12use crate::http::errors::web_error::WebError;
13use crate::http::middleware_auth::Auth;
14+15+/// TTL for AIP session ready cache entries (5 minutes).
16+const AIP_SESSION_READY_CACHE_TTL_SECS: i64 = 300;
1718/// Result of checking if an AIP session is ready for AT Protocol operations.
19pub(crate) enum AipSessionStatus {
···87 }
88}
8990+/// Generate a cache key for AIP session ready status.
91+///
92+/// Uses a SHA-256 hash of the access token to avoid storing raw tokens in Redis.
93+fn aip_session_cache_key(access_token: &str) -> String {
94+ let mut hasher = Sha256::new();
95+ hasher.update(access_token.as_bytes());
96+ let hash = hasher.finalize();
97+ format!("aip_session_ready:{:x}", hash)
98+}
99+100+/// Check Redis cache for AIP session ready status.
101+///
102+/// Returns `Some(true)` if cached as ready, `Some(false)` if cached as stale,
103+/// or `None` on cache miss or error.
104+async fn get_cached_aip_session_status(
105+ web_context: &WebContext,
106+ access_token: &str,
107+) -> Option<bool> {
108+ let cache_key = aip_session_cache_key(access_token);
109+110+ let mut conn = match web_context.cache_pool.get().await {
111+ Ok(conn) => conn,
112+ Err(e) => {
113+ tracing::debug!(?e, "Failed to get Redis connection for AIP session cache");
114+ return None;
115+ }
116+ };
117+118+ match conn.get::<_, Option<String>>(&cache_key).await {
119+ Ok(Some(value)) => {
120+ tracing::debug!(cache_key = %cache_key, value = %value, "AIP session cache hit");
121+ Some(value == "ready")
122+ }
123+ Ok(None) => {
124+ tracing::debug!(cache_key = %cache_key, "AIP session cache miss");
125+ None
126+ }
127+ Err(e) => {
128+ tracing::debug!(?e, "Redis error reading AIP session cache");
129+ None
130+ }
131+ }
132+}
133+134+/// Cache AIP session ready status in Redis with TTL.
135+async fn cache_aip_session_status(web_context: &WebContext, access_token: &str, is_ready: bool) {
136+ let cache_key = aip_session_cache_key(access_token);
137+ let value = if is_ready { "ready" } else { "stale" };
138+139+ let mut conn = match web_context.cache_pool.get().await {
140+ Ok(conn) => conn,
141+ Err(e) => {
142+ tracing::debug!(?e, "Failed to get Redis connection for AIP session cache write");
143+ return;
144+ }
145+ };
146+147+ if let Err(e) = conn
148+ .set_ex::<_, _, ()>(&cache_key, value, AIP_SESSION_READY_CACHE_TTL_SECS as u64)
149+ .await
150+ {
151+ tracing::debug!(?e, "Failed to cache AIP session status");
152+ } else {
153+ tracing::debug!(
154+ cache_key = %cache_key,
155+ value = %value,
156+ ttl_secs = AIP_SESSION_READY_CACHE_TTL_SECS,
157+ "Cached AIP session status"
158+ );
159+ }
160+}
161+162/// Check if the current AIP session is ready for AT Protocol operations.
163///
164/// This calls the AIP ready endpoint to validate the access token.
165+/// Results are cached in Redis for 5 minutes to avoid repeated validation.
166/// For PDS sessions, this is a no-op (returns NotAip).
167pub(crate) async fn require_valid_aip_session(
168 web_context: &WebContext,
···179 return Ok(AipSessionStatus::Stale);
180 }
181182+ // Check Redis cache first
183+ if let Some(cached_ready) =
184+ get_cached_aip_session_status(web_context, access_token).await
185+ {
186+ return if cached_ready {
187+ Ok(AipSessionStatus::Ready)
188+ } else {
189+ Ok(AipSessionStatus::Stale)
190+ };
191+ }
192+193 // Get AIP hostname from config
194 let aip_hostname = match &web_context.config.oauth_backend {
195 OAuthBackendConfig::AIP { hostname, .. } => hostname,
···208 tracing::warn!(?e, "AIP ready check failed");
209 WebError::InternalError
210 })?;
211+212+ // Cache the result
213+ cache_aip_session_status(web_context, access_token, is_ready).await;
214215 if is_ready {
216 Ok(AipSessionStatus::Ready)
···1+use thiserror::Error;
2+3+/// Represents errors that can occur during LFG (Looking For Group) operations.
4+///
5+/// These errors are typically triggered during validation of user-submitted
6+/// LFG creation forms or during LFG record operations.
7+#[derive(Debug, Error)]
8+pub(crate) enum LfgError {
9+ /// Error when the location is not provided.
10+ ///
11+ /// This error occurs when a user attempts to create an LFG record without
12+ /// selecting a location on the map.
13+ #[error("error-smokesignal-lfg-1 Location not set")]
14+ LocationNotSet,
15+16+ /// Error when the coordinates are invalid.
17+ ///
18+ /// This error occurs when the provided latitude or longitude
19+ /// values are not valid numbers or are out of range.
20+ #[error("error-smokesignal-lfg-2 Invalid coordinates: {0}")]
21+ InvalidCoordinates(String),
22+23+ /// Error when no tags are provided.
24+ ///
25+ /// This error occurs when a user attempts to create an LFG record without
26+ /// specifying at least one interest tag.
27+ #[error("error-smokesignal-lfg-3 Tags required (at least one)")]
28+ TagsRequired,
29+30+ /// Error when too many tags are provided.
31+ ///
32+ /// This error occurs when a user attempts to create an LFG record with
33+ /// more than the maximum allowed number of tags (10).
34+ #[error("error-smokesignal-lfg-4 Too many tags (maximum 10)")]
35+ TooManyTags,
36+37+ /// Error when an invalid duration is specified.
38+ ///
39+ /// This error occurs when the provided duration value is not one of
40+ /// the allowed options (6, 12, 24, 48, or 72 hours).
41+ #[error("error-smokesignal-lfg-5 Invalid duration")]
42+ InvalidDuration,
43+44+ /// Error when the PDS record creation fails.
45+ ///
46+ /// This error occurs when the AT Protocol server returns an error
47+ /// during LFG record creation.
48+ #[error("error-smokesignal-lfg-6 Failed to create PDS record: {message}")]
49+ PdsRecordCreationFailed { message: String },
50+51+ /// Error when no active LFG record is found.
52+ ///
53+ /// This error occurs when attempting to perform operations that
54+ /// require an active LFG record (e.g., deactivation, viewing matches).
55+ #[error("error-smokesignal-lfg-7 No active LFG record found")]
56+ NoActiveRecord,
57+58+ /// Error when user already has an active LFG record.
59+ ///
60+ /// This error occurs when a user attempts to create a new LFG record
61+ /// while they already have an active one. Users must deactivate their
62+ /// existing record before creating a new one.
63+ #[error("error-smokesignal-lfg-8 Active LFG record already exists")]
64+ ActiveRecordExists,
65+66+ /// Error when deactivation fails.
67+ ///
68+ /// This error occurs when the attempt to deactivate an LFG record
69+ /// fails due to a server or network error.
70+ #[error("error-smokesignal-lfg-9 Failed to deactivate LFG record: {message}")]
71+ DeactivationFailed { message: String },
72+73+ /// Error when a tag is invalid.
74+ ///
75+ /// This error occurs when a provided tag is empty or exceeds
76+ /// the maximum allowed length.
77+ #[error("error-smokesignal-lfg-10 Invalid tag: {0}")]
78+ InvalidTag(String),
79+}
+2
src/http/errors/mod.rs
···8pub mod delete_event_errors;
9pub mod event_view_errors;
10pub mod import_error;
011pub mod login_error;
12pub mod profile_import_error;
13pub mod middleware_errors;
···21pub(crate) use delete_event_errors::DeleteEventError;
22pub(crate) use event_view_errors::EventViewError;
23pub(crate) use import_error::ImportError;
024pub(crate) use login_error::LoginError;
25pub(crate) use profile_import_error::ProfileImportError;
26pub(crate) use middleware_errors::WebSessionError;
···8pub mod delete_event_errors;
9pub mod event_view_errors;
10pub mod import_error;
11+pub mod lfg_error;
12pub mod login_error;
13pub mod profile_import_error;
14pub mod middleware_errors;
···22pub(crate) use delete_event_errors::DeleteEventError;
23pub(crate) use event_view_errors::EventViewError;
24pub(crate) use import_error::ImportError;
25+pub(crate) use lfg_error::LfgError;
26pub(crate) use login_error::LoginError;
27pub(crate) use profile_import_error::ProfileImportError;
28pub(crate) use middleware_errors::WebSessionError;
+8
src/http/errors/web_error.rs
···18use super::create_event_errors::CreateEventError;
19use super::event_view_errors::EventViewError;
20use super::import_error::ImportError;
021use super::login_error::LoginError;
22use super::middleware_errors::MiddlewareAuthError;
23use super::url_error::UrlError;
···158 /// such as avatar/banner uploads or AT Protocol record operations.
159 #[error(transparent)]
160 BlobError(#[from] BlobError),
0000000161162 /// The AIP session has expired and the user must re-authenticate.
163 ///
···18use super::create_event_errors::CreateEventError;
19use super::event_view_errors::EventViewError;
20use super::import_error::ImportError;
21+use super::lfg_error::LfgError;
22use super::login_error::LoginError;
23use super::middleware_errors::MiddlewareAuthError;
24use super::url_error::UrlError;
···159 /// such as avatar/banner uploads or AT Protocol record operations.
160 #[error(transparent)]
161 BlobError(#[from] BlobError),
162+163+ /// Looking For Group (LFG) errors.
164+ ///
165+ /// This error occurs when there are issues with LFG operations,
166+ /// such as creating, viewing, or deactivating LFG records.
167+ #[error(transparent)]
168+ LfgError(#[from] LfgError),
169170 /// The AIP session has expired and the user must re-authenticate.
171 ///
···1+//! LFG form constants and validation utilities.
2+//!
3+//! This module provides constants for the Looking For Group (LFG) feature.
4+5+/// Allowed duration options in hours for LFG records.
6+pub(crate) const ALLOWED_DURATIONS: [u32; 5] = [6, 12, 24, 48, 72];
7+8+/// Default duration in hours for new LFG records.
9+pub(crate) const DEFAULT_DURATION_HOURS: u32 = 48;
10+11+/// Maximum number of tags allowed per LFG record.
12+pub(crate) const MAX_TAGS: usize = 10;
13+14+/// Maximum length of a single tag.
15+pub(crate) const MAX_TAG_LENGTH: usize = 64;
16+17+#[cfg(test)]
18+mod tests {
19+ use super::*;
20+21+ #[test]
22+ fn test_allowed_durations() {
23+ assert!(ALLOWED_DURATIONS.contains(&6));
24+ assert!(ALLOWED_DURATIONS.contains(&12));
25+ assert!(ALLOWED_DURATIONS.contains(&24));
26+ assert!(ALLOWED_DURATIONS.contains(&48));
27+ assert!(ALLOWED_DURATIONS.contains(&72));
28+ assert!(!ALLOWED_DURATIONS.contains(&1));
29+ assert!(!ALLOWED_DURATIONS.contains(&100));
30+ }
31+32+ #[test]
33+ fn test_default_duration() {
34+ assert_eq!(DEFAULT_DURATION_HOURS, 48);
35+ }
36+37+ #[test]
38+ fn test_max_tags() {
39+ assert_eq!(MAX_TAGS, 10);
40+ }
41+42+ #[test]
43+ fn test_max_tag_length() {
44+ assert_eq!(MAX_TAG_LENGTH, 64);
45+ }
46+}
+3
src/http/mod.rs
···38pub mod handle_finalize_acceptance;
39pub mod handle_geo_aggregation;
40pub mod handle_health;
0041pub mod handle_host_meta;
42pub mod handle_import;
43pub mod handle_index;
···67pub mod handle_xrpc_search_events;
68pub mod handler_mcp;
69pub mod import_utils;
070pub mod location_edit_status;
71pub mod macros;
72pub mod middleware_auth;
···38pub mod handle_finalize_acceptance;
39pub mod handle_geo_aggregation;
40pub mod handle_health;
41+pub mod handle_lfg;
42+pub mod h3_utils;
43pub mod handle_host_meta;
44pub mod handle_import;
45pub mod handle_index;
···69pub mod handle_xrpc_search_events;
70pub mod handler_mcp;
71pub mod import_utils;
72+pub mod lfg_form;
73pub mod location_edit_status;
74pub mod macros;
75pub mod middleware_auth;
+11-1
src/http/server.rs
···66 handle_finalize_acceptance::handle_finalize_acceptance,
67 handle_geo_aggregation::handle_geo_aggregation,
68 handle_health::{handle_alive, handle_ready, handle_started},
000069 handle_host_meta::handle_host_meta,
70 handle_import::{handle_import, handle_import_submit},
71 handle_index::handle_index,
···155 post(handle_xrpc_link_attestation),
156 )
157 // API endpoints
158- .route("/api/geo-aggregation", get(handle_geo_aggregation));
000000159160 // Add OAuth metadata route only for AT Protocol backend
161 if matches!(
···28pub mod storage;
29pub mod tap_processor;
30pub mod task_identity_refresh;
031pub mod task_oauth_requests_cleanup;
32pub mod task_search_indexer;
33pub mod task_search_indexer_errors;
···28pub mod storage;
29pub mod tap_processor;
30pub mod task_identity_refresh;
31+pub mod task_lfg_cleanup;
32pub mod task_oauth_requests_cleanup;
33pub mod task_search_indexer;
34pub mod task_search_indexer_errors;
+472-1
src/search_index.rs
···361 }
362363 /// Index a single event
364- async fn index_event(
365 &self,
366 pool: &StoragePool,
367 identity_resolver: Arc<dyn IdentityResolver>,
···931932 Ok(buckets)
933 }
000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000934}
935936#[cfg(test)]
···1+//! Storage module for LFG (Looking For Group) records.
2+//!
3+//! This module provides query helpers for LFG records stored in the `atproto_records` table.
4+//! For writes, use `atproto_record_upsert()` and `atproto_record_delete()` from the
5+//! `atproto_record` module.
6+//!
7+//! Records are stored as `AtprotoRecord` and can be deserialized to `Lfg` using:
8+//! ```ignore
9+//! let lfg: Lfg = serde_json::from_value(record.record.0.clone())?;
10+//! ```
11+12+use super::atproto_record::AtprotoRecord;
13+use super::errors::StorageError;
14+use super::StoragePool;
15+use crate::atproto::lexicon::lfg::NSID;
16+17+/// Get the active LFG record for a DID from atproto_records.
18+///
19+/// Returns the most recent active LFG record for the given DID, or None if
20+/// no active record exists.
21+pub async fn lfg_get_active_by_did(
22+ pool: &StoragePool,
23+ did: &str,
24+) -> Result<Option<AtprotoRecord>, StorageError> {
25+ let record = sqlx::query_as::<_, AtprotoRecord>(
26+ r#"
27+ SELECT aturi, did, cid, collection, indexed_at, record
28+ FROM atproto_records
29+ WHERE did = $1
30+ AND collection = $2
31+ AND (record->>'active')::boolean = true
32+ AND (record->>'endsAt')::timestamptz > NOW()
33+ ORDER BY indexed_at DESC
34+ LIMIT 1
35+ "#,
36+ )
37+ .bind(did)
38+ .bind(NSID)
39+ .fetch_optional(pool)
40+ .await
41+ .map_err(StorageError::UnableToExecuteQuery)?;
42+43+ Ok(record)
44+}
45+46+/// Get an LFG record by AT-URI from atproto_records.
47+pub async fn lfg_get_by_aturi(
48+ pool: &StoragePool,
49+ aturi: &str,
50+) -> Result<Option<AtprotoRecord>, StorageError> {
51+ let record = sqlx::query_as::<_, AtprotoRecord>(
52+ r#"
53+ SELECT aturi, did, cid, collection, indexed_at, record
54+ FROM atproto_records
55+ WHERE aturi = $1
56+ AND collection = $2
57+ "#,
58+ )
59+ .bind(aturi)
60+ .bind(NSID)
61+ .fetch_optional(pool)
62+ .await
63+ .map_err(StorageError::UnableToExecuteQuery)?;
64+65+ Ok(record)
66+}
67+68+/// Get all LFG records for a DID (including inactive/expired).
69+///
70+/// Used for tag history lookups.
71+pub async fn lfg_get_all_by_did(
72+ pool: &StoragePool,
73+ did: &str,
74+ limit: i64,
75+) -> Result<Vec<AtprotoRecord>, StorageError> {
76+ let records = sqlx::query_as::<_, AtprotoRecord>(
77+ r#"
78+ SELECT aturi, did, cid, collection, indexed_at, record
79+ FROM atproto_records
80+ WHERE did = $1
81+ AND collection = $2
82+ ORDER BY indexed_at DESC
83+ LIMIT $3
84+ "#,
85+ )
86+ .bind(did)
87+ .bind(NSID)
88+ .bind(limit)
89+ .fetch_all(pool)
90+ .await
91+ .map_err(StorageError::UnableToExecuteQuery)?;
92+93+ Ok(records)
94+}
+1
src/storage/mod.rs
···8pub mod errors;
9pub mod event;
10pub mod identity_profile;
011pub mod notification;
12pub mod oauth;
13pub mod private_event_content;
···8pub mod errors;
9pub mod event;
10pub mod identity_profile;
11+pub mod lfg;
12pub mod notification;
13pub mod oauth;
14pub mod private_event_content;
···1+//! LFG (Looking For Group) cleanup background task.
2+//!
3+//! This task runs periodically to deactivate expired LFG records in both
4+//! the database and OpenSearch index.
5+6+use anyhow::Result;
7+use chrono::{Duration, Utc};
8+use tokio::time::{Instant, sleep};
9+use tokio_util::sync::CancellationToken;
10+11+use crate::search_index::SearchIndexManager;
12+use crate::storage::StoragePool;
13+use crate::atproto::lexicon::lfg::NSID;
14+15+/// Configuration for the LFG cleanup task.
16+pub struct LfgCleanupTaskConfig {
17+ /// How often to run the cleanup (default: 1 hour)
18+ pub sleep_interval: Duration,
19+}
20+21+impl Default for LfgCleanupTaskConfig {
22+ fn default() -> Self {
23+ Self {
24+ sleep_interval: Duration::hours(1),
25+ }
26+ }
27+}
28+29+/// Background task that deactivates expired LFG records.
30+pub struct LfgCleanupTask {
31+ pub config: LfgCleanupTaskConfig,
32+ pub storage_pool: StoragePool,
33+ pub search_index: Option<SearchIndexManager>,
34+ pub cancellation_token: CancellationToken,
35+}
36+37+impl LfgCleanupTask {
38+ /// Creates a new LFG cleanup task.
39+ #[must_use]
40+ pub fn new(
41+ config: LfgCleanupTaskConfig,
42+ storage_pool: StoragePool,
43+ search_index: Option<SearchIndexManager>,
44+ cancellation_token: CancellationToken,
45+ ) -> Self {
46+ Self {
47+ config,
48+ storage_pool,
49+ search_index,
50+ cancellation_token,
51+ }
52+ }
53+54+ /// Runs the LFG cleanup task as a long-running process.
55+ ///
56+ /// This task:
57+ /// 1. Deactivates expired LFG records in the database
58+ /// 2. Updates the OpenSearch index to reflect expired records
59+ ///
60+ /// # Errors
61+ /// Returns an error if the sleep interval cannot be converted, or if there's
62+ /// a problem cleaning up expired records.
63+ pub async fn run(&self) -> Result<()> {
64+ tracing::info!("LfgCleanupTask started");
65+66+ let interval = self.config.sleep_interval.to_std()?;
67+68+ let sleeper = sleep(interval);
69+ tokio::pin!(sleeper);
70+71+ loop {
72+ tokio::select! {
73+ () = self.cancellation_token.cancelled() => {
74+ break;
75+ },
76+ () = &mut sleeper => {
77+ if let Err(err) = self.cleanup_expired_lfg_records().await {
78+ tracing::error!("LfgCleanupTask failed: {}", err);
79+ }
80+ sleeper.as_mut().reset(Instant::now() + interval);
81+ }
82+ }
83+ }
84+85+ tracing::info!("LfgCleanupTask stopped");
86+87+ Ok(())
88+ }
89+90+ /// Cleanup expired LFG records.
91+ async fn cleanup_expired_lfg_records(&self) -> Result<()> {
92+ let now = Utc::now();
93+94+ tracing::debug!("Starting cleanup of expired LFG records");
95+96+ // Step 1: Update expired records in the database
97+ let db_result = self.deactivate_expired_in_database(&now).await?;
98+99+ // Step 2: Update expired records in OpenSearch
100+ let os_result = self.deactivate_expired_in_opensearch().await?;
101+102+ if db_result > 0 || os_result > 0 {
103+ tracing::info!(
104+ database_updated = db_result,
105+ opensearch_updated = os_result,
106+ "Cleaned up expired LFG records"
107+ );
108+ } else {
109+ tracing::debug!("No expired LFG records to clean up");
110+ }
111+112+ Ok(())
113+ }
114+115+ /// Deactivate expired LFG records in the database.
116+ async fn deactivate_expired_in_database(
117+ &self,
118+ now: &chrono::DateTime<Utc>,
119+ ) -> Result<u64> {
120+ // Query for active LFG records that have expired
121+ let result = sqlx::query(
122+ r#"
123+ UPDATE atproto_records
124+ SET record = jsonb_set(record, '{active}', 'false')
125+ WHERE collection = $1
126+ AND (record->>'active')::boolean = true
127+ AND (record->>'endsAt')::timestamptz < $2
128+ "#,
129+ )
130+ .bind(NSID)
131+ .bind(now)
132+ .execute(&self.storage_pool)
133+ .await?;
134+135+ Ok(result.rows_affected())
136+ }
137+138+ /// Deactivate expired LFG profiles in OpenSearch.
139+ async fn deactivate_expired_in_opensearch(&self) -> Result<u64> {
140+ let Some(ref search_index) = self.search_index else {
141+ return Ok(0);
142+ };
143+144+ match search_index.deactivate_expired_lfg_profiles().await {
145+ Ok(count) => Ok(count),
146+ Err(err) => {
147+ tracing::warn!("Failed to deactivate expired LFG profiles in OpenSearch: {}", err);
148+ Ok(0)
149+ }
150+ }
151+ }
152+}
153+154+#[cfg(test)]
155+mod tests {
156+ use super::*;
157+158+ #[test]
159+ fn test_default_config() {
160+ let config = LfgCleanupTaskConfig::default();
161+ assert_eq!(config.sleep_interval, Duration::hours(1));
162+ }
163+}
+120-124
src/task_search_indexer.rs
···1use anyhow::Result;
2-use atproto_attestation::create_dagbor_cid;
3use atproto_identity::{model::Document, resolve::IdentityResolver, traits::DidDocumentStorage};
4-use atproto_record::lexicon::app::bsky::richtext::facet::{Facet, FacetFeature};
5use atproto_record::lexicon::community::lexicon::calendar::event::NSID as LexiconCommunityEventNSID;
6use opensearch::{
7 DeleteParts, IndexParts, OpenSearch,
8 http::transport::Transport,
9 indices::{IndicesCreateParts, IndicesExistsParts},
10};
11-use serde::Deserialize;
12use serde_json::{Value, json};
13use std::sync::Arc;
14015use crate::atproto::lexicon::profile::{Profile, NSID as PROFILE_NSID};
16use crate::atproto::utils::get_profile_hashtags;
00017use crate::task_search_indexer_errors::SearchIndexerError;
1819/// Build an AT URI with pre-allocated capacity to avoid format! overhead.
···30 uri
31}
3233-/// Generate a DAG-CBOR CID from a JSON value.
34-///
35-/// Creates a CIDv1 with DAG-CBOR codec (0x71) and SHA-256 hash (0x12),
36-/// following the AT Protocol specification for content addressing.
37-fn generate_location_cid(value: &Value) -> Result<String, SearchIndexerError> {
38- let cid = create_dagbor_cid(value).map_err(|e| SearchIndexerError::CidGenerationFailed {
39- error: e.to_string(),
40- })?;
41- Ok(cid.to_string())
42-}
43-44-/// A lightweight event struct for search indexing.
45-///
46-/// Uses serde_json::Value for locations to avoid deserialization errors
47-/// when event data contains location types not supported by the LocationOrRef enum.
48-#[derive(Deserialize)]
49-struct IndexableEvent {
50- name: String,
51- description: String,
52- #[serde(rename = "createdAt")]
53- created_at: chrono::DateTime<chrono::Utc>,
54- #[serde(rename = "startsAt")]
55- starts_at: Option<chrono::DateTime<chrono::Utc>>,
56- #[serde(rename = "endsAt")]
57- ends_at: Option<chrono::DateTime<chrono::Utc>>,
58- #[serde(rename = "descriptionFacets")]
59- facets: Option<Vec<Facet>>,
60- /// Locations stored as raw JSON values for CID generation.
61- #[serde(default)]
62- locations: Vec<Value>,
63-}
64-65-impl IndexableEvent {
66- /// Extract hashtags from the event's facets
67- fn get_hashtags(&self) -> Vec<String> {
68- self.facets
69- .as_ref()
70- .map(|facets| {
71- facets
72- .iter()
73- .flat_map(|facet| {
74- facet.features.iter().filter_map(|feature| {
75- if let FacetFeature::Tag(tag) = feature {
76- Some(tag.tag.clone())
77- } else {
78- None
79- }
80- })
81- })
82- .collect()
83- })
84- .unwrap_or_default()
85- }
86-}
87-88const EVENTS_INDEX_NAME: &str = "smokesignal-events";
89const PROFILES_INDEX_NAME: &str = "smokesignal-profiles";
09091pub struct SearchIndexer {
92 client: Arc<OpenSearch>,
093 identity_resolver: Arc<dyn IdentityResolver>,
94 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>,
095}
9697impl SearchIndexer {
···100 /// # Arguments
101 ///
102 /// * `endpoint` - OpenSearch endpoint URL
0103 /// * `identity_resolver` - Resolver for DID identities
104 /// * `document_storage` - Storage for DID documents
105 pub async fn new(
106 endpoint: &str,
0107 identity_resolver: Arc<dyn IdentityResolver>,
108 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>,
109 ) -> Result<Self> {
110 let transport = Transport::single_node(endpoint)?;
111 let client = Arc::new(OpenSearch::new(transport));
0112113 let indexer = Self {
114 client,
0115 identity_resolver,
116 document_storage,
0117 };
118119 indexer.ensure_index().await?;
···126 self.ensure_events_index().await?;
127 // Ensure profiles index
128 self.ensure_profiles_index().await?;
00129 Ok(())
130 }
131···151 "description": { "type": "text" },
152 "tags": { "type": "keyword" },
153 "location_cids": { "type": "keyword" },
0154 "start_time": { "type": "date" },
155 "end_time": { "type": "date" },
156 "created_at": { "type": "date" },
···222 Ok(())
223 }
2240000000000000000000000000000000000000000000000225 /// Index a commit event (create or update).
226 ///
227 /// Dispatches to the appropriate indexer based on collection type.
···236 match collection {
237 "community.lexicon.calendar.event" => self.index_event(did, rkey, record).await,
238 c if c == PROFILE_NSID => self.index_profile(did, rkey, record).await,
0239 _ => Ok(()),
240 }
241 }
···247 match collection {
248 "community.lexicon.calendar.event" => self.delete_event(did, rkey).await,
249 c if c == PROFILE_NSID => self.delete_profile(did, rkey).await,
0250 _ => Ok(()),
251 }
252 }
253254- async fn index_event(&self, did: &str, rkey: &str, record: Value) -> Result<()> {
255- let event: IndexableEvent = serde_json::from_value(record)?;
256-257- let document = self.ensure_identity_stored(did).await?;
258- let handle = document.handles().unwrap_or("invalid.handle");
259-260 let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey);
261262- // Extract fields from the IndexableEvent struct
263- let name = &event.name;
264- let description = &event.description;
265- let created_at = &event.created_at;
266- let starts_at = &event.starts_at;
267- let ends_at = &event.ends_at;
268-269- // Extract hashtags from facets
270- let tags = event.get_hashtags();
271-272- // Generate CIDs for each location
273- let location_cids: Vec<String> = event
274- .locations
275- .iter()
276- .filter_map(|loc| generate_location_cid(loc).ok())
277- .collect();
278-279- let mut doc = json!({
280- "did": did,
281- "handle": handle,
282- "name": name,
283- "description": description,
284- "tags": tags,
285- "location_cids": location_cids,
286- "created_at": json!(created_at),
287- "updated_at": json!(chrono::Utc::now())
288- });
289-290- // Add optional time fields
291- if let Some(start) = starts_at {
292- doc["start_time"] = json!(start);
293- }
294- if let Some(end) = ends_at {
295- doc["end_time"] = json!(end);
296- }
297-298- let response = self
299- .client
300- .index(IndexParts::IndexId(EVENTS_INDEX_NAME, &aturi))
301- .body(doc)
302- .send()
303- .await?;
304-305- if response.status_code().is_success() {
306- tracing::debug!("Indexed event {} for DID {}", rkey, did);
307- } else {
308- let error_body = response.text().await?;
309- tracing::error!("Failed to index event: {}", error_body);
310 }
311312 Ok(())
···315 async fn delete_event(&self, did: &str, rkey: &str) -> Result<()> {
316 let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey);
317318- let response = self
319- .client
320- .delete(DeleteParts::IndexId(EVENTS_INDEX_NAME, &aturi))
321- .send()
322- .await?;
323324- if response.status_code().is_success() || response.status_code() == 404 {
325- tracing::debug!("Deleted event {} for DID {} from search index", rkey, did);
326- } else {
327- let error_body = response.text().await?;
328- tracing::error!("Failed to delete event from index: {}", error_body);
329- }
330-331 Ok(())
332 }
333···389 tracing::error!("Failed to delete profile from index: {}", error_body);
390 }
391000000000000000000000000000000000000392 Ok(())
393 }
394
···1use anyhow::Result;
02use atproto_identity::{model::Document, resolve::IdentityResolver, traits::DidDocumentStorage};
03use atproto_record::lexicon::community::lexicon::calendar::event::NSID as LexiconCommunityEventNSID;
4use opensearch::{
5 DeleteParts, IndexParts, OpenSearch,
6 http::transport::Transport,
7 indices::{IndicesCreateParts, IndicesExistsParts},
8};
09use serde_json::{Value, json};
10use std::sync::Arc;
1112+use crate::atproto::lexicon::lfg::{Lfg, NSID as LFG_NSID};
13use crate::atproto::lexicon::profile::{Profile, NSID as PROFILE_NSID};
14use crate::atproto::utils::get_profile_hashtags;
15+use crate::search_index::SearchIndexManager;
16+use crate::storage::event::event_get;
17+use crate::storage::StoragePool;
18use crate::task_search_indexer_errors::SearchIndexerError;
1920/// Build an AT URI with pre-allocated capacity to avoid format! overhead.
···31 uri
32}
33000000000000000000000000000000000000000000000000000000034const EVENTS_INDEX_NAME: &str = "smokesignal-events";
35const PROFILES_INDEX_NAME: &str = "smokesignal-profiles";
36+const LFG_INDEX_NAME: &str = "smokesignal-lfg-profile";
3738pub struct SearchIndexer {
39 client: Arc<OpenSearch>,
40+ pool: StoragePool,
41 identity_resolver: Arc<dyn IdentityResolver>,
42 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>,
43+ event_index_manager: SearchIndexManager,
44}
4546impl SearchIndexer {
···49 /// # Arguments
50 ///
51 /// * `endpoint` - OpenSearch endpoint URL
52+ /// * `pool` - Database connection pool for fetching events
53 /// * `identity_resolver` - Resolver for DID identities
54 /// * `document_storage` - Storage for DID documents
55 pub async fn new(
56 endpoint: &str,
57+ pool: StoragePool,
58 identity_resolver: Arc<dyn IdentityResolver>,
59 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>,
60 ) -> Result<Self> {
61 let transport = Transport::single_node(endpoint)?;
62 let client = Arc::new(OpenSearch::new(transport));
63+ let event_index_manager = SearchIndexManager::new(endpoint)?;
6465 let indexer = Self {
66 client,
67+ pool,
68 identity_resolver,
69 document_storage,
70+ event_index_manager,
71 };
7273 indexer.ensure_index().await?;
···80 self.ensure_events_index().await?;
81 // Ensure profiles index
82 self.ensure_profiles_index().await?;
83+ // Ensure LFG profiles index
84+ self.ensure_lfg_profiles_index().await?;
85 Ok(())
86 }
87···107 "description": { "type": "text" },
108 "tags": { "type": "keyword" },
109 "location_cids": { "type": "keyword" },
110+ "locations_geo": { "type": "geo_point" },
111 "start_time": { "type": "date" },
112 "end_time": { "type": "date" },
113 "created_at": { "type": "date" },
···179 Ok(())
180 }
181182+ async fn ensure_lfg_profiles_index(&self) -> Result<()> {
183+ let exists_response = self
184+ .client
185+ .indices()
186+ .exists(IndicesExistsParts::Index(&[LFG_INDEX_NAME]))
187+ .send()
188+ .await?;
189+190+ if exists_response.status_code().is_success() {
191+ tracing::info!("OpenSearch index {} already exists", LFG_INDEX_NAME);
192+ return Ok(());
193+ }
194+195+ let index_body = json!({
196+ "mappings": {
197+ "properties": {
198+ "aturi": { "type": "keyword" },
199+ "did": { "type": "keyword" },
200+ "location": { "type": "geo_point" },
201+ "tags": { "type": "keyword" },
202+ "starts_at": { "type": "date" },
203+ "ends_at": { "type": "date" },
204+ "active": { "type": "boolean" },
205+ "created_at": { "type": "date" }
206+ }
207+ },
208+ });
209+210+ let response = self
211+ .client
212+ .indices()
213+ .create(IndicesCreateParts::Index(LFG_INDEX_NAME))
214+ .body(index_body)
215+ .send()
216+ .await?;
217+218+ if response.status_code().is_success() {
219+ tracing::info!("Created OpenSearch index {}", LFG_INDEX_NAME);
220+ } else {
221+ let error_body = response.text().await?;
222+ return Err(SearchIndexerError::IndexCreationFailed { error_body }.into());
223+ }
224+225+ Ok(())
226+ }
227+228 /// Index a commit event (create or update).
229 ///
230 /// Dispatches to the appropriate indexer based on collection type.
···239 match collection {
240 "community.lexicon.calendar.event" => self.index_event(did, rkey, record).await,
241 c if c == PROFILE_NSID => self.index_profile(did, rkey, record).await,
242+ c if c == LFG_NSID => self.index_lfg_profile(did, rkey, record).await,
243 _ => Ok(()),
244 }
245 }
···251 match collection {
252 "community.lexicon.calendar.event" => self.delete_event(did, rkey).await,
253 c if c == PROFILE_NSID => self.delete_profile(did, rkey).await,
254+ c if c == LFG_NSID => self.delete_lfg_profile(did, rkey).await,
255 _ => Ok(()),
256 }
257 }
258259+ async fn index_event(&self, did: &str, rkey: &str, _record: Value) -> Result<()> {
00000260 let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey);
261262+ // Fetch the event from the database and delegate to SearchIndexManager
263+ // This ensures we use the same indexing logic as the web handlers
264+ match event_get(&self.pool, &aturi).await {
265+ Ok(event) => {
266+ self.event_index_manager
267+ .index_event(&self.pool, self.identity_resolver.clone(), &event)
268+ .await?;
269+ tracing::debug!("Indexed event {} for DID {}", rkey, did);
270+ }
271+ Err(err) => {
272+ // Event might not be in the database yet if content fetcher hasn't processed it
273+ tracing::warn!(
274+ "Could not fetch event {} for indexing: {}. It may be indexed on next update.",
275+ aturi,
276+ err
277+ );
278+ }
0000000000000000000000000000000279 }
280281 Ok(())
···284 async fn delete_event(&self, did: &str, rkey: &str) -> Result<()> {
285 let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey);
286287+ // Delegate to SearchIndexManager for consistent deletion logic
288+ self.event_index_manager.delete_indexed_event(&aturi).await?;
000289290+ tracing::debug!("Deleted event {} for DID {} from search index", rkey, did);
000000291 Ok(())
292 }
293···349 tracing::error!("Failed to delete profile from index: {}", error_body);
350 }
351352+ Ok(())
353+ }
354+355+ async fn index_lfg_profile(&self, did: &str, rkey: &str, record: Value) -> Result<()> {
356+ let lfg: Lfg = serde_json::from_value(record)?;
357+ let aturi = build_aturi(did, LFG_NSID, rkey);
358+359+ // Extract coordinates from location
360+ let (lat, lon) = lfg.get_coordinates().unwrap_or((0.0, 0.0));
361+362+ // Delegate to SearchIndexManager for consistent indexing logic
363+ self.event_index_manager
364+ .index_lfg_profile(
365+ &aturi,
366+ did,
367+ lat,
368+ lon,
369+ &lfg.tags,
370+ &lfg.starts_at,
371+ &lfg.ends_at,
372+ &lfg.created_at,
373+ lfg.active,
374+ )
375+ .await?;
376+377+ tracing::debug!("Indexed LFG profile {} for DID {}", rkey, did);
378+ Ok(())
379+ }
380+381+ async fn delete_lfg_profile(&self, did: &str, rkey: &str) -> Result<()> {
382+ let aturi = build_aturi(did, LFG_NSID, rkey);
383+384+ // Delegate to SearchIndexManager for consistent deletion logic
385+ self.event_index_manager.delete_lfg_profile(&aturi).await?;
386+387+ tracing::debug!("Deleted LFG profile {} for DID {} from search index", rkey, did);
388 Ok(())
389 }
390
-7
src/task_search_indexer_errors.rs
···12 /// and the operation fails with a server error response.
13 #[error("error-smokesignal-search-indexer-1 Failed to create index: {error_body}")]
14 IndexCreationFailed { error_body: String },
15-16- /// Error when CID generation fails.
17- ///
18- /// This error occurs when serializing location data to DAG-CBOR
19- /// or generating the multihash for a CID fails.
20- #[error("error-smokesignal-search-indexer-2 Failed to generate CID: {error}")]
21- CidGenerationFailed { error: String },
22}
···12 /// and the operation fails with a server error response.
13 #[error("error-smokesignal-search-indexer-1 Failed to create index: {error_body}")]
14 IndexCreationFailed { error_body: String },
000000015}
+24-1
templates/en-us/create_event.alpine.html
···768769 init() {
770 // Load existing location data from server if present
771- {% if location_form.location_country %}
000000000000772 this.formData.locations.push({
773 country: {{ location_form.location_country | tojson }},
774 postal_code: {% if location_form.location_postal_code %}{{ location_form.location_postal_code | tojson }}{% else %}null{% endif %},
···777 street: {% if location_form.location_street %}{{ location_form.location_street | tojson }}{% else %}null{% endif %},
778 name: {% if location_form.location_name %}{{ location_form.location_name | tojson }}{% else %}null{% endif %},
779 });
00000000000780 {% endif %}
781782 // Load existing links from server if present
···768769 init() {
770 // Load existing location data from server if present
771+ {% if event_locations %}
772+ {% for loc in event_locations %}
773+ this.formData.locations.push({
774+ country: {{ loc.country | tojson }},
775+ postal_code: {{ loc.postal_code | tojson }},
776+ region: {{ loc.region | tojson }},
777+ locality: {{ loc.locality | tojson }},
778+ street: {{ loc.street | tojson }},
779+ name: {{ loc.name | tojson }},
780+ });
781+ {% endfor %}
782+ {% elif location_form.location_country %}
783+ // Fallback to old single-location format
784 this.formData.locations.push({
785 country: {{ location_form.location_country | tojson }},
786 postal_code: {% if location_form.location_postal_code %}{{ location_form.location_postal_code | tojson }}{% else %}null{% endif %},
···789 street: {% if location_form.location_street %}{{ location_form.location_street | tojson }}{% else %}null{% endif %},
790 name: {% if location_form.location_name %}{{ location_form.location_name | tojson }}{% else %}null{% endif %},
791 });
792+ {% endif %}
793+794+ // Load existing geo locations from server if present
795+ {% if event_geo_locations %}
796+ {% for geo in event_geo_locations %}
797+ this.formData.geo_locations.push({
798+ latitude: {{ geo.latitude | tojson }},
799+ longitude: {{ geo.longitude | tojson }},
800+ name: {{ geo.name | tojson }},
801+ });
802+ {% endfor %}
803 {% endif %}
804805 // Load existing links from server if present
···1+{% extends "en-us/base.html" %}
2+{% block title %}Looking For Group - Smoke Signal{% endblock %}
3+{% block head %}
4+<meta name="description" content="Find activity partners in your area with Looking For Group">
5+<meta property="og:title" content="Looking For Group">
6+<meta property="og:description" content="Find activity partners in your area with Looking For Group">
7+<meta property="og:site_name" content="Smoke Signal" />
8+<meta property="og:type" content="website" />
9+<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" />
10+<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"></script>
11+<script src="https://unpkg.com/h3-js@4"></script>
12+{% endblock %}
13+{% block content %}
14+{% include 'en-us/lfg_form.common.html' %}
15+{% endblock %}
···1+{% extends "en-us/base.html" %}
2+{% block title %}Looking For Group - Smoke Signal{% endblock %}
3+{% block head %}
4+<meta name="description" content="Find activity partners in your area with Looking For Group">
5+<meta property="og:title" content="Looking For Group">
6+<meta property="og:description" content="Find activity partners in your area with Looking For Group">
7+<meta property="og:site_name" content="Smoke Signal" />
8+<meta property="og:type" content="website" />
9+<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" />
10+<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"></script>
11+<script src="https://unpkg.com/h3-js@4"></script>
12+{% endblock %}
13+{% block content %}
14+{% include 'en-us/lfg_matches.common.html' %}
15+{% endblock %}