fork to do stuff
1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use futures_util::StreamExt;
4use serde::{Deserialize, Serialize};
5use std::{collections::HashMap, time::Duration};
6use tokio::sync::mpsc;
7use tokio_tungstenite::{
8 connect_async,
9 tungstenite::{client::IntoClientRequest, http::HeaderValue, Message},
10};
11use url::Url;
12
13use crate::tui::Message as TuiMessage;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct JetstreamEvent {
17 #[serde(rename = "kind")]
18 pub kind: String,
19 #[serde(rename = "time_us")]
20 pub time_us: i64,
21 pub did: String,
22 pub commit: Option<CommitData>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct CommitData {
27 pub rev: String,
28 pub operation: String,
29 pub collection: String,
30 pub rkey: String,
31 pub record: Option<serde_json::Value>,
32 pub cid: String,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct BlipRecord {
37 #[serde(rename = "$type")]
38 pub record_type: String,
39 pub content: String,
40 #[serde(rename = "createdAt")]
41 pub created_at: String,
42}
43
44pub struct JetstreamClient {
45 did_cache: HashMap<String, String>, // DID -> handle cache
46 own_did: Option<String>, // User's own DID to filter out
47}
48
49impl JetstreamClient {
50 pub fn new(own_did: Option<String>) -> Self {
51 Self {
52 did_cache: HashMap::new(),
53 own_did,
54 }
55 }
56
57 pub async fn connect_and_listen(
58 &mut self,
59 message_tx: mpsc::UnboundedSender<TuiMessage>,
60 ) -> Result<()> {
61 // Try simple connection first, then with collection filter
62 let urls = vec![
63 "wss://jetstream2.us-west.bsky.network/subscribe",
64 "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip",
65 ];
66
67 for (i, jetstream_url) in urls.iter().enumerate() {
68 // Send status to TUI instead of console
69 let status_msg = crate::tui::Message::new(
70 "system".to_string(),
71 format!("Trying connection {} of {}", i + 1, urls.len()),
72 false,
73 None,
74 );
75 let _ = message_tx.send(status_msg);
76
77 loop {
78 match self
79 .try_connect_and_listen(&message_tx, jetstream_url)
80 .await
81 {
82 Ok(_) => {
83 return Ok(());
84 }
85 Err(_e) => {
86 // If this is the last URL, retry it after a delay
87 if i == urls.len() - 1 {
88 let retry_msg = crate::tui::Message::new(
89 "system".to_string(),
90 "Connection failed, retrying in 5s...".to_string(),
91 false,
92 None,
93 );
94 let _ = message_tx.send(retry_msg);
95 tokio::time::sleep(Duration::from_secs(5)).await;
96 } else {
97 // Try the next URL
98 break;
99 }
100 }
101 }
102 }
103 }
104
105 Ok(())
106 }
107
108 async fn try_connect_and_listen(
109 &mut self,
110 message_tx: &mpsc::UnboundedSender<TuiMessage>,
111 url_str: &str,
112 ) -> Result<()> {
113 // Parse URL and create request with headers
114 let url = Url::parse(url_str)?;
115 let mut request = url.into_client_request()?;
116
117 // Add User-Agent header
118 request
119 .headers_mut()
120 .insert("User-Agent", HeaderValue::from_static("think-cli/0.1.0"));
121
122 // Connect with timeout
123 let connect_future = connect_async(request);
124 let (ws_stream, _response) = tokio::time::timeout(Duration::from_secs(10), connect_future)
125 .await
126 .context("Connection timeout")?
127 .context("Failed to connect to jetstream")?;
128
129 // Send a connection success message to the TUI
130 let success_msg = crate::tui::Message::new(
131 "system".to_string(),
132 "Connected to jetstream! Listening for blips...".to_string(),
133 false,
134 None,
135 );
136 let _ = message_tx.send(success_msg);
137
138 let (mut _write, mut read) = ws_stream.split();
139
140 while let Some(msg) = read.next().await {
141 match msg {
142 Ok(Message::Text(text)) => {
143 // Silently ignore message handling errors
144 let _ = self.handle_message(&text, message_tx).await;
145 }
146 Ok(Message::Close(_)) => {
147 break;
148 }
149 Err(e) => {
150 return Err(anyhow::anyhow!("WebSocket error: {}", e));
151 }
152 _ => {
153 // Ignore other message types (binary, ping, pong)
154 }
155 }
156 }
157
158 Ok(())
159 }
160
161 async fn handle_message(
162 &mut self,
163 message: &str,
164 message_tx: &mpsc::UnboundedSender<TuiMessage>,
165 ) -> Result<()> {
166 // First, check if it's even a commit event using basic JSON parsing
167 let event_value: serde_json::Value = serde_json::from_str(message)?;
168
169 // Only process commit events
170 if event_value.get("kind").and_then(|k| k.as_str()) != Some("commit") {
171 return Ok(());
172 }
173
174 // Check if it has a commit with the right collection
175 let commit = event_value.get("commit");
176 if let Some(commit_obj) = commit {
177 if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip")
178 {
179 return Ok(());
180 }
181
182 // Skip delete operations
183 if commit_obj.get("operation").and_then(|o| o.as_str()) == Some("delete") {
184 return Ok(());
185 }
186 } else {
187 return Ok(());
188 }
189
190 // Now try to parse as our structured event
191 let event: JetstreamEvent = serde_json::from_str(message)?;
192 let commit = event.commit.as_ref().unwrap(); // Safe because we checked above
193
194 // Parse the blip record
195 let record_data = commit.record.as_ref();
196 if record_data.is_none() {
197 return Ok(());
198 }
199
200 let blip_record: BlipRecord = match serde_json::from_value(record_data.unwrap().clone()) {
201 Ok(record) => record,
202 Err(_) => return Ok(()), // Silently skip unparseable records
203 };
204
205 let is_own = self.own_did.as_ref().is_some_and(|own| own == &event.did);
206 let handle = if is_own {
207 "you".into()
208 } else {
209 // Get or resolve the handle
210 self.resolve_did(&event.did).await
211 };
212
213 // Create TUI message
214 let tui_message = TuiMessage::new(
215 handle,
216 blip_record.content,
217 is_own,
218 DateTime::parse_from_rfc3339(&blip_record.created_at)
219 .map(|dt| dt.with_timezone(&Utc))
220 .ok(), // Parse RFC3339 → UTC, None if invalid (so current timestamp instead)
221 );
222
223 // Send to TUI
224 if let Err(_) = message_tx.send(tui_message) {
225 // Channel closed, probably shutting down
226 return Err(anyhow::anyhow!("Message channel closed"));
227 }
228
229 Ok(())
230 }
231
232 async fn resolve_did(&mut self, did: &str) -> String {
233 // Check cache first
234 if let Some(handle) = self.did_cache.get(did) {
235 return handle.clone();
236 }
237
238 // Try to resolve the DID to a handle
239 let handle = match self.fetch_handle_for_did(did).await {
240 Ok(h) => h,
241 Err(_) => {
242 // Fallback to showing just the DID (truncated)
243 if did.len() > 20 {
244 format!("{}...", &did[..20])
245 } else {
246 did.to_string()
247 }
248 }
249 };
250
251 // Cache the result
252 self.did_cache.insert(did.to_string(), handle.clone());
253
254 handle
255 }
256
257 async fn fetch_handle_for_did(&self, did: &str) -> Result<String> {
258 // Use the ATProto API to resolve DID to handle
259 let client = reqwest::Client::new();
260 let url = format!(
261 "https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}",
262 did
263 );
264
265 #[derive(Deserialize)]
266 struct ResolveResponse {
267 handle: String,
268 }
269
270 // Try a simpler approach - resolve via profile
271 let profile_url = format!(
272 "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}",
273 did
274 );
275
276 #[derive(Deserialize)]
277 struct ProfileResponse {
278 handle: String,
279 }
280
281 let response = client.get(&profile_url).send().await?;
282
283 if response.status().is_success() {
284 let profile: ProfileResponse = response.json().await?;
285 Ok(profile.handle)
286 } else {
287 Err(anyhow::anyhow!("Failed to resolve DID to handle"))
288 }
289 }
290}
291
292pub async fn start_jetstream_listener(
293 message_tx: mpsc::UnboundedSender<TuiMessage>,
294 own_did: Option<String>,
295) -> Result<()> {
296 let mut client = JetstreamClient::new(own_did);
297 client.connect_and_listen(message_tx).await
298}