tangled
alpha
login
or
join now
rubberducky.guru
/
thought-stream-cli
forked from
cameron.stream/thought-stream-cli
0
fork
atom
fork to do stuff
0
fork
atom
overview
issues
pulls
pipelines
added multiple terminal sync using global timestamps
azom.dev
6 months ago
f4dc75d2
d6c9023c
+138
-77
3 changed files
expand all
collapse all
unified
split
src
client.rs
jetstream.rs
tui.rs
+20
-15
src/client.rs
···
1
use anyhow::{Context, Result};
2
use chrono::Utc;
3
-
use reqwest::{Client as HttpClient, header::{HeaderMap, HeaderValue, AUTHORIZATION}};
0
0
0
4
use serde::{Deserialize, Serialize};
5
use serde_json::Value;
6
···
62
63
pub async fn login(&mut self, credentials: &Credentials) -> Result<()> {
64
let login_url = format!("{}/xrpc/com.atproto.server.createSession", self.base_url);
65
-
66
let request = LoginRequest {
67
identifier: credentials.username.clone(),
68
password: credentials.password.clone(),
69
};
70
71
-
let response = self.http_client
0
72
.post(&login_url)
73
.header("Content-Type", "application/json")
74
.json(&request)
···
93
}
94
95
pub async fn publish_blip(&self, content: &str) -> Result<String> {
96
-
let session = self.session.as_ref()
0
0
97
.context("Not authenticated. Please run 'thought login' first.")?;
98
0
0
99
let record = BlipRecord {
100
record_type: "stream.thought.blip".to_string(),
101
content: content.to_string(),
102
-
created_at: Utc::now().to_rfc3339().replace("+00:00", "Z"),
103
};
104
105
let request = CreateRecordRequest {
106
repo: session.did.clone(),
107
collection: "stream.thought.blip".to_string(),
108
-
record: serde_json::to_value(&record)
109
-
.context("Failed to serialize blip record")?,
110
};
111
112
let create_url = format!("{}/xrpc/com.atproto.repo.createRecord", self.base_url);
113
-
114
let mut headers = HeaderMap::new();
115
headers.insert(
116
AUTHORIZATION,
117
HeaderValue::from_str(&format!("Bearer {}", session.access_jwt))
118
.context("Invalid authorization header")?,
119
);
120
-
headers.insert(
121
-
"Content-Type",
122
-
HeaderValue::from_static("application/json"),
123
-
);
124
125
-
let response = self.http_client
0
126
.post(&create_url)
127
.headers(headers)
128
.json(&request)
···
141
.await
142
.context("Failed to parse create record response")?;
143
144
-
Ok(create_response.uri)
145
}
146
147
pub fn is_authenticated(&self) -> bool {
···
151
pub fn get_user_did(&self) -> Option<String> {
152
self.session.as_ref().map(|s| s.did.clone())
153
}
154
-
}
···
1
use anyhow::{Context, Result};
2
use chrono::Utc;
3
+
use reqwest::{
4
+
header::{HeaderMap, HeaderValue, AUTHORIZATION},
5
+
Client as HttpClient,
6
+
};
7
use serde::{Deserialize, Serialize};
8
use serde_json::Value;
9
···
65
66
pub async fn login(&mut self, credentials: &Credentials) -> Result<()> {
67
let login_url = format!("{}/xrpc/com.atproto.server.createSession", self.base_url);
68
+
69
let request = LoginRequest {
70
identifier: credentials.username.clone(),
71
password: credentials.password.clone(),
72
};
73
74
+
let response = self
75
+
.http_client
76
.post(&login_url)
77
.header("Content-Type", "application/json")
78
.json(&request)
···
97
}
98
99
pub async fn publish_blip(&self, content: &str) -> Result<String> {
100
+
let session = self
101
+
.session
102
+
.as_ref()
103
.context("Not authenticated. Please run 'thought login' first.")?;
104
105
+
let timestamp = Utc::now().to_rfc3339().replace("+00:00", "Z");
106
+
107
let record = BlipRecord {
108
record_type: "stream.thought.blip".to_string(),
109
content: content.to_string(),
110
+
created_at: timestamp.clone(),
111
};
112
113
let request = CreateRecordRequest {
114
repo: session.did.clone(),
115
collection: "stream.thought.blip".to_string(),
116
+
record: serde_json::to_value(&record).context("Failed to serialize blip record")?,
0
117
};
118
119
let create_url = format!("{}/xrpc/com.atproto.repo.createRecord", self.base_url);
120
+
121
let mut headers = HeaderMap::new();
122
headers.insert(
123
AUTHORIZATION,
124
HeaderValue::from_str(&format!("Bearer {}", session.access_jwt))
125
.context("Invalid authorization header")?,
126
);
127
+
headers.insert("Content-Type", HeaderValue::from_static("application/json"));
0
0
0
128
129
+
let response = self
130
+
.http_client
131
.post(&create_url)
132
.headers(headers)
133
.json(&request)
···
146
.await
147
.context("Failed to parse create record response")?;
148
149
+
Ok(timestamp)
150
}
151
152
pub fn is_authenticated(&self) -> bool {
···
156
pub fn get_user_did(&self) -> Option<String> {
157
self.session.as_ref().map(|s| s.did.clone())
158
}
159
+
}
+66
-43
src/jetstream.rs
···
1
use anyhow::{Context, Result};
0
2
use futures_util::StreamExt;
3
use serde::{Deserialize, Serialize};
4
use std::{collections::HashMap, time::Duration};
5
use tokio::sync::mpsc;
6
use tokio_tungstenite::{
7
connect_async,
8
-
tungstenite::{
9
-
client::IntoClientRequest,
10
-
http::HeaderValue,
11
-
Message,
12
-
},
13
};
14
use url::Url;
15
···
46
47
pub struct JetstreamClient {
48
did_cache: HashMap<String, String>, // DID -> handle cache
49
-
own_did: Option<String>, // User's own DID to filter out
50
}
51
52
impl JetstreamClient {
···
57
}
58
}
59
60
-
pub async fn connect_and_listen(&mut self, message_tx: mpsc::UnboundedSender<TuiMessage>) -> Result<()> {
0
0
0
61
// Try simple connection first, then with collection filter
62
let urls = vec![
63
"wss://jetstream2.us-west.bsky.network/subscribe",
64
-
"wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip"
65
];
66
-
67
for (i, jetstream_url) in urls.iter().enumerate() {
68
// Send status to TUI instead of console
69
let status_msg = crate::tui::Message::new(
70
"system".to_string(),
71
format!("Trying connection {} of {}", i + 1, urls.len()),
72
false,
0
73
);
74
let _ = message_tx.send(status_msg);
75
-
76
loop {
77
-
match self.try_connect_and_listen(&message_tx, jetstream_url).await {
0
0
0
78
Ok(_) => {
79
return Ok(());
80
}
···
85
"system".to_string(),
86
"Connection failed, retrying in 5s...".to_string(),
87
false,
0
88
);
89
let _ = message_tx.send(retry_msg);
90
tokio::time::sleep(Duration::from_secs(5)).await;
···
96
}
97
}
98
}
99
-
100
Ok(())
101
}
102
···
108
// Parse URL and create request with headers
109
let url = Url::parse(url_str)?;
110
let mut request = url.into_client_request()?;
111
-
112
// Add User-Agent header
113
-
request.headers_mut().insert(
114
-
"User-Agent",
115
-
HeaderValue::from_static("think-cli/0.1.0")
116
-
);
117
-
118
// Connect with timeout
119
let connect_future = connect_async(request);
120
-
let (ws_stream, _response) = tokio::time::timeout(
121
-
Duration::from_secs(10),
122
-
connect_future
123
-
).await
124
.context("Connection timeout")?
125
.context("Failed to connect to jetstream")?;
126
···
129
"system".to_string(),
130
"Connected to jetstream! Listening for blips...".to_string(),
131
false,
0
132
);
133
let _ = message_tx.send(success_msg);
134
···
162
) -> Result<()> {
163
// First, check if it's even a commit event using basic JSON parsing
164
let event_value: serde_json::Value = serde_json::from_str(message)?;
165
-
166
// Only process commit events
167
if event_value.get("kind").and_then(|k| k.as_str()) != Some("commit") {
168
return Ok(());
169
}
170
-
171
// Check if it has a commit with the right collection
172
let commit = event_value.get("commit");
173
if let Some(commit_obj) = commit {
174
-
if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip") {
0
175
return Ok(());
176
}
177
-
178
// Skip delete operations
179
if commit_obj.get("operation").and_then(|o| o.as_str()) == Some("delete") {
180
return Ok(());
···
188
let commit = event.commit.as_ref().unwrap(); // Safe because we checked above
189
190
// Skip messages from our own DID
191
-
if let Some(ref own_did) = self.own_did {
192
-
if &event.did == own_did {
193
-
return Ok(());
194
-
}
195
-
}
196
197
// Parse the blip record
198
let record_data = commit.record.as_ref();
···
206
};
207
208
// Get or resolve the handle
209
-
let handle = self.resolve_did(&event.did).await;
0
0
0
0
0
0
0
0
210
211
// Create TUI message
212
let tui_message = TuiMessage::new(
213
handle,
214
blip_record.content,
215
-
false, // Not our own message
0
0
0
0
0
216
);
217
218
// Send to TUI
···
252
async fn fetch_handle_for_did(&self, did: &str) -> Result<String> {
253
// Use the ATProto API to resolve DID to handle
254
let client = reqwest::Client::new();
255
-
let url = format!("https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}", did);
256
-
0
0
0
257
#[derive(Deserialize)]
258
struct ResolveResponse {
259
handle: String,
260
}
261
262
// Try a simpler approach - resolve via profile
263
-
let profile_url = format!("https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}", did);
264
-
0
0
0
265
#[derive(Deserialize)]
266
struct ProfileResponse {
267
handle: String,
268
}
269
270
-
let response = client
271
-
.get(&profile_url)
272
-
.send()
273
-
.await?;
274
275
if response.status().is_success() {
276
let profile: ProfileResponse = response.json().await?;
···
281
}
282
}
283
284
-
pub async fn start_jetstream_listener(message_tx: mpsc::UnboundedSender<TuiMessage>, own_did: Option<String>) -> Result<()> {
0
0
0
285
let mut client = JetstreamClient::new(own_did);
286
client.connect_and_listen(message_tx).await
287
-
}
···
1
use anyhow::{Context, Result};
2
+
use chrono::{DateTime, Utc};
3
use futures_util::StreamExt;
4
use serde::{Deserialize, Serialize};
5
use std::{collections::HashMap, time::Duration};
6
use tokio::sync::mpsc;
7
use tokio_tungstenite::{
8
connect_async,
9
+
tungstenite::{client::IntoClientRequest, http::HeaderValue, Message},
0
0
0
0
10
};
11
use url::Url;
12
···
43
44
pub struct JetstreamClient {
45
did_cache: HashMap<String, String>, // DID -> handle cache
46
+
own_did: Option<String>, // User's own DID to filter out
47
}
48
49
impl JetstreamClient {
···
54
}
55
}
56
57
+
pub async fn connect_and_listen(
58
+
&mut self,
59
+
message_tx: mpsc::UnboundedSender<TuiMessage>,
60
+
) -> Result<()> {
61
// Try simple connection first, then with collection filter
62
let urls = vec![
63
"wss://jetstream2.us-west.bsky.network/subscribe",
64
+
"wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip",
65
];
66
+
67
for (i, jetstream_url) in urls.iter().enumerate() {
68
// Send status to TUI instead of console
69
let status_msg = crate::tui::Message::new(
70
"system".to_string(),
71
format!("Trying connection {} of {}", i + 1, urls.len()),
72
false,
73
+
None,
74
);
75
let _ = message_tx.send(status_msg);
76
+
77
loop {
78
+
match self
79
+
.try_connect_and_listen(&message_tx, jetstream_url)
80
+
.await
81
+
{
82
Ok(_) => {
83
return Ok(());
84
}
···
89
"system".to_string(),
90
"Connection failed, retrying in 5s...".to_string(),
91
false,
92
+
None,
93
);
94
let _ = message_tx.send(retry_msg);
95
tokio::time::sleep(Duration::from_secs(5)).await;
···
101
}
102
}
103
}
104
+
105
Ok(())
106
}
107
···
113
// Parse URL and create request with headers
114
let url = Url::parse(url_str)?;
115
let mut request = url.into_client_request()?;
116
+
117
// Add User-Agent header
118
+
request
119
+
.headers_mut()
120
+
.insert("User-Agent", HeaderValue::from_static("think-cli/0.1.0"));
121
+
0
122
// Connect with timeout
123
let connect_future = connect_async(request);
124
+
let (ws_stream, _response) = tokio::time::timeout(Duration::from_secs(10), connect_future)
125
+
.await
0
0
126
.context("Connection timeout")?
127
.context("Failed to connect to jetstream")?;
128
···
131
"system".to_string(),
132
"Connected to jetstream! Listening for blips...".to_string(),
133
false,
134
+
None,
135
);
136
let _ = message_tx.send(success_msg);
137
···
165
) -> Result<()> {
166
// First, check if it's even a commit event using basic JSON parsing
167
let event_value: serde_json::Value = serde_json::from_str(message)?;
168
+
169
// Only process commit events
170
if event_value.get("kind").and_then(|k| k.as_str()) != Some("commit") {
171
return Ok(());
172
}
173
+
174
// Check if it has a commit with the right collection
175
let commit = event_value.get("commit");
176
if let Some(commit_obj) = commit {
177
+
if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip")
178
+
{
179
return Ok(());
180
}
181
+
182
// Skip delete operations
183
if commit_obj.get("operation").and_then(|o| o.as_str()) == Some("delete") {
184
return Ok(());
···
192
let commit = event.commit.as_ref().unwrap(); // Safe because we checked above
193
194
// Skip messages from our own DID
195
+
// if let Some(ref own_did) = self.own_did {
196
+
// if &event.did == own_did {
197
+
// return Ok(());
198
+
// }
199
+
// }
200
201
// Parse the blip record
202
let record_data = commit.record.as_ref();
···
210
};
211
212
// Get or resolve the handle
213
+
let mut handle = self.resolve_did(&event.did).await;
214
+
let mut is_own = false;
215
+
216
+
if let Some(ref own_did) = self.own_did {
217
+
if &event.did == own_did {
218
+
is_own = true;
219
+
handle = String::from("you");
220
+
}
221
+
}
222
223
// Create TUI message
224
let tui_message = TuiMessage::new(
225
handle,
226
blip_record.content,
227
+
is_own, // Not our own message
228
+
Some(
229
+
DateTime::parse_from_rfc3339(&blip_record.created_at)
230
+
.unwrap()
231
+
.with_timezone(&Utc),
232
+
),
233
);
234
235
// Send to TUI
···
269
async fn fetch_handle_for_did(&self, did: &str) -> Result<String> {
270
// Use the ATProto API to resolve DID to handle
271
let client = reqwest::Client::new();
272
+
let url = format!(
273
+
"https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}",
274
+
did
275
+
);
276
+
277
#[derive(Deserialize)]
278
struct ResolveResponse {
279
handle: String,
280
}
281
282
// Try a simpler approach - resolve via profile
283
+
let profile_url = format!(
284
+
"https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}",
285
+
did
286
+
);
287
+
288
#[derive(Deserialize)]
289
struct ProfileResponse {
290
handle: String,
291
}
292
293
+
let response = client.get(&profile_url).send().await?;
0
0
0
294
295
if response.status().is_success() {
296
let profile: ProfileResponse = response.json().await?;
···
301
}
302
}
303
304
+
pub async fn start_jetstream_listener(
305
+
message_tx: mpsc::UnboundedSender<TuiMessage>,
306
+
own_did: Option<String>,
307
+
) -> Result<()> {
308
let mut client = JetstreamClient::new(own_did);
309
client.connect_and_listen(message_tx).await
310
+
}
+52
-19
src/tui.rs
···
21
22
use crate::client::AtProtoClient;
23
24
-
#[derive(Debug, Clone)]
25
pub struct Message {
26
pub handle: String,
27
pub content: String,
···
30
}
31
32
impl Message {
33
-
pub fn new(handle: String, content: String, is_own: bool) -> Self {
0
0
0
0
0
34
Self {
35
handle,
36
content,
37
-
timestamp: Utc::now(),
38
is_own,
39
}
40
}
···
71
pub fn add_message(&mut self, message: Message) {
72
self.messages.push(message);
73
self.message_count += 1;
74
-
75
// Keep only last 1000 messages
76
if self.messages.len() > 1000 {
77
self.messages.remove(0);
78
}
79
-
80
// Auto-scroll to bottom unless user is scrolling up
81
if self.scroll_offset == 0 {
82
self.scroll_offset = 0; // Stay at bottom
···
139
let vertical = Layout::default()
140
.direction(Direction::Vertical)
141
.constraints([
142
-
Constraint::Min(0), // Messages area
143
-
Constraint::Length(3), // Status area
144
-
Constraint::Length(3), // Input area
145
])
146
.split(frame.area());
147
148
// Render messages
149
let mut message_lines = Vec::new();
150
-
151
// Convert messages to styled lines in reverse chronological order (newest first)
152
for msg in self.messages.iter().rev() {
153
let style = if msg.is_own {
154
-
Style::default().fg(Color::Green).add_modifier(Modifier::BOLD)
0
0
155
} else {
156
Style::default().fg(Color::White)
157
};
158
-
159
message_lines.push(Line::from(Span::styled(msg.format_display(), style)));
160
}
161
-
162
let messages_text = Text::from(message_lines);
163
let messages_paragraph = Paragraph::new(messages_text)
164
.block(Block::default().borders(Borders::ALL).title("Messages"))
···
172
} else {
173
Style::default().fg(Color::Yellow)
174
};
175
-
176
let status_paragraph = Paragraph::new(self.status.clone())
177
.style(status_style)
178
.block(Block::default().borders(Borders::ALL).title("Status"));
179
frame.render_widget(status_paragraph, vertical[1]);
180
181
// Render input
182
-
let input_paragraph = Paragraph::new(self.input.clone())
183
-
.block(Block::default().borders(Borders::ALL).title("Input (Esc to quit)"));
0
0
0
184
frame.render_widget(input_paragraph, vertical[2]);
185
}
186
}
···
197
let mut terminal = Terminal::new(backend)?;
198
199
let mut app = TuiApp::new();
200
-
201
// Add welcome message
202
app.add_message(Message::new(
203
"system".to_string(),
204
"Welcome to Think TUI! Connecting to jetstream...".to_string(),
205
false,
0
206
));
207
208
let result = run_tui_loop(&mut terminal, &mut app, client, &mut message_rx).await;
···
234
if let Event::Key(key) = event::read()? {
235
if key.kind == KeyEventKind::Press {
236
// Handle Ctrl+C
237
-
if matches!(key.code, KeyCode::Char('c')) && key.modifiers.contains(crossterm::event::KeyModifiers::CONTROL) {
0
0
0
0
238
break;
239
}
240
···
242
if let Some(message) = app.handle_input(key.code) {
243
// Publish the message
244
match client.publish_blip(&message).await {
245
-
Ok(_) => {
246
// Add our own message to the display
247
app.add_message(Message::new(
248
"you".to_string(),
249
message,
250
true,
0
0
0
0
0
251
));
252
}
253
Err(e) => {
···
256
"error".to_string(),
257
format!("Failed to publish: {}", e),
258
false,
0
259
));
260
}
261
}
···
266
267
// Check for new messages from jetstream
268
while let Ok(message) = message_rx.try_recv() {
0
0
0
0
0
0
0
0
0
0
0
0
269
app.add_message(message);
270
app.set_connection_status(true);
271
}
···
277
}
278
279
Ok(())
280
-
}
···
21
22
use crate::client::AtProtoClient;
23
24
+
#[derive(Debug, Clone, PartialEq)]
25
pub struct Message {
26
pub handle: String,
27
pub content: String,
···
30
}
31
32
impl Message {
33
+
pub fn new(
34
+
handle: String,
35
+
content: String,
36
+
is_own: bool,
37
+
timestamp: Option<DateTime<Utc>>,
38
+
) -> Self {
39
Self {
40
handle,
41
content,
42
+
timestamp: timestamp.unwrap_or_else(Utc::now),
43
is_own,
44
}
45
}
···
76
pub fn add_message(&mut self, message: Message) {
77
self.messages.push(message);
78
self.message_count += 1;
79
+
80
// Keep only last 1000 messages
81
if self.messages.len() > 1000 {
82
self.messages.remove(0);
83
}
84
+
85
// Auto-scroll to bottom unless user is scrolling up
86
if self.scroll_offset == 0 {
87
self.scroll_offset = 0; // Stay at bottom
···
144
let vertical = Layout::default()
145
.direction(Direction::Vertical)
146
.constraints([
147
+
Constraint::Min(0), // Messages area
148
+
Constraint::Length(3), // Status area
149
+
Constraint::Length(3), // Input area
150
])
151
.split(frame.area());
152
153
// Render messages
154
let mut message_lines = Vec::new();
155
+
156
// Convert messages to styled lines in reverse chronological order (newest first)
157
for msg in self.messages.iter().rev() {
158
let style = if msg.is_own {
159
+
Style::default()
160
+
.fg(Color::Green)
161
+
.add_modifier(Modifier::BOLD)
162
} else {
163
Style::default().fg(Color::White)
164
};
165
+
166
message_lines.push(Line::from(Span::styled(msg.format_display(), style)));
167
}
168
+
169
let messages_text = Text::from(message_lines);
170
let messages_paragraph = Paragraph::new(messages_text)
171
.block(Block::default().borders(Borders::ALL).title("Messages"))
···
179
} else {
180
Style::default().fg(Color::Yellow)
181
};
182
+
183
let status_paragraph = Paragraph::new(self.status.clone())
184
.style(status_style)
185
.block(Block::default().borders(Borders::ALL).title("Status"));
186
frame.render_widget(status_paragraph, vertical[1]);
187
188
// Render input
189
+
let input_paragraph = Paragraph::new(self.input.clone()).block(
190
+
Block::default()
191
+
.borders(Borders::ALL)
192
+
.title("Input (Esc to quit)"),
193
+
);
194
frame.render_widget(input_paragraph, vertical[2]);
195
}
196
}
···
207
let mut terminal = Terminal::new(backend)?;
208
209
let mut app = TuiApp::new();
210
+
211
// Add welcome message
212
app.add_message(Message::new(
213
"system".to_string(),
214
"Welcome to Think TUI! Connecting to jetstream...".to_string(),
215
false,
216
+
None,
217
));
218
219
let result = run_tui_loop(&mut terminal, &mut app, client, &mut message_rx).await;
···
245
if let Event::Key(key) = event::read()? {
246
if key.kind == KeyEventKind::Press {
247
// Handle Ctrl+C
248
+
if matches!(key.code, KeyCode::Char('c'))
249
+
&& key
250
+
.modifiers
251
+
.contains(crossterm::event::KeyModifiers::CONTROL)
252
+
{
253
break;
254
}
255
···
257
if let Some(message) = app.handle_input(key.code) {
258
// Publish the message
259
match client.publish_blip(&message).await {
260
+
Ok(t) => {
261
// Add our own message to the display
262
app.add_message(Message::new(
263
"you".to_string(),
264
message,
265
true,
266
+
Some(
267
+
DateTime::parse_from_rfc3339(&t)
268
+
.unwrap()
269
+
.with_timezone(&Utc),
270
+
),
271
));
272
}
273
Err(e) => {
···
276
"error".to_string(),
277
format!("Failed to publish: {}", e),
278
false,
279
+
None,
280
));
281
}
282
}
···
287
288
// Check for new messages from jetstream
289
while let Ok(message) = message_rx.try_recv() {
290
+
// find most recent is_own message and see if it's already there (you posted it)
291
+
let duplicate = app
292
+
.messages
293
+
.iter()
294
+
.rev()
295
+
.find(|m| m.is_own)
296
+
.is_some_and(|m| m.timestamp == message.timestamp);
297
+
// TODO: some messages getting flagged as duplicates if it's the same content as previous separate message from you
298
+
299
+
if duplicate {
300
+
continue; // skip this while iteration
301
+
}
302
app.add_message(message);
303
app.set_connection_status(true);
304
}
···
310
}
311
312
Ok(())
313
+
}