An ATProtocol powered blogging engine.
at main 192 lines 6.3 kB view raw
1use std::sync::Arc; 2use std::sync::atomic::{AtomicU64, Ordering}; 3use std::time::{Duration, Instant}; 4 5use crate::errors::BlahgError; 6use anyhow::Result; 7use async_trait::async_trait; 8use atproto_jetstream::{EventHandler, JetstreamEvent}; 9use tokio::sync::Mutex; 10use tokio::sync::mpsc; 11 12/// Receiver for `BlahgEvent` instances from the jetstream consumer. 13pub type BlahgEventReceiver = mpsc::UnboundedReceiver<BlahgEvent>; 14 15/// ATProtocol events relevant to the blog application. 16#[derive(Debug, Clone)] 17pub enum BlahgEvent { 18 /// A record was committed to the ATProtocol network. 19 Commit { 20 /// The DID of the record author 21 did: String, 22 /// The collection name 23 collection: String, 24 /// The record key 25 rkey: String, 26 /// The content identifier 27 cid: String, 28 /// The record data 29 record: serde_json::Value, 30 }, 31 /// A record was deleted from the ATProtocol network. 32 Delete { 33 /// The DID of the record author 34 did: String, 35 /// The collection name 36 collection: String, 37 /// The record key 38 rkey: String, 39 }, 40} 41 42/// Handler for processing ATProtocol events and converting them to `BlahgEvent` instances. 43pub struct BlahgEventHandler { 44 id: String, 45 event_sender: mpsc::UnboundedSender<BlahgEvent>, 46} 47 48impl BlahgEventHandler { 49 fn new(id: String, event_sender: mpsc::UnboundedSender<BlahgEvent>) -> Self { 50 Self { id, event_sender } 51 } 52} 53 54#[async_trait] 55impl EventHandler for BlahgEventHandler { 56 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { 57 let award_event = match event { 58 JetstreamEvent::Commit { did, commit, .. } => { 59 // Filter for supported collection types 60 match commit.collection.as_str() { 61 "tools.smokesignal.blahg.content.post" 62 | "app.bsky.feed.post" 63 | "app.bsky.feed.like" 64 | "community.lexicon.interaction.like" => BlahgEvent::Commit { 65 did, 66 collection: commit.collection, 67 rkey: commit.rkey, 68 cid: commit.cid, 69 record: commit.record, 70 }, 71 _ => return Ok(()), 72 } 73 } 74 JetstreamEvent::Delete { did, commit, .. } => { 75 // Filter for supported collection types 76 match commit.collection.as_str() { 77 "tools.smokesignal.blahg.content.post" 78 | "app.bsky.feed.post" 79 | "app.bsky.feed.like" 80 | "community.lexicon.interaction.like" => BlahgEvent::Delete { 81 did, 82 collection: commit.collection, 83 rkey: commit.rkey, 84 }, 85 _ => return Ok(()), 86 } 87 } 88 JetstreamEvent::Identity { .. } | JetstreamEvent::Account { .. } => { 89 return Ok(()); 90 } 91 }; 92 93 if let Err(err) = self.event_sender.send(award_event) { 94 let blahg_error = BlahgError::ConsumerQueueSendFailed { 95 details: err.to_string(), 96 }; 97 tracing::error!(?blahg_error); 98 } 99 100 Ok(()) 101 } 102 103 fn handler_id(&self) -> String { 104 self.id.clone() 105 } 106} 107 108/// Cursor writer handler that periodically writes the latest time_us to a file 109pub struct CursorWriterHandler { 110 id: String, 111 cursor_path: String, 112 last_time_us: Arc<AtomicU64>, 113 last_write: Arc<Mutex<Instant>>, 114 write_interval: Duration, 115} 116 117impl CursorWriterHandler { 118 fn new(id: String, cursor_path: String) -> Self { 119 Self { 120 id, 121 cursor_path, 122 last_time_us: Arc::new(AtomicU64::new(0)), 123 last_write: Arc::new(Mutex::new(Instant::now())), 124 write_interval: Duration::from_secs(30), 125 } 126 } 127 128 async fn maybe_write_cursor(&self) -> Result<()> { 129 let current_time_us = self.last_time_us.load(Ordering::Relaxed); 130 if current_time_us == 0 { 131 return Ok(()); 132 } 133 134 let mut last_write = self.last_write.lock().await; 135 if last_write.elapsed() >= self.write_interval { 136 tokio::fs::write(&self.cursor_path, current_time_us.to_string()).await?; 137 *last_write = Instant::now(); 138 tracing::debug!("Wrote cursor to {}: {}", self.cursor_path, current_time_us); 139 } 140 Ok(()) 141 } 142} 143 144#[async_trait] 145impl EventHandler for CursorWriterHandler { 146 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { 147 // Extract time_us from any event type 148 let time_us = match &event { 149 JetstreamEvent::Commit { time_us, .. } => *time_us, 150 JetstreamEvent::Delete { time_us, .. } => *time_us, 151 JetstreamEvent::Identity { time_us, .. } => *time_us, 152 JetstreamEvent::Account { time_us, .. } => *time_us, 153 }; 154 155 // Update the latest time_us 156 self.last_time_us.store(time_us, Ordering::Relaxed); 157 158 // Try to write the cursor periodically 159 if let Err(err) = self.maybe_write_cursor().await { 160 tracing::warn!("Failed to write cursor: {}", err); 161 } 162 163 Ok(()) 164 } 165 166 fn handler_id(&self) -> String { 167 self.id.clone() 168 } 169} 170 171/// Consumer for creating ATProtocol event handlers. 172pub struct Consumer {} 173 174impl Consumer { 175 /// Create a new Blahg event handler that processes ATProtocol events. 176 pub fn create_blahg_handler(&self) -> (Arc<BlahgEventHandler>, BlahgEventReceiver) { 177 let (sender, receiver) = mpsc::unbounded_channel(); 178 let handler = Arc::new(BlahgEventHandler::new( 179 "blagh-processor".to_string(), 180 sender, 181 )); 182 (handler, receiver) 183 } 184 185 /// Create a new cursor writer handler that persists jetstream cursor position. 186 pub fn create_cursor_writer_handler(&self, cursor_path: String) -> Arc<CursorWriterHandler> { 187 Arc::new(CursorWriterHandler::new( 188 "cursor-writer".to_string(), 189 cursor_path, 190 )) 191 } 192}