use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; use crate::errors::BlahgError; use anyhow::Result; use async_trait::async_trait; use atproto_jetstream::{EventHandler, JetstreamEvent}; use tokio::sync::Mutex; use tokio::sync::mpsc; /// Receiver for `BlahgEvent` instances from the jetstream consumer. pub type BlahgEventReceiver = mpsc::UnboundedReceiver; /// ATProtocol events relevant to the blog application. #[derive(Debug, Clone)] pub enum BlahgEvent { /// A record was committed to the ATProtocol network. Commit { /// The DID of the record author did: String, /// The collection name collection: String, /// The record key rkey: String, /// The content identifier cid: String, /// The record data record: serde_json::Value, }, /// A record was deleted from the ATProtocol network. Delete { /// The DID of the record author did: String, /// The collection name collection: String, /// The record key rkey: String, }, } /// Handler for processing ATProtocol events and converting them to `BlahgEvent` instances. pub struct BlahgEventHandler { id: String, event_sender: mpsc::UnboundedSender, } impl BlahgEventHandler { fn new(id: String, event_sender: mpsc::UnboundedSender) -> Self { Self { id, event_sender } } } #[async_trait] impl EventHandler for BlahgEventHandler { async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { let award_event = match event { JetstreamEvent::Commit { did, commit, .. } => { // Filter for supported collection types match commit.collection.as_str() { "tools.smokesignal.blahg.content.post" | "app.bsky.feed.post" | "app.bsky.feed.like" | "community.lexicon.interaction.like" => BlahgEvent::Commit { did, collection: commit.collection, rkey: commit.rkey, cid: commit.cid, record: commit.record, }, _ => return Ok(()), } } JetstreamEvent::Delete { did, commit, .. } => { // Filter for supported collection types match commit.collection.as_str() { "tools.smokesignal.blahg.content.post" | "app.bsky.feed.post" | "app.bsky.feed.like" | "community.lexicon.interaction.like" => BlahgEvent::Delete { did, collection: commit.collection, rkey: commit.rkey, }, _ => return Ok(()), } } JetstreamEvent::Identity { .. } | JetstreamEvent::Account { .. } => { return Ok(()); } }; if let Err(err) = self.event_sender.send(award_event) { let blahg_error = BlahgError::ConsumerQueueSendFailed { details: err.to_string(), }; tracing::error!(?blahg_error); } Ok(()) } fn handler_id(&self) -> String { self.id.clone() } } /// Cursor writer handler that periodically writes the latest time_us to a file pub struct CursorWriterHandler { id: String, cursor_path: String, last_time_us: Arc, last_write: Arc>, write_interval: Duration, } impl CursorWriterHandler { fn new(id: String, cursor_path: String) -> Self { Self { id, cursor_path, last_time_us: Arc::new(AtomicU64::new(0)), last_write: Arc::new(Mutex::new(Instant::now())), write_interval: Duration::from_secs(30), } } async fn maybe_write_cursor(&self) -> Result<()> { let current_time_us = self.last_time_us.load(Ordering::Relaxed); if current_time_us == 0 { return Ok(()); } let mut last_write = self.last_write.lock().await; if last_write.elapsed() >= self.write_interval { tokio::fs::write(&self.cursor_path, current_time_us.to_string()).await?; *last_write = Instant::now(); tracing::debug!("Wrote cursor to {}: {}", self.cursor_path, current_time_us); } Ok(()) } } #[async_trait] impl EventHandler for CursorWriterHandler { async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { // Extract time_us from any event type let time_us = match &event { JetstreamEvent::Commit { time_us, .. } => *time_us, JetstreamEvent::Delete { time_us, .. } => *time_us, JetstreamEvent::Identity { time_us, .. } => *time_us, JetstreamEvent::Account { time_us, .. } => *time_us, }; // Update the latest time_us self.last_time_us.store(time_us, Ordering::Relaxed); // Try to write the cursor periodically if let Err(err) = self.maybe_write_cursor().await { tracing::warn!("Failed to write cursor: {}", err); } Ok(()) } fn handler_id(&self) -> String { self.id.clone() } } /// Consumer for creating ATProtocol event handlers. pub struct Consumer {} impl Consumer { /// Create a new Blahg event handler that processes ATProtocol events. pub fn create_blahg_handler(&self) -> (Arc, BlahgEventReceiver) { let (sender, receiver) = mpsc::unbounded_channel(); let handler = Arc::new(BlahgEventHandler::new( "blagh-processor".to_string(), sender, )); (handler, receiver) } /// Create a new cursor writer handler that persists jetstream cursor position. pub fn create_cursor_writer_handler(&self, cursor_path: String) -> Arc { Arc::new(CursorWriterHandler::new( "cursor-writer".to_string(), cursor_path, )) } }