//! Storage-backed record resolver for ATProto records with network fallback. //! //! This module provides a `RecordResolver` implementation that: //! 1. First checks local storage for cached acceptance records //! 2. Falls back to fetching from the network via the identity's PDS //! 3. Caches fetched records in storage for future use use crate::storage::StoragePool; use crate::storage::acceptance::{acceptance_record_get, acceptance_record_upsert}; use anyhow::{Context, anyhow}; use atproto_identity::resolve::IdentityResolver; use atproto_record::aturi::ATURI; use std::str::FromStr; use std::sync::Arc; use tracing::{debug, warn}; /// Record resolver that uses storage as a cache and falls back to network fetching. /// /// This resolver attempts to retrieve acceptance records from local storage first. /// If not found, it resolves the DID to find the PDS endpoint, fetches the record from /// the network, and stores it for future use. pub struct StorageBackedRecordResolver { http_client: reqwest::Client, pub(crate) identity_resolver: Arc, pool: StoragePool, } impl StorageBackedRecordResolver { /// Create a new storage-backed record resolver. pub fn new( http_client: reqwest::Client, identity_resolver: Arc, pool: StoragePool, ) -> Self { Self { http_client, identity_resolver, pool, } } } #[async_trait::async_trait] impl atproto_client::record_resolver::RecordResolver for StorageBackedRecordResolver { async fn resolve(&self, aturi: &str) -> anyhow::Result where T: serde::de::DeserializeOwned + Send, { // Parse the AT-URI let parsed = ATURI::from_str(aturi).map_err(|e| anyhow!("Invalid AT-URI: {}", e))?; // Try to get from storage first (only for acceptance records) if parsed.collection == "events.smokesignal.calendar.acceptance" { debug!(aturi = %aturi, "Checking storage for acceptance record"); match acceptance_record_get(&self.pool, aturi).await { Ok(Some(acceptance_record)) => { debug!(aturi = %aturi, "Found acceptance record in storage cache"); // Deserialize the stored record return serde_json::from_value(acceptance_record.record.0) .map_err(|e| anyhow!("Failed to deserialize cached record: {}", e)); } Ok(None) => { debug!(aturi = %aturi, "Acceptance record not found in storage, fetching from network"); } Err(e) => { warn!( aturi = %aturi, error = %e, "Failed to check storage for acceptance record, will fetch from network" ); } } } // Not in storage or not an acceptance record - fetch from network debug!(aturi = %aturi, authority = %parsed.authority, "Resolving DID to fetch record from PDS"); // Resolve the DID to get the PDS endpoint let document = self .identity_resolver .resolve(&parsed.authority) .await .with_context(|| format!("Failed to resolve DID: {}", parsed.authority))?; // Find the PDS endpoint let pds_endpoint = document .service .iter() .find(|s| s.r#type == "AtprotoPersonalDataServer") .map(|s| s.service_endpoint.as_str()) .ok_or_else(|| anyhow!("No PDS endpoint found for DID: {}", parsed.authority))?; debug!( aturi = %aturi, pds_endpoint = %pds_endpoint, "Fetching record from PDS" ); // Fetch the record using the XRPC client let response = atproto_client::com::atproto::repo::get_record( &self.http_client, &atproto_client::client::Auth::None, pds_endpoint, &parsed.authority, &parsed.collection, &parsed.record_key, None, ) .await .with_context(|| format!("Failed to fetch record from PDS: {}", pds_endpoint))?; match response { atproto_client::com::atproto::repo::GetRecordResponse::Record { value, cid, .. } => { // If this is an acceptance record, store it for future use if parsed.collection == "events.smokesignal.calendar.acceptance" { debug!(aturi = %aturi, "Caching acceptance record in storage"); // Store asynchronously, but don't fail if storage fails if let Err(e) = acceptance_record_upsert(&self.pool, aturi, &cid, &parsed.authority, &value) .await { warn!( aturi = %aturi, error = %e, "Failed to cache acceptance record in storage" ); } } // Deserialize and return the record serde_json::from_value(value) .map_err(|e| anyhow!("Failed to deserialize record: {}", e)) } atproto_client::com::atproto::repo::GetRecordResponse::Error(error) => { Err(anyhow!("Failed to fetch record: {}", error.error_message())) } } } } // Implement RecordResolver for &StorageBackedRecordResolver to allow passing by reference #[async_trait::async_trait] impl atproto_client::record_resolver::RecordResolver for &StorageBackedRecordResolver { async fn resolve(&self, aturi: &str) -> anyhow::Result where T: serde::de::DeserializeOwned + Send, { // Delegate to the implementation for StorageBackedRecordResolver (*self).resolve(aturi).await } }