The smokesignal.events web application
at main 144 lines 4.6 kB view raw
1use std::sync::Arc; 2 3use anyhow::Result; 4use atproto_identity::{resolve::IdentityResolver, traits::DidDocumentStorage}; 5use chrono::Duration; 6use sqlx::FromRow; 7use tokio::time::{Instant, sleep}; 8use tokio_util::sync::CancellationToken; 9 10use crate::storage::StoragePool; 11 12pub struct IdentityRefreshTaskConfig { 13 pub sleep_interval: Duration, 14 pub worker_id: String, 15} 16 17pub struct IdentityRefreshTask { 18 pub config: IdentityRefreshTaskConfig, 19 pub storage_pool: StoragePool, 20 pub document_storage: std::sync::Arc<dyn DidDocumentStorage>, 21 pub identity_resolver: Arc<dyn IdentityResolver>, 22 pub cancellation_token: CancellationToken, 23} 24 25#[derive(FromRow)] 26struct ExpiredDidDocument { 27 did: String, 28} 29 30impl IdentityRefreshTask { 31 #[must_use] 32 pub fn new( 33 config: IdentityRefreshTaskConfig, 34 storage_pool: StoragePool, 35 document_storage: std::sync::Arc<dyn DidDocumentStorage>, 36 identity_resolver: Arc<dyn IdentityResolver>, 37 cancellation_token: CancellationToken, 38 ) -> Self { 39 Self { 40 config, 41 storage_pool, 42 document_storage, 43 identity_resolver, 44 cancellation_token, 45 } 46 } 47 48 /// Runs the identity refresh task as a long-running process 49 /// 50 /// # Errors 51 /// Returns an error if the sleep interval cannot be converted, or if there's a problem 52 /// processing the expired DID documents 53 pub async fn run(&self) -> Result<()> { 54 tracing::debug!("IdentityRefreshTask started"); 55 56 let interval = self.config.sleep_interval.to_std()?; 57 58 let sleeper = sleep(interval); 59 tokio::pin!(sleeper); 60 61 loop { 62 tokio::select! { 63 () = self.cancellation_token.cancelled() => { 64 break; 65 }, 66 () = &mut sleeper => { 67 if let Err(err) = self.process_expired_documents().await { 68 tracing::error!("IdentityRefreshTask failed: {}", err); 69 } 70 sleeper.as_mut().reset(Instant::now() + interval); 71 } 72 } 73 } 74 75 tracing::info!("IdentityRefreshTask stopped"); 76 77 Ok(()) 78 } 79 80 async fn process_expired_documents(&self) -> Result<i32> { 81 // Find DID documents that have expired in a separate transaction 82 let expired_docs = { 83 let mut tx = self.storage_pool.begin().await?; 84 let docs = sqlx::query_as::<_, ExpiredDidDocument>( 85 "SELECT did FROM did_documents WHERE expires_at IS NOT NULL AND expires_at <= NOW() LIMIT 50" 86 ) 87 .fetch_all(tx.as_mut()) 88 .await?; 89 tx.commit().await?; 90 docs 91 }; 92 93 let count = expired_docs.len() as i32; 94 95 if count == 0 { 96 return Ok(0); 97 } 98 99 tracing::info!(count = count, "processing expired DID documents"); 100 101 for expired_doc in expired_docs { 102 tracing::debug!(did = expired_doc.did, "refreshing expired DID document"); 103 104 match self.refresh_did_document(&expired_doc.did).await { 105 Ok(()) => { 106 tracing::debug!(did = expired_doc.did, "successfully refreshed DID document"); 107 } 108 Err(err) => { 109 tracing::warn!( 110 did = expired_doc.did, 111 error = ?err, 112 "failed to refresh DID document, deleting from storage" 113 ); 114 115 // If we can't resolve the DID, delete it from storage 116 if let Err(delete_err) = self 117 .document_storage 118 .delete_document_by_did(&expired_doc.did) 119 .await 120 { 121 tracing::error!( 122 did = expired_doc.did, 123 error = ?delete_err, 124 "failed to delete expired DID document" 125 ); 126 } 127 } 128 } 129 } 130 131 Ok(count) 132 } 133 134 async fn refresh_did_document(&self, did: &str) -> Result<()> { 135 // Use the identity resolver to get the updated DID document 136 let document = self.identity_resolver.resolve(did).await?; 137 138 // Store the updated document using the DidDocumentStorage trait 139 // This will reset the expires_at column based on the storage implementation 140 self.document_storage.store_document(document).await?; 141 142 Ok(()) 143 } 144}