An ATProtocol powered blogging engine.
1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::{Duration, Instant};
4
5use crate::errors::BlahgError;
6use anyhow::Result;
7use async_trait::async_trait;
8use atproto_jetstream::{EventHandler, JetstreamEvent};
9use tokio::sync::Mutex;
10use tokio::sync::mpsc;
11
12/// Receiver for `BlahgEvent` instances from the jetstream consumer.
13pub type BlahgEventReceiver = mpsc::UnboundedReceiver<BlahgEvent>;
14
15/// ATProtocol events relevant to the blog application.
16#[derive(Debug, Clone)]
17pub enum BlahgEvent {
18 /// A record was committed to the ATProtocol network.
19 Commit {
20 /// The DID of the record author
21 did: String,
22 /// The collection name
23 collection: String,
24 /// The record key
25 rkey: String,
26 /// The content identifier
27 cid: String,
28 /// The record data
29 record: serde_json::Value,
30 },
31 /// A record was deleted from the ATProtocol network.
32 Delete {
33 /// The DID of the record author
34 did: String,
35 /// The collection name
36 collection: String,
37 /// The record key
38 rkey: String,
39 },
40}
41
42/// Handler for processing ATProtocol events and converting them to `BlahgEvent` instances.
43pub struct BlahgEventHandler {
44 id: String,
45 event_sender: mpsc::UnboundedSender<BlahgEvent>,
46}
47
48impl BlahgEventHandler {
49 fn new(id: String, event_sender: mpsc::UnboundedSender<BlahgEvent>) -> Self {
50 Self { id, event_sender }
51 }
52}
53
54#[async_trait]
55impl EventHandler for BlahgEventHandler {
56 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> {
57 let award_event = match event {
58 JetstreamEvent::Commit { did, commit, .. } => {
59 // Filter for supported collection types
60 match commit.collection.as_str() {
61 "tools.smokesignal.blahg.content.post"
62 | "app.bsky.feed.post"
63 | "app.bsky.feed.like"
64 | "community.lexicon.interaction.like" => BlahgEvent::Commit {
65 did,
66 collection: commit.collection,
67 rkey: commit.rkey,
68 cid: commit.cid,
69 record: commit.record,
70 },
71 _ => return Ok(()),
72 }
73 }
74 JetstreamEvent::Delete { did, commit, .. } => {
75 // Filter for supported collection types
76 match commit.collection.as_str() {
77 "tools.smokesignal.blahg.content.post"
78 | "app.bsky.feed.post"
79 | "app.bsky.feed.like"
80 | "community.lexicon.interaction.like" => BlahgEvent::Delete {
81 did,
82 collection: commit.collection,
83 rkey: commit.rkey,
84 },
85 _ => return Ok(()),
86 }
87 }
88 JetstreamEvent::Identity { .. } | JetstreamEvent::Account { .. } => {
89 return Ok(());
90 }
91 };
92
93 if let Err(err) = self.event_sender.send(award_event) {
94 let blahg_error = BlahgError::ConsumerQueueSendFailed {
95 details: err.to_string(),
96 };
97 tracing::error!(?blahg_error);
98 }
99
100 Ok(())
101 }
102
103 fn handler_id(&self) -> String {
104 self.id.clone()
105 }
106}
107
108/// Cursor writer handler that periodically writes the latest time_us to a file
109pub struct CursorWriterHandler {
110 id: String,
111 cursor_path: String,
112 last_time_us: Arc<AtomicU64>,
113 last_write: Arc<Mutex<Instant>>,
114 write_interval: Duration,
115}
116
117impl CursorWriterHandler {
118 fn new(id: String, cursor_path: String) -> Self {
119 Self {
120 id,
121 cursor_path,
122 last_time_us: Arc::new(AtomicU64::new(0)),
123 last_write: Arc::new(Mutex::new(Instant::now())),
124 write_interval: Duration::from_secs(30),
125 }
126 }
127
128 async fn maybe_write_cursor(&self) -> Result<()> {
129 let current_time_us = self.last_time_us.load(Ordering::Relaxed);
130 if current_time_us == 0 {
131 return Ok(());
132 }
133
134 let mut last_write = self.last_write.lock().await;
135 if last_write.elapsed() >= self.write_interval {
136 tokio::fs::write(&self.cursor_path, current_time_us.to_string()).await?;
137 *last_write = Instant::now();
138 tracing::debug!("Wrote cursor to {}: {}", self.cursor_path, current_time_us);
139 }
140 Ok(())
141 }
142}
143
144#[async_trait]
145impl EventHandler for CursorWriterHandler {
146 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> {
147 // Extract time_us from any event type
148 let time_us = match &event {
149 JetstreamEvent::Commit { time_us, .. } => *time_us,
150 JetstreamEvent::Delete { time_us, .. } => *time_us,
151 JetstreamEvent::Identity { time_us, .. } => *time_us,
152 JetstreamEvent::Account { time_us, .. } => *time_us,
153 };
154
155 // Update the latest time_us
156 self.last_time_us.store(time_us, Ordering::Relaxed);
157
158 // Try to write the cursor periodically
159 if let Err(err) = self.maybe_write_cursor().await {
160 tracing::warn!("Failed to write cursor: {}", err);
161 }
162
163 Ok(())
164 }
165
166 fn handler_id(&self) -> String {
167 self.id.clone()
168 }
169}
170
171/// Consumer for creating ATProtocol event handlers.
172pub struct Consumer {}
173
174impl Consumer {
175 /// Create a new Blahg event handler that processes ATProtocol events.
176 pub fn create_blahg_handler(&self) -> (Arc<BlahgEventHandler>, BlahgEventReceiver) {
177 let (sender, receiver) = mpsc::unbounded_channel();
178 let handler = Arc::new(BlahgEventHandler::new(
179 "blagh-processor".to_string(),
180 sender,
181 ));
182 (handler, receiver)
183 }
184
185 /// Create a new cursor writer handler that persists jetstream cursor position.
186 pub fn create_cursor_writer_handler(&self, cursor_path: String) -> Arc<CursorWriterHandler> {
187 Arc::new(CursorWriterHandler::new(
188 "cursor-writer".to_string(),
189 cursor_path,
190 ))
191 }
192}