use std::sync::Arc; use anyhow::Result; use atproto_identity::{resolve::IdentityResolver, traits::DidDocumentStorage}; use chrono::Duration; use sqlx::FromRow; use tokio::time::{Instant, sleep}; use tokio_util::sync::CancellationToken; use crate::storage::StoragePool; pub struct IdentityRefreshTaskConfig { pub sleep_interval: Duration, pub worker_id: String, } pub struct IdentityRefreshTask { pub config: IdentityRefreshTaskConfig, pub storage_pool: StoragePool, pub document_storage: std::sync::Arc, pub identity_resolver: Arc, pub cancellation_token: CancellationToken, } #[derive(FromRow)] struct ExpiredDidDocument { did: String, } impl IdentityRefreshTask { #[must_use] pub fn new( config: IdentityRefreshTaskConfig, storage_pool: StoragePool, document_storage: std::sync::Arc, identity_resolver: Arc, cancellation_token: CancellationToken, ) -> Self { Self { config, storage_pool, document_storage, identity_resolver, cancellation_token, } } /// Runs the identity refresh task as a long-running process /// /// # Errors /// Returns an error if the sleep interval cannot be converted, or if there's a problem /// processing the expired DID documents pub async fn run(&self) -> Result<()> { tracing::debug!("IdentityRefreshTask started"); let interval = self.config.sleep_interval.to_std()?; let sleeper = sleep(interval); tokio::pin!(sleeper); loop { tokio::select! { () = self.cancellation_token.cancelled() => { break; }, () = &mut sleeper => { if let Err(err) = self.process_expired_documents().await { tracing::error!("IdentityRefreshTask failed: {}", err); } sleeper.as_mut().reset(Instant::now() + interval); } } } tracing::info!("IdentityRefreshTask stopped"); Ok(()) } async fn process_expired_documents(&self) -> Result { // Find DID documents that have expired in a separate transaction let expired_docs = { let mut tx = self.storage_pool.begin().await?; let docs = sqlx::query_as::<_, ExpiredDidDocument>( "SELECT did FROM did_documents WHERE expires_at IS NOT NULL AND expires_at <= NOW() LIMIT 50" ) .fetch_all(tx.as_mut()) .await?; tx.commit().await?; docs }; let count = expired_docs.len() as i32; if count == 0 { return Ok(0); } tracing::info!(count = count, "processing expired DID documents"); for expired_doc in expired_docs { tracing::debug!(did = expired_doc.did, "refreshing expired DID document"); match self.refresh_did_document(&expired_doc.did).await { Ok(()) => { tracing::debug!(did = expired_doc.did, "successfully refreshed DID document"); } Err(err) => { tracing::warn!( did = expired_doc.did, error = ?err, "failed to refresh DID document, deleting from storage" ); // If we can't resolve the DID, delete it from storage if let Err(delete_err) = self .document_storage .delete_document_by_did(&expired_doc.did) .await { tracing::error!( did = expired_doc.did, error = ?delete_err, "failed to delete expired DID document" ); } } } } Ok(count) } async fn refresh_did_document(&self, did: &str) -> Result<()> { // Use the identity resolver to get the updated DID document let document = self.identity_resolver.resolve(did).await?; // Store the updated document using the DidDocumentStorage trait // This will reset the expires_at column based on the storage implementation self.document_storage.store_document(document).await?; Ok(()) } }