The smokesignal.events web application
1use anyhow::Result;
2use chrono::Duration;
3use tokio::time::{Instant, sleep};
4use tokio_util::sync::CancellationToken;
5
6use crate::storage::StoragePool;
7
8pub struct OAuthRequestsCleanupTaskConfig {
9 pub sleep_interval: Duration,
10}
11
12pub struct OAuthRequestsCleanupTask {
13 pub config: OAuthRequestsCleanupTaskConfig,
14 pub storage_pool: StoragePool,
15 pub cancellation_token: CancellationToken,
16}
17
18impl OAuthRequestsCleanupTask {
19 #[must_use]
20 pub fn new(
21 config: OAuthRequestsCleanupTaskConfig,
22 storage_pool: StoragePool,
23 cancellation_token: CancellationToken,
24 ) -> Self {
25 Self {
26 config,
27 storage_pool,
28 cancellation_token,
29 }
30 }
31
32 /// Runs the OAuth requests cleanup task as a long-running process
33 ///
34 /// # Errors
35 /// Returns an error if the sleep interval cannot be converted, or if there's a problem
36 /// cleaning up expired requests
37 pub async fn run(&self) -> Result<()> {
38 tracing::debug!("OAuthRequestsCleanupTask started");
39
40 let interval = self.config.sleep_interval.to_std()?;
41
42 let sleeper = sleep(interval);
43 tokio::pin!(sleeper);
44
45 loop {
46 tokio::select! {
47 () = self.cancellation_token.cancelled() => {
48 break;
49 },
50 () = &mut sleeper => {
51 if let Err(err) = self.cleanup_expired_requests().await {
52 tracing::error!("OAuthRequestsCleanupTask failed: {}", err);
53 }
54 sleeper.as_mut().reset(Instant::now() + interval);
55 }
56 }
57 }
58
59 tracing::info!("OAuthRequestsCleanupTask stopped");
60
61 Ok(())
62 }
63
64 async fn cleanup_expired_requests(&self) -> Result<()> {
65 let now = chrono::Utc::now();
66
67 tracing::debug!("Starting cleanup of expired OAuth requests");
68
69 let result = sqlx::query("DELETE FROM atproto_oauth_requests WHERE expires_at < $1")
70 .bind(now)
71 .execute(&self.storage_pool)
72 .await?;
73
74 let deleted_count = result.rows_affected();
75
76 if deleted_count > 0 {
77 tracing::info!(
78 deleted_count = deleted_count,
79 "Cleaned up expired OAuth requests"
80 );
81 } else {
82 tracing::debug!("No expired OAuth requests to clean up");
83 }
84
85 Ok(())
86 }
87}