The smokesignal.events web application
at main 163 lines 4.8 kB view raw
1//! LFG (Looking For Group) cleanup background task. 2//! 3//! This task runs periodically to deactivate expired LFG records in both 4//! the database and OpenSearch index. 5 6use anyhow::Result; 7use chrono::{Duration, Utc}; 8use tokio::time::{Instant, sleep}; 9use tokio_util::sync::CancellationToken; 10 11use crate::atproto::lexicon::lfg::NSID; 12use crate::search_index::SearchIndexManager; 13use crate::storage::StoragePool; 14 15/// Configuration for the LFG cleanup task. 16pub struct LfgCleanupTaskConfig { 17 /// How often to run the cleanup (default: 1 hour) 18 pub sleep_interval: Duration, 19} 20 21impl Default for LfgCleanupTaskConfig { 22 fn default() -> Self { 23 Self { 24 sleep_interval: Duration::hours(1), 25 } 26 } 27} 28 29/// Background task that deactivates expired LFG records. 30pub struct LfgCleanupTask { 31 pub config: LfgCleanupTaskConfig, 32 pub storage_pool: StoragePool, 33 pub search_index: Option<SearchIndexManager>, 34 pub cancellation_token: CancellationToken, 35} 36 37impl LfgCleanupTask { 38 /// Creates a new LFG cleanup task. 39 #[must_use] 40 pub fn new( 41 config: LfgCleanupTaskConfig, 42 storage_pool: StoragePool, 43 search_index: Option<SearchIndexManager>, 44 cancellation_token: CancellationToken, 45 ) -> Self { 46 Self { 47 config, 48 storage_pool, 49 search_index, 50 cancellation_token, 51 } 52 } 53 54 /// Runs the LFG cleanup task as a long-running process. 55 /// 56 /// This task: 57 /// 1. Deactivates expired LFG records in the database 58 /// 2. Updates the OpenSearch index to reflect expired records 59 /// 60 /// # Errors 61 /// Returns an error if the sleep interval cannot be converted, or if there's 62 /// a problem cleaning up expired records. 63 pub async fn run(&self) -> Result<()> { 64 tracing::info!("LfgCleanupTask started"); 65 66 let interval = self.config.sleep_interval.to_std()?; 67 68 let sleeper = sleep(interval); 69 tokio::pin!(sleeper); 70 71 loop { 72 tokio::select! { 73 () = self.cancellation_token.cancelled() => { 74 break; 75 }, 76 () = &mut sleeper => { 77 if let Err(err) = self.cleanup_expired_lfg_records().await { 78 tracing::error!("LfgCleanupTask failed: {}", err); 79 } 80 sleeper.as_mut().reset(Instant::now() + interval); 81 } 82 } 83 } 84 85 tracing::info!("LfgCleanupTask stopped"); 86 87 Ok(()) 88 } 89 90 /// Cleanup expired LFG records. 91 async fn cleanup_expired_lfg_records(&self) -> Result<()> { 92 let now = Utc::now(); 93 94 tracing::debug!("Starting cleanup of expired LFG records"); 95 96 // Step 1: Update expired records in the database 97 let db_result = self.deactivate_expired_in_database(&now).await?; 98 99 // Step 2: Update expired records in OpenSearch 100 let os_result = self.deactivate_expired_in_opensearch().await?; 101 102 if db_result > 0 || os_result > 0 { 103 tracing::info!( 104 database_updated = db_result, 105 opensearch_updated = os_result, 106 "Cleaned up expired LFG records" 107 ); 108 } else { 109 tracing::debug!("No expired LFG records to clean up"); 110 } 111 112 Ok(()) 113 } 114 115 /// Deactivate expired LFG records in the database. 116 async fn deactivate_expired_in_database(&self, now: &chrono::DateTime<Utc>) -> Result<u64> { 117 // Query for active LFG records that have expired 118 let result = sqlx::query( 119 r#" 120 UPDATE atproto_records 121 SET record = jsonb_set(record, '{active}', 'false') 122 WHERE collection = $1 123 AND (record->>'active')::boolean = true 124 AND (record->>'endsAt')::timestamptz < $2 125 "#, 126 ) 127 .bind(NSID) 128 .bind(now) 129 .execute(&self.storage_pool) 130 .await?; 131 132 Ok(result.rows_affected()) 133 } 134 135 /// Deactivate expired LFG profiles in OpenSearch. 136 async fn deactivate_expired_in_opensearch(&self) -> Result<u64> { 137 let Some(ref search_index) = self.search_index else { 138 return Ok(0); 139 }; 140 141 match search_index.deactivate_expired_lfg_profiles().await { 142 Ok(count) => Ok(count), 143 Err(err) => { 144 tracing::warn!( 145 "Failed to deactivate expired LFG profiles in OpenSearch: {}", 146 err 147 ); 148 Ok(0) 149 } 150 } 151 } 152} 153 154#[cfg(test)] 155mod tests { 156 use super::*; 157 158 #[test] 159 fn test_default_config() { 160 let config = LfgCleanupTaskConfig::default(); 161 assert_eq!(config.sleep_interval, Duration::hours(1)); 162 } 163}