The smokesignal.events web application
1//! LFG (Looking For Group) cleanup background task.
2//!
3//! This task runs periodically to deactivate expired LFG records in both
4//! the database and OpenSearch index.
5
6use anyhow::Result;
7use chrono::{Duration, Utc};
8use tokio::time::{Instant, sleep};
9use tokio_util::sync::CancellationToken;
10
11use crate::atproto::lexicon::lfg::NSID;
12use crate::search_index::SearchIndexManager;
13use crate::storage::StoragePool;
14
15/// Configuration for the LFG cleanup task.
16pub struct LfgCleanupTaskConfig {
17 /// How often to run the cleanup (default: 1 hour)
18 pub sleep_interval: Duration,
19}
20
21impl Default for LfgCleanupTaskConfig {
22 fn default() -> Self {
23 Self {
24 sleep_interval: Duration::hours(1),
25 }
26 }
27}
28
29/// Background task that deactivates expired LFG records.
30pub struct LfgCleanupTask {
31 pub config: LfgCleanupTaskConfig,
32 pub storage_pool: StoragePool,
33 pub search_index: Option<SearchIndexManager>,
34 pub cancellation_token: CancellationToken,
35}
36
37impl LfgCleanupTask {
38 /// Creates a new LFG cleanup task.
39 #[must_use]
40 pub fn new(
41 config: LfgCleanupTaskConfig,
42 storage_pool: StoragePool,
43 search_index: Option<SearchIndexManager>,
44 cancellation_token: CancellationToken,
45 ) -> Self {
46 Self {
47 config,
48 storage_pool,
49 search_index,
50 cancellation_token,
51 }
52 }
53
54 /// Runs the LFG cleanup task as a long-running process.
55 ///
56 /// This task:
57 /// 1. Deactivates expired LFG records in the database
58 /// 2. Updates the OpenSearch index to reflect expired records
59 ///
60 /// # Errors
61 /// Returns an error if the sleep interval cannot be converted, or if there's
62 /// a problem cleaning up expired records.
63 pub async fn run(&self) -> Result<()> {
64 tracing::info!("LfgCleanupTask started");
65
66 let interval = self.config.sleep_interval.to_std()?;
67
68 let sleeper = sleep(interval);
69 tokio::pin!(sleeper);
70
71 loop {
72 tokio::select! {
73 () = self.cancellation_token.cancelled() => {
74 break;
75 },
76 () = &mut sleeper => {
77 if let Err(err) = self.cleanup_expired_lfg_records().await {
78 tracing::error!("LfgCleanupTask failed: {}", err);
79 }
80 sleeper.as_mut().reset(Instant::now() + interval);
81 }
82 }
83 }
84
85 tracing::info!("LfgCleanupTask stopped");
86
87 Ok(())
88 }
89
90 /// Cleanup expired LFG records.
91 async fn cleanup_expired_lfg_records(&self) -> Result<()> {
92 let now = Utc::now();
93
94 tracing::debug!("Starting cleanup of expired LFG records");
95
96 // Step 1: Update expired records in the database
97 let db_result = self.deactivate_expired_in_database(&now).await?;
98
99 // Step 2: Update expired records in OpenSearch
100 let os_result = self.deactivate_expired_in_opensearch().await?;
101
102 if db_result > 0 || os_result > 0 {
103 tracing::info!(
104 database_updated = db_result,
105 opensearch_updated = os_result,
106 "Cleaned up expired LFG records"
107 );
108 } else {
109 tracing::debug!("No expired LFG records to clean up");
110 }
111
112 Ok(())
113 }
114
115 /// Deactivate expired LFG records in the database.
116 async fn deactivate_expired_in_database(&self, now: &chrono::DateTime<Utc>) -> Result<u64> {
117 // Query for active LFG records that have expired
118 let result = sqlx::query(
119 r#"
120 UPDATE atproto_records
121 SET record = jsonb_set(record, '{active}', 'false')
122 WHERE collection = $1
123 AND (record->>'active')::boolean = true
124 AND (record->>'endsAt')::timestamptz < $2
125 "#,
126 )
127 .bind(NSID)
128 .bind(now)
129 .execute(&self.storage_pool)
130 .await?;
131
132 Ok(result.rows_affected())
133 }
134
135 /// Deactivate expired LFG profiles in OpenSearch.
136 async fn deactivate_expired_in_opensearch(&self) -> Result<u64> {
137 let Some(ref search_index) = self.search_index else {
138 return Ok(0);
139 };
140
141 match search_index.deactivate_expired_lfg_profiles().await {
142 Ok(count) => Ok(count),
143 Err(err) => {
144 tracing::warn!(
145 "Failed to deactivate expired LFG profiles in OpenSearch: {}",
146 err
147 );
148 Ok(0)
149 }
150 }
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 #[test]
159 fn test_default_config() {
160 let config = LfgCleanupTaskConfig::default();
161 assert_eq!(config.sleep_interval, Duration::hours(1));
162 }
163}