The smokesignal.events web application
1//! Storage-backed record resolver for ATProto records with network fallback.
2//!
3//! This module provides a `RecordResolver` implementation that:
4//! 1. First checks local storage for cached acceptance records
5//! 2. Falls back to fetching from the network via the identity's PDS
6//! 3. Caches fetched records in storage for future use
7
8use crate::storage::StoragePool;
9use crate::storage::acceptance::{acceptance_record_get, acceptance_record_upsert};
10use anyhow::{Context, anyhow};
11use atproto_identity::resolve::IdentityResolver;
12use atproto_record::aturi::ATURI;
13use std::str::FromStr;
14use std::sync::Arc;
15use tracing::{debug, warn};
16
17/// Record resolver that uses storage as a cache and falls back to network fetching.
18///
19/// This resolver attempts to retrieve acceptance records from local storage first.
20/// If not found, it resolves the DID to find the PDS endpoint, fetches the record from
21/// the network, and stores it for future use.
22pub struct StorageBackedRecordResolver {
23 http_client: reqwest::Client,
24 pub(crate) identity_resolver: Arc<dyn IdentityResolver>,
25 pool: StoragePool,
26}
27
28impl StorageBackedRecordResolver {
29 /// Create a new storage-backed record resolver.
30 pub fn new(
31 http_client: reqwest::Client,
32 identity_resolver: Arc<dyn IdentityResolver>,
33 pool: StoragePool,
34 ) -> Self {
35 Self {
36 http_client,
37 identity_resolver,
38 pool,
39 }
40 }
41}
42
43#[async_trait::async_trait]
44impl atproto_client::record_resolver::RecordResolver for StorageBackedRecordResolver {
45 async fn resolve<T>(&self, aturi: &str) -> anyhow::Result<T>
46 where
47 T: serde::de::DeserializeOwned + Send,
48 {
49 // Parse the AT-URI
50 let parsed = ATURI::from_str(aturi).map_err(|e| anyhow!("Invalid AT-URI: {}", e))?;
51
52 // Try to get from storage first (only for acceptance records)
53 if parsed.collection == "events.smokesignal.calendar.acceptance" {
54 debug!(aturi = %aturi, "Checking storage for acceptance record");
55
56 match acceptance_record_get(&self.pool, aturi).await {
57 Ok(Some(acceptance_record)) => {
58 debug!(aturi = %aturi, "Found acceptance record in storage cache");
59 // Deserialize the stored record
60 return serde_json::from_value(acceptance_record.record.0)
61 .map_err(|e| anyhow!("Failed to deserialize cached record: {}", e));
62 }
63 Ok(None) => {
64 debug!(aturi = %aturi, "Acceptance record not found in storage, fetching from network");
65 }
66 Err(e) => {
67 warn!(
68 aturi = %aturi,
69 error = %e,
70 "Failed to check storage for acceptance record, will fetch from network"
71 );
72 }
73 }
74 }
75
76 // Not in storage or not an acceptance record - fetch from network
77 debug!(aturi = %aturi, authority = %parsed.authority, "Resolving DID to fetch record from PDS");
78
79 // Resolve the DID to get the PDS endpoint
80 let document = self
81 .identity_resolver
82 .resolve(&parsed.authority)
83 .await
84 .with_context(|| format!("Failed to resolve DID: {}", parsed.authority))?;
85
86 // Find the PDS endpoint
87 let pds_endpoint = document
88 .service
89 .iter()
90 .find(|s| s.r#type == "AtprotoPersonalDataServer")
91 .map(|s| s.service_endpoint.as_str())
92 .ok_or_else(|| anyhow!("No PDS endpoint found for DID: {}", parsed.authority))?;
93
94 debug!(
95 aturi = %aturi,
96 pds_endpoint = %pds_endpoint,
97 "Fetching record from PDS"
98 );
99
100 // Fetch the record using the XRPC client
101 let response = atproto_client::com::atproto::repo::get_record(
102 &self.http_client,
103 &atproto_client::client::Auth::None,
104 pds_endpoint,
105 &parsed.authority,
106 &parsed.collection,
107 &parsed.record_key,
108 None,
109 )
110 .await
111 .with_context(|| format!("Failed to fetch record from PDS: {}", pds_endpoint))?;
112
113 match response {
114 atproto_client::com::atproto::repo::GetRecordResponse::Record {
115 value, cid, ..
116 } => {
117 // If this is an acceptance record, store it for future use
118 if parsed.collection == "events.smokesignal.calendar.acceptance" {
119 debug!(aturi = %aturi, "Caching acceptance record in storage");
120
121 // Store asynchronously, but don't fail if storage fails
122 if let Err(e) =
123 acceptance_record_upsert(&self.pool, aturi, &cid, &parsed.authority, &value)
124 .await
125 {
126 warn!(
127 aturi = %aturi,
128 error = %e,
129 "Failed to cache acceptance record in storage"
130 );
131 }
132 }
133
134 // Deserialize and return the record
135 serde_json::from_value(value)
136 .map_err(|e| anyhow!("Failed to deserialize record: {}", e))
137 }
138 atproto_client::com::atproto::repo::GetRecordResponse::Error(error) => {
139 Err(anyhow!("Failed to fetch record: {}", error.error_message()))
140 }
141 }
142 }
143}
144
145// Implement RecordResolver for &StorageBackedRecordResolver to allow passing by reference
146#[async_trait::async_trait]
147impl atproto_client::record_resolver::RecordResolver for &StorageBackedRecordResolver {
148 async fn resolve<T>(&self, aturi: &str) -> anyhow::Result<T>
149 where
150 T: serde::de::DeserializeOwned + Send,
151 {
152 // Delegate to the implementation for StorageBackedRecordResolver
153 (*self).resolve(aturi).await
154 }
155}