Noreposts Feed

Setup pre-commit hooks with rustfmt and clippy

- Add .pre-commit-config.yaml with fmt and clippy hooks
- Fix all clippy warnings (use first().copied(), allow dead_code)
- Remove unused imports
- Format all code with rustfmt

+257 -134
+8
.pre-commit-config.yaml
···
··· 1 + repos: 2 + - repo: https://github.com/doublify/pre-commit-rust 3 + rev: v1.0 4 + hooks: 5 + - id: fmt 6 + args: ['--all', '--', '--check'] 7 + - id: clippy 8 + args: ['--all-targets', '--all-features', '--', '-D', 'warnings']
+47 -21
src/admin_socket.rs
··· 3 use std::sync::Arc; 4 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 5 use tokio::net::{UnixListener, UnixStream}; 6 - use tracing::{info, warn, error}; 7 8 use crate::{backfill, database::Database}; 9 ··· 57 let mut line = String::new(); 58 59 writer.write_all(b"Feed Generator Admin Console\n").await?; 60 - writer.write_all(b"Commands: backfill <did>, stats, help, quit\n> ").await?; 61 writer.flush().await?; 62 63 loop { ··· 77 78 let parts: Vec<&str> = command.split_whitespace().collect(); 79 80 - match parts.get(0).map(|s| *s) { 81 Some("backfill") => { 82 if let Some(did) = parts.get(1) { 83 - writer.write_all(format!("Starting backfill for {}...\n", did).as_bytes()).await?; 84 writer.flush().await?; 85 86 // First backfill follows 87 match backfill::backfill_follows(Arc::clone(&db), did).await { 88 Ok(_) => { 89 - writer.write_all(b"Follows backfilled successfully\n").await?; 90 } 91 Err(e) => { 92 - writer.write_all(format!("Follow backfill failed: {}\n", e).as_bytes()).await?; 93 writer.write_all(b"> ").await?; 94 writer.flush().await?; 95 continue; ··· 105 writer.write_all(b"Posts backfilled successfully\n").await?; 106 } 107 Err(e) => { 108 - writer.write_all(format!("Post backfill failed: {}\n", e).as_bytes()).await?; 109 } 110 } 111 } else { 112 writer.write_all(b"Usage: backfill <did>\n").await?; 113 } 114 } 115 - Some("stats") => { 116 - match get_stats(&db).await { 117 - Ok(stats) => { 118 - writer.write_all(stats.as_bytes()).await?; 119 - } 120 - Err(e) => { 121 - writer.write_all(format!("Failed to get stats: {}\n", e).as_bytes()).await?; 122 - } 123 } 124 - } 125 Some("help") => { 126 writer.write_all(b"Available commands:\n").await?; 127 - writer.write_all(b" backfill <did> - Backfill follows and posts for a user\n").await?; 128 - writer.write_all(b" stats - Show database statistics\n").await?; 129 - writer.write_all(b" help - Show this help message\n").await?; 130 - writer.write_all(b" quit - Close connection\n").await?; 131 } 132 Some("quit") | Some("exit") => { 133 writer.write_all(b"Goodbye!\n").await?; ··· 135 break; 136 } 137 _ => { 138 - writer.write_all(format!("Unknown command: {}. Type 'help' for available commands.\n", command).as_bytes()).await?; 139 } 140 } 141
··· 3 use std::sync::Arc; 4 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 5 use tokio::net::{UnixListener, UnixStream}; 6 + use tracing::{error, info, warn}; 7 8 use crate::{backfill, database::Database}; 9 ··· 57 let mut line = String::new(); 58 59 writer.write_all(b"Feed Generator Admin Console\n").await?; 60 + writer 61 + .write_all(b"Commands: backfill <did>, stats, help, quit\n> ") 62 + .await?; 63 writer.flush().await?; 64 65 loop { ··· 79 80 let parts: Vec<&str> = command.split_whitespace().collect(); 81 82 + match parts.first().copied() { 83 Some("backfill") => { 84 if let Some(did) = parts.get(1) { 85 + writer 86 + .write_all(format!("Starting backfill for {}...\n", did).as_bytes()) 87 + .await?; 88 writer.flush().await?; 89 90 // First backfill follows 91 match backfill::backfill_follows(Arc::clone(&db), did).await { 92 Ok(_) => { 93 + writer 94 + .write_all(b"Follows backfilled successfully\n") 95 + .await?; 96 } 97 Err(e) => { 98 + writer 99 + .write_all(format!("Follow backfill failed: {}\n", e).as_bytes()) 100 + .await?; 101 writer.write_all(b"> ").await?; 102 writer.flush().await?; 103 continue; ··· 113 writer.write_all(b"Posts backfilled successfully\n").await?; 114 } 115 Err(e) => { 116 + writer 117 + .write_all(format!("Post backfill failed: {}\n", e).as_bytes()) 118 + .await?; 119 } 120 } 121 } else { 122 writer.write_all(b"Usage: backfill <did>\n").await?; 123 } 124 } 125 + Some("stats") => match get_stats(&db).await { 126 + Ok(stats) => { 127 + writer.write_all(stats.as_bytes()).await?; 128 } 129 + Err(e) => { 130 + writer 131 + .write_all(format!("Failed to get stats: {}\n", e).as_bytes()) 132 + .await?; 133 + } 134 + }, 135 Some("help") => { 136 writer.write_all(b"Available commands:\n").await?; 137 + writer 138 + .write_all(b" backfill <did> - Backfill follows and posts for a user\n") 139 + .await?; 140 + writer 141 + .write_all(b" stats - Show database statistics\n") 142 + .await?; 143 + writer 144 + .write_all(b" help - Show this help message\n") 145 + .await?; 146 + writer 147 + .write_all(b" quit - Close connection\n") 148 + .await?; 149 } 150 Some("quit") | Some("exit") => { 151 writer.write_all(b"Goodbye!\n").await?; ··· 153 break; 154 } 155 _ => { 156 + writer 157 + .write_all( 158 + format!( 159 + "Unknown command: {}. Type 'help' for available commands.\n", 160 + command 161 + ) 162 + .as_bytes(), 163 + ) 164 + .await?; 165 } 166 } 167
+42 -32
src/auth.rs
··· 1 use anyhow::{anyhow, Result}; 2 use jwt_compact::UntrustedToken; 3 - use serde::Deserialize; 4 use tracing::{debug, warn}; 5 6 use crate::types::JwtClaims; 7 8 - // Empty custom claims - all required fields are in standard JWT claims 9 - #[derive(Debug, Deserialize)] 10 - struct EmptyCustomClaims {} 11 - 12 - // Standard JWT claims structure as expected by jwt-compact 13 - #[derive(Debug, Deserialize)] 14 - struct StandardClaims { 15 - #[serde(rename = "iss")] 16 - issuer: Option<String>, 17 - #[serde(rename = "aud")] 18 - audience: Option<String>, 19 - #[serde(rename = "exp")] 20 - expiration: Option<i64>, 21 - } 22 23 pub fn validate_jwt(token: &str, service_did: &str) -> Result<JwtClaims> { 24 // Token should already have "Bearer " prefix stripped by caller ··· 26 debug!("Expected audience: {}", service_did); 27 28 // Parse the untrusted token to extract claims without verification 29 - let untrusted = UntrustedToken::new(token) 30 - .map_err(|e| { 31 - warn!("Failed to parse JWT: {}", e); 32 - anyhow!("Invalid JWT format: {}", e) 33 - })?; 34 35 // First, try to deserialize as raw JSON to see the actual structure 36 - let claims_wrapper = untrusted.deserialize_claims_unchecked::<serde_json::Value>() 37 .map_err(|e| { 38 warn!("Failed to deserialize JWT claims: {}", e); 39 anyhow!("Invalid JWT claims: {}", e) ··· 42 debug!("Raw JWT claims: {:?}", claims_wrapper); 43 44 // Extract the actual claims from the Value 45 - let iss = claims_wrapper.custom.get("iss") 46 .and_then(|v| v.as_str()) 47 .ok_or_else(|| anyhow!("Missing 'iss' claim"))? 48 .to_string(); 49 50 - let aud = claims_wrapper.custom.get("aud") 51 .and_then(|v| v.as_str()) 52 .ok_or_else(|| anyhow!("Missing 'aud' claim"))? 53 .to_string(); 54 55 - let exp = claims_wrapper.custom.get("exp") 56 .and_then(|v| v.as_i64()) 57 .or_else(|| claims_wrapper.expiration.map(|ts| ts.timestamp())) 58 .ok_or_else(|| anyhow!("Missing 'exp' claim"))?; 59 60 - debug!("JWT claims extracted - issuer: {}, audience: {}, exp: {}", iss, aud, exp); 61 62 // Validate audience 63 if aud != service_did { 64 - warn!("JWT audience mismatch: expected {}, got {}", service_did, aud); 65 return Err(anyhow!("Invalid JWT audience")); 66 } 67 ··· 95 96 // 1. Decode JWT header to get the signing key ID 97 let header = decode_header(token)?; 98 - 99 // 2. Extract issuer DID from token payload (without verification) 100 let mut validation = Validation::new(Algorithm::ES256K); 101 validation.insecure_disable_signature_validation(); 102 let temp_decode = decode::<JwtClaims>(token, &DecodingKey::from_secret(b"temp"), &validation)?; 103 let issuer_did = temp_decode.claims.iss; 104 - 105 // 3. Fetch DID document for the issuer 106 let did_doc = fetch_did_document(&issuer_did).await?; 107 - 108 // 4. Extract the appropriate verification key 109 let verification_key = extract_verification_key(&did_doc, &header.kid)?; 110 - 111 // 5. Validate the JWT with the real key 112 let mut validation = Validation::new(Algorithm::ES256K); 113 validation.validate_exp = true; 114 validation.set_audience(&[service_did]); 115 - 116 let decoding_key = DecodingKey::from_ec_pem(&verification_key)?; 117 let token_data = decode::<JwtClaims>(token, &decoding_key, &validation)?; 118 - 119 Ok(token_data.claims) 120 } 121 */
··· 1 use anyhow::{anyhow, Result}; 2 use jwt_compact::UntrustedToken; 3 use tracing::{debug, warn}; 4 5 use crate::types::JwtClaims; 6 7 + // Unused structs kept for reference if needed in future 8 + // #[derive(Debug, Deserialize)] 9 + // struct EmptyCustomClaims {} 10 + // 11 + // #[derive(Debug, Deserialize)] 12 + // struct StandardClaims { 13 + // #[serde(rename = "iss")] 14 + // issuer: Option<String>, 15 + // #[serde(rename = "aud")] 16 + // audience: Option<String>, 17 + // #[serde(rename = "exp")] 18 + // expiration: Option<i64>, 19 + // } 20 21 pub fn validate_jwt(token: &str, service_did: &str) -> Result<JwtClaims> { 22 // Token should already have "Bearer " prefix stripped by caller ··· 24 debug!("Expected audience: {}", service_did); 25 26 // Parse the untrusted token to extract claims without verification 27 + let untrusted = UntrustedToken::new(token).map_err(|e| { 28 + warn!("Failed to parse JWT: {}", e); 29 + anyhow!("Invalid JWT format: {}", e) 30 + })?; 31 32 // First, try to deserialize as raw JSON to see the actual structure 33 + let claims_wrapper = untrusted 34 + .deserialize_claims_unchecked::<serde_json::Value>() 35 .map_err(|e| { 36 warn!("Failed to deserialize JWT claims: {}", e); 37 anyhow!("Invalid JWT claims: {}", e) ··· 40 debug!("Raw JWT claims: {:?}", claims_wrapper); 41 42 // Extract the actual claims from the Value 43 + let iss = claims_wrapper 44 + .custom 45 + .get("iss") 46 .and_then(|v| v.as_str()) 47 .ok_or_else(|| anyhow!("Missing 'iss' claim"))? 48 .to_string(); 49 50 + let aud = claims_wrapper 51 + .custom 52 + .get("aud") 53 .and_then(|v| v.as_str()) 54 .ok_or_else(|| anyhow!("Missing 'aud' claim"))? 55 .to_string(); 56 57 + let exp = claims_wrapper 58 + .custom 59 + .get("exp") 60 .and_then(|v| v.as_i64()) 61 .or_else(|| claims_wrapper.expiration.map(|ts| ts.timestamp())) 62 .ok_or_else(|| anyhow!("Missing 'exp' claim"))?; 63 64 + debug!( 65 + "JWT claims extracted - issuer: {}, audience: {}, exp: {}", 66 + iss, aud, exp 67 + ); 68 69 // Validate audience 70 if aud != service_did { 71 + warn!( 72 + "JWT audience mismatch: expected {}, got {}", 73 + service_did, aud 74 + ); 75 return Err(anyhow!("Invalid JWT audience")); 76 } 77 ··· 105 106 // 1. Decode JWT header to get the signing key ID 107 let header = decode_header(token)?; 108 + 109 // 2. Extract issuer DID from token payload (without verification) 110 let mut validation = Validation::new(Algorithm::ES256K); 111 validation.insecure_disable_signature_validation(); 112 let temp_decode = decode::<JwtClaims>(token, &DecodingKey::from_secret(b"temp"), &validation)?; 113 let issuer_did = temp_decode.claims.iss; 114 + 115 // 3. Fetch DID document for the issuer 116 let did_doc = fetch_did_document(&issuer_did).await?; 117 + 118 // 4. Extract the appropriate verification key 119 let verification_key = extract_verification_key(&did_doc, &header.kid)?; 120 + 121 // 5. Validate the JWT with the real key 122 let mut validation = Validation::new(Algorithm::ES256K); 123 validation.validate_exp = true; 124 validation.set_audience(&[service_did]); 125 + 126 let decoding_key = DecodingKey::from_ec_pem(&verification_key)?; 127 let token_data = decode::<JwtClaims>(token, &decoding_key, &validation)?; 128 + 129 Ok(token_data.claims) 130 } 131 */
+35 -18
src/backfill.rs
··· 2 use chrono::{DateTime, Utc}; 3 use sqlx::Row; 4 use std::sync::Arc; 5 - use tracing::{info, warn, debug}; 6 7 - use crate::{database::Database, types::{Follow, Post}}; 8 9 pub async fn backfill_follows(db: Arc<Database>, user_did: &str) -> Result<()> { 10 info!("Starting backfill of follows for {}", user_did); ··· 14 let mut total_follows = 0; 15 16 loop { 17 - let mut url = format!("https://public.api.bsky.app/xrpc/app.bsky.graph.getFollows?actor={}&limit=100", user_did); 18 if let Some(ref c) = cursor { 19 url.push_str(&format!("&cursor={}", c)); 20 } 21 22 - let response: serde_json::Value = client.get(&url) 23 - .send() 24 - .await? 25 - .json() 26 - .await?; 27 28 let follows = response["follows"].as_array(); 29 if follows.is_none() { ··· 37 } 38 39 let follow_record = Follow { 40 - uri: format!("at://{}/app.bsky.graph.follow/{}", user_did, uuid::Uuid::new_v4()), 41 follower_did: user_did.to_string(), 42 target_did: target_did.to_string(), 43 created_at: chrono::Utc::now(), ··· 69 let mut fetched = 0; 70 71 loop { 72 - let mut url = format!("https://public.api.bsky.app/xrpc/app.bsky.feed.getAuthorFeed?actor={}&limit=100", target_did); 73 if let Some(ref c) = cursor { 74 url.push_str(&format!("&cursor={}", c)); 75 } 76 77 - let response: serde_json::Value = client.get(&url) 78 - .send() 79 - .await? 80 - .json() 81 - .await?; 82 83 let feed = response["feed"].as_array(); 84 if feed.is_none() { ··· 128 129 fetched += 1; 130 if fetched >= limit { 131 - debug!("Backfilled {} posts for {} (limit reached)", total_posts, target_did); 132 return Ok(()); 133 } 134 } ··· 143 Ok(()) 144 } 145 146 - pub async fn backfill_posts_for_follows(db: Arc<Database>, user_did: &str, posts_per_user: usize) -> Result<()> { 147 info!("Starting backfill of posts for {}'s follows", user_did); 148 149 // Get all follows for this user ··· 158 for (idx, row) in follows.iter().enumerate() { 159 let target_did: String = row.try_get("target_did")?; 160 161 - debug!("Backfilling posts from {} ({}/{})", target_did, idx + 1, total_follows); 162 163 if let Err(e) = backfill_posts(Arc::clone(&db), &target_did, posts_per_user).await { 164 warn!("Failed to backfill posts from {}: {}", target_did, e);
··· 2 use chrono::{DateTime, Utc}; 3 use sqlx::Row; 4 use std::sync::Arc; 5 + use tracing::{debug, info, warn}; 6 7 + use crate::{ 8 + database::Database, 9 + types::{Follow, Post}, 10 + }; 11 12 pub async fn backfill_follows(db: Arc<Database>, user_did: &str) -> Result<()> { 13 info!("Starting backfill of follows for {}", user_did); ··· 17 let mut total_follows = 0; 18 19 loop { 20 + let mut url = format!( 21 + "https://public.api.bsky.app/xrpc/app.bsky.graph.getFollows?actor={}&limit=100", 22 + user_did 23 + ); 24 if let Some(ref c) = cursor { 25 url.push_str(&format!("&cursor={}", c)); 26 } 27 28 + let response: serde_json::Value = client.get(&url).send().await?.json().await?; 29 30 let follows = response["follows"].as_array(); 31 if follows.is_none() { ··· 39 } 40 41 let follow_record = Follow { 42 + uri: format!( 43 + "at://{}/app.bsky.graph.follow/{}", 44 + user_did, 45 + uuid::Uuid::new_v4() 46 + ), 47 follower_did: user_did.to_string(), 48 target_did: target_did.to_string(), 49 created_at: chrono::Utc::now(), ··· 75 let mut fetched = 0; 76 77 loop { 78 + let mut url = format!( 79 + "https://public.api.bsky.app/xrpc/app.bsky.feed.getAuthorFeed?actor={}&limit=100", 80 + target_did 81 + ); 82 if let Some(ref c) = cursor { 83 url.push_str(&format!("&cursor={}", c)); 84 } 85 86 + let response: serde_json::Value = client.get(&url).send().await?.json().await?; 87 88 let feed = response["feed"].as_array(); 89 if feed.is_none() { ··· 133 134 fetched += 1; 135 if fetched >= limit { 136 + debug!( 137 + "Backfilled {} posts for {} (limit reached)", 138 + total_posts, target_did 139 + ); 140 return Ok(()); 141 } 142 } ··· 151 Ok(()) 152 } 153 154 + pub async fn backfill_posts_for_follows( 155 + db: Arc<Database>, 156 + user_did: &str, 157 + posts_per_user: usize, 158 + ) -> Result<()> { 159 info!("Starting backfill of posts for {}'s follows", user_did); 160 161 // Get all follows for this user ··· 170 for (idx, row) in follows.iter().enumerate() { 171 let target_did: String = row.try_get("target_did")?; 172 173 + debug!( 174 + "Backfilling posts from {} ({}/{})", 175 + target_did, 176 + idx + 1, 177 + total_follows 178 + ); 179 180 if let Err(e) = backfill_posts(Arc::clone(&db), &target_did, posts_per_user).await { 181 warn!("Failed to backfill posts from {}: {}", target_did, e);
+13 -9
src/database.rs
··· 1 use anyhow::Result; 2 use chrono::{DateTime, Utc}; 3 - use sqlx::{SqlitePool, Row}; 4 5 use crate::types::{Follow, Post}; 6 ··· 25 r#" 26 INSERT OR REPLACE INTO posts (uri, cid, author_did, text, created_at, indexed_at) 27 VALUES (?, ?, ?, ?, ?, ?) 28 - "# 29 ) 30 .bind(&post.uri) 31 .bind(&post.cid) ··· 52 r#" 53 INSERT OR REPLACE INTO follows (uri, follower_did, target_did, created_at, indexed_at) 54 VALUES (?, ?, ?, ?, ?) 55 - "# 56 ) 57 .bind(&follow.uri) 58 .bind(&follow.follower_did) ··· 93 AND p.created_at < ? 94 ORDER BY p.created_at DESC 95 LIMIT ? 96 - "# 97 ) 98 .bind(follower_did) 99 .bind(cursor_time.to_rfc3339()) ··· 132 Ok(()) 133 } 134 135 pub async fn is_following(&self, follower_did: &str, target_did: &str) -> Result<bool> { 136 - let row = sqlx::query("SELECT COUNT(*) as count FROM follows WHERE follower_did = ? AND target_did = ?") 137 - .bind(follower_did) 138 - .bind(target_did) 139 - .fetch_one(&self.pool) 140 - .await?; 141 142 let count: i64 = row.try_get("count")?; 143 Ok(count > 0)
··· 1 use anyhow::Result; 2 use chrono::{DateTime, Utc}; 3 + use sqlx::{Row, SqlitePool}; 4 5 use crate::types::{Follow, Post}; 6 ··· 25 r#" 26 INSERT OR REPLACE INTO posts (uri, cid, author_did, text, created_at, indexed_at) 27 VALUES (?, ?, ?, ?, ?, ?) 28 + "#, 29 ) 30 .bind(&post.uri) 31 .bind(&post.cid) ··· 52 r#" 53 INSERT OR REPLACE INTO follows (uri, follower_did, target_did, created_at, indexed_at) 54 VALUES (?, ?, ?, ?, ?) 55 + "#, 56 ) 57 .bind(&follow.uri) 58 .bind(&follow.follower_did) ··· 93 AND p.created_at < ? 94 ORDER BY p.created_at DESC 95 LIMIT ? 96 + "#, 97 ) 98 .bind(follower_did) 99 .bind(cursor_time.to_rfc3339()) ··· 132 Ok(()) 133 } 134 135 + // Unused but kept for potential future use 136 + #[allow(dead_code)] 137 pub async fn is_following(&self, follower_did: &str, target_did: &str) -> Result<bool> { 138 + let row = sqlx::query( 139 + "SELECT COUNT(*) as count FROM follows WHERE follower_did = ? AND target_did = ?", 140 + ) 141 + .bind(follower_did) 142 + .bind(target_did) 143 + .fetch_one(&self.pool) 144 + .await?; 145 146 let count: i64 = row.try_get("count")?; 147 Ok(count > 0)
+1 -3
src/feed_algorithm.rs
··· 50 .collect(); 51 52 // Generate cursor for pagination (use created_at for chronological order) 53 - let cursor = posts 54 - .last() 55 - .map(|post| post.created_at.to_rfc3339()); 56 57 Ok(FeedSkeletonResponse { 58 cursor,
··· 50 .collect(); 51 52 // Generate cursor for pagination (use created_at for chronological order) 53 + let cursor = posts.last().map(|post| post.created_at.to_rfc3339()); 54 55 Ok(FeedSkeletonResponse { 56 cursor,
+18 -6
src/jetstream_consumer.rs
··· 8 use tracing::{error, info, warn}; 9 use url::Url; 10 11 - use crate::{database::Database, types::{Follow, Post}}; 12 13 pub struct JetstreamEventHandler { 14 db: Arc<Database>, ··· 20 } 21 22 pub async fn start(&self, jetstream_hostname: String) -> Result<()> { 23 - let wanted_collections = "wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.graph.follow"; 24 - let ws_url = format!("wss://{}/subscribe?{}", jetstream_hostname, wanted_collections); 25 26 info!("Connecting to Jetstream at {}", ws_url); 27 ··· 50 } 51 } 52 Err(e) => { 53 - error!("Failed to connect to Jetstream: {}. Reconnecting in 5 seconds...", e); 54 } 55 } 56 ··· 63 64 match event { 65 JetstreamEvent::Commit { did, commit, .. } => { 66 - info!("Received commit event: did={}, collection={}, operation={}", 67 - did, commit.collection, commit.operation); 68 69 match commit.collection.as_str() { 70 "app.bsky.feed.post" => {
··· 8 use tracing::{error, info, warn}; 9 use url::Url; 10 11 + use crate::{ 12 + database::Database, 13 + types::{Follow, Post}, 14 + }; 15 16 pub struct JetstreamEventHandler { 17 db: Arc<Database>, ··· 23 } 24 25 pub async fn start(&self, jetstream_hostname: String) -> Result<()> { 26 + let wanted_collections = 27 + "wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.graph.follow"; 28 + let ws_url = format!( 29 + "wss://{}/subscribe?{}", 30 + jetstream_hostname, wanted_collections 31 + ); 32 33 info!("Connecting to Jetstream at {}", ws_url); 34 ··· 57 } 58 } 59 Err(e) => { 60 + error!( 61 + "Failed to connect to Jetstream: {}. Reconnecting in 5 seconds...", 62 + e 63 + ); 64 } 65 } 66 ··· 73 74 match event { 75 JetstreamEvent::Commit { did, commit, .. } => { 76 + info!( 77 + "Received commit event: did={}, collection={}, operation={}", 78 + did, commit.collection, commit.operation 79 + ); 80 81 match commit.collection.as_str() { 82 "app.bsky.feed.post" => {
+75 -38
src/main.rs
··· 1 use anyhow::Result; 2 use axum::{ 3 extract::{Query, State}, 4 - http::{StatusCode, HeaderMap}, 5 - response::{Json, IntoResponse, Response}, 6 routing::get, 7 Router, 8 }; ··· 23 mod types; 24 25 use crate::{ 26 - admin_socket::AdminSocket, 27 - auth::validate_jwt, 28 - database::Database, 29 - feed_algorithm::FollowingNoRepostsFeed, 30 - jetstream_consumer::JetstreamEventHandler, 31 - types::*, 32 }; 33 34 #[derive(Parser)] ··· 50 #[arg(long, env = "FEEDGEN_SERVICE_DID")] 51 service_did: Option<String>, 52 53 - #[arg(long, env = "JETSTREAM_HOSTNAME", default_value = "jetstream1.us-east.bsky.network")] 54 jetstream_hostname: String, 55 56 - #[arg(long, env = "ADMIN_SOCKET", default_value = "/var/run/noreposts-feed.sock")] 57 admin_socket: String, 58 } 59 ··· 84 } 85 86 // Default to serve mode 87 - let service_did = args.service_did 88 .or_else(|| args.hostname.clone().map(|h| format!("did:web:{}", h))) 89 .expect("FEEDGEN_SERVICE_DID or FEEDGEN_HOSTNAME must be set"); 90 ··· 124 loop { 125 info!("Starting Jetstream consumer..."); 126 if let Err(e) = event_handler.start(jetstream_hostname.clone()).await { 127 - warn!("Jetstream consumer error: {}. Reconnecting in 5 seconds...", e); 128 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 129 } else { 130 // Consumer stopped without error, wait before restarting ··· 138 let app = Router::new() 139 .route("/", get(root)) 140 .route("/.well-known/did.json", get(did_document)) 141 - .route("/xrpc/app.bsky.feed.getFeedSkeleton", get(get_feed_skeleton)) 142 .layer(CorsLayer::permissive()) 143 .with_state(app_state); 144 ··· 160 service: vec![ServiceEndpoint { 161 id: "#bsky_fg".to_string(), 162 service_type: "BskyFeedGenerator".to_string(), 163 - service_endpoint: format!("https://{}", std::env::var("FEEDGEN_HOSTNAME").unwrap_or_default()), 164 }], 165 }) 166 } ··· 181 StatusCode::UNAUTHORIZED, 182 Json(types::ErrorResponse { 183 error: "AuthenticationRequired".to_string(), 184 - message: "This feed shows posts from accounts you follow and requires authentication".to_string(), 185 - }) 186 - ).into_response(); 187 } 188 }; 189 ··· 196 Json(types::ErrorResponse { 197 error: "AuthenticationRequired".to_string(), 198 message: "Invalid authorization header format".to_string(), 199 - }) 200 - ).into_response(); 201 } 202 }; 203 ··· 209 Ok(claims) => { 210 info!("Authenticated request from DID: {}", claims.iss); 211 claims.iss 212 - }, 213 Err(e) => { 214 warn!("JWT validation failed: {}", e); 215 return ( ··· 217 Json(types::ErrorResponse { 218 error: "AuthenticationRequired".to_string(), 219 message: format!("JWT validation failed: {}", e), 220 - }) 221 - ).into_response(); 222 } 223 }; 224 ··· 227 let requester_did_clone = requester_did.clone(); 228 tokio::spawn(async move { 229 // Check if we have any follows for this user 230 - let has_follows = sqlx::query("SELECT COUNT(*) as count FROM follows WHERE follower_did = ?") 231 - .bind(&requester_did_clone) 232 - .fetch_one(&db_for_backfill.pool) 233 - .await 234 - .ok() 235 - .and_then(|row| row.try_get::<i64, _>("count").ok()) 236 - .unwrap_or(0); 237 238 if has_follows == 0 { 239 - info!("No follows found for {}, triggering backfill", requester_did_clone); 240 241 // First backfill follows 242 - if let Err(e) = backfill::backfill_follows(Arc::clone(&db_for_backfill), &requester_did_clone).await { 243 warn!("Follow backfill failed for {}: {}", requester_did_clone, e); 244 return; 245 } 246 247 // Then backfill recent posts from each follow (10 posts per user) 248 info!("Starting post backfill for {}", requester_did_clone); 249 - if let Err(e) = backfill::backfill_posts_for_follows(Arc::clone(&db_for_backfill), &requester_did_clone, 10).await { 250 warn!("Post backfill failed for {}: {}", requester_did_clone, e); 251 } 252 } ··· 254 255 let feed_algorithm = FollowingNoRepostsFeed::new(Arc::clone(&state.db)); 256 257 - info!("Generating feed for requester: {}, limit: {:?}, cursor: {:?}", 258 - requester_did, params.limit, params.cursor); 259 260 match feed_algorithm 261 .generate_feed(Some(requester_did), params.limit, params.cursor) 262 .await 263 { 264 Ok(response) => { 265 - info!("Successfully generated feed with {} posts", response.feed.len()); 266 Json(response).into_response() 267 - }, 268 Err(e) => { 269 warn!("Feed generation error: {}", e); 270 ( ··· 272 Json(types::ErrorResponse { 273 error: "InternalServerError".to_string(), 274 message: format!("Failed to generate feed: {}", e), 275 - }) 276 - ).into_response() 277 } 278 } 279 }
··· 1 use anyhow::Result; 2 use axum::{ 3 extract::{Query, State}, 4 + http::{HeaderMap, StatusCode}, 5 + response::{IntoResponse, Json, Response}, 6 routing::get, 7 Router, 8 }; ··· 23 mod types; 24 25 use crate::{ 26 + admin_socket::AdminSocket, auth::validate_jwt, database::Database, 27 + feed_algorithm::FollowingNoRepostsFeed, jetstream_consumer::JetstreamEventHandler, types::*, 28 }; 29 30 #[derive(Parser)] ··· 46 #[arg(long, env = "FEEDGEN_SERVICE_DID")] 47 service_did: Option<String>, 48 49 + #[arg( 50 + long, 51 + env = "JETSTREAM_HOSTNAME", 52 + default_value = "jetstream1.us-east.bsky.network" 53 + )] 54 jetstream_hostname: String, 55 56 + #[arg( 57 + long, 58 + env = "ADMIN_SOCKET", 59 + default_value = "/var/run/noreposts-feed.sock" 60 + )] 61 admin_socket: String, 62 } 63 ··· 88 } 89 90 // Default to serve mode 91 + let service_did = args 92 + .service_did 93 .or_else(|| args.hostname.clone().map(|h| format!("did:web:{}", h))) 94 .expect("FEEDGEN_SERVICE_DID or FEEDGEN_HOSTNAME must be set"); 95 ··· 129 loop { 130 info!("Starting Jetstream consumer..."); 131 if let Err(e) = event_handler.start(jetstream_hostname.clone()).await { 132 + warn!( 133 + "Jetstream consumer error: {}. Reconnecting in 5 seconds...", 134 + e 135 + ); 136 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 137 } else { 138 // Consumer stopped without error, wait before restarting ··· 146 let app = Router::new() 147 .route("/", get(root)) 148 .route("/.well-known/did.json", get(did_document)) 149 + .route( 150 + "/xrpc/app.bsky.feed.getFeedSkeleton", 151 + get(get_feed_skeleton), 152 + ) 153 .layer(CorsLayer::permissive()) 154 .with_state(app_state); 155 ··· 171 service: vec![ServiceEndpoint { 172 id: "#bsky_fg".to_string(), 173 service_type: "BskyFeedGenerator".to_string(), 174 + service_endpoint: format!( 175 + "https://{}", 176 + std::env::var("FEEDGEN_HOSTNAME").unwrap_or_default() 177 + ), 178 }], 179 }) 180 } ··· 195 StatusCode::UNAUTHORIZED, 196 Json(types::ErrorResponse { 197 error: "AuthenticationRequired".to_string(), 198 + message: 199 + "This feed shows posts from accounts you follow and requires authentication" 200 + .to_string(), 201 + }), 202 + ) 203 + .into_response(); 204 } 205 }; 206 ··· 213 Json(types::ErrorResponse { 214 error: "AuthenticationRequired".to_string(), 215 message: "Invalid authorization header format".to_string(), 216 + }), 217 + ) 218 + .into_response(); 219 } 220 }; 221 ··· 227 Ok(claims) => { 228 info!("Authenticated request from DID: {}", claims.iss); 229 claims.iss 230 + } 231 Err(e) => { 232 warn!("JWT validation failed: {}", e); 233 return ( ··· 235 Json(types::ErrorResponse { 236 error: "AuthenticationRequired".to_string(), 237 message: format!("JWT validation failed: {}", e), 238 + }), 239 + ) 240 + .into_response(); 241 } 242 }; 243 ··· 246 let requester_did_clone = requester_did.clone(); 247 tokio::spawn(async move { 248 // Check if we have any follows for this user 249 + let has_follows = 250 + sqlx::query("SELECT COUNT(*) as count FROM follows WHERE follower_did = ?") 251 + .bind(&requester_did_clone) 252 + .fetch_one(&db_for_backfill.pool) 253 + .await 254 + .ok() 255 + .and_then(|row| row.try_get::<i64, _>("count").ok()) 256 + .unwrap_or(0); 257 258 if has_follows == 0 { 259 + info!( 260 + "No follows found for {}, triggering backfill", 261 + requester_did_clone 262 + ); 263 264 // First backfill follows 265 + if let Err(e) = 266 + backfill::backfill_follows(Arc::clone(&db_for_backfill), &requester_did_clone).await 267 + { 268 warn!("Follow backfill failed for {}: {}", requester_did_clone, e); 269 return; 270 } 271 272 // Then backfill recent posts from each follow (10 posts per user) 273 info!("Starting post backfill for {}", requester_did_clone); 274 + if let Err(e) = backfill::backfill_posts_for_follows( 275 + Arc::clone(&db_for_backfill), 276 + &requester_did_clone, 277 + 10, 278 + ) 279 + .await 280 + { 281 warn!("Post backfill failed for {}: {}", requester_did_clone, e); 282 } 283 } ··· 285 286 let feed_algorithm = FollowingNoRepostsFeed::new(Arc::clone(&state.db)); 287 288 + info!( 289 + "Generating feed for requester: {}, limit: {:?}, cursor: {:?}", 290 + requester_did, params.limit, params.cursor 291 + ); 292 293 match feed_algorithm 294 .generate_feed(Some(requester_did), params.limit, params.cursor) 295 .await 296 { 297 Ok(response) => { 298 + info!( 299 + "Successfully generated feed with {} posts", 300 + response.feed.len() 301 + ); 302 Json(response).into_response() 303 + } 304 Err(e) => { 305 warn!("Feed generation error: {}", e); 306 ( ··· 308 Json(types::ErrorResponse { 309 error: "InternalServerError".to_string(), 310 message: format!("Failed to generate feed: {}", e), 311 + }), 312 + ) 313 + .into_response() 314 } 315 } 316 }
+18 -6
src/publish.rs
··· 52 dotenvy::dotenv().ok(); 53 let feedgen_service_did = std::env::var("FEEDGEN_SERVICE_DID") 54 .or_else(|_| { 55 - std::env::var("FEEDGEN_HOSTNAME") 56 - .map(|hostname| format!("did:web:{}", hostname)) 57 }) 58 .map_err(|_| anyhow!("Please set FEEDGEN_SERVICE_DID or FEEDGEN_HOSTNAME in .env file"))?; 59 ··· 81 record_type: "app.bsky.feed.generator".to_string(), 82 did: feedgen_service_did, 83 display_name, 84 - description: if description.is_empty() { None } else { Some(description) }, 85 created_at: chrono::Utc::now().to_rfc3339(), 86 }; 87 ··· 95 96 let response = client 97 .post(format!("{}/xrpc/com.atproto.repo.putRecord", pds_url)) 98 - .header("Authorization", format!("Bearer {}", login_response.access_jwt)) 99 .json(&put_request) 100 .send() 101 .await?; ··· 109 response.error_for_status()?; 110 111 println!("\n✅ Feed published successfully!"); 112 - println!("🔗 Feed AT-URI: at://{}/app.bsky.feed.generator/{}", login_response.did, record_name); 113 println!("\n🌐 You can view your feed at:"); 114 - println!(" https://bsky.app/profile/{}/feed/{}", login_response.handle, record_name); 115 println!("\nYou can now find and share your feed in the Bluesky app!"); 116 117 Ok(())
··· 52 dotenvy::dotenv().ok(); 53 let feedgen_service_did = std::env::var("FEEDGEN_SERVICE_DID") 54 .or_else(|_| { 55 + std::env::var("FEEDGEN_HOSTNAME").map(|hostname| format!("did:web:{}", hostname)) 56 }) 57 .map_err(|_| anyhow!("Please set FEEDGEN_SERVICE_DID or FEEDGEN_HOSTNAME in .env file"))?; 58 ··· 80 record_type: "app.bsky.feed.generator".to_string(), 81 did: feedgen_service_did, 82 display_name, 83 + description: if description.is_empty() { 84 + None 85 + } else { 86 + Some(description) 87 + }, 88 created_at: chrono::Utc::now().to_rfc3339(), 89 }; 90 ··· 98 99 let response = client 100 .post(format!("{}/xrpc/com.atproto.repo.putRecord", pds_url)) 101 + .header( 102 + "Authorization", 103 + format!("Bearer {}", login_response.access_jwt), 104 + ) 105 .json(&put_request) 106 .send() 107 .await?; ··· 115 response.error_for_status()?; 116 117 println!("\n✅ Feed published successfully!"); 118 + println!( 119 + "🔗 Feed AT-URI: at://{}/app.bsky.feed.generator/{}", 120 + login_response.did, record_name 121 + ); 122 println!("\n🌐 You can view your feed at:"); 123 + println!( 124 + " https://bsky.app/profile/{}/feed/{}", 125 + login_response.handle, record_name 126 + ); 127 println!("\nYou can now find and share your feed in the Bluesky app!"); 128 129 Ok(())
-1
src/types.rs
··· 55 pub indexed_at: DateTime<Utc>, 56 } 57 58 - 59 // JWT Claims 60 #[derive(Debug, Serialize, Deserialize)] 61 pub struct JwtClaims {
··· 55 pub indexed_at: DateTime<Utc>, 56 } 57 58 // JWT Claims 59 #[derive(Debug, Serialize, Deserialize)] 60 pub struct JwtClaims {