//! LFG (Looking For Group) cleanup background task. //! //! This task runs periodically to deactivate expired LFG records in both //! the database and OpenSearch index. use anyhow::Result; use chrono::{Duration, Utc}; use tokio::time::{Instant, sleep}; use tokio_util::sync::CancellationToken; use crate::atproto::lexicon::lfg::NSID; use crate::search_index::SearchIndexManager; use crate::storage::StoragePool; /// Configuration for the LFG cleanup task. pub struct LfgCleanupTaskConfig { /// How often to run the cleanup (default: 1 hour) pub sleep_interval: Duration, } impl Default for LfgCleanupTaskConfig { fn default() -> Self { Self { sleep_interval: Duration::hours(1), } } } /// Background task that deactivates expired LFG records. pub struct LfgCleanupTask { pub config: LfgCleanupTaskConfig, pub storage_pool: StoragePool, pub search_index: Option, pub cancellation_token: CancellationToken, } impl LfgCleanupTask { /// Creates a new LFG cleanup task. #[must_use] pub fn new( config: LfgCleanupTaskConfig, storage_pool: StoragePool, search_index: Option, cancellation_token: CancellationToken, ) -> Self { Self { config, storage_pool, search_index, cancellation_token, } } /// Runs the LFG cleanup task as a long-running process. /// /// This task: /// 1. Deactivates expired LFG records in the database /// 2. Updates the OpenSearch index to reflect expired records /// /// # Errors /// Returns an error if the sleep interval cannot be converted, or if there's /// a problem cleaning up expired records. pub async fn run(&self) -> Result<()> { tracing::info!("LfgCleanupTask started"); let interval = self.config.sleep_interval.to_std()?; let sleeper = sleep(interval); tokio::pin!(sleeper); loop { tokio::select! { () = self.cancellation_token.cancelled() => { break; }, () = &mut sleeper => { if let Err(err) = self.cleanup_expired_lfg_records().await { tracing::error!("LfgCleanupTask failed: {}", err); } sleeper.as_mut().reset(Instant::now() + interval); } } } tracing::info!("LfgCleanupTask stopped"); Ok(()) } /// Cleanup expired LFG records. async fn cleanup_expired_lfg_records(&self) -> Result<()> { let now = Utc::now(); tracing::debug!("Starting cleanup of expired LFG records"); // Step 1: Update expired records in the database let db_result = self.deactivate_expired_in_database(&now).await?; // Step 2: Update expired records in OpenSearch let os_result = self.deactivate_expired_in_opensearch().await?; if db_result > 0 || os_result > 0 { tracing::info!( database_updated = db_result, opensearch_updated = os_result, "Cleaned up expired LFG records" ); } else { tracing::debug!("No expired LFG records to clean up"); } Ok(()) } /// Deactivate expired LFG records in the database. async fn deactivate_expired_in_database(&self, now: &chrono::DateTime) -> Result { // Query for active LFG records that have expired let result = sqlx::query( r#" UPDATE atproto_records SET record = jsonb_set(record, '{active}', 'false') WHERE collection = $1 AND (record->>'active')::boolean = true AND (record->>'endsAt')::timestamptz < $2 "#, ) .bind(NSID) .bind(now) .execute(&self.storage_pool) .await?; Ok(result.rows_affected()) } /// Deactivate expired LFG profiles in OpenSearch. async fn deactivate_expired_in_opensearch(&self) -> Result { let Some(ref search_index) = self.search_index else { return Ok(0); }; match search_index.deactivate_expired_lfg_profiles().await { Ok(count) => Ok(count), Err(err) => { tracing::warn!( "Failed to deactivate expired LFG profiles in OpenSearch: {}", err ); Ok(0) } } } } #[cfg(test)] mod tests { use super::*; #[test] fn test_default_config() { let config = LfgCleanupTaskConfig::default(); assert_eq!(config.sleep_interval, Duration::hours(1)); } }