Noreposts Feed
1use anyhow::Result;
2use chrono::{DateTime, Utc};
3use futures::StreamExt;
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio_tungstenite::tungstenite::Message;
8use tracing::{error, info, warn};
9
10use crate::{
11 database::Database,
12 types::{Follow, Post},
13};
14
15pub struct JetstreamEventHandler {
16 db: Arc<Database>,
17}
18
19impl JetstreamEventHandler {
20 pub fn new(db: Arc<Database>) -> Self {
21 Self { db }
22 }
23
24 pub async fn start(&self, jetstream_hostname: String) -> Result<()> {
25 let wanted_collections =
26 "wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.graph.follow";
27 let ws_url = format!(
28 "wss://{}/subscribe?{}",
29 jetstream_hostname, wanted_collections
30 );
31
32 info!("Connecting to Jetstream at {}", ws_url);
33
34 loop {
35 match tokio_tungstenite::connect_async(&ws_url).await {
36 Ok((mut socket, _response)) => {
37 info!("Connected to Jetstream successfully");
38
39 while let Some(msg) = socket.next().await {
40 match msg {
41 Ok(Message::Text(text)) => {
42 if let Err(e) = self.handle_message(&text).await {
43 error!("Error handling message: {}", e);
44 }
45 }
46 Ok(Message::Close(_)) => {
47 warn!("Jetstream connection closed");
48 break;
49 }
50 Err(e) => {
51 error!("WebSocket error: {}", e);
52 break;
53 }
54 _ => {}
55 }
56 }
57 }
58 Err(e) => {
59 error!(
60 "Failed to connect to Jetstream: {}. Reconnecting in 5 seconds...",
61 e
62 );
63 }
64 }
65
66 tokio::time::sleep(Duration::from_secs(5)).await;
67 }
68 }
69
70 async fn handle_message(&self, message: &str) -> Result<()> {
71 let event: JetstreamEvent = serde_json::from_str(message)?;
72
73 match event {
74 JetstreamEvent::Commit { did, commit, .. } => {
75 info!(
76 "Received commit event: did={}, collection={}, operation={}",
77 did, commit.collection, commit.operation
78 );
79
80 match commit.collection.as_str() {
81 "app.bsky.feed.post" => {
82 self.handle_post_event(&did, &commit).await?;
83 }
84 "app.bsky.graph.follow" => {
85 self.handle_follow_event(&did, &commit).await?;
86 }
87 _ => {}
88 }
89 }
90 JetstreamEvent::Account { did, .. } => {
91 info!("Received account event: did={}", did);
92 }
93 JetstreamEvent::Identity { did, .. } => {
94 info!("Received identity event: did={}", did);
95 }
96 }
97
98 Ok(())
99 }
100
101 async fn handle_post_event(&self, did: &str, commit: &JetstreamCommit) -> Result<()> {
102 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey);
103
104 match commit.operation.as_str() {
105 "create" => {
106 if let Some(record) = &commit.record {
107 // Check if this is a repost by looking for a "subject" field
108 if record.get("subject").is_some() {
109 // This is a repost, skip it
110 return Ok(());
111 }
112
113 let text = record
114 .get("text")
115 .and_then(|v| v.as_str())
116 .unwrap_or("")
117 .to_string();
118
119 let created_at_str = record
120 .get("createdAt")
121 .and_then(|v| v.as_str())
122 .unwrap_or("");
123
124 let created_at = DateTime::parse_from_rfc3339(created_at_str)
125 .unwrap_or_else(|_| Utc::now().into())
126 .with_timezone(&Utc);
127
128 let cid = commit.cid.as_ref().unwrap_or(&String::new()).clone();
129
130 let post = Post {
131 uri: uri.clone(),
132 cid,
133 author_did: did.to_string(),
134 text: text.clone(),
135 created_at,
136 indexed_at: Utc::now(),
137 };
138
139 if let Err(e) = self.db.insert_post(&post).await {
140 error!("Failed to insert post: {}", e);
141 } else {
142 info!("Inserted post: {} by {}", uri, did);
143 }
144 }
145 }
146 "delete" => {
147 if let Err(e) = self.db.delete_post(&uri).await {
148 error!("Failed to delete post: {}", e);
149 } else {
150 info!("Deleted post: {}", uri);
151 }
152 }
153 _ => {} // Ignore updates
154 }
155
156 Ok(())
157 }
158
159 async fn handle_follow_event(&self, did: &str, commit: &JetstreamCommit) -> Result<()> {
160 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey);
161
162 match commit.operation.as_str() {
163 "create" => {
164 if let Some(record) = &commit.record {
165 let target_did = record
166 .get("subject")
167 .and_then(|v| v.as_str())
168 .unwrap_or("")
169 .to_string();
170
171 let created_at_str = record
172 .get("createdAt")
173 .and_then(|v| v.as_str())
174 .unwrap_or("");
175
176 let created_at = DateTime::parse_from_rfc3339(created_at_str)
177 .unwrap_or_else(|_| Utc::now().into())
178 .with_timezone(&Utc);
179
180 let follow = Follow {
181 uri: uri.clone(),
182 follower_did: did.to_string(),
183 target_did: target_did.clone(),
184 created_at,
185 indexed_at: Utc::now(),
186 };
187
188 if let Err(e) = self.db.insert_follow(&follow).await {
189 error!("Failed to insert follow: {}", e);
190 } else {
191 info!("Inserted follow: {} -> {}", did, target_did);
192 }
193 }
194 }
195 "delete" => {
196 if let Err(e) = self.db.delete_follow(&uri).await {
197 error!("Failed to delete follow: {}", e);
198 } else {
199 info!("Deleted follow: {}", uri);
200 }
201 }
202 _ => {} // Ignore updates
203 }
204
205 Ok(())
206 }
207}
208
209impl Clone for JetstreamEventHandler {
210 fn clone(&self) -> Self {
211 Self {
212 db: Arc::clone(&self.db),
213 }
214 }
215}
216
217#[derive(Debug, Deserialize, Serialize)]
218#[serde(tag = "kind")]
219enum JetstreamEvent {
220 #[serde(rename = "commit")]
221 Commit {
222 did: String,
223 time_us: i64,
224 commit: JetstreamCommit,
225 },
226 #[serde(rename = "account")]
227 Account {
228 did: String,
229 time_us: i64,
230 account: serde_json::Value,
231 },
232 #[serde(rename = "identity")]
233 Identity {
234 did: String,
235 time_us: i64,
236 identity: serde_json::Value,
237 },
238}
239
240#[derive(Debug, Deserialize, Serialize)]
241struct JetstreamCommit {
242 rev: String,
243 operation: String,
244 collection: String,
245 rkey: String,
246 #[serde(skip_serializing_if = "Option::is_none")]
247 record: Option<serde_json::Value>,
248 #[serde(skip_serializing_if = "Option::is_none")]
249 cid: Option<String>,
250}