fork to do stuff

Initial commit: Think CLI for stream.thought.blip

- Terminal UI for real-time blip streaming via ATProto jetstream
- Interactive posting and viewing with TUI interface
- Secure credential storage for Bluesky authentication
- Dual entry points: 'think' CLI and 'thought stream' TUI
- Handle resolution and duplicate message filtering
- Built with Rust, ratatui, and tokio-tungstenite

+1339
+47
.gitignore
··· 1 + # Generated by Cargo 2 + # will have compiled files and executables 3 + debug/ 4 + target/ 5 + 6 + # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries 7 + # More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html 8 + Cargo.lock 9 + 10 + # These are backup files generated by rustfmt 11 + **/*.rs.bk 12 + 13 + # MSVC Windows builds of rustc generate these, which store debugging information 14 + *.pdb 15 + 16 + # IDE files 17 + .vscode/ 18 + .idea/ 19 + *.swp 20 + *.swo 21 + 22 + # OS generated files 23 + .DS_Store 24 + .DS_Store? 25 + ._* 26 + .Spotlight-V100 27 + .Trashes 28 + ehthumbs.db 29 + Thumbs.db 30 + 31 + # Logs 32 + *.log 33 + 34 + # Runtime test harness files 35 + testresults/ 36 + 37 + # Coverage reports 38 + tarpaulin-report.html 39 + lcov.info 40 + coverage/ 41 + 42 + # Profiling data 43 + *.profraw 44 + 45 + # Temporary files 46 + *~ 47 + *.tmp
+29
Cargo.toml
··· 1 + [package] 2 + name = "think" 3 + version = "0.1.0" 4 + edition = "2021" 5 + description = "CLI tool for publishing stream.thought.blip records to ATProto" 6 + 7 + [[bin]] 8 + name = "think" 9 + path = "src/main.rs" 10 + 11 + [[bin]] 12 + name = "thought" 13 + path = "src/thought.rs" 14 + 15 + [dependencies] 16 + clap = { version = "4.4", features = ["derive"] } 17 + tokio = { version = "1.0", features = ["full"] } 18 + reqwest = { version = "0.11", features = ["json"] } 19 + serde = { version = "1.0", features = ["derive"] } 20 + serde_json = "1.0" 21 + directories = "5.0" 22 + rpassword = "7.3" 23 + anyhow = "1.0" 24 + chrono = { version = "0.4", features = ["serde"] } 25 + ratatui = "0.28" 26 + crossterm = "0.27" 27 + tokio-tungstenite = { version = "0.21", features = ["native-tls"] } 28 + futures-util = "0.3" 29 + url = "2.5"
+154
README.md
··· 1 + # Think CLI 2 + 3 + A terminal UI for real-time streaming and posting to the ATProto/Bluesky network using `stream.thought.blip` records. 4 + 5 + ## Features 6 + 7 + - **Real-time TUI**: Live terminal interface showing all blips flowing through the network 8 + - **Interactive posting**: Post your thoughts directly from the TUI 9 + - **Jetstream integration**: Connect to the ATProto firehose for real-time updates 10 + - **Secure credentials**: Store your Bluesky credentials safely on your system 11 + - **Handle resolution**: Automatically resolve DIDs to readable handles 12 + - **Duplicate filtering**: Smart filtering to avoid showing your own posts twice 13 + 14 + ## Quick Start 15 + 16 + ### 1. Install 17 + ```bash 18 + cargo build --release 19 + ``` 20 + 21 + ### 2. Login 22 + ```bash 23 + ./target/release/think login 24 + ``` 25 + Enter your Bluesky handle/email and app password when prompted. 26 + 27 + ### 3. Stream Your Thoughts 28 + ```bash 29 + ./target/release/thought stream 30 + ``` 31 + This launches the full TUI experience where you can see all blips in real-time and post your own. 32 + 33 + ## Commands 34 + 35 + ### think 36 + The main CLI tool with multiple modes: 37 + 38 + ```bash 39 + # Login to Bluesky 40 + think login 41 + 42 + # Post a single blip 43 + think "Your thought here" 44 + 45 + # Enter simple REPL mode 46 + think interactive 47 + 48 + # Enter full TUI mode with live feed 49 + think interactive --tui 50 + 51 + # Logout and clear credentials 52 + think logout 53 + ``` 54 + 55 + ### thought 56 + Direct entry to the streaming TUI: 57 + 58 + ```bash 59 + # Launch the TUI directly (same as think interactive --tui) 60 + thought stream 61 + ``` 62 + 63 + ## TUI Interface 64 + 65 + The terminal UI shows: 66 + - **Messages area**: Scrollable feed of all blips with timestamps 67 + - **Status bar**: Connection status and message count 68 + - **Input field**: Type your thoughts and press Enter to post 69 + 70 + ### TUI Controls 71 + - **Type + Enter**: Post a blip 72 + - **↑/↓**: Scroll through message history 73 + - **PageUp/PageDown**: Fast scroll 74 + - **Esc or Ctrl+C**: Exit TUI 75 + 76 + ## Installation 77 + 78 + ### Prerequisites 79 + - Rust 1.70+ 80 + - A Bluesky account with an app password 81 + 82 + ### From Source 83 + ```bash 84 + git clone git@tangled.sh:cameron.pfiffer.org/think 85 + cd think 86 + cargo build --release 87 + ``` 88 + 89 + The binaries will be available at: 90 + - `target/release/think` 91 + - `target/release/thought` 92 + 93 + ### App Password Setup 94 + 1. Go to [Bluesky Settings](https://bsky.app/settings/app-passwords) 95 + 2. Generate a new app password 96 + 3. Use this password (not your main password) when running `think login` 97 + 98 + ## Configuration 99 + 100 + Credentials are securely stored in your system's configuration directory: 101 + - **macOS**: `~/Library/Application Support/com.thoughtstream.think/` 102 + - **Linux**: `~/.config/thoughtstream/think/` 103 + - **Windows**: `%APPDATA%\thoughtstream\think\` 104 + 105 + ## Architecture 106 + 107 + - **ATProto Integration**: Uses the ATProto API for authentication and posting 108 + - **Jetstream Connection**: WebSocket connection to the real-time firehose 109 + - **Custom Records**: Posts using the `stream.thought.blip` record type 110 + - **TUI Framework**: Built with ratatui for a responsive terminal interface 111 + 112 + ## Examples 113 + 114 + ### Post a quick thought 115 + ```bash 116 + think "Just built something cool!" 117 + ``` 118 + 119 + ### Start a streaming session 120 + ```bash 121 + thought stream 122 + # See all blips in real-time, post your own thoughts 123 + ``` 124 + 125 + ### Interactive REPL 126 + ```bash 127 + think interactive 128 + # Enter multiple thoughts line by line 129 + ``` 130 + 131 + ## Development 132 + 133 + ### Building 134 + ```bash 135 + cargo build 136 + ``` 137 + 138 + ### Running Tests 139 + ```bash 140 + cargo test 141 + ``` 142 + 143 + ### Linting 144 + ```bash 145 + cargo clippy 146 + ``` 147 + 148 + ## Contributing 149 + 150 + This is part of the larger `stream.thought` ecosystem for decentralized social communication on ATProto. 151 + 152 + ## License 153 + 154 + MIT License - see LICENSE file for details.
+154
src/client.rs
··· 1 + use anyhow::{Context, Result}; 2 + use chrono::Utc; 3 + use reqwest::{Client as HttpClient, header::{HeaderMap, HeaderValue, AUTHORIZATION}}; 4 + use serde::{Deserialize, Serialize}; 5 + use serde_json::Value; 6 + 7 + use crate::credentials::Credentials; 8 + 9 + #[derive(Debug, Clone, Serialize, Deserialize)] 10 + pub struct Session { 11 + #[serde(rename = "accessJwt")] 12 + pub access_jwt: String, 13 + #[serde(rename = "refreshJwt")] 14 + pub refresh_jwt: String, 15 + pub handle: String, 16 + pub did: String, 17 + } 18 + 19 + #[derive(Debug, Clone)] 20 + pub struct AtProtoClient { 21 + http_client: HttpClient, 22 + base_url: String, 23 + session: Option<Session>, 24 + } 25 + 26 + #[derive(Debug, Serialize)] 27 + struct LoginRequest { 28 + identifier: String, 29 + password: String, 30 + } 31 + 32 + #[derive(Debug, Serialize)] 33 + struct CreateRecordRequest { 34 + repo: String, 35 + collection: String, 36 + record: Value, 37 + } 38 + 39 + #[derive(Debug, Serialize)] 40 + struct BlipRecord { 41 + #[serde(rename = "$type")] 42 + record_type: String, 43 + content: String, 44 + #[serde(rename = "createdAt")] 45 + created_at: String, 46 + } 47 + 48 + #[derive(Debug, Deserialize)] 49 + struct CreateRecordResponse { 50 + uri: String, 51 + cid: String, 52 + } 53 + 54 + impl AtProtoClient { 55 + pub fn new(pds_uri: &str) -> Self { 56 + Self { 57 + http_client: HttpClient::new(), 58 + base_url: pds_uri.to_string(), 59 + session: None, 60 + } 61 + } 62 + 63 + pub async fn login(&mut self, credentials: &Credentials) -> Result<()> { 64 + let login_url = format!("{}/xrpc/com.atproto.server.createSession", self.base_url); 65 + 66 + let request = LoginRequest { 67 + identifier: credentials.username.clone(), 68 + password: credentials.password.clone(), 69 + }; 70 + 71 + let response = self.http_client 72 + .post(&login_url) 73 + .header("Content-Type", "application/json") 74 + .json(&request) 75 + .send() 76 + .await 77 + .context("Failed to send login request")?; 78 + 79 + if !response.status().is_success() { 80 + let status = response.status(); 81 + let error_text = response.text().await.unwrap_or_default(); 82 + anyhow::bail!("Login failed: {} - {}", status, error_text); 83 + } 84 + 85 + let session: Session = response 86 + .json() 87 + .await 88 + .context("Failed to parse login response")?; 89 + 90 + println!("Authenticated as: {}", session.handle); 91 + self.session = Some(session); 92 + Ok(()) 93 + } 94 + 95 + pub async fn publish_blip(&self, content: &str) -> Result<String> { 96 + let session = self.session.as_ref() 97 + .context("Not authenticated. Please run 'think login' first.")?; 98 + 99 + let record = BlipRecord { 100 + record_type: "stream.thought.blip".to_string(), 101 + content: content.to_string(), 102 + created_at: Utc::now().to_rfc3339().replace("+00:00", "Z"), 103 + }; 104 + 105 + let request = CreateRecordRequest { 106 + repo: session.did.clone(), 107 + collection: "stream.thought.blip".to_string(), 108 + record: serde_json::to_value(&record) 109 + .context("Failed to serialize blip record")?, 110 + }; 111 + 112 + let create_url = format!("{}/xrpc/com.atproto.repo.createRecord", self.base_url); 113 + 114 + let mut headers = HeaderMap::new(); 115 + headers.insert( 116 + AUTHORIZATION, 117 + HeaderValue::from_str(&format!("Bearer {}", session.access_jwt)) 118 + .context("Invalid authorization header")?, 119 + ); 120 + headers.insert( 121 + "Content-Type", 122 + HeaderValue::from_static("application/json"), 123 + ); 124 + 125 + let response = self.http_client 126 + .post(&create_url) 127 + .headers(headers) 128 + .json(&request) 129 + .send() 130 + .await 131 + .context("Failed to send create record request")?; 132 + 133 + if !response.status().is_success() { 134 + let status = response.status(); 135 + let error_text = response.text().await.unwrap_or_default(); 136 + anyhow::bail!("Failed to publish blip: {} - {}", status, error_text); 137 + } 138 + 139 + let create_response: CreateRecordResponse = response 140 + .json() 141 + .await 142 + .context("Failed to parse create record response")?; 143 + 144 + Ok(create_response.uri) 145 + } 146 + 147 + pub fn is_authenticated(&self) -> bool { 148 + self.session.is_some() 149 + } 150 + 151 + pub fn get_user_did(&self) -> Option<String> { 152 + self.session.as_ref().map(|s| s.did.clone()) 153 + } 154 + }
+90
src/credentials.rs
··· 1 + use anyhow::{Context, Result}; 2 + use directories::ProjectDirs; 3 + use serde::{Deserialize, Serialize}; 4 + use std::fs; 5 + use std::path::PathBuf; 6 + 7 + #[derive(Debug, Clone, Serialize, Deserialize)] 8 + pub struct Credentials { 9 + pub username: String, 10 + pub password: String, 11 + pub pds_uri: String, 12 + } 13 + 14 + impl Credentials { 15 + pub fn new(username: String, password: String, pds_uri: String) -> Self { 16 + Self { 17 + username, 18 + password, 19 + pds_uri, 20 + } 21 + } 22 + } 23 + 24 + pub struct CredentialStore { 25 + config_dir: PathBuf, 26 + } 27 + 28 + impl CredentialStore { 29 + pub fn new() -> Result<Self> { 30 + let project_dirs = ProjectDirs::from("com", "thoughtstream", "think") 31 + .context("Failed to determine project directories")?; 32 + 33 + let config_dir = project_dirs.config_dir().to_path_buf(); 34 + 35 + // Create config directory if it doesn't exist 36 + fs::create_dir_all(&config_dir) 37 + .context("Failed to create config directory")?; 38 + 39 + Ok(Self { config_dir }) 40 + } 41 + 42 + fn credentials_path(&self) -> PathBuf { 43 + self.config_dir.join("credentials.json") 44 + } 45 + 46 + pub fn store(&self, credentials: &Credentials) -> Result<()> { 47 + let json = serde_json::to_string_pretty(credentials) 48 + .context("Failed to serialize credentials")?; 49 + 50 + fs::write(self.credentials_path(), json) 51 + .context("Failed to write credentials file")?; 52 + 53 + println!("Credentials stored successfully"); 54 + Ok(()) 55 + } 56 + 57 + pub fn load(&self) -> Result<Option<Credentials>> { 58 + let path = self.credentials_path(); 59 + 60 + if !path.exists() { 61 + return Ok(None); 62 + } 63 + 64 + let json = fs::read_to_string(&path) 65 + .context("Failed to read credentials file")?; 66 + 67 + let credentials: Credentials = serde_json::from_str(&json) 68 + .context("Failed to parse credentials file")?; 69 + 70 + Ok(Some(credentials)) 71 + } 72 + 73 + pub fn clear(&self) -> Result<()> { 74 + let path = self.credentials_path(); 75 + 76 + if path.exists() { 77 + fs::remove_file(&path) 78 + .context("Failed to remove credentials file")?; 79 + println!("Credentials cleared successfully"); 80 + } else { 81 + println!("No credentials found to clear"); 82 + } 83 + 84 + Ok(()) 85 + } 86 + 87 + pub fn exists(&self) -> bool { 88 + self.credentials_path().exists() 89 + } 90 + }
+93
src/interactive.rs
··· 1 + use anyhow::Result; 2 + use std::io::{self, Write}; 3 + use tokio::sync::mpsc; 4 + 5 + use crate::client::AtProtoClient; 6 + use crate::jetstream; 7 + use crate::tui; 8 + 9 + pub async fn run_interactive_mode(client: &AtProtoClient, use_tui: bool) -> Result<()> { 10 + if use_tui { 11 + run_tui_mode(client).await 12 + } else { 13 + run_simple_repl(client).await 14 + } 15 + } 16 + 17 + async fn run_tui_mode(client: &AtProtoClient) -> Result<()> { 18 + // Create channel for messages from jetstream to TUI 19 + let (message_tx, message_rx) = mpsc::unbounded_channel(); 20 + 21 + // Get user's DID to filter out our own messages 22 + let own_did = client.get_user_did(); 23 + 24 + // Start jetstream listener in background 25 + let jetstream_tx = message_tx.clone(); 26 + tokio::spawn(async move { 27 + if let Err(e) = jetstream::start_jetstream_listener(jetstream_tx, own_did).await { 28 + eprintln!("Jetstream listener error: {}", e); 29 + } 30 + }); 31 + 32 + // Run the TUI 33 + tui::run_tui(client, message_rx).await 34 + } 35 + 36 + async fn run_simple_repl(client: &AtProtoClient) -> Result<()> { 37 + println!("Entering interactive mode. Type /help for commands."); 38 + println!("Enter your thoughts line by line, or use commands starting with '/'"); 39 + println!(); 40 + 41 + loop { 42 + print!("> "); 43 + io::stdout().flush()?; 44 + 45 + let mut input = String::new(); 46 + io::stdin().read_line(&mut input)?; 47 + 48 + let input = input.trim(); 49 + 50 + // Skip empty lines 51 + if input.is_empty() { 52 + continue; 53 + } 54 + 55 + // Handle commands 56 + if input.starts_with('/') { 57 + match input { 58 + "/exit" | "/quit" => { 59 + println!("Goodbye!"); 60 + break; 61 + } 62 + "/help" => { 63 + show_help(); 64 + continue; 65 + } 66 + _ => { 67 + println!("Unknown command: {}. Type /help for available commands.", input); 68 + continue; 69 + } 70 + } 71 + } 72 + 73 + // Publish the blip 74 + match client.publish_blip(input).await { 75 + Ok(uri) => { 76 + println!("✅ Published: {}", uri); 77 + } 78 + Err(e) => { 79 + println!("❌ Failed to publish: {}", e); 80 + } 81 + } 82 + } 83 + 84 + Ok(()) 85 + } 86 + 87 + fn show_help() { 88 + println!("Interactive mode commands:"); 89 + println!(" /help - Show this help message"); 90 + println!(" /exit, /quit - Exit interactive mode"); 91 + println!(); 92 + println!("Just type your message and press Enter to publish a blip."); 93 + }
+287
src/jetstream.rs
··· 1 + use anyhow::{Context, Result}; 2 + use futures_util::StreamExt; 3 + use serde::{Deserialize, Serialize}; 4 + use std::{collections::HashMap, time::Duration}; 5 + use tokio::sync::mpsc; 6 + use tokio_tungstenite::{ 7 + connect_async, 8 + tungstenite::{ 9 + client::IntoClientRequest, 10 + http::HeaderValue, 11 + Message, 12 + }, 13 + }; 14 + use url::Url; 15 + 16 + use crate::tui::Message as TuiMessage; 17 + 18 + #[derive(Debug, Clone, Serialize, Deserialize)] 19 + pub struct JetstreamEvent { 20 + #[serde(rename = "kind")] 21 + pub kind: String, 22 + #[serde(rename = "time_us")] 23 + pub time_us: i64, 24 + pub did: String, 25 + pub commit: Option<CommitData>, 26 + } 27 + 28 + #[derive(Debug, Clone, Serialize, Deserialize)] 29 + pub struct CommitData { 30 + pub rev: String, 31 + pub operation: String, 32 + pub collection: String, 33 + pub rkey: String, 34 + pub record: Option<serde_json::Value>, 35 + pub cid: String, 36 + } 37 + 38 + #[derive(Debug, Clone, Serialize, Deserialize)] 39 + pub struct BlipRecord { 40 + #[serde(rename = "$type")] 41 + pub record_type: String, 42 + pub content: String, 43 + #[serde(rename = "createdAt")] 44 + pub created_at: String, 45 + } 46 + 47 + pub struct JetstreamClient { 48 + did_cache: HashMap<String, String>, // DID -> handle cache 49 + own_did: Option<String>, // User's own DID to filter out 50 + } 51 + 52 + impl JetstreamClient { 53 + pub fn new(own_did: Option<String>) -> Self { 54 + Self { 55 + did_cache: HashMap::new(), 56 + own_did, 57 + } 58 + } 59 + 60 + pub async fn connect_and_listen(&mut self, message_tx: mpsc::UnboundedSender<TuiMessage>) -> Result<()> { 61 + // Try simple connection first, then with collection filter 62 + let urls = vec![ 63 + "wss://jetstream2.us-west.bsky.network/subscribe", 64 + "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip" 65 + ]; 66 + 67 + for (i, jetstream_url) in urls.iter().enumerate() { 68 + // Send status to TUI instead of console 69 + let status_msg = crate::tui::Message::new( 70 + "system".to_string(), 71 + format!("Trying connection {} of {}", i + 1, urls.len()), 72 + false, 73 + ); 74 + let _ = message_tx.send(status_msg); 75 + 76 + loop { 77 + match self.try_connect_and_listen(&message_tx, jetstream_url).await { 78 + Ok(_) => { 79 + return Ok(()); 80 + } 81 + Err(_e) => { 82 + // If this is the last URL, retry it after a delay 83 + if i == urls.len() - 1 { 84 + let retry_msg = crate::tui::Message::new( 85 + "system".to_string(), 86 + "Connection failed, retrying in 5s...".to_string(), 87 + false, 88 + ); 89 + let _ = message_tx.send(retry_msg); 90 + tokio::time::sleep(Duration::from_secs(5)).await; 91 + } else { 92 + // Try the next URL 93 + break; 94 + } 95 + } 96 + } 97 + } 98 + } 99 + 100 + Ok(()) 101 + } 102 + 103 + async fn try_connect_and_listen( 104 + &mut self, 105 + message_tx: &mpsc::UnboundedSender<TuiMessage>, 106 + url_str: &str, 107 + ) -> Result<()> { 108 + // Parse URL and create request with headers 109 + let url = Url::parse(url_str)?; 110 + let mut request = url.into_client_request()?; 111 + 112 + // Add User-Agent header 113 + request.headers_mut().insert( 114 + "User-Agent", 115 + HeaderValue::from_static("think-cli/0.1.0") 116 + ); 117 + 118 + // Connect with timeout 119 + let connect_future = connect_async(request); 120 + let (ws_stream, _response) = tokio::time::timeout( 121 + Duration::from_secs(10), 122 + connect_future 123 + ).await 124 + .context("Connection timeout")? 125 + .context("Failed to connect to jetstream")?; 126 + 127 + // Send a connection success message to the TUI 128 + let success_msg = crate::tui::Message::new( 129 + "system".to_string(), 130 + "Connected to jetstream! Listening for blips...".to_string(), 131 + false, 132 + ); 133 + let _ = message_tx.send(success_msg); 134 + 135 + let (mut _write, mut read) = ws_stream.split(); 136 + 137 + while let Some(msg) = read.next().await { 138 + match msg { 139 + Ok(Message::Text(text)) => { 140 + // Silently ignore message handling errors 141 + let _ = self.handle_message(&text, message_tx).await; 142 + } 143 + Ok(Message::Close(_)) => { 144 + break; 145 + } 146 + Err(e) => { 147 + return Err(anyhow::anyhow!("WebSocket error: {}", e)); 148 + } 149 + _ => { 150 + // Ignore other message types (binary, ping, pong) 151 + } 152 + } 153 + } 154 + 155 + Ok(()) 156 + } 157 + 158 + async fn handle_message( 159 + &mut self, 160 + message: &str, 161 + message_tx: &mpsc::UnboundedSender<TuiMessage>, 162 + ) -> Result<()> { 163 + // First, check if it's even a commit event using basic JSON parsing 164 + let event_value: serde_json::Value = serde_json::from_str(message)?; 165 + 166 + // Only process commit events 167 + if event_value.get("kind").and_then(|k| k.as_str()) != Some("commit") { 168 + return Ok(()); 169 + } 170 + 171 + // Check if it has a commit with the right collection 172 + let commit = event_value.get("commit"); 173 + if let Some(commit_obj) = commit { 174 + if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip") { 175 + return Ok(()); 176 + } 177 + 178 + // Skip delete operations 179 + if commit_obj.get("operation").and_then(|o| o.as_str()) == Some("delete") { 180 + return Ok(()); 181 + } 182 + } else { 183 + return Ok(()); 184 + } 185 + 186 + // Now try to parse as our structured event 187 + let event: JetstreamEvent = serde_json::from_str(message)?; 188 + let commit = event.commit.as_ref().unwrap(); // Safe because we checked above 189 + 190 + // Skip messages from our own DID 191 + if let Some(ref own_did) = self.own_did { 192 + if &event.did == own_did { 193 + return Ok(()); 194 + } 195 + } 196 + 197 + // Parse the blip record 198 + let record_data = commit.record.as_ref(); 199 + if record_data.is_none() { 200 + return Ok(()); 201 + } 202 + 203 + let blip_record: BlipRecord = match serde_json::from_value(record_data.unwrap().clone()) { 204 + Ok(record) => record, 205 + Err(_) => return Ok(()), // Silently skip unparseable records 206 + }; 207 + 208 + // Get or resolve the handle 209 + let handle = self.resolve_did(&event.did).await; 210 + 211 + // Create TUI message 212 + let tui_message = TuiMessage::new( 213 + handle, 214 + blip_record.content, 215 + false, // Not our own message 216 + ); 217 + 218 + // Send to TUI 219 + if let Err(_) = message_tx.send(tui_message) { 220 + // Channel closed, probably shutting down 221 + return Err(anyhow::anyhow!("Message channel closed")); 222 + } 223 + 224 + Ok(()) 225 + } 226 + 227 + async fn resolve_did(&mut self, did: &str) -> String { 228 + // Check cache first 229 + if let Some(handle) = self.did_cache.get(did) { 230 + return handle.clone(); 231 + } 232 + 233 + // Try to resolve the DID to a handle 234 + let handle = match self.fetch_handle_for_did(did).await { 235 + Ok(h) => h, 236 + Err(_) => { 237 + // Fallback to showing just the DID (truncated) 238 + if did.len() > 20 { 239 + format!("{}...", &did[..20]) 240 + } else { 241 + did.to_string() 242 + } 243 + } 244 + }; 245 + 246 + // Cache the result 247 + self.did_cache.insert(did.to_string(), handle.clone()); 248 + 249 + handle 250 + } 251 + 252 + async fn fetch_handle_for_did(&self, did: &str) -> Result<String> { 253 + // Use the ATProto API to resolve DID to handle 254 + let client = reqwest::Client::new(); 255 + let url = format!("https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}", did); 256 + 257 + #[derive(Deserialize)] 258 + struct ResolveResponse { 259 + handle: String, 260 + } 261 + 262 + // Try a simpler approach - resolve via profile 263 + let profile_url = format!("https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}", did); 264 + 265 + #[derive(Deserialize)] 266 + struct ProfileResponse { 267 + handle: String, 268 + } 269 + 270 + let response = client 271 + .get(&profile_url) 272 + .send() 273 + .await?; 274 + 275 + if response.status().is_success() { 276 + let profile: ProfileResponse = response.json().await?; 277 + Ok(profile.handle) 278 + } else { 279 + Err(anyhow::anyhow!("Failed to resolve DID to handle")) 280 + } 281 + } 282 + } 283 + 284 + pub async fn start_jetstream_listener(message_tx: mpsc::UnboundedSender<TuiMessage>, own_did: Option<String>) -> Result<()> { 285 + let mut client = JetstreamClient::new(own_did); 286 + client.connect_and_listen(message_tx).await 287 + }
+153
src/main.rs
··· 1 + mod client; 2 + mod credentials; 3 + mod interactive; 4 + mod jetstream; 5 + mod tui; 6 + 7 + use anyhow::{Context, Result}; 8 + use clap::{Parser, Subcommand}; 9 + use std::io::{self, Write}; 10 + 11 + use client::AtProtoClient; 12 + use credentials::{CredentialStore, Credentials}; 13 + 14 + #[derive(Parser)] 15 + #[command(name = "think")] 16 + #[command(about = "CLI tool for publishing stream.thought.blip records")] 17 + #[command(version = "0.1.0")] 18 + struct Cli { 19 + #[command(subcommand)] 20 + command: Option<Commands>, 21 + 22 + /// Message to publish (if no subcommand is provided) 23 + message: Option<String>, 24 + } 25 + 26 + #[derive(Subcommand)] 27 + enum Commands { 28 + /// Login to Bluesky with credentials 29 + Login, 30 + /// Logout and clear stored credentials 31 + Logout, 32 + /// Enter interactive mode for rapid posting 33 + Interactive { 34 + /// Use TUI interface with live message feed 35 + #[arg(long)] 36 + tui: bool, 37 + }, 38 + } 39 + 40 + fn prompt_for_input(prompt: &str) -> Result<String> { 41 + print!("{}", prompt); 42 + io::stdout().flush().context("Failed to flush stdout")?; 43 + 44 + let mut input = String::new(); 45 + io::stdin() 46 + .read_line(&mut input) 47 + .context("Failed to read input")?; 48 + 49 + Ok(input.trim().to_string()) 50 + } 51 + 52 + async fn handle_login() -> Result<()> { 53 + let store = CredentialStore::new()?; 54 + 55 + if store.exists() { 56 + println!("You are already logged in. Use 'think logout' to clear credentials first."); 57 + return Ok(()); 58 + } 59 + 60 + println!("Login to Bluesky"); 61 + println!("================"); 62 + 63 + let username = prompt_for_input("Username (handle or email): ")?; 64 + if username.is_empty() { 65 + anyhow::bail!("Username cannot be empty"); 66 + } 67 + 68 + let password = rpassword::prompt_password("App Password: ") 69 + .context("Failed to read password")?; 70 + if password.is_empty() { 71 + anyhow::bail!("Password cannot be empty"); 72 + } 73 + 74 + let pds_uri = prompt_for_input("PDS URI (press Enter for https://bsky.social): ")?; 75 + let pds_uri = if pds_uri.is_empty() { 76 + "https://bsky.social".to_string() 77 + } else { 78 + pds_uri 79 + }; 80 + 81 + // Test the credentials by attempting to authenticate 82 + println!("Testing credentials..."); 83 + let credentials = Credentials::new(username, password, pds_uri); 84 + let mut client = AtProtoClient::new(&credentials.pds_uri); 85 + 86 + client.login(&credentials).await 87 + .context("Login failed. Please check your credentials.")?; 88 + 89 + // Store credentials if login was successful 90 + store.store(&credentials)?; 91 + 92 + Ok(()) 93 + } 94 + 95 + async fn handle_logout() -> Result<()> { 96 + let store = CredentialStore::new()?; 97 + store.clear()?; 98 + Ok(()) 99 + } 100 + 101 + async fn handle_publish(message: &str) -> Result<()> { 102 + let store = CredentialStore::new()?; 103 + 104 + let credentials = store.load()? 105 + .context("Not logged in. Please run 'think login' first.")?; 106 + 107 + let mut client = AtProtoClient::new(&credentials.pds_uri); 108 + client.login(&credentials).await?; 109 + 110 + let uri = client.publish_blip(message).await?; 111 + println!("Published: {}", uri); 112 + 113 + Ok(()) 114 + } 115 + 116 + async fn handle_interactive(use_tui: bool) -> Result<()> { 117 + let store = CredentialStore::new()?; 118 + 119 + let credentials = store.load()? 120 + .context("Not logged in. Please run 'think login' first.")?; 121 + 122 + let mut client = AtProtoClient::new(&credentials.pds_uri); 123 + client.login(&credentials).await?; 124 + 125 + interactive::run_interactive_mode(&client, use_tui).await?; 126 + 127 + Ok(()) 128 + } 129 + 130 + #[tokio::main] 131 + async fn main() -> Result<()> { 132 + let cli = Cli::parse(); 133 + 134 + match cli.command { 135 + Some(Commands::Login) => handle_login().await?, 136 + Some(Commands::Logout) => handle_logout().await?, 137 + Some(Commands::Interactive { tui }) => handle_interactive(tui).await?, 138 + None => { 139 + if let Some(message) = cli.message { 140 + handle_publish(&message).await?; 141 + } else { 142 + println!("Usage:"); 143 + println!(" think login # Login to Bluesky"); 144 + println!(" think logout # Logout and clear credentials"); 145 + println!(" think interactive # Enter simple REPL mode"); 146 + println!(" think interactive --tui # Enter TUI mode with live feed"); 147 + println!(" think \"message\" # Publish a blip"); 148 + } 149 + } 150 + } 151 + 152 + Ok(()) 153 + }
+49
src/thought.rs
··· 1 + mod client; 2 + mod credentials; 3 + mod interactive; 4 + mod jetstream; 5 + mod tui; 6 + 7 + use anyhow::{Context, Result}; 8 + use clap::Parser; 9 + 10 + use client::AtProtoClient; 11 + use credentials::CredentialStore; 12 + 13 + #[derive(Parser)] 14 + #[command(name = "thought")] 15 + #[command(about = "Stream your thoughts to the bluesky firehose", long_about = None)] 16 + #[command(version = "0.1.0")] 17 + struct Cli { 18 + /// Subcommand 19 + #[command(subcommand)] 20 + command: Option<Commands>, 21 + } 22 + 23 + #[derive(clap::Subcommand)] 24 + enum Commands { 25 + /// Enter the thought stream (TUI mode) 26 + Stream, 27 + } 28 + 29 + #[tokio::main] 30 + async fn main() -> Result<()> { 31 + let cli = Cli::parse(); 32 + 33 + match cli.command { 34 + Some(Commands::Stream) | None => { 35 + // Always launch TUI mode 36 + let store = CredentialStore::new()?; 37 + 38 + let credentials = store.load()? 39 + .context("Not logged in. Please run 'think login' first to set up your credentials.")?; 40 + 41 + let mut client = AtProtoClient::new(&credentials.pds_uri); 42 + client.login(&credentials).await?; 43 + 44 + interactive::run_interactive_mode(&client, true).await?; 45 + } 46 + } 47 + 48 + Ok(()) 49 + }
+283
src/tui.rs
··· 1 + use anyhow::Result; 2 + use chrono::{DateTime, Utc}; 3 + use crossterm::{ 4 + event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyEventKind}, 5 + execute, 6 + terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, 7 + }; 8 + use ratatui::{ 9 + backend::CrosstermBackend, 10 + layout::{Constraint, Direction, Layout}, 11 + style::{Color, Modifier, Style}, 12 + text::{Line, Span}, 13 + widgets::{Block, Borders, List, ListItem, Paragraph}, 14 + Frame, Terminal, 15 + }; 16 + use std::{ 17 + io::{self, Stdout}, 18 + time::Duration, 19 + }; 20 + use tokio::sync::mpsc; 21 + 22 + use crate::client::AtProtoClient; 23 + 24 + #[derive(Debug, Clone)] 25 + pub struct Message { 26 + pub handle: String, 27 + pub content: String, 28 + pub timestamp: DateTime<Utc>, 29 + pub is_own: bool, 30 + } 31 + 32 + impl Message { 33 + pub fn new(handle: String, content: String, is_own: bool) -> Self { 34 + Self { 35 + handle, 36 + content, 37 + timestamp: Utc::now(), 38 + is_own, 39 + } 40 + } 41 + 42 + pub fn format_display(&self) -> String { 43 + let time_str = self.timestamp.format("%H:%M:%S").to_string(); 44 + format!("[{}] {}: {}", time_str, self.handle, self.content) 45 + } 46 + } 47 + 48 + pub struct TuiApp { 49 + messages: Vec<Message>, 50 + input: String, 51 + scroll_offset: usize, 52 + status: String, 53 + message_count: usize, 54 + connected: bool, 55 + should_quit: bool, 56 + } 57 + 58 + impl TuiApp { 59 + pub fn new() -> Self { 60 + Self { 61 + messages: Vec::new(), 62 + input: String::new(), 63 + scroll_offset: 0, 64 + status: "Connecting...".to_string(), 65 + message_count: 0, 66 + connected: false, 67 + should_quit: false, 68 + } 69 + } 70 + 71 + pub fn add_message(&mut self, message: Message) { 72 + self.messages.push(message); 73 + self.message_count += 1; 74 + 75 + // Keep only last 1000 messages 76 + if self.messages.len() > 1000 { 77 + self.messages.remove(0); 78 + } 79 + 80 + // Auto-scroll to bottom unless user is scrolling up 81 + if self.scroll_offset == 0 { 82 + self.scroll_offset = 0; // Stay at bottom 83 + } 84 + } 85 + 86 + pub fn set_connection_status(&mut self, connected: bool) { 87 + self.connected = connected; 88 + self.status = if connected { 89 + format!("Connected • {} messages", self.message_count) 90 + } else { 91 + "Reconnecting...".to_string() 92 + }; 93 + } 94 + 95 + pub fn handle_input(&mut self, key: KeyCode) -> Option<String> { 96 + match key { 97 + KeyCode::Enter => { 98 + if !self.input.is_empty() { 99 + let message = self.input.clone(); 100 + self.input.clear(); 101 + return Some(message); 102 + } 103 + } 104 + KeyCode::Char(c) => { 105 + self.input.push(c); 106 + } 107 + KeyCode::Backspace => { 108 + self.input.pop(); 109 + } 110 + KeyCode::Up => { 111 + if self.scroll_offset < self.messages.len().saturating_sub(1) { 112 + self.scroll_offset += 1; 113 + } 114 + } 115 + KeyCode::Down => { 116 + if self.scroll_offset > 0 { 117 + self.scroll_offset -= 1; 118 + } 119 + } 120 + KeyCode::PageUp => { 121 + self.scroll_offset = (self.scroll_offset + 10).min(self.messages.len().saturating_sub(1)); 122 + } 123 + KeyCode::PageDown => { 124 + self.scroll_offset = self.scroll_offset.saturating_sub(10); 125 + } 126 + KeyCode::Esc => { 127 + self.should_quit = true; 128 + } 129 + _ => {} 130 + } 131 + None 132 + } 133 + 134 + pub fn should_quit(&self) -> bool { 135 + self.should_quit 136 + } 137 + 138 + pub fn draw(&self, frame: &mut Frame) { 139 + let vertical = Layout::default() 140 + .direction(Direction::Vertical) 141 + .constraints([ 142 + Constraint::Min(0), // Messages area 143 + Constraint::Length(3), // Status area 144 + Constraint::Length(3), // Input area 145 + ]) 146 + .split(frame.size()); 147 + 148 + // Render messages 149 + let visible_messages: Vec<ListItem> = self.messages 150 + .iter() 151 + .rev() 152 + .skip(self.scroll_offset) 153 + .take(vertical[0].height as usize - 2) // Account for borders 154 + .map(|msg| { 155 + let style = if msg.is_own { 156 + Style::default().fg(Color::Green).add_modifier(Modifier::BOLD) 157 + } else { 158 + Style::default().fg(Color::White) 159 + }; 160 + 161 + ListItem::new(Line::from(Span::styled(msg.format_display(), style))) 162 + }) 163 + .collect::<Vec<_>>() 164 + .into_iter() 165 + .rev() 166 + .collect(); 167 + 168 + let messages_list = List::new(visible_messages) 169 + .block(Block::default().borders(Borders::ALL).title("Messages")); 170 + frame.render_widget(messages_list, vertical[0]); 171 + 172 + // Render status 173 + let status_style = if self.connected { 174 + Style::default().fg(Color::Green) 175 + } else { 176 + Style::default().fg(Color::Yellow) 177 + }; 178 + 179 + let status_paragraph = Paragraph::new(self.status.clone()) 180 + .style(status_style) 181 + .block(Block::default().borders(Borders::ALL).title("Status")); 182 + frame.render_widget(status_paragraph, vertical[1]); 183 + 184 + // Render input 185 + let input_paragraph = Paragraph::new(self.input.clone()) 186 + .block(Block::default().borders(Borders::ALL).title("Input (Esc to quit)")); 187 + frame.render_widget(input_paragraph, vertical[2]); 188 + } 189 + } 190 + 191 + pub async fn run_tui( 192 + client: &AtProtoClient, 193 + mut message_rx: mpsc::UnboundedReceiver<Message>, 194 + ) -> Result<()> { 195 + // Setup terminal 196 + enable_raw_mode()?; 197 + let mut stdout = io::stdout(); 198 + execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?; 199 + let backend = CrosstermBackend::new(stdout); 200 + let mut terminal = Terminal::new(backend)?; 201 + 202 + let mut app = TuiApp::new(); 203 + 204 + // Add welcome message 205 + app.add_message(Message::new( 206 + "system".to_string(), 207 + "Welcome to Think TUI! Connecting to jetstream...".to_string(), 208 + false, 209 + )); 210 + 211 + let result = run_tui_loop(&mut terminal, &mut app, client, &mut message_rx).await; 212 + 213 + // Restore terminal 214 + disable_raw_mode()?; 215 + execute!( 216 + terminal.backend_mut(), 217 + LeaveAlternateScreen, 218 + DisableMouseCapture 219 + )?; 220 + terminal.show_cursor()?; 221 + 222 + result 223 + } 224 + 225 + async fn run_tui_loop( 226 + terminal: &mut Terminal<CrosstermBackend<Stdout>>, 227 + app: &mut TuiApp, 228 + client: &AtProtoClient, 229 + message_rx: &mut mpsc::UnboundedReceiver<Message>, 230 + ) -> Result<()> { 231 + loop { 232 + // Draw the UI 233 + terminal.draw(|f| app.draw(f))?; 234 + 235 + // Handle events with a timeout so we can check for messages 236 + if event::poll(Duration::from_millis(100))? { 237 + if let Event::Key(key) = event::read()? { 238 + if key.kind == KeyEventKind::Press { 239 + // Handle Ctrl+C 240 + if matches!(key.code, KeyCode::Char('c')) && key.modifiers.contains(crossterm::event::KeyModifiers::CONTROL) { 241 + break; 242 + } 243 + 244 + // Handle other input 245 + if let Some(message) = app.handle_input(key.code) { 246 + // Publish the message 247 + match client.publish_blip(&message).await { 248 + Ok(_) => { 249 + // Add our own message to the display 250 + app.add_message(Message::new( 251 + "you".to_string(), 252 + message, 253 + true, 254 + )); 255 + } 256 + Err(e) => { 257 + // Add error message 258 + app.add_message(Message::new( 259 + "error".to_string(), 260 + format!("Failed to publish: {}", e), 261 + false, 262 + )); 263 + } 264 + } 265 + } 266 + } 267 + } 268 + } 269 + 270 + // Check for new messages from jetstream 271 + while let Ok(message) = message_rx.try_recv() { 272 + app.add_message(message); 273 + app.set_connection_status(true); 274 + } 275 + 276 + // Check if we should quit 277 + if app.should_quit() { 278 + break; 279 + } 280 + } 281 + 282 + Ok(()) 283 + }