The smokesignal.events web application
at main 155 lines 6.1 kB view raw
1//! Storage-backed record resolver for ATProto records with network fallback. 2//! 3//! This module provides a `RecordResolver` implementation that: 4//! 1. First checks local storage for cached acceptance records 5//! 2. Falls back to fetching from the network via the identity's PDS 6//! 3. Caches fetched records in storage for future use 7 8use crate::storage::StoragePool; 9use crate::storage::acceptance::{acceptance_record_get, acceptance_record_upsert}; 10use anyhow::{Context, anyhow}; 11use atproto_identity::resolve::IdentityResolver; 12use atproto_record::aturi::ATURI; 13use std::str::FromStr; 14use std::sync::Arc; 15use tracing::{debug, warn}; 16 17/// Record resolver that uses storage as a cache and falls back to network fetching. 18/// 19/// This resolver attempts to retrieve acceptance records from local storage first. 20/// If not found, it resolves the DID to find the PDS endpoint, fetches the record from 21/// the network, and stores it for future use. 22pub struct StorageBackedRecordResolver { 23 http_client: reqwest::Client, 24 pub(crate) identity_resolver: Arc<dyn IdentityResolver>, 25 pool: StoragePool, 26} 27 28impl StorageBackedRecordResolver { 29 /// Create a new storage-backed record resolver. 30 pub fn new( 31 http_client: reqwest::Client, 32 identity_resolver: Arc<dyn IdentityResolver>, 33 pool: StoragePool, 34 ) -> Self { 35 Self { 36 http_client, 37 identity_resolver, 38 pool, 39 } 40 } 41} 42 43#[async_trait::async_trait] 44impl atproto_client::record_resolver::RecordResolver for StorageBackedRecordResolver { 45 async fn resolve<T>(&self, aturi: &str) -> anyhow::Result<T> 46 where 47 T: serde::de::DeserializeOwned + Send, 48 { 49 // Parse the AT-URI 50 let parsed = ATURI::from_str(aturi).map_err(|e| anyhow!("Invalid AT-URI: {}", e))?; 51 52 // Try to get from storage first (only for acceptance records) 53 if parsed.collection == "events.smokesignal.calendar.acceptance" { 54 debug!(aturi = %aturi, "Checking storage for acceptance record"); 55 56 match acceptance_record_get(&self.pool, aturi).await { 57 Ok(Some(acceptance_record)) => { 58 debug!(aturi = %aturi, "Found acceptance record in storage cache"); 59 // Deserialize the stored record 60 return serde_json::from_value(acceptance_record.record.0) 61 .map_err(|e| anyhow!("Failed to deserialize cached record: {}", e)); 62 } 63 Ok(None) => { 64 debug!(aturi = %aturi, "Acceptance record not found in storage, fetching from network"); 65 } 66 Err(e) => { 67 warn!( 68 aturi = %aturi, 69 error = %e, 70 "Failed to check storage for acceptance record, will fetch from network" 71 ); 72 } 73 } 74 } 75 76 // Not in storage or not an acceptance record - fetch from network 77 debug!(aturi = %aturi, authority = %parsed.authority, "Resolving DID to fetch record from PDS"); 78 79 // Resolve the DID to get the PDS endpoint 80 let document = self 81 .identity_resolver 82 .resolve(&parsed.authority) 83 .await 84 .with_context(|| format!("Failed to resolve DID: {}", parsed.authority))?; 85 86 // Find the PDS endpoint 87 let pds_endpoint = document 88 .service 89 .iter() 90 .find(|s| s.r#type == "AtprotoPersonalDataServer") 91 .map(|s| s.service_endpoint.as_str()) 92 .ok_or_else(|| anyhow!("No PDS endpoint found for DID: {}", parsed.authority))?; 93 94 debug!( 95 aturi = %aturi, 96 pds_endpoint = %pds_endpoint, 97 "Fetching record from PDS" 98 ); 99 100 // Fetch the record using the XRPC client 101 let response = atproto_client::com::atproto::repo::get_record( 102 &self.http_client, 103 &atproto_client::client::Auth::None, 104 pds_endpoint, 105 &parsed.authority, 106 &parsed.collection, 107 &parsed.record_key, 108 None, 109 ) 110 .await 111 .with_context(|| format!("Failed to fetch record from PDS: {}", pds_endpoint))?; 112 113 match response { 114 atproto_client::com::atproto::repo::GetRecordResponse::Record { 115 value, cid, .. 116 } => { 117 // If this is an acceptance record, store it for future use 118 if parsed.collection == "events.smokesignal.calendar.acceptance" { 119 debug!(aturi = %aturi, "Caching acceptance record in storage"); 120 121 // Store asynchronously, but don't fail if storage fails 122 if let Err(e) = 123 acceptance_record_upsert(&self.pool, aturi, &cid, &parsed.authority, &value) 124 .await 125 { 126 warn!( 127 aturi = %aturi, 128 error = %e, 129 "Failed to cache acceptance record in storage" 130 ); 131 } 132 } 133 134 // Deserialize and return the record 135 serde_json::from_value(value) 136 .map_err(|e| anyhow!("Failed to deserialize record: {}", e)) 137 } 138 atproto_client::com::atproto::repo::GetRecordResponse::Error(error) => { 139 Err(anyhow!("Failed to fetch record: {}", error.error_message())) 140 } 141 } 142 } 143} 144 145// Implement RecordResolver for &StorageBackedRecordResolver to allow passing by reference 146#[async_trait::async_trait] 147impl atproto_client::record_resolver::RecordResolver for &StorageBackedRecordResolver { 148 async fn resolve<T>(&self, aturi: &str) -> anyhow::Result<T> 149 where 150 T: serde::de::DeserializeOwned + Send, 151 { 152 // Delegate to the implementation for StorageBackedRecordResolver 153 (*self).resolve(aturi).await 154 } 155}