A Rust CLI for publishing thought records. Designed to work with thought.stream.

Implement AT Protocol token refresh for seamless authentication

- Add refresh_session method to AtProtoClient using /xrpc/com.atproto.server.refreshSession
- Implement automatic 401 retry logic in publish_blip with token refresh
- Add session persistence to CredentialStore (saves access/refresh tokens)
- Update authentication flow to reuse stored sessions and validate on startup
- Add comprehensive error handling with timeouts and session cleanup
- Handle expired tokens transparently during long-running sessions

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

+206 -18
+98 -2
src/client.rs
··· 3 3 use reqwest::{Client as HttpClient, header::{HeaderMap, HeaderValue, AUTHORIZATION}}; 4 4 use serde::{Deserialize, Serialize}; 5 5 use serde_json::Value; 6 + use std::time::Duration; 6 7 7 8 use crate::credentials::Credentials; 8 9 ··· 51 52 cid: String, 52 53 } 53 54 55 + #[derive(Debug, Deserialize)] 56 + struct RefreshSessionResponse { 57 + #[serde(rename = "accessJwt")] 58 + access_jwt: String, 59 + #[serde(rename = "refreshJwt")] 60 + refresh_jwt: String, 61 + handle: String, 62 + did: String, 63 + } 64 + 54 65 impl AtProtoClient { 55 66 pub fn new(pds_uri: &str) -> Self { 56 67 Self { ··· 92 103 Ok(()) 93 104 } 94 105 95 - pub async fn publish_blip(&self, content: &str) -> Result<String> { 106 + pub async fn publish_blip(&mut self, content: &str) -> Result<String> { 107 + // Try the request, and if it fails with 401, refresh and retry once 108 + match self.try_publish_blip(content).await { 109 + Ok(uri) => Ok(uri), 110 + Err(e) => { 111 + // Check if this is a 401 error by examining the error message 112 + let error_msg = e.to_string(); 113 + if error_msg.contains("401") || error_msg.contains("Unauthorized") { 114 + // Try to refresh the session 115 + match self.refresh_session().await { 116 + Ok(_) => { 117 + // Retry the request with the new token 118 + self.try_publish_blip(content).await 119 + .context("Failed to publish blip after session refresh") 120 + } 121 + Err(refresh_err) => { 122 + // Session refresh failed - clear any stored session 123 + if let Ok(store) = crate::credentials::CredentialStore::new() { 124 + let _ = store.clear_session(); 125 + } 126 + Err(anyhow::anyhow!( 127 + "Authentication failed and session refresh failed: {}. Please run 'thought login' again.", 128 + refresh_err 129 + )) 130 + } 131 + } 132 + } else { 133 + Err(e) 134 + } 135 + } 136 + } 137 + } 138 + 139 + async fn try_publish_blip(&self, content: &str) -> Result<String> { 96 140 let session = self.session.as_ref() 97 141 .context("Not authenticated. Please run 'thought login' first.")?; 98 142 ··· 126 170 .post(&create_url) 127 171 .headers(headers) 128 172 .json(&request) 173 + .timeout(Duration::from_secs(30)) 129 174 .send() 130 175 .await 131 - .context("Failed to send create record request")?; 176 + .context("Failed to send create record request (timeout or network error)")?; 132 177 133 178 if !response.status().is_success() { 134 179 let status = response.status(); ··· 150 195 151 196 pub fn get_user_did(&self) -> Option<String> { 152 197 self.session.as_ref().map(|s| s.did.clone()) 198 + } 199 + 200 + pub async fn refresh_session(&mut self) -> Result<()> { 201 + let session = self.session.as_ref() 202 + .context("No session to refresh. Please login first.")?; 203 + 204 + let refresh_url = format!("{}/xrpc/com.atproto.server.refreshSession", self.base_url); 205 + 206 + let mut headers = HeaderMap::new(); 207 + headers.insert( 208 + AUTHORIZATION, 209 + HeaderValue::from_str(&format!("Bearer {}", session.refresh_jwt)) 210 + .context("Invalid refresh token for authorization header")?, 211 + ); 212 + 213 + let response = self.http_client 214 + .post(&refresh_url) 215 + .headers(headers) 216 + .timeout(Duration::from_secs(30)) 217 + .send() 218 + .await 219 + .context("Failed to send refresh session request (timeout or network error)")?; 220 + 221 + if !response.status().is_success() { 222 + let status = response.status(); 223 + let error_text = response.text().await.unwrap_or_default(); 224 + anyhow::bail!("Session refresh failed: {} - {}", status, error_text); 225 + } 226 + 227 + let refresh_response: RefreshSessionResponse = response 228 + .json() 229 + .await 230 + .context("Failed to parse refresh session response")?; 231 + 232 + let new_session = Session { 233 + access_jwt: refresh_response.access_jwt, 234 + refresh_jwt: refresh_response.refresh_jwt, 235 + handle: refresh_response.handle, 236 + did: refresh_response.did, 237 + }; 238 + 239 + self.session = Some(new_session); 240 + Ok(()) 241 + } 242 + 243 + pub fn get_session(&self) -> Option<&Session> { 244 + self.session.as_ref() 245 + } 246 + 247 + pub fn set_session(&mut self, session: Session) { 248 + self.session = Some(session); 153 249 } 154 250 }
+49
src/credentials.rs
··· 4 4 use std::fs; 5 5 use std::path::PathBuf; 6 6 7 + use crate::client::Session; 8 + 7 9 #[derive(Debug, Clone, Serialize, Deserialize)] 8 10 pub struct Credentials { 9 11 pub username: String, ··· 43 45 self.config_dir.join("credentials.json") 44 46 } 45 47 48 + fn session_path(&self) -> PathBuf { 49 + self.config_dir.join("session.json") 50 + } 51 + 46 52 pub fn store(&self, credentials: &Credentials) -> Result<()> { 47 53 let json = serde_json::to_string_pretty(credentials) 48 54 .context("Failed to serialize credentials")?; ··· 81 87 println!("No credentials found to clear"); 82 88 } 83 89 90 + self.clear_session()?; 91 + 84 92 Ok(()) 85 93 } 86 94 87 95 pub fn exists(&self) -> bool { 88 96 self.credentials_path().exists() 97 + } 98 + 99 + pub fn store_session(&self, session: &Session) -> Result<()> { 100 + let json = serde_json::to_string_pretty(session) 101 + .context("Failed to serialize session")?; 102 + 103 + fs::write(self.session_path(), json) 104 + .context("Failed to write session file")?; 105 + 106 + Ok(()) 107 + } 108 + 109 + pub fn load_session(&self) -> Result<Option<Session>> { 110 + let path = self.session_path(); 111 + 112 + if !path.exists() { 113 + return Ok(None); 114 + } 115 + 116 + let json = fs::read_to_string(&path) 117 + .context("Failed to read session file")?; 118 + 119 + let session: Session = serde_json::from_str(&json) 120 + .context("Failed to parse session file")?; 121 + 122 + Ok(Some(session)) 123 + } 124 + 125 + pub fn clear_session(&self) -> Result<()> { 126 + let path = self.session_path(); 127 + 128 + if path.exists() { 129 + fs::remove_file(&path) 130 + .context("Failed to remove session file")?; 131 + } 132 + 133 + Ok(()) 134 + } 135 + 136 + pub fn session_exists(&self) -> bool { 137 + self.session_path().exists() 89 138 } 90 139 }
+3 -3
src/interactive.rs
··· 6 6 use crate::jetstream; 7 7 use crate::tui; 8 8 9 - pub async fn run_interactive_mode(client: &AtProtoClient, use_tui: bool) -> Result<()> { 9 + pub async fn run_interactive_mode(client: &mut AtProtoClient, use_tui: bool) -> Result<()> { 10 10 if use_tui { 11 11 run_tui_mode(client).await 12 12 } else { ··· 14 14 } 15 15 } 16 16 17 - async fn run_tui_mode(client: &AtProtoClient) -> Result<()> { 17 + async fn run_tui_mode(client: &mut AtProtoClient) -> Result<()> { 18 18 // Create channel for messages from jetstream to TUI 19 19 let (message_tx, message_rx) = mpsc::unbounded_channel(); 20 20 ··· 33 33 tui::run_tui(client, message_rx).await 34 34 } 35 35 36 - async fn run_simple_repl(client: &AtProtoClient) -> Result<()> { 36 + async fn run_simple_repl(client: &mut AtProtoClient) -> Result<()> { 37 37 println!("Entering interactive mode. Type /help for commands."); 38 38 println!("Enter your thoughts line by line, or use commands starting with '/'"); 39 39 println!();
+54 -11
src/main.rs
··· 88 88 client.login(&credentials).await 89 89 .context("Login failed. Please check your credentials.")?; 90 90 91 - // Store credentials if login was successful 91 + // Store credentials and session if login was successful 92 92 store.store(&credentials)?; 93 + if let Some(session) = client.get_session() { 94 + store.store_session(session)?; 95 + } 93 96 94 97 Ok(()) 95 98 } ··· 101 104 } 102 105 103 106 async fn handle_publish(message: &str) -> Result<()> { 107 + let mut client = create_authenticated_client().await?; 108 + let uri = client.publish_blip(message).await?; 109 + 110 + // Update stored session if it was refreshed 104 111 let store = CredentialStore::new()?; 112 + if let Some(session) = client.get_session() { 113 + store.store_session(session)?; 114 + } 105 115 106 - let credentials = store.load()? 107 - .context("Not logged in. Please run 'thought login' first.")?; 116 + println!("Published: {}", uri); 117 + Ok(()) 118 + } 108 119 109 - let mut client = AtProtoClient::new(&credentials.pds_uri); 110 - client.login(&credentials).await?; 120 + async fn handle_interactive(use_tui: bool) -> Result<()> { 121 + let mut client = create_authenticated_client().await?; 122 + interactive::run_interactive_mode(&mut client, use_tui).await?; 111 123 112 - let uri = client.publish_blip(message).await?; 113 - println!("Published: {}", uri); 124 + // Update stored session after interactive mode ends in case it was refreshed 125 + let store = CredentialStore::new()?; 126 + if let Some(session) = client.get_session() { 127 + store.store_session(session)?; 128 + } 114 129 115 130 Ok(()) 116 131 } 117 132 118 - async fn handle_interactive(use_tui: bool) -> Result<()> { 133 + async fn create_authenticated_client() -> Result<AtProtoClient> { 119 134 let store = CredentialStore::new()?; 120 135 121 136 let credentials = store.load()? 122 137 .context("Not logged in. Please run 'thought login' first.")?; 123 138 124 139 let mut client = AtProtoClient::new(&credentials.pds_uri); 125 - client.login(&credentials).await?; 126 140 127 - interactive::run_interactive_mode(&client, use_tui).await?; 141 + // Try to load existing session first 142 + if let Some(session) = store.load_session()? { 143 + client.set_session(session); 144 + 145 + // Try to refresh the session to validate it's still good 146 + // If refresh fails, we'll fall back to full login 147 + if let Err(_) = client.refresh_session().await { 148 + // Session is invalid, clear it and perform full login 149 + store.clear_session()?; 150 + client.login(&credentials).await?; 151 + 152 + // Save the new session 153 + if let Some(session) = client.get_session() { 154 + store.store_session(session)?; 155 + } 156 + } else { 157 + // Session refresh succeeded, save the updated session 158 + if let Some(session) = client.get_session() { 159 + store.store_session(session)?; 160 + } 161 + } 162 + } else { 163 + // No existing session, perform full login 164 + client.login(&credentials).await?; 165 + 166 + // Save the new session 167 + if let Some(session) = client.get_session() { 168 + store.store_session(session)?; 169 + } 170 + } 128 171 129 - Ok(()) 172 + Ok(client) 130 173 } 131 174 132 175 #[tokio::main]
+2 -2
src/tui.rs
··· 186 186 } 187 187 188 188 pub async fn run_tui( 189 - client: &AtProtoClient, 189 + client: &mut AtProtoClient, 190 190 mut message_rx: mpsc::UnboundedReceiver<Message>, 191 191 ) -> Result<()> { 192 192 // Setup terminal ··· 222 222 async fn run_tui_loop( 223 223 terminal: &mut Terminal<CrosstermBackend<Stdout>>, 224 224 app: &mut TuiApp, 225 - client: &AtProtoClient, 225 + client: &mut AtProtoClient, 226 226 message_rx: &mut mpsc::UnboundedReceiver<Message>, 227 227 ) -> Result<()> { 228 228 loop {