Noreposts Feed

Add enhanced logging to track Jetstream consumer activity

+26 -9
+16 -6
src/jetstream_consumer.rs
··· 3 use async_trait::async_trait; 4 use chrono::{DateTime, Utc}; 5 use std::sync::Arc; 6 - use tracing::{error, warn}; 7 8 use crate::{database::Database, types::{Follow, Post}}; 9 ··· 17 } 18 19 pub async fn start(&self, jetstream_hostname: String) -> Result<()> { 20 let config = ConsumerTaskConfig { 21 user_agent: "following-no-reposts-feed/1.0".to_string(), 22 compression: true, 23 - jetstream_hostname, 24 zstd_dictionary_location: String::new(), 25 collections: vec![ 26 "app.bsky.feed.post".to_string(), ··· 33 }; 34 35 let consumer = Consumer::new(config); 36 consumer.register_handler(Arc::new(self.clone())).await?; 37 38 let cancellation_token = CancellationToken::new(); 39 40 // Start cleanup task 41 let db_cleanup = Arc::clone(&self.db); ··· 68 if let JetstreamEvent::Commit { did, time_us: _, kind: _, commit } = event { 69 match commit.collection.as_str() { 70 "app.bsky.feed.post" => { 71 self.handle_post_event(&did, &commit.collection, &commit.rkey, &commit.operation, Some(&commit.record), &commit.cid).await?; 72 } 73 "app.bsky.graph.follow" => { 74 self.handle_follow_event(&did, &commit.collection, &commit.rkey, &commit.operation, Some(&commit.record)).await?; 75 } 76 _ => {} // Ignore other collections ··· 121 .with_timezone(&Utc); 122 123 let post = Post { 124 - uri, 125 cid: cid.to_string(), 126 author_did: did.to_string(), 127 - text, 128 created_at, 129 indexed_at: Utc::now(), 130 }; 131 132 if let Err(e) = self.db.insert_post(&post).await { 133 error!("Failed to insert post: {}", e); 134 } 135 } 136 } ··· 174 .with_timezone(&Utc); 175 176 let follow = Follow { 177 - uri, 178 follower_did: did.to_string(), 179 - target_did, 180 created_at, 181 indexed_at: Utc::now(), 182 }; 183 184 if let Err(e) = self.db.insert_follow(&follow).await { 185 error!("Failed to insert follow: {}", e); 186 } 187 } 188 }
··· 3 use async_trait::async_trait; 4 use chrono::{DateTime, Utc}; 5 use std::sync::Arc; 6 + use tracing::{error, warn, info, debug}; 7 8 use crate::{database::Database, types::{Follow, Post}}; 9 ··· 17 } 18 19 pub async fn start(&self, jetstream_hostname: String) -> Result<()> { 20 + info!("Starting Jetstream consumer, connecting to {}", jetstream_hostname); 21 + 22 let config = ConsumerTaskConfig { 23 user_agent: "following-no-reposts-feed/1.0".to_string(), 24 compression: true, 25 + jetstream_hostname: jetstream_hostname.clone(), 26 zstd_dictionary_location: String::new(), 27 collections: vec![ 28 "app.bsky.feed.post".to_string(), ··· 35 }; 36 37 let consumer = Consumer::new(config); 38 + info!("Registering event handler..."); 39 consumer.register_handler(Arc::new(self.clone())).await?; 40 41 let cancellation_token = CancellationToken::new(); 42 + info!("Starting Jetstream consumer background task..."); 43 44 // Start cleanup task 45 let db_cleanup = Arc::clone(&self.db); ··· 72 if let JetstreamEvent::Commit { did, time_us: _, kind: _, commit } = event { 73 match commit.collection.as_str() { 74 "app.bsky.feed.post" => { 75 + debug!("Received post event from {}", did); 76 self.handle_post_event(&did, &commit.collection, &commit.rkey, &commit.operation, Some(&commit.record), &commit.cid).await?; 77 } 78 "app.bsky.graph.follow" => { 79 + info!("Received follow event: {} -> target", did); 80 self.handle_follow_event(&did, &commit.collection, &commit.rkey, &commit.operation, Some(&commit.record)).await?; 81 } 82 _ => {} // Ignore other collections ··· 127 .with_timezone(&Utc); 128 129 let post = Post { 130 + uri: uri.clone(), 131 cid: cid.to_string(), 132 author_did: did.to_string(), 133 + text: text.clone(), 134 created_at, 135 indexed_at: Utc::now(), 136 }; 137 138 if let Err(e) = self.db.insert_post(&post).await { 139 error!("Failed to insert post: {}", e); 140 + } else { 141 + debug!("Inserted post: {} by {}", uri, did); 142 } 143 } 144 } ··· 182 .with_timezone(&Utc); 183 184 let follow = Follow { 185 + uri: uri.clone(), 186 follower_did: did.to_string(), 187 + target_did: target_did.clone(), 188 created_at, 189 indexed_at: Utc::now(), 190 }; 191 192 if let Err(e) = self.db.insert_follow(&follow).await { 193 error!("Failed to insert follow: {}", e); 194 + } else { 195 + info!("Inserted follow: {} -> {}", did, target_did); 196 } 197 } 198 }
+10 -3
src/publish.rs
··· 93 record, 94 }; 95 96 - client 97 .post(format!("{}/xrpc/com.atproto.repo.putRecord", pds_url)) 98 .header("Authorization", format!("Bearer {}", login_response.access_jwt)) 99 .json(&put_request) 100 .send() 101 - .await? 102 - .error_for_status()?; 103 104 println!("\n✅ Feed published successfully!"); 105 println!("🔗 Feed AT-URI: at://{}/app.bsky.feed.generator/{}", login_response.did, record_name);
··· 93 record, 94 }; 95 96 + let response = client 97 .post(format!("{}/xrpc/com.atproto.repo.putRecord", pds_url)) 98 .header("Authorization", format!("Bearer {}", login_response.access_jwt)) 99 .json(&put_request) 100 .send() 101 + .await?; 102 + 103 + if !response.status().is_success() { 104 + let error_text = response.text().await?; 105 + eprintln!("Error response: {}", error_text); 106 + return Err(anyhow!("Failed to publish feed: {}", error_text)); 107 + } 108 + 109 + response.error_for_status()?; 110 111 println!("\n✅ Feed published successfully!"); 112 println!("🔗 Feed AT-URI: at://{}/app.bsky.feed.generator/{}", login_response.did, record_name);