QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
···245245 /// When set, the root handler will serve files from this directory.
246246 /// Default: "www" (relative to working directory)
247247 pub static_files_dir: String,
248248+249249+ /// Enable Jetstream consumer for AT Protocol events.
250250+ /// When enabled, the service will consume Account and Identity events
251251+ /// to maintain cache consistency.
252252+ /// Default: false
253253+ pub jetstream_enabled: bool,
254254+255255+ /// Jetstream WebSocket hostname for consuming AT Protocol events.
256256+ /// Example: "jetstream.atproto.tools" or "jetstream1.us-west.bsky.network"
257257+ /// Default: "jetstream.atproto.tools"
258258+ pub jetstream_hostname: String,
248259}
249260250261impl Config {
···327338 proactive_refresh_enabled: parse_env("PROACTIVE_REFRESH_ENABLED", false)?,
328339 proactive_refresh_threshold: parse_env("PROACTIVE_REFRESH_THRESHOLD", 0.8)?,
329340 static_files_dir: get_env_or_default("STATIC_FILES_DIR", Some("www")).unwrap(),
341341+ jetstream_enabled: parse_env("JETSTREAM_ENABLED", false)?,
342342+ jetstream_hostname: get_env_or_default("JETSTREAM_HOSTNAME", Some("jetstream.atproto.tools")).unwrap(),
330343 };
331344332345 // Calculate the Cache-Control header value if enabled
+354
src/jetstream_handler.rs
···11+//! Jetstream event handler for QuickDID
22+//!
33+//! This module provides the event handler for processing AT Protocol Jetstream events,
44+//! specifically handling Account and Identity events to maintain cache consistency.
55+66+use crate::handle_resolver::HandleResolver;
77+use crate::metrics::MetricsPublisher;
88+use anyhow::Result;
99+use atproto_jetstream::{EventHandler, JetstreamEvent};
1010+use std::sync::Arc;
1111+use tracing::{debug, info, warn};
1212+1313+/// Jetstream event handler for QuickDID
1414+///
1515+/// This handler processes AT Protocol events from the Jetstream firehose to keep
1616+/// the handle resolver cache in sync with the network state.
1717+///
1818+/// # Event Processing
1919+///
2020+/// ## Account Events
2121+/// - When an account is marked as "deleted" or "deactivated", the DID is purged from the cache
2222+/// - Metrics are tracked for successful and failed purge operations
2323+///
2424+/// ## Identity Events
2525+/// - When an identity event contains a handle, the handle-to-DID mapping is updated
2626+/// - When an identity event lacks a handle (indicating removal), the DID is purged
2727+/// - Metrics are tracked for successful and failed update/purge operations
2828+///
2929+/// # Example
3030+///
3131+/// ```no_run
3232+/// use quickdid::jetstream_handler::QuickDidEventHandler;
3333+/// use quickdid::handle_resolver::HandleResolver;
3434+/// use quickdid::metrics::MetricsPublisher;
3535+/// use std::sync::Arc;
3636+///
3737+/// # async fn example(resolver: Arc<dyn HandleResolver>, metrics: Arc<dyn MetricsPublisher>) {
3838+/// let handler = QuickDidEventHandler::new(resolver, metrics);
3939+/// // Register with a JetstreamConsumer
4040+/// # }
4141+/// ```
4242+pub struct QuickDidEventHandler {
4343+ resolver: Arc<dyn HandleResolver>,
4444+ metrics: Arc<dyn MetricsPublisher>,
4545+}
4646+4747+impl QuickDidEventHandler {
4848+ /// Create a new Jetstream event handler
4949+ ///
5050+ /// # Arguments
5151+ ///
5252+ /// * `resolver` - The handle resolver to use for cache operations
5353+ /// * `metrics` - The metrics publisher for tracking event processing
5454+ pub fn new(resolver: Arc<dyn HandleResolver>, metrics: Arc<dyn MetricsPublisher>) -> Self {
5555+ Self { resolver, metrics }
5656+ }
5757+}
5858+5959+#[async_trait::async_trait]
6060+impl EventHandler for QuickDidEventHandler {
6161+ fn handler_id(&self) -> String {
6262+ "quickdid_handler".to_string()
6363+ }
6464+6565+ async fn handle_event(&self, event: JetstreamEvent) -> Result<()> {
6666+ match event {
6767+ JetstreamEvent::Account { did, kind, .. } => {
6868+ // If account kind is "deleted" or "deactivated", purge the DID
6969+ if kind == "deleted" || kind == "deactivated" {
7070+ info!(did = %did, kind = %kind, "Purging account");
7171+ match self.resolver.purge(&did).await {
7272+ Ok(()) => {
7373+ self.metrics.incr("jetstream.account.purged").await;
7474+ }
7575+ Err(e) => {
7676+ warn!(did = %did, error = ?e, "Failed to purge DID");
7777+ self.metrics.incr("jetstream.account.purge_error").await;
7878+ }
7979+ }
8080+ }
8181+ self.metrics.incr("jetstream.account.processed").await;
8282+ }
8383+ JetstreamEvent::Identity { did, identity, .. } => {
8484+ // Extract handle from identity JSON if available
8585+ if !identity.is_null() {
8686+ if let Some(handle_value) = identity.get("handle") {
8787+ if let Some(handle) = handle_value.as_str() {
8888+ info!(handle = %handle, did = %did, "Updating identity mapping");
8989+ match self.resolver.set(handle, &did).await {
9090+ Ok(()) => {
9191+ self.metrics.incr("jetstream.identity.updated").await;
9292+ }
9393+ Err(e) => {
9494+ warn!(handle = %handle, did = %did, error = ?e, "Failed to update mapping");
9595+ self.metrics.incr("jetstream.identity.update_error").await;
9696+ }
9797+ }
9898+ } else {
9999+ // No handle or invalid handle, purge the DID
100100+ info!(did = %did, "Purging identity without valid handle");
101101+ match self.resolver.purge(&did).await {
102102+ Ok(()) => {
103103+ self.metrics.incr("jetstream.identity.purged").await;
104104+ }
105105+ Err(e) => {
106106+ warn!(did = %did, error = ?e, "Failed to purge DID");
107107+ self.metrics.incr("jetstream.identity.purge_error").await;
108108+ }
109109+ }
110110+ }
111111+ } else {
112112+ // No handle field, purge the DID
113113+ info!(did = %did, "Purging identity without handle field");
114114+ match self.resolver.purge(&did).await {
115115+ Ok(()) => {
116116+ self.metrics.incr("jetstream.identity.purged").await;
117117+ }
118118+ Err(e) => {
119119+ warn!(did = %did, error = ?e, "Failed to purge DID");
120120+ self.metrics.incr("jetstream.identity.purge_error").await;
121121+ }
122122+ }
123123+ }
124124+ } else {
125125+ // Null identity means removed, purge the DID
126126+ info!(did = %did, "Purging identity with null info");
127127+ match self.resolver.purge(&did).await {
128128+ Ok(()) => {
129129+ self.metrics.incr("jetstream.identity.purged").await;
130130+ }
131131+ Err(e) => {
132132+ warn!(did = %did, error = ?e, "Failed to purge DID");
133133+ self.metrics.incr("jetstream.identity.purge_error").await;
134134+ }
135135+ }
136136+ }
137137+ self.metrics.incr("jetstream.identity.processed").await;
138138+ }
139139+ _ => {
140140+ // Other event types we don't care about
141141+ debug!("Ignoring unhandled Jetstream event type");
142142+ }
143143+ }
144144+ Ok(())
145145+ }
146146+}
147147+148148+#[cfg(test)]
149149+mod tests {
150150+ use super::*;
151151+ use crate::handle_resolver::HandleResolverError;
152152+ use crate::metrics::NoOpMetricsPublisher;
153153+ use async_trait::async_trait;
154154+ use serde_json::json;
155155+156156+ /// Mock resolver for testing
157157+ struct MockResolver {
158158+ purge_called: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
159159+ set_called: std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>>,
160160+ }
161161+162162+ impl MockResolver {
163163+ fn new() -> Self {
164164+ Self {
165165+ purge_called: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
166166+ set_called: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
167167+ }
168168+ }
169169+170170+ fn get_purge_calls(&self) -> Vec<String> {
171171+ self.purge_called.lock().unwrap().clone()
172172+ }
173173+174174+ fn get_set_calls(&self) -> Vec<(String, String)> {
175175+ self.set_called.lock().unwrap().clone()
176176+ }
177177+ }
178178+179179+ #[async_trait]
180180+ impl HandleResolver for MockResolver {
181181+ async fn resolve(&self, _handle: &str) -> Result<(String, u64), HandleResolverError> {
182182+ unimplemented!("Not needed for tests")
183183+ }
184184+185185+ async fn purge(&self, subject: &str) -> Result<(), HandleResolverError> {
186186+ self.purge_called.lock().unwrap().push(subject.to_string());
187187+ Ok(())
188188+ }
189189+190190+ async fn set(&self, handle: &str, did: &str) -> Result<(), HandleResolverError> {
191191+ self.set_called
192192+ .lock()
193193+ .unwrap()
194194+ .push((handle.to_string(), did.to_string()));
195195+ Ok(())
196196+ }
197197+ }
198198+199199+ #[tokio::test]
200200+ async fn test_account_deleted_event() {
201201+ let resolver = Arc::new(MockResolver::new());
202202+ let metrics = Arc::new(NoOpMetricsPublisher::new());
203203+ let handler = QuickDidEventHandler::new(resolver.clone(), metrics);
204204+205205+ // Create a deleted account event
206206+ let event = JetstreamEvent::Account {
207207+ did: "did:plc:test123".to_string(),
208208+ kind: "deleted".to_string(),
209209+ time_us: 0,
210210+ identity: json!(null),
211211+ };
212212+213213+ handler.handle_event(event).await.unwrap();
214214+215215+ // Verify the DID was purged
216216+ let purge_calls = resolver.get_purge_calls();
217217+ assert_eq!(purge_calls.len(), 1);
218218+ assert_eq!(purge_calls[0], "did:plc:test123");
219219+ }
220220+221221+ #[tokio::test]
222222+ async fn test_account_deactivated_event() {
223223+ let resolver = Arc::new(MockResolver::new());
224224+ let metrics = Arc::new(NoOpMetricsPublisher::new());
225225+ let handler = QuickDidEventHandler::new(resolver.clone(), metrics);
226226+227227+ // Create a deactivated account event
228228+ let event = JetstreamEvent::Account {
229229+ did: "did:plc:test456".to_string(),
230230+ kind: "deactivated".to_string(),
231231+ time_us: 0,
232232+ identity: json!(null),
233233+ };
234234+235235+ handler.handle_event(event).await.unwrap();
236236+237237+ // Verify the DID was purged
238238+ let purge_calls = resolver.get_purge_calls();
239239+ assert_eq!(purge_calls.len(), 1);
240240+ assert_eq!(purge_calls[0], "did:plc:test456");
241241+ }
242242+243243+ #[tokio::test]
244244+ async fn test_account_active_event() {
245245+ let resolver = Arc::new(MockResolver::new());
246246+ let metrics = Arc::new(NoOpMetricsPublisher::new());
247247+ let handler = QuickDidEventHandler::new(resolver.clone(), metrics);
248248+249249+ // Create an active account event (should not purge)
250250+ let event = JetstreamEvent::Account {
251251+ did: "did:plc:test789".to_string(),
252252+ kind: "active".to_string(),
253253+ time_us: 0,
254254+ identity: json!(null),
255255+ };
256256+257257+ handler.handle_event(event).await.unwrap();
258258+259259+ // Verify the DID was NOT purged
260260+ let purge_calls = resolver.get_purge_calls();
261261+ assert_eq!(purge_calls.len(), 0);
262262+ }
263263+264264+ #[tokio::test]
265265+ async fn test_identity_with_handle_event() {
266266+ let resolver = Arc::new(MockResolver::new());
267267+ let metrics = Arc::new(NoOpMetricsPublisher::new());
268268+ let handler = QuickDidEventHandler::new(resolver.clone(), metrics);
269269+270270+ // Create an identity event with a handle
271271+ let event = JetstreamEvent::Identity {
272272+ did: "did:plc:testuser".to_string(),
273273+ kind: "update".to_string(),
274274+ time_us: 0,
275275+ identity: json!({
276276+ "handle": "alice.bsky.social"
277277+ }),
278278+ };
279279+280280+ handler.handle_event(event).await.unwrap();
281281+282282+ // Verify the set method was called
283283+ let set_calls = resolver.get_set_calls();
284284+ assert_eq!(set_calls.len(), 1);
285285+ assert_eq!(set_calls[0], ("alice.bsky.social".to_string(), "did:plc:testuser".to_string()));
286286+287287+ // Verify no purge was called
288288+ let purge_calls = resolver.get_purge_calls();
289289+ assert_eq!(purge_calls.len(), 0);
290290+ }
291291+292292+ #[tokio::test]
293293+ async fn test_identity_without_handle_event() {
294294+ let resolver = Arc::new(MockResolver::new());
295295+ let metrics = Arc::new(NoOpMetricsPublisher::new());
296296+ let handler = QuickDidEventHandler::new(resolver.clone(), metrics);
297297+298298+ // Create an identity event without a handle field
299299+ let event = JetstreamEvent::Identity {
300300+ did: "did:plc:nohandle".to_string(),
301301+ kind: "update".to_string(),
302302+ time_us: 0,
303303+ identity: json!({
304304+ "other_field": "value"
305305+ }),
306306+ };
307307+308308+ handler.handle_event(event).await.unwrap();
309309+310310+ // Verify the DID was purged
311311+ let purge_calls = resolver.get_purge_calls();
312312+ assert_eq!(purge_calls.len(), 1);
313313+ assert_eq!(purge_calls[0], "did:plc:nohandle");
314314+315315+ // Verify set was not called
316316+ let set_calls = resolver.get_set_calls();
317317+ assert_eq!(set_calls.len(), 0);
318318+ }
319319+320320+ #[tokio::test]
321321+ async fn test_identity_with_null_identity() {
322322+ let resolver = Arc::new(MockResolver::new());
323323+ let metrics = Arc::new(NoOpMetricsPublisher::new());
324324+ let handler = QuickDidEventHandler::new(resolver.clone(), metrics);
325325+326326+ // Create an identity event with null identity
327327+ let event = JetstreamEvent::Identity {
328328+ did: "did:plc:nullidentity".to_string(),
329329+ kind: "delete".to_string(),
330330+ time_us: 0,
331331+ identity: json!(null),
332332+ };
333333+334334+ handler.handle_event(event).await.unwrap();
335335+336336+ // Verify the DID was purged
337337+ let purge_calls = resolver.get_purge_calls();
338338+ assert_eq!(purge_calls.len(), 1);
339339+ assert_eq!(purge_calls[0], "did:plc:nullidentity");
340340+341341+ // Verify set was not called
342342+ let set_calls = resolver.get_set_calls();
343343+ assert_eq!(set_calls.len(), 0);
344344+ }
345345+346346+ #[tokio::test]
347347+ async fn test_handler_id() {
348348+ let resolver = Arc::new(MockResolver::new());
349349+ let metrics = Arc::new(NoOpMetricsPublisher::new());
350350+ let handler = QuickDidEventHandler::new(resolver, metrics);
351351+352352+ assert_eq!(handler.handler_id(), "quickdid_handler");
353353+ }
354354+}
+1
src/lib.rs
···22pub mod config; // Config and Args needed by binary
33pub mod handle_resolver; // Only traits and factory functions exposed
44pub mod http; // Only create_router exposed
55+pub mod jetstream_handler; // Jetstream event handler for AT Protocol events
5667// Semi-public modules - needed by binary but with limited exposure
78pub mod cache; // Only create_redis_pool exposed