Noreposts Feed
at main 250 lines 8.2 kB view raw
1use anyhow::Result; 2use chrono::{DateTime, Utc}; 3use futures::StreamExt; 4use serde::{Deserialize, Serialize}; 5use std::sync::Arc; 6use std::time::Duration; 7use tokio_tungstenite::tungstenite::Message; 8use tracing::{error, info, warn}; 9 10use crate::{ 11 database::Database, 12 types::{Follow, Post}, 13}; 14 15pub struct JetstreamEventHandler { 16 db: Arc<Database>, 17} 18 19impl JetstreamEventHandler { 20 pub fn new(db: Arc<Database>) -> Self { 21 Self { db } 22 } 23 24 pub async fn start(&self, jetstream_hostname: String) -> Result<()> { 25 let wanted_collections = 26 "wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.graph.follow"; 27 let ws_url = format!( 28 "wss://{}/subscribe?{}", 29 jetstream_hostname, wanted_collections 30 ); 31 32 info!("Connecting to Jetstream at {}", ws_url); 33 34 loop { 35 match tokio_tungstenite::connect_async(&ws_url).await { 36 Ok((mut socket, _response)) => { 37 info!("Connected to Jetstream successfully"); 38 39 while let Some(msg) = socket.next().await { 40 match msg { 41 Ok(Message::Text(text)) => { 42 if let Err(e) = self.handle_message(&text).await { 43 error!("Error handling message: {}", e); 44 } 45 } 46 Ok(Message::Close(_)) => { 47 warn!("Jetstream connection closed"); 48 break; 49 } 50 Err(e) => { 51 error!("WebSocket error: {}", e); 52 break; 53 } 54 _ => {} 55 } 56 } 57 } 58 Err(e) => { 59 error!( 60 "Failed to connect to Jetstream: {}. Reconnecting in 5 seconds...", 61 e 62 ); 63 } 64 } 65 66 tokio::time::sleep(Duration::from_secs(5)).await; 67 } 68 } 69 70 async fn handle_message(&self, message: &str) -> Result<()> { 71 let event: JetstreamEvent = serde_json::from_str(message)?; 72 73 match event { 74 JetstreamEvent::Commit { did, commit, .. } => { 75 info!( 76 "Received commit event: did={}, collection={}, operation={}", 77 did, commit.collection, commit.operation 78 ); 79 80 match commit.collection.as_str() { 81 "app.bsky.feed.post" => { 82 self.handle_post_event(&did, &commit).await?; 83 } 84 "app.bsky.graph.follow" => { 85 self.handle_follow_event(&did, &commit).await?; 86 } 87 _ => {} 88 } 89 } 90 JetstreamEvent::Account { did, .. } => { 91 info!("Received account event: did={}", did); 92 } 93 JetstreamEvent::Identity { did, .. } => { 94 info!("Received identity event: did={}", did); 95 } 96 } 97 98 Ok(()) 99 } 100 101 async fn handle_post_event(&self, did: &str, commit: &JetstreamCommit) -> Result<()> { 102 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 103 104 match commit.operation.as_str() { 105 "create" => { 106 if let Some(record) = &commit.record { 107 // Check if this is a repost by looking for a "subject" field 108 if record.get("subject").is_some() { 109 // This is a repost, skip it 110 return Ok(()); 111 } 112 113 let text = record 114 .get("text") 115 .and_then(|v| v.as_str()) 116 .unwrap_or("") 117 .to_string(); 118 119 let created_at_str = record 120 .get("createdAt") 121 .and_then(|v| v.as_str()) 122 .unwrap_or(""); 123 124 let created_at = DateTime::parse_from_rfc3339(created_at_str) 125 .unwrap_or_else(|_| Utc::now().into()) 126 .with_timezone(&Utc); 127 128 let cid = commit.cid.as_ref().unwrap_or(&String::new()).clone(); 129 130 let post = Post { 131 uri: uri.clone(), 132 cid, 133 author_did: did.to_string(), 134 text: text.clone(), 135 created_at, 136 indexed_at: Utc::now(), 137 }; 138 139 if let Err(e) = self.db.insert_post(&post).await { 140 error!("Failed to insert post: {}", e); 141 } else { 142 info!("Inserted post: {} by {}", uri, did); 143 } 144 } 145 } 146 "delete" => { 147 if let Err(e) = self.db.delete_post(&uri).await { 148 error!("Failed to delete post: {}", e); 149 } else { 150 info!("Deleted post: {}", uri); 151 } 152 } 153 _ => {} // Ignore updates 154 } 155 156 Ok(()) 157 } 158 159 async fn handle_follow_event(&self, did: &str, commit: &JetstreamCommit) -> Result<()> { 160 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 161 162 match commit.operation.as_str() { 163 "create" => { 164 if let Some(record) = &commit.record { 165 let target_did = record 166 .get("subject") 167 .and_then(|v| v.as_str()) 168 .unwrap_or("") 169 .to_string(); 170 171 let created_at_str = record 172 .get("createdAt") 173 .and_then(|v| v.as_str()) 174 .unwrap_or(""); 175 176 let created_at = DateTime::parse_from_rfc3339(created_at_str) 177 .unwrap_or_else(|_| Utc::now().into()) 178 .with_timezone(&Utc); 179 180 let follow = Follow { 181 uri: uri.clone(), 182 follower_did: did.to_string(), 183 target_did: target_did.clone(), 184 created_at, 185 indexed_at: Utc::now(), 186 }; 187 188 if let Err(e) = self.db.insert_follow(&follow).await { 189 error!("Failed to insert follow: {}", e); 190 } else { 191 info!("Inserted follow: {} -> {}", did, target_did); 192 } 193 } 194 } 195 "delete" => { 196 if let Err(e) = self.db.delete_follow(&uri).await { 197 error!("Failed to delete follow: {}", e); 198 } else { 199 info!("Deleted follow: {}", uri); 200 } 201 } 202 _ => {} // Ignore updates 203 } 204 205 Ok(()) 206 } 207} 208 209impl Clone for JetstreamEventHandler { 210 fn clone(&self) -> Self { 211 Self { 212 db: Arc::clone(&self.db), 213 } 214 } 215} 216 217#[derive(Debug, Deserialize, Serialize)] 218#[serde(tag = "kind")] 219enum JetstreamEvent { 220 #[serde(rename = "commit")] 221 Commit { 222 did: String, 223 time_us: i64, 224 commit: JetstreamCommit, 225 }, 226 #[serde(rename = "account")] 227 Account { 228 did: String, 229 time_us: i64, 230 account: serde_json::Value, 231 }, 232 #[serde(rename = "identity")] 233 Identity { 234 did: String, 235 time_us: i64, 236 identity: serde_json::Value, 237 }, 238} 239 240#[derive(Debug, Deserialize, Serialize)] 241struct JetstreamCommit { 242 rev: String, 243 operation: String, 244 collection: String, 245 rkey: String, 246 #[serde(skip_serializing_if = "Option::is_none")] 247 record: Option<serde_json::Value>, 248 #[serde(skip_serializing_if = "Option::is_none")] 249 cid: Option<String>, 250}