fork to do stuff
at main 298 lines 9.5 kB view raw
1use anyhow::{Context, Result}; 2use chrono::{DateTime, Utc}; 3use futures_util::StreamExt; 4use serde::{Deserialize, Serialize}; 5use std::{collections::HashMap, time::Duration}; 6use tokio::sync::mpsc; 7use tokio_tungstenite::{ 8 connect_async, 9 tungstenite::{client::IntoClientRequest, http::HeaderValue, Message}, 10}; 11use url::Url; 12 13use crate::tui::Message as TuiMessage; 14 15#[derive(Debug, Clone, Serialize, Deserialize)] 16pub struct JetstreamEvent { 17 #[serde(rename = "kind")] 18 pub kind: String, 19 #[serde(rename = "time_us")] 20 pub time_us: i64, 21 pub did: String, 22 pub commit: Option<CommitData>, 23} 24 25#[derive(Debug, Clone, Serialize, Deserialize)] 26pub struct CommitData { 27 pub rev: String, 28 pub operation: String, 29 pub collection: String, 30 pub rkey: String, 31 pub record: Option<serde_json::Value>, 32 pub cid: String, 33} 34 35#[derive(Debug, Clone, Serialize, Deserialize)] 36pub struct BlipRecord { 37 #[serde(rename = "$type")] 38 pub record_type: String, 39 pub content: String, 40 #[serde(rename = "createdAt")] 41 pub created_at: String, 42} 43 44pub struct JetstreamClient { 45 did_cache: HashMap<String, String>, // DID -> handle cache 46 own_did: Option<String>, // User's own DID to filter out 47} 48 49impl JetstreamClient { 50 pub fn new(own_did: Option<String>) -> Self { 51 Self { 52 did_cache: HashMap::new(), 53 own_did, 54 } 55 } 56 57 pub async fn connect_and_listen( 58 &mut self, 59 message_tx: mpsc::UnboundedSender<TuiMessage>, 60 ) -> Result<()> { 61 // Try simple connection first, then with collection filter 62 let urls = vec![ 63 "wss://jetstream2.us-west.bsky.network/subscribe", 64 "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip", 65 ]; 66 67 for (i, jetstream_url) in urls.iter().enumerate() { 68 // Send status to TUI instead of console 69 let status_msg = crate::tui::Message::new( 70 "system".to_string(), 71 format!("Trying connection {} of {}", i + 1, urls.len()), 72 false, 73 None, 74 ); 75 let _ = message_tx.send(status_msg); 76 77 loop { 78 match self 79 .try_connect_and_listen(&message_tx, jetstream_url) 80 .await 81 { 82 Ok(_) => { 83 return Ok(()); 84 } 85 Err(_e) => { 86 // If this is the last URL, retry it after a delay 87 if i == urls.len() - 1 { 88 let retry_msg = crate::tui::Message::new( 89 "system".to_string(), 90 "Connection failed, retrying in 5s...".to_string(), 91 false, 92 None, 93 ); 94 let _ = message_tx.send(retry_msg); 95 tokio::time::sleep(Duration::from_secs(5)).await; 96 } else { 97 // Try the next URL 98 break; 99 } 100 } 101 } 102 } 103 } 104 105 Ok(()) 106 } 107 108 async fn try_connect_and_listen( 109 &mut self, 110 message_tx: &mpsc::UnboundedSender<TuiMessage>, 111 url_str: &str, 112 ) -> Result<()> { 113 // Parse URL and create request with headers 114 let url = Url::parse(url_str)?; 115 let mut request = url.into_client_request()?; 116 117 // Add User-Agent header 118 request 119 .headers_mut() 120 .insert("User-Agent", HeaderValue::from_static("think-cli/0.1.0")); 121 122 // Connect with timeout 123 let connect_future = connect_async(request); 124 let (ws_stream, _response) = tokio::time::timeout(Duration::from_secs(10), connect_future) 125 .await 126 .context("Connection timeout")? 127 .context("Failed to connect to jetstream")?; 128 129 // Send a connection success message to the TUI 130 let success_msg = crate::tui::Message::new( 131 "system".to_string(), 132 "Connected to jetstream! Listening for blips...".to_string(), 133 false, 134 None, 135 ); 136 let _ = message_tx.send(success_msg); 137 138 let (mut _write, mut read) = ws_stream.split(); 139 140 while let Some(msg) = read.next().await { 141 match msg { 142 Ok(Message::Text(text)) => { 143 // Silently ignore message handling errors 144 let _ = self.handle_message(&text, message_tx).await; 145 } 146 Ok(Message::Close(_)) => { 147 break; 148 } 149 Err(e) => { 150 return Err(anyhow::anyhow!("WebSocket error: {}", e)); 151 } 152 _ => { 153 // Ignore other message types (binary, ping, pong) 154 } 155 } 156 } 157 158 Ok(()) 159 } 160 161 async fn handle_message( 162 &mut self, 163 message: &str, 164 message_tx: &mpsc::UnboundedSender<TuiMessage>, 165 ) -> Result<()> { 166 // First, check if it's even a commit event using basic JSON parsing 167 let event_value: serde_json::Value = serde_json::from_str(message)?; 168 169 // Only process commit events 170 if event_value.get("kind").and_then(|k| k.as_str()) != Some("commit") { 171 return Ok(()); 172 } 173 174 // Check if it has a commit with the right collection 175 let commit = event_value.get("commit"); 176 if let Some(commit_obj) = commit { 177 if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip") 178 { 179 return Ok(()); 180 } 181 182 // Skip delete operations 183 if commit_obj.get("operation").and_then(|o| o.as_str()) == Some("delete") { 184 return Ok(()); 185 } 186 } else { 187 return Ok(()); 188 } 189 190 // Now try to parse as our structured event 191 let event: JetstreamEvent = serde_json::from_str(message)?; 192 let commit = event.commit.as_ref().unwrap(); // Safe because we checked above 193 194 // Parse the blip record 195 let record_data = commit.record.as_ref(); 196 if record_data.is_none() { 197 return Ok(()); 198 } 199 200 let blip_record: BlipRecord = match serde_json::from_value(record_data.unwrap().clone()) { 201 Ok(record) => record, 202 Err(_) => return Ok(()), // Silently skip unparseable records 203 }; 204 205 let is_own = self.own_did.as_ref().is_some_and(|own| own == &event.did); 206 let handle = if is_own { 207 "you".into() 208 } else { 209 // Get or resolve the handle 210 self.resolve_did(&event.did).await 211 }; 212 213 // Create TUI message 214 let tui_message = TuiMessage::new( 215 handle, 216 blip_record.content, 217 is_own, 218 DateTime::parse_from_rfc3339(&blip_record.created_at) 219 .map(|dt| dt.with_timezone(&Utc)) 220 .ok(), // Parse RFC3339 → UTC, None if invalid (so current timestamp instead) 221 ); 222 223 // Send to TUI 224 if let Err(_) = message_tx.send(tui_message) { 225 // Channel closed, probably shutting down 226 return Err(anyhow::anyhow!("Message channel closed")); 227 } 228 229 Ok(()) 230 } 231 232 async fn resolve_did(&mut self, did: &str) -> String { 233 // Check cache first 234 if let Some(handle) = self.did_cache.get(did) { 235 return handle.clone(); 236 } 237 238 // Try to resolve the DID to a handle 239 let handle = match self.fetch_handle_for_did(did).await { 240 Ok(h) => h, 241 Err(_) => { 242 // Fallback to showing just the DID (truncated) 243 if did.len() > 20 { 244 format!("{}...", &did[..20]) 245 } else { 246 did.to_string() 247 } 248 } 249 }; 250 251 // Cache the result 252 self.did_cache.insert(did.to_string(), handle.clone()); 253 254 handle 255 } 256 257 async fn fetch_handle_for_did(&self, did: &str) -> Result<String> { 258 // Use the ATProto API to resolve DID to handle 259 let client = reqwest::Client::new(); 260 let url = format!( 261 "https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}", 262 did 263 ); 264 265 #[derive(Deserialize)] 266 struct ResolveResponse { 267 handle: String, 268 } 269 270 // Try a simpler approach - resolve via profile 271 let profile_url = format!( 272 "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}", 273 did 274 ); 275 276 #[derive(Deserialize)] 277 struct ProfileResponse { 278 handle: String, 279 } 280 281 let response = client.get(&profile_url).send().await?; 282 283 if response.status().is_success() { 284 let profile: ProfileResponse = response.json().await?; 285 Ok(profile.handle) 286 } else { 287 Err(anyhow::anyhow!("Failed to resolve DID to handle")) 288 } 289 } 290} 291 292pub async fn start_jetstream_listener( 293 message_tx: mpsc::UnboundedSender<TuiMessage>, 294 own_did: Option<String>, 295) -> Result<()> { 296 let mut client = JetstreamClient::new(own_did); 297 client.connect_and_listen(message_tx).await 298}