The smokesignal.events web application
1use std::sync::Arc;
2
3use anyhow::Result;
4use atproto_identity::{resolve::IdentityResolver, traits::DidDocumentStorage};
5use chrono::Duration;
6use sqlx::FromRow;
7use tokio::time::{Instant, sleep};
8use tokio_util::sync::CancellationToken;
9
10use crate::storage::StoragePool;
11
12pub struct IdentityRefreshTaskConfig {
13 pub sleep_interval: Duration,
14 pub worker_id: String,
15}
16
17pub struct IdentityRefreshTask {
18 pub config: IdentityRefreshTaskConfig,
19 pub storage_pool: StoragePool,
20 pub document_storage: std::sync::Arc<dyn DidDocumentStorage>,
21 pub identity_resolver: Arc<dyn IdentityResolver>,
22 pub cancellation_token: CancellationToken,
23}
24
25#[derive(FromRow)]
26struct ExpiredDidDocument {
27 did: String,
28}
29
30impl IdentityRefreshTask {
31 #[must_use]
32 pub fn new(
33 config: IdentityRefreshTaskConfig,
34 storage_pool: StoragePool,
35 document_storage: std::sync::Arc<dyn DidDocumentStorage>,
36 identity_resolver: Arc<dyn IdentityResolver>,
37 cancellation_token: CancellationToken,
38 ) -> Self {
39 Self {
40 config,
41 storage_pool,
42 document_storage,
43 identity_resolver,
44 cancellation_token,
45 }
46 }
47
48 /// Runs the identity refresh task as a long-running process
49 ///
50 /// # Errors
51 /// Returns an error if the sleep interval cannot be converted, or if there's a problem
52 /// processing the expired DID documents
53 pub async fn run(&self) -> Result<()> {
54 tracing::debug!("IdentityRefreshTask started");
55
56 let interval = self.config.sleep_interval.to_std()?;
57
58 let sleeper = sleep(interval);
59 tokio::pin!(sleeper);
60
61 loop {
62 tokio::select! {
63 () = self.cancellation_token.cancelled() => {
64 break;
65 },
66 () = &mut sleeper => {
67 if let Err(err) = self.process_expired_documents().await {
68 tracing::error!("IdentityRefreshTask failed: {}", err);
69 }
70 sleeper.as_mut().reset(Instant::now() + interval);
71 }
72 }
73 }
74
75 tracing::info!("IdentityRefreshTask stopped");
76
77 Ok(())
78 }
79
80 async fn process_expired_documents(&self) -> Result<i32> {
81 // Find DID documents that have expired in a separate transaction
82 let expired_docs = {
83 let mut tx = self.storage_pool.begin().await?;
84 let docs = sqlx::query_as::<_, ExpiredDidDocument>(
85 "SELECT did FROM did_documents WHERE expires_at IS NOT NULL AND expires_at <= NOW() LIMIT 50"
86 )
87 .fetch_all(tx.as_mut())
88 .await?;
89 tx.commit().await?;
90 docs
91 };
92
93 let count = expired_docs.len() as i32;
94
95 if count == 0 {
96 return Ok(0);
97 }
98
99 tracing::info!(count = count, "processing expired DID documents");
100
101 for expired_doc in expired_docs {
102 tracing::debug!(did = expired_doc.did, "refreshing expired DID document");
103
104 match self.refresh_did_document(&expired_doc.did).await {
105 Ok(()) => {
106 tracing::debug!(did = expired_doc.did, "successfully refreshed DID document");
107 }
108 Err(err) => {
109 tracing::warn!(
110 did = expired_doc.did,
111 error = ?err,
112 "failed to refresh DID document, deleting from storage"
113 );
114
115 // If we can't resolve the DID, delete it from storage
116 if let Err(delete_err) = self
117 .document_storage
118 .delete_document_by_did(&expired_doc.did)
119 .await
120 {
121 tracing::error!(
122 did = expired_doc.did,
123 error = ?delete_err,
124 "failed to delete expired DID document"
125 );
126 }
127 }
128 }
129 }
130
131 Ok(count)
132 }
133
134 async fn refresh_did_document(&self, did: &str) -> Result<()> {
135 // Use the identity resolver to get the updated DID document
136 let document = self.identity_resolver.resolve(did).await?;
137
138 // Store the updated document using the DidDocumentStorage trait
139 // This will reset the expires_at column based on the storage implementation
140 self.document_storage.store_document(document).await?;
141
142 Ok(())
143 }
144}