//! Jetstream consumer for AT Protocol badge award events. //! //! Handles real-time processing of badge award events from Jetstream, //! with event queuing and cursor management for reliable consumption. use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; use crate::errors::ShowcaseError; use anyhow::Result; use async_trait::async_trait; use atproto_jetstream::{EventHandler, JetstreamEvent}; use tokio::sync::Mutex; use tokio::sync::mpsc; /// Type alias for badge event receiver to hide implementation details pub type BadgeEventReceiver = mpsc::UnboundedReceiver; /// Badge event types from Jetstream #[derive(Debug, Clone)] pub enum AwardEvent { /// Record commit event for a badge award. Commit { /// DID of the record owner. did: String, /// Record key within the collection. rkey: String, /// Content identifier of the record. cid: String, /// The complete record data. record: serde_json::Value, }, /// Record deletion event for a badge award. Delete { /// DID of the record owner. did: String, /// Record key within the collection. rkey: String, }, } /// Event handler that publishes badge events to an in-memory queue pub struct AwardEventHandler { id: String, event_sender: mpsc::UnboundedSender, } impl AwardEventHandler { fn new(id: String, event_sender: mpsc::UnboundedSender) -> Self { Self { id, event_sender } } } #[async_trait] impl EventHandler for AwardEventHandler { async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { let award_event = match event { JetstreamEvent::Commit { did, commit, .. } => { if commit.collection != "community.lexicon.badge.award" { return Ok(()); } AwardEvent::Commit { did, rkey: commit.rkey, cid: commit.cid, record: commit.record, } } JetstreamEvent::Delete { did, commit, .. } => { if commit.collection != "community.lexicon.badge.award" { return Ok(()); } AwardEvent::Delete { did, rkey: commit.rkey, } } JetstreamEvent::Identity { .. } | JetstreamEvent::Account { .. } => { return Ok(()); } }; if let Err(err) = self.event_sender.send(award_event) { let showcase_error = ShowcaseError::ConsumerQueueSendFailed { details: err.to_string(), }; tracing::error!(?showcase_error); } Ok(()) } fn handler_id(&self) -> String { self.id.clone() } } /// Cursor writer handler that periodically writes the latest time_us to a file pub struct CursorWriterHandler { id: String, cursor_path: String, last_time_us: Arc, last_write: Arc>, write_interval: Duration, } impl CursorWriterHandler { fn new(id: String, cursor_path: String) -> Self { Self { id, cursor_path, last_time_us: Arc::new(AtomicU64::new(0)), last_write: Arc::new(Mutex::new(Instant::now())), write_interval: Duration::from_secs(30), } } async fn maybe_write_cursor(&self) -> Result<()> { let current_time_us = self.last_time_us.load(Ordering::Relaxed); if current_time_us == 0 { return Ok(()); } let mut last_write = self.last_write.lock().await; if last_write.elapsed() >= self.write_interval { tokio::fs::write(&self.cursor_path, current_time_us.to_string()).await?; *last_write = Instant::now(); tracing::debug!("Wrote cursor to {}: {}", self.cursor_path, current_time_us); } Ok(()) } } #[async_trait] impl EventHandler for CursorWriterHandler { async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { // Extract time_us from any event type let time_us = match &event { JetstreamEvent::Commit { time_us, .. } => *time_us, JetstreamEvent::Delete { time_us, .. } => *time_us, JetstreamEvent::Identity { time_us, .. } => *time_us, JetstreamEvent::Account { time_us, .. } => *time_us, }; // Update the latest time_us self.last_time_us.store(time_us, Ordering::Relaxed); // Try to write the cursor periodically if let Err(err) = self.maybe_write_cursor().await { tracing::warn!("Failed to write cursor: {}", err); } Ok(()) } fn handler_id(&self) -> String { self.id.clone() } } /// Consumer factory for creating event handlers and queue receivers pub struct Consumer {} impl Consumer { /// Create a badge event handler and return the receiver for processing pub fn create_badge_handler(&self) -> (Arc, BadgeEventReceiver) { let (sender, receiver) = mpsc::unbounded_channel(); let handler = Arc::new(AwardEventHandler::new( "badge-processor".to_string(), sender, )); (handler, receiver) } /// Create a cursor writer handler for the given cursor path pub fn create_cursor_writer_handler(&self, cursor_path: String) -> Arc { Arc::new(CursorWriterHandler::new( "cursor-writer".to_string(), cursor_path, )) } }