Noreposts Feed

Initial commit: NoReposts ATProto feed generator

Vitor Py Braga 09773d00

+1306
+9
.claude/settings.local.json
··· 1 + { 2 + "permissions": { 3 + "allow": [ 4 + "Read(//home/vitorpy/code/vitorpy.com/**)" 5 + ], 6 + "deny": [], 7 + "ask": [] 8 + } 9 + }
+12
.env.example
··· 1 + # Database configuration 2 + DATABASE_URL=sqlite:./feed.db 3 + 4 + # Server configuration 5 + PORT=3000 6 + 7 + # Feed generator configuration 8 + FEEDGEN_HOSTNAME=your-domain.com 9 + FEEDGEN_SERVICE_DID=did:web:your-domain.com 10 + 11 + # Jetstream configuration (optional, defaults to jetstream1.us-east.bsky.network) 12 + JETSTREAM_HOSTNAME=jetstream1.us-east.bsky.network
+91
.github/workflows/deploy.yml
··· 1 + name: Deploy Feed Generator to VPS 2 + 3 + on: 4 + push: 5 + branches: [main] 6 + workflow_dispatch: 7 + 8 + jobs: 9 + deploy: 10 + runs-on: ubuntu-latest 11 + 12 + steps: 13 + - name: Checkout repository 14 + uses: actions/checkout@v4 15 + 16 + - name: Setup Rust 17 + uses: dtolnay/rust-toolchain@stable 18 + 19 + - name: Cache cargo registry 20 + uses: actions/cache@v4 21 + with: 22 + path: ~/.cargo/registry 23 + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} 24 + 25 + - name: Cache cargo index 26 + uses: actions/cache@v4 27 + with: 28 + path: ~/.cargo/git 29 + key: ${{ runner.os }}-cargo-git-${{ hashFiles('**/Cargo.lock') }} 30 + 31 + - name: Cache target directory 32 + uses: actions/cache@v4 33 + with: 34 + path: target 35 + key: ${{ runner.os }}-target-${{ hashFiles('**/Cargo.lock') }} 36 + 37 + - name: Build release binary 38 + run: cargo build --release 39 + 40 + - name: Setup SSH 41 + run: | 42 + mkdir -p ~/.ssh 43 + echo "${{ secrets.VPS_SSH_KEY }}" > ~/.ssh/deploy_key 44 + chmod 600 ~/.ssh/deploy_key 45 + ssh-keyscan -H ${{ secrets.VPS_HOST }} >> ~/.ssh/known_hosts 46 + 47 + - name: Create remote directory 48 + run: | 49 + ssh -i ~/.ssh/deploy_key -o StrictHostKeyChecking=no \ 50 + ${{ secrets.VPS_USER }}@${{ secrets.VPS_HOST }} \ 51 + "mkdir -p /var/www/noreposts-feed" 52 + 53 + - name: Deploy binary and migration 54 + run: | 55 + scp -i ~/.ssh/deploy_key -o StrictHostKeyChecking=no \ 56 + target/release/following-no-reposts-feed \ 57 + ${{ secrets.VPS_USER }}@${{ secrets.VPS_HOST }}:/var/www/noreposts-feed/ 58 + 59 + scp -i ~/.ssh/deploy_key -o StrictHostKeyChecking=no \ 60 + migrations/001_initial.sql \ 61 + ${{ secrets.VPS_USER }}@${{ secrets.VPS_HOST }}:/var/www/noreposts-feed/ 62 + 63 + - name: Deploy .env file 64 + run: | 65 + echo "DATABASE_URL=${{ secrets.DATABASE_URL }}" > .env.production 66 + echo "PORT=${{ secrets.PORT }}" >> .env.production 67 + echo "FEEDGEN_HOSTNAME=${{ secrets.FEEDGEN_HOSTNAME }}" >> .env.production 68 + echo "FEEDGEN_SERVICE_DID=${{ secrets.FEEDGEN_SERVICE_DID }}" >> .env.production 69 + echo "JETSTREAM_HOSTNAME=${{ secrets.JETSTREAM_HOSTNAME }}" >> .env.production 70 + 71 + scp -i ~/.ssh/deploy_key -o StrictHostKeyChecking=no \ 72 + .env.production \ 73 + ${{ secrets.VPS_USER }}@${{ secrets.VPS_HOST }}:/var/www/noreposts-feed/.env 74 + 75 + - name: Set permissions 76 + run: | 77 + ssh -i ~/.ssh/deploy_key -o StrictHostKeyChecking=no \ 78 + ${{ secrets.VPS_USER }}@${{ secrets.VPS_HOST }} \ 79 + "chmod +x /var/www/noreposts-feed/following-no-reposts-feed" 80 + 81 + - name: Restart service 82 + run: | 83 + ssh -i ~/.ssh/deploy_key -o StrictHostKeyChecking=no \ 84 + ${{ secrets.VPS_USER }}@${{ secrets.VPS_HOST }} \ 85 + "systemctl restart noreposts-feed && systemctl status noreposts-feed --no-pager || true" 86 + 87 + - name: Cleanup 88 + if: always() 89 + run: | 90 + rm -f ~/.ssh/deploy_key 91 + rm -f .env.production
+25
.gitignore
··· 1 + # Rust 2 + /target/ 3 + Cargo.lock 4 + 5 + # Database 6 + *.db 7 + *.db-shm 8 + *.db-wal 9 + 10 + # Environment 11 + .env 12 + .env.production 13 + 14 + # IDE 15 + .idea/ 16 + .vscode/ 17 + *.swp 18 + *.swo 19 + *~ 20 + 21 + # macOS 22 + .DS_Store 23 + 24 + # Logs 25 + *.log
+45
Cargo.toml
··· 1 + [package] 2 + name = "following-no-reposts-feed" 3 + version = "0.1.0" 4 + edition = "2021" 5 + 6 + [dependencies] 7 + # Jetstream consumer 8 + atproto-jetstream = { version = "0.13", features = ["clap"] } 9 + 10 + # AT Protocol libraries 11 + atrium-api = "0.1" 12 + atrium-xrpc-client = "0.1" 13 + 14 + # Web server 15 + axum = "0.7" 16 + tower = "0.4" 17 + tower-http = { version = "0.5", features = ["cors", "trace"] } 18 + 19 + # Database 20 + sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "sqlite", "chrono", "uuid"] } 21 + 22 + # Async runtime 23 + tokio = { version = "1.0", features = ["full"] } 24 + 25 + # Serialization 26 + serde = { version = "1.0", features = ["derive"] } 27 + serde_json = "1.0" 28 + 29 + # Utilities 30 + anyhow = "1.0" 31 + tracing = "0.1" 32 + tracing-subscriber = "0.3" 33 + chrono = { version = "0.4", features = ["serde"] } 34 + uuid = { version = "1.0", features = ["v4", "serde"] } 35 + clap = { version = "4.0", features = ["derive"] } 36 + async-trait = "0.1" 37 + 38 + # JWT handling 39 + jsonwebtoken = "9.0" 40 + 41 + # HTTP client 42 + reqwest = { version = "0.11", features = ["json"] } 43 + 44 + # Environment 45 + dotenvy = "0.15"
+241
README.md
··· 1 + # Following No Reposts Feed Generator 2 + 3 + A Rust-based Bluesky feed generator that shows posts from people you follow, excluding all reposts. Built using Jetstream for efficient real-time data consumption. 4 + 5 + ## Features 6 + 7 + - **Efficient Jetstream Integration**: Uses Bluesky's Jetstream service for lightweight, filtered event consumption 8 + - **No Reposts**: Automatically filters out all reposts, showing only original posts 9 + - **Personalized**: Shows only posts from accounts you follow 10 + - **Real-time**: Updates in real-time as new posts and follows are created 11 + - **Memory Efficient**: Automatic cleanup of old posts (configurable retention period) 12 + - **Production Ready**: Includes proper JWT authentication, error handling, and logging 13 + 14 + ## Architecture 15 + 16 + The feed generator consists of several components: 17 + 18 + 1. **Jetstream Consumer**: Connects to Bluesky's Jetstream and consumes `app.bsky.feed.post` and `app.bsky.graph.follow` events 19 + 2. **Database Layer**: SQLite database for storing posts and follow relationships 20 + 3. **Web Server**: Axum-based HTTP server that implements the feed skeleton API 21 + 4. **Feed Algorithm**: Generates personalized feeds based on user follows 22 + 5. **Authentication**: JWT validation for personalized feeds 23 + 24 + ## Setup 25 + 26 + ### 1. Prerequisites 27 + 28 + Ensure you have Rust installed: 29 + 30 + ```fish 31 + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh 32 + ``` 33 + 34 + ### 2. Clone and Build 35 + 36 + ```fish 37 + git clone <your-repo> 38 + cd following-no-reposts-feed 39 + cargo build --release 40 + ``` 41 + 42 + ### 3. Configuration 43 + 44 + Copy the example environment file and configure it: 45 + 46 + ```fish 47 + cp .env.example .env 48 + nvim .env 49 + ``` 50 + 51 + Required environment variables: 52 + - `FEEDGEN_HOSTNAME`: Your domain where the feed will be hosted 53 + - `FEEDGEN_SERVICE_DID`: Your service DID (usually `did:web:your-domain.com`) 54 + - `DATABASE_URL`: SQLite database path 55 + - `PORT`: Server port (default: 3000) 56 + 57 + ### 4. Database Setup 58 + 59 + The database will be automatically migrated on startup, but you can run migrations manually: 60 + 61 + ```fish 62 + cargo install sqlx-cli 63 + sqlx migrate run 64 + ``` 65 + 66 + ### 5. Run the Feed Generator 67 + 68 + ```fish 69 + cargo run --release 70 + ``` 71 + 72 + Or with custom parameters: 73 + 74 + ```fish 75 + cargo run --release -- --port 3000 --hostname your-domain.com 76 + ``` 77 + 78 + ## Deployment 79 + 80 + ### 1. Build for Production 81 + 82 + ```fish 83 + cargo build --release 84 + ``` 85 + 86 + ### 2. Deploy to Your Server 87 + 88 + The binary needs to be accessible via HTTPS on port 443. You can use any reverse proxy (nginx, caddy, etc.) to handle TLS termination. 89 + 90 + Example nginx configuration: 91 + 92 + ```nginx 93 + server { 94 + listen 443 ssl; 95 + server_name your-domain.com; 96 + 97 + ssl_certificate /path/to/cert.pem; 98 + ssl_certificate_key /path/to/key.pem; 99 + 100 + location / { 101 + proxy_pass http://localhost:3000; 102 + proxy_set_header Host $host; 103 + proxy_set_header X-Real-IP $remote_addr; 104 + } 105 + } 106 + ``` 107 + 108 + ### 3. Publishing the Feed 109 + 110 + Use the Bluesky API to publish your feed generator. You'll need to create a feed generator record in your account: 111 + 112 + ```fish 113 + # Get your account DID 114 + curl "https://bsky.social/xrpc/com.atproto.identity.resolveHandle?handle=yourhandle.bsky.social" 115 + 116 + # Publish the feed (you'll need to implement this using atrium-api or similar) 117 + ``` 118 + 119 + ## API Endpoints 120 + 121 + ### GET /.well-known/did.json 122 + 123 + Returns the DID document for your feed generator service. 124 + 125 + ### GET /xrpc/app.bsky.feed.getFeedSkeleton 126 + 127 + Main feed endpoint that returns the skeleton of posts for the requesting user. 128 + 129 + Query parameters: 130 + - `feed`: The AT-URI of the feed (required) 131 + - `limit`: Number of posts to return (optional, max 100) 132 + - `cursor`: Pagination cursor (optional) 133 + 134 + Headers: 135 + - `Authorization`: Bearer JWT token for user authentication 136 + 137 + ## Performance 138 + 139 + ### Data Usage 140 + 141 + Using Jetstream significantly reduces bandwidth compared to the raw firehose: 142 + - **Jetstream**: ~850 MB/day for all posts (with compression) 143 + - **Raw Firehose**: 200+ GB/day during high activity periods 144 + 145 + ### Filtering Efficiency 146 + 147 + The feed generator only subscribes to relevant collections: 148 + - `app.bsky.feed.post` - for post creation/deletion 149 + - `app.bsky.graph.follow` - for follow relationships 150 + 151 + Reposts (`app.bsky.feed.repost`) are automatically excluded by not subscribing to that collection. 152 + 153 + ### Database Optimization 154 + 155 + - Automatic cleanup of posts older than 48 hours 156 + - Efficient indexes on author_did and indexed_at 157 + - Unique constraints on follow relationships 158 + 159 + ## Monitoring 160 + 161 + The application includes structured logging. Set `RUST_LOG=debug` for detailed logs. 162 + 163 + Key metrics to monitor: 164 + - Database size growth 165 + - Jetstream connection health 166 + - Feed generation latency 167 + - Error rates 168 + 169 + ## Development 170 + 171 + ### Running Tests 172 + 173 + ```fish 174 + cargo test 175 + ``` 176 + 177 + ### Database Migrations 178 + 179 + Add new migrations in the `migrations/` directory: 180 + 181 + ```fish 182 + sqlx migrate add your_migration_name 183 + ``` 184 + 185 + ### Adding New Features 186 + 187 + The modular design makes it easy to extend: 188 + 189 + 1. **New Event Types**: Add handlers in `jetstream_consumer.rs` 190 + 2. **New Algorithms**: Implement new feed algorithms in `feed_algorithm.rs` 191 + 3. **Enhanced Auth**: Improve JWT validation in `auth.rs` 192 + 193 + ## Troubleshooting 194 + 195 + ### Common Issues 196 + 197 + 1. **Jetstream Connection Failures** 198 + - Check network connectivity 199 + - Verify Jetstream hostname in configuration 200 + - Monitor for rate limiting 201 + 202 + 2. **Database Locks** 203 + - Ensure proper connection pooling 204 + - Check for long-running transactions 205 + 206 + 3. **Authentication Errors** 207 + - Verify JWT implementation matches AT Protocol specs 208 + - Check DID resolution for user verification keys 209 + 210 + ### Debugging 211 + 212 + Enable debug logging: 213 + 214 + ```fish 215 + RUST_LOG=debug cargo run 216 + ``` 217 + 218 + Test the feed endpoint directly: 219 + 220 + ```fish 221 + curl "http://localhost:3000/xrpc/app.bsky.feed.getFeedSkeleton?feed=at://your-did/app.bsky.feed.generator/following-no-reposts&limit=10" 222 + ``` 223 + 224 + ## License 225 + 226 + MIT License - see LICENSE file for details. 227 + 228 + ## Contributing 229 + 230 + 1. Fork the repository 231 + 2. Create a feature branch 232 + 3. Make your changes 233 + 4. Add tests if applicable 234 + 5. Submit a pull request 235 + 236 + ## Resources 237 + 238 + - [AT Protocol Documentation](https://atproto.com) 239 + - [Bluesky API Reference](https://docs.bsky.app) 240 + - [Jetstream Documentation](https://github.com/bluesky-social/jetstream) 241 + - [ATrium Rust Library](https://github.com/sugyan/atrium)
+51
deploy.sh
··· 1 + #!/bin/bash 2 + 3 + # Deployment script for noreposts-atproto-feed 4 + # Deploys Rust feed generator to Hetzner server 5 + # 6 + # NOTE: Nginx configuration is handled separately in the vitorpy.com repo 7 + 8 + set -e # Exit on any error 9 + 10 + # Configuration 11 + SERVER="root@167.235.24.234" 12 + REMOTE_DIR="/var/www/noreposts-feed" 13 + SERVICE_NAME="noreposts-feed" 14 + 15 + echo "🚀 Starting deployment to noreposts-feed..." 16 + 17 + # Step 1: Build the Rust binary 18 + echo "📦 Building Rust binary..." 19 + cargo build --release 20 + 21 + if [ $? -ne 0 ]; then 22 + echo "❌ Cargo build failed!" 23 + exit 1 24 + fi 25 + 26 + echo "✅ Rust build complete" 27 + 28 + # Step 2: Upload binary and SQL migration 29 + echo "📤 Uploading binary and files..." 30 + ssh $SERVER "mkdir -p $REMOTE_DIR" 31 + scp target/release/following-no-reposts-feed $SERVER:$REMOTE_DIR/ 32 + scp 001_initial.sql $SERVER:$REMOTE_DIR/ 33 + 34 + if [ $? -ne 0 ]; then 35 + echo "❌ Upload failed!" 36 + exit 1 37 + fi 38 + 39 + echo "✅ Files uploaded" 40 + 41 + # Step 3: Set correct permissions 42 + echo "🔒 Setting permissions..." 43 + ssh $SERVER "chmod +x $REMOTE_DIR/following-no-reposts-feed" 44 + 45 + # Step 4: Restart the service 46 + echo "🔄 Restarting service..." 47 + ssh $SERVER "systemctl restart $SERVICE_NAME" 48 + ssh $SERVER "systemctl status $SERVICE_NAME --no-pager" 49 + 50 + echo "✅ Deployment complete!" 51 + echo "🌐 Feed is live"
+23
migrations/001_initial.sql
··· 1 + CREATE TABLE IF NOT EXISTS posts ( 2 + uri TEXT PRIMARY KEY, 3 + cid TEXT NOT NULL, 4 + author_did TEXT NOT NULL, 5 + text TEXT NOT NULL, 6 + created_at TEXT NOT NULL, 7 + indexed_at TEXT NOT NULL 8 + ); 9 + 10 + CREATE INDEX IF NOT EXISTS idx_posts_author_indexed ON posts(author_did, indexed_at DESC); 11 + CREATE INDEX IF NOT EXISTS idx_posts_indexed ON posts(indexed_at DESC); 12 + 13 + CREATE TABLE IF NOT EXISTS follows ( 14 + uri TEXT PRIMARY KEY, 15 + follower_did TEXT NOT NULL, 16 + target_did TEXT NOT NULL, 17 + created_at TEXT NOT NULL, 18 + indexed_at TEXT NOT NULL 19 + ); 20 + 21 + CREATE INDEX IF NOT EXISTS idx_follows_follower ON follows(follower_did); 22 + CREATE INDEX IF NOT EXISTS idx_follows_target ON follows(target_did); 23 + CREATE UNIQUE INDEX IF NOT EXISTS idx_follows_unique ON follows(follower_did, target_did);
+65
src/auth.rs
··· 1 + use anyhow::{anyhow, Result}; 2 + use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; 3 + 4 + use crate::types::JwtClaims; 5 + 6 + pub fn validate_jwt(auth_header: &str, service_did: &str) -> Result<JwtClaims> { 7 + // Extract bearer token 8 + let token = auth_header 9 + .strip_prefix("Bearer ") 10 + .ok_or_else(|| anyhow!("Invalid authorization header format"))?; 11 + 12 + // For this example, we'll use a simplified JWT validation 13 + // In production, you'd need to: 14 + // 1. Fetch the user's DID document 15 + // 2. Extract their signing key 16 + // 3. Validate the signature with that key 17 + 18 + // For now, let's decode without verification (unsafe for production!) 19 + let mut validation = Validation::new(Algorithm::ES256); 20 + validation.insecure_disable_signature_validation(); 21 + validation.validate_exp = true; 22 + validation.set_audience(&[service_did]); 23 + 24 + // This is a placeholder - in production you need the actual signing key 25 + let decoding_key = DecodingKey::from_secret(b"placeholder"); 26 + 27 + let token_data = decode::<JwtClaims>(token, &decoding_key, &validation) 28 + .map_err(|e| anyhow!("JWT validation failed: {}", e))?; 29 + 30 + Ok(token_data.claims) 31 + } 32 + 33 + // Production implementation would need this: 34 + /* 35 + pub async fn validate_jwt_production(auth_header: &str, service_did: &str) -> Result<JwtClaims> { 36 + let token = auth_header 37 + .strip_prefix("Bearer ") 38 + .ok_or_else(|| anyhow!("Invalid authorization header format"))?; 39 + 40 + // 1. Decode JWT header to get the signing key ID 41 + let header = decode_header(token)?; 42 + 43 + // 2. Extract issuer DID from token payload (without verification) 44 + let mut validation = Validation::new(Algorithm::ES256K); 45 + validation.insecure_disable_signature_validation(); 46 + let temp_decode = decode::<JwtClaims>(token, &DecodingKey::from_secret(b"temp"), &validation)?; 47 + let issuer_did = temp_decode.claims.iss; 48 + 49 + // 3. Fetch DID document for the issuer 50 + let did_doc = fetch_did_document(&issuer_did).await?; 51 + 52 + // 4. Extract the appropriate verification key 53 + let verification_key = extract_verification_key(&did_doc, &header.kid)?; 54 + 55 + // 5. Validate the JWT with the real key 56 + let mut validation = Validation::new(Algorithm::ES256K); 57 + validation.validate_exp = true; 58 + validation.set_audience(&[service_did]); 59 + 60 + let decoding_key = DecodingKey::from_ec_pem(&verification_key)?; 61 + let token_data = decode::<JwtClaims>(token, &decoding_key, &validation)?; 62 + 63 + Ok(token_data.claims) 64 + } 65 + */
+145
src/database.rs
··· 1 + use anyhow::Result; 2 + use chrono::{DateTime, Utc}; 3 + use sqlx::{SqlitePool, Row}; 4 + 5 + use crate::types::{Follow, Post}; 6 + 7 + pub struct Database { 8 + pool: SqlitePool, 9 + } 10 + 11 + impl Database { 12 + pub async fn new(database_url: &str) -> Result<Self> { 13 + let pool = SqlitePool::connect(database_url).await?; 14 + Ok(Self { pool }) 15 + } 16 + 17 + pub async fn migrate(&self) -> Result<()> { 18 + sqlx::migrate!("./migrations").run(&self.pool).await?; 19 + Ok(()) 20 + } 21 + 22 + // Post operations 23 + pub async fn insert_post(&self, post: &Post) -> Result<()> { 24 + sqlx::query( 25 + r#" 26 + INSERT OR REPLACE INTO posts (uri, cid, author_did, text, created_at, indexed_at) 27 + VALUES (?, ?, ?, ?, ?, ?) 28 + "# 29 + ) 30 + .bind(&post.uri) 31 + .bind(&post.cid) 32 + .bind(&post.author_did) 33 + .bind(&post.text) 34 + .bind(post.created_at.to_rfc3339()) 35 + .bind(post.indexed_at.to_rfc3339()) 36 + .execute(&self.pool) 37 + .await?; 38 + Ok(()) 39 + } 40 + 41 + pub async fn delete_post(&self, uri: &str) -> Result<()> { 42 + sqlx::query("DELETE FROM posts WHERE uri = ?") 43 + .bind(uri) 44 + .execute(&self.pool) 45 + .await?; 46 + Ok(()) 47 + } 48 + 49 + // Follow operations 50 + pub async fn insert_follow(&self, follow: &Follow) -> Result<()> { 51 + sqlx::query( 52 + r#" 53 + INSERT OR REPLACE INTO follows (uri, follower_did, target_did, created_at, indexed_at) 54 + VALUES (?, ?, ?, ?, ?) 55 + "# 56 + ) 57 + .bind(&follow.uri) 58 + .bind(&follow.follower_did) 59 + .bind(&follow.target_did) 60 + .bind(follow.created_at.to_rfc3339()) 61 + .bind(follow.indexed_at.to_rfc3339()) 62 + .execute(&self.pool) 63 + .await?; 64 + Ok(()) 65 + } 66 + 67 + pub async fn delete_follow(&self, uri: &str) -> Result<()> { 68 + sqlx::query("DELETE FROM follows WHERE uri = ?") 69 + .bind(uri) 70 + .execute(&self.pool) 71 + .await?; 72 + Ok(()) 73 + } 74 + 75 + // Feed generation queries 76 + pub async fn get_following_posts( 77 + &self, 78 + follower_did: &str, 79 + limit: i32, 80 + cursor: Option<&str>, 81 + ) -> Result<Vec<Post>> { 82 + let cursor_time = cursor 83 + .and_then(|c| DateTime::parse_from_rfc3339(c).ok()) 84 + .map(|dt| dt.with_timezone(&Utc)) 85 + .unwrap_or_else(Utc::now); 86 + 87 + let rows = sqlx::query( 88 + r#" 89 + SELECT p.uri, p.cid, p.author_did, p.text, p.created_at, p.indexed_at 90 + FROM posts p 91 + INNER JOIN follows f ON f.target_did = p.author_did 92 + WHERE f.follower_did = ? 93 + AND p.indexed_at < ? 94 + ORDER BY p.indexed_at DESC 95 + LIMIT ? 96 + "# 97 + ) 98 + .bind(follower_did) 99 + .bind(cursor_time.to_rfc3339()) 100 + .bind(limit) 101 + .fetch_all(&self.pool) 102 + .await?; 103 + 104 + let mut posts = Vec::new(); 105 + for row in rows { 106 + let uri: String = row.try_get("uri")?; 107 + let cid: String = row.try_get("cid")?; 108 + let author_did: String = row.try_get("author_did")?; 109 + let text: String = row.try_get("text")?; 110 + let created_at_str: String = row.try_get("created_at")?; 111 + let indexed_at_str: String = row.try_get("indexed_at")?; 112 + 113 + posts.push(Post { 114 + uri, 115 + cid, 116 + author_did, 117 + text, 118 + created_at: DateTime::parse_from_rfc3339(&created_at_str)?.with_timezone(&Utc), 119 + indexed_at: DateTime::parse_from_rfc3339(&indexed_at_str)?.with_timezone(&Utc), 120 + }); 121 + } 122 + 123 + Ok(posts) 124 + } 125 + 126 + pub async fn cleanup_old_posts(&self, hours: i64) -> Result<()> { 127 + let cutoff = Utc::now() - chrono::Duration::hours(hours); 128 + sqlx::query("DELETE FROM posts WHERE indexed_at < ?") 129 + .bind(cutoff.to_rfc3339()) 130 + .execute(&self.pool) 131 + .await?; 132 + Ok(()) 133 + } 134 + 135 + pub async fn is_following(&self, follower_did: &str, target_did: &str) -> Result<bool> { 136 + let row = sqlx::query("SELECT COUNT(*) as count FROM follows WHERE follower_did = ? AND target_did = ?") 137 + .bind(follower_did) 138 + .bind(target_did) 139 + .fetch_one(&self.pool) 140 + .await?; 141 + 142 + let count: i64 = row.try_get("count")?; 143 + Ok(count > 0) 144 + } 145 + }
+110
src/feed_algorithm.rs
··· 1 + use anyhow::Result; 2 + use std::sync::Arc; 3 + use tracing::warn; 4 + 5 + use crate::{ 6 + database::Database, 7 + types::{FeedSkeletonResponse, SkeletonFeedPost}, 8 + }; 9 + 10 + pub struct FollowingNoRepostsFeed { 11 + db: Arc<Database>, 12 + } 13 + 14 + impl FollowingNoRepostsFeed { 15 + pub fn new(db: Arc<Database>) -> Self { 16 + Self { db } 17 + } 18 + 19 + pub async fn generate_feed( 20 + &self, 21 + requester_did: Option<String>, 22 + limit: Option<i32>, 23 + cursor: Option<String>, 24 + ) -> Result<FeedSkeletonResponse> { 25 + // Require authentication for this feed since it's personalized 26 + let follower_did = match requester_did { 27 + Some(did) => did, 28 + None => { 29 + warn!("Unauthenticated request to following feed"); 30 + return Ok(FeedSkeletonResponse { 31 + cursor: None, 32 + feed: vec![], 33 + }); 34 + } 35 + }; 36 + 37 + let limit = limit.unwrap_or(50).min(100); // Cap at 100 items 38 + 39 + // Get posts from accounts the user follows 40 + let posts = self 41 + .db 42 + .get_following_posts(&follower_did, limit, cursor.as_deref()) 43 + .await?; 44 + 45 + let feed_posts: Vec<SkeletonFeedPost> = posts 46 + .iter() 47 + .map(|post| SkeletonFeedPost { 48 + post: post.uri.clone(), 49 + }) 50 + .collect(); 51 + 52 + // Generate cursor for pagination 53 + let cursor = posts 54 + .last() 55 + .map(|post| post.indexed_at.to_rfc3339()); 56 + 57 + Ok(FeedSkeletonResponse { 58 + cursor, 59 + feed: feed_posts, 60 + }) 61 + } 62 + } 63 + 64 + #[cfg(test)] 65 + mod tests { 66 + use super::*; 67 + use crate::types::{Follow, Post}; 68 + use chrono::Utc; 69 + 70 + #[tokio::test] 71 + async fn test_feed_generation() -> Result<()> { 72 + let db = Arc::new(Database::new(":memory:").await?); 73 + db.migrate().await?; 74 + 75 + // Create test data 76 + let follower_did = "did:example:alice"; 77 + let target_did = "did:example:bob"; 78 + 79 + // Insert follow relationship 80 + let follow = Follow { 81 + uri: format!("at://{}/app.bsky.graph.follow/test", follower_did), 82 + follower_did: follower_did.to_string(), 83 + target_did: target_did.to_string(), 84 + created_at: Utc::now(), 85 + indexed_at: Utc::now(), 86 + }; 87 + db.insert_follow(&follow).await?; 88 + 89 + // Insert post from followed user 90 + let post = Post { 91 + uri: format!("at://{}/app.bsky.feed.post/test", target_did), 92 + cid: "test-cid".to_string(), 93 + author_did: target_did.to_string(), 94 + text: "Hello world!".to_string(), 95 + created_at: Utc::now(), 96 + indexed_at: Utc::now(), 97 + }; 98 + db.insert_post(&post).await?; 99 + 100 + let feed_algorithm = FollowingNoRepostsFeed::new(Arc::clone(&db)); 101 + let response = feed_algorithm 102 + .generate_feed(Some(follower_did.to_string()), Some(10), None) 103 + .await?; 104 + 105 + assert_eq!(response.feed.len(), 1); 106 + assert_eq!(response.feed[0].post, post.uri); 107 + 108 + Ok(()) 109 + } 110 + }
+199
src/jetstream_consumer.rs
··· 1 + use anyhow::Result; 2 + use atproto_jetstream::{Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, CancellationToken}; 3 + use async_trait::async_trait; 4 + use chrono::{DateTime, Utc}; 5 + use std::sync::Arc; 6 + use tracing::{error, warn}; 7 + 8 + use crate::{database::Database, types::{Follow, Post}}; 9 + 10 + pub struct JetstreamEventHandler { 11 + db: Arc<Database>, 12 + } 13 + 14 + impl JetstreamEventHandler { 15 + pub fn new(db: Arc<Database>) -> Self { 16 + Self { db } 17 + } 18 + 19 + pub async fn start(&self, jetstream_hostname: String) -> Result<()> { 20 + let config = ConsumerTaskConfig { 21 + user_agent: "following-no-reposts-feed/1.0".to_string(), 22 + compression: true, 23 + jetstream_hostname, 24 + zstd_dictionary_location: String::new(), 25 + collections: vec![ 26 + "app.bsky.feed.post".to_string(), 27 + "app.bsky.graph.follow".to_string(), 28 + ], 29 + dids: vec![], 30 + cursor: None, 31 + max_message_size_bytes: None, 32 + require_hello: false, 33 + }; 34 + 35 + let consumer = Consumer::new(config); 36 + consumer.register_handler(Arc::new(self.clone())).await?; 37 + 38 + let cancellation_token = CancellationToken::new(); 39 + 40 + // Start cleanup task 41 + let db_cleanup = Arc::clone(&self.db); 42 + tokio::spawn(async move { 43 + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(3600)); // Every hour 44 + loop { 45 + interval.tick().await; 46 + if let Err(e) = db_cleanup.cleanup_old_posts(48).await { 47 + warn!("Failed to cleanup old posts: {}", e); 48 + } 49 + } 50 + }); 51 + 52 + consumer.run_background(cancellation_token).await?; 53 + Ok(()) 54 + } 55 + } 56 + 57 + impl Clone for JetstreamEventHandler { 58 + fn clone(&self) -> Self { 59 + Self { 60 + db: Arc::clone(&self.db), 61 + } 62 + } 63 + } 64 + 65 + #[async_trait] 66 + impl EventHandler for JetstreamEventHandler { 67 + async fn handle_event(&self, event: JetstreamEvent) -> anyhow::Result<()> { 68 + if let JetstreamEvent::Commit { did, time_us: _, kind: _, commit } = event { 69 + match commit.collection.as_str() { 70 + "app.bsky.feed.post" => { 71 + self.handle_post_event(&did, &commit.collection, &commit.rkey, &commit.operation, Some(&commit.record), &commit.cid).await?; 72 + } 73 + "app.bsky.graph.follow" => { 74 + self.handle_follow_event(&did, &commit.collection, &commit.rkey, &commit.operation, Some(&commit.record)).await?; 75 + } 76 + _ => {} // Ignore other collections 77 + } 78 + } 79 + Ok(()) 80 + } 81 + 82 + fn handler_id(&self) -> String { 83 + "following-no-reposts-handler".to_string() 84 + } 85 + } 86 + 87 + impl JetstreamEventHandler { 88 + async fn handle_post_event( 89 + &self, 90 + did: &str, 91 + collection: &str, 92 + rkey: &str, 93 + operation: &str, 94 + record: Option<&serde_json::Value>, 95 + cid: &str, 96 + ) -> Result<()> { 97 + let uri = format!("at://{}/{}/{}", did, collection, rkey); 98 + 99 + match operation { 100 + "create" => { 101 + if let Some(record) = record { 102 + // Check if this is a repost by looking for a "subject" field 103 + if record.get("subject").is_some() { 104 + // This is a repost, skip it 105 + return Ok(()); 106 + } 107 + 108 + let text = record 109 + .get("text") 110 + .and_then(|v| v.as_str()) 111 + .unwrap_or("") 112 + .to_string(); 113 + 114 + let created_at_str = record 115 + .get("createdAt") 116 + .and_then(|v| v.as_str()) 117 + .unwrap_or(""); 118 + 119 + let created_at = DateTime::parse_from_rfc3339(created_at_str) 120 + .unwrap_or_else(|_| Utc::now().into()) 121 + .with_timezone(&Utc); 122 + 123 + let post = Post { 124 + uri, 125 + cid: cid.to_string(), 126 + author_did: did.to_string(), 127 + text, 128 + created_at, 129 + indexed_at: Utc::now(), 130 + }; 131 + 132 + if let Err(e) = self.db.insert_post(&post).await { 133 + error!("Failed to insert post: {}", e); 134 + } 135 + } 136 + } 137 + "delete" => { 138 + if let Err(e) = self.db.delete_post(&uri).await { 139 + error!("Failed to delete post: {}", e); 140 + } 141 + } 142 + _ => {} // Ignore updates 143 + } 144 + 145 + Ok(()) 146 + } 147 + 148 + async fn handle_follow_event( 149 + &self, 150 + did: &str, 151 + collection: &str, 152 + rkey: &str, 153 + operation: &str, 154 + record: Option<&serde_json::Value>, 155 + ) -> Result<()> { 156 + let uri = format!("at://{}/{}/{}", did, collection, rkey); 157 + 158 + match operation { 159 + "create" => { 160 + if let Some(record) = record { 161 + let target_did = record 162 + .get("subject") 163 + .and_then(|v| v.as_str()) 164 + .unwrap_or("") 165 + .to_string(); 166 + 167 + let created_at_str = record 168 + .get("createdAt") 169 + .and_then(|v| v.as_str()) 170 + .unwrap_or(""); 171 + 172 + let created_at = DateTime::parse_from_rfc3339(created_at_str) 173 + .unwrap_or_else(|_| Utc::now().into()) 174 + .with_timezone(&Utc); 175 + 176 + let follow = Follow { 177 + uri, 178 + follower_did: did.to_string(), 179 + target_did, 180 + created_at, 181 + indexed_at: Utc::now(), 182 + }; 183 + 184 + if let Err(e) = self.db.insert_follow(&follow).await { 185 + error!("Failed to insert follow: {}", e); 186 + } 187 + } 188 + } 189 + "delete" => { 190 + if let Err(e) = self.db.delete_follow(&uri).await { 191 + error!("Failed to delete follow: {}", e); 192 + } 193 + } 194 + _ => {} // Ignore updates 195 + } 196 + 197 + Ok(()) 198 + } 199 + }
+136
src/main.rs
··· 1 + use anyhow::Result; 2 + use axum::{ 3 + extract::{Query, State}, 4 + http::StatusCode, 5 + response::Json, 6 + routing::get, 7 + Router, 8 + }; 9 + use clap::Parser; 10 + use std::sync::Arc; 11 + use tokio::net::TcpListener; 12 + use tower_http::cors::CorsLayer; 13 + use tracing::{info, warn}; 14 + 15 + mod auth; 16 + mod database; 17 + mod feed_algorithm; 18 + mod jetstream_consumer; 19 + mod types; 20 + 21 + use crate::{ 22 + auth::validate_jwt, 23 + database::Database, 24 + feed_algorithm::FollowingNoRepostsFeed, 25 + jetstream_consumer::JetstreamEventHandler, 26 + types::*, 27 + }; 28 + 29 + #[derive(Parser)] 30 + #[command(name = "following-no-reposts-feed")] 31 + #[command(about = "A Bluesky feed generator for following without reposts")] 32 + struct Args { 33 + #[arg(long, env = "DATABASE_URL", default_value = "sqlite:./feed.db")] 34 + database_url: String, 35 + 36 + #[arg(long, env = "PORT", default_value = "3000")] 37 + port: u16, 38 + 39 + #[arg(long, env = "FEEDGEN_HOSTNAME")] 40 + hostname: String, 41 + 42 + #[arg(long, env = "FEEDGEN_SERVICE_DID")] 43 + service_did: String, 44 + 45 + #[arg(long, env = "JETSTREAM_HOSTNAME", default_value = "jetstream1.us-east.bsky.network")] 46 + jetstream_hostname: String, 47 + } 48 + 49 + #[derive(Clone)] 50 + struct AppState { 51 + db: Arc<Database>, 52 + service_did: String, 53 + } 54 + 55 + #[tokio::main] 56 + async fn main() -> Result<()> { 57 + tracing_subscriber::fmt::init(); 58 + dotenvy::dotenv().ok(); 59 + 60 + let args = Args::parse(); 61 + 62 + // Initialize database 63 + let db = Arc::new(Database::new(&args.database_url).await?); 64 + db.migrate().await?; 65 + 66 + let app_state = AppState { 67 + db: Arc::clone(&db), 68 + service_did: args.service_did.clone(), 69 + }; 70 + 71 + // Start Jetstream consumer 72 + let event_handler = JetstreamEventHandler::new(Arc::clone(&db)); 73 + tokio::spawn(async move { 74 + if let Err(e) = event_handler.start(args.jetstream_hostname).await { 75 + warn!("Jetstream consumer error: {}", e); 76 + } 77 + }); 78 + 79 + // Setup web server 80 + let app = Router::new() 81 + .route("/", get(root)) 82 + .route("/.well-known/did.json", get(did_document)) 83 + .route("/xrpc/app.bsky.feed.getFeedSkeleton", get(get_feed_skeleton)) 84 + .layer(CorsLayer::permissive()) 85 + .with_state(app_state); 86 + 87 + let listener = TcpListener::bind(format!("0.0.0.0:{}", args.port)).await?; 88 + info!("Feed generator listening on port {}", args.port); 89 + 90 + axum::serve(listener, app).await?; 91 + Ok(()) 92 + } 93 + 94 + async fn root() -> &'static str { 95 + "Following No Reposts Feed Generator" 96 + } 97 + 98 + async fn did_document(State(state): State<AppState>) -> Json<DidDocument> { 99 + Json(DidDocument { 100 + context: vec!["https://www.w3.org/ns/did/v1".to_string()], 101 + id: state.service_did.clone(), 102 + service: vec![ServiceEndpoint { 103 + id: "#bsky_fg".to_string(), 104 + service_type: "BskyFeedGenerator".to_string(), 105 + service_endpoint: format!("https://{}", std::env::var("FEEDGEN_HOSTNAME").unwrap_or_default()), 106 + }], 107 + }) 108 + } 109 + 110 + async fn get_feed_skeleton( 111 + Query(params): Query<FeedSkeletonParams>, 112 + State(state): State<AppState>, 113 + ) -> Result<Json<FeedSkeletonResponse>, StatusCode> { 114 + // Validate JWT if provided 115 + let requester_did = if let Some(auth_header) = params.auth { 116 + match validate_jwt(&auth_header, &state.service_did) { 117 + Ok(claims) => Some(claims.iss), 118 + Err(_) => return Err(StatusCode::UNAUTHORIZED), 119 + } 120 + } else { 121 + None 122 + }; 123 + 124 + let feed_algorithm = FollowingNoRepostsFeed::new(Arc::clone(&state.db)); 125 + 126 + match feed_algorithm 127 + .generate_feed(requester_did, params.limit, params.cursor) 128 + .await 129 + { 130 + Ok(response) => Ok(Json(response)), 131 + Err(e) => { 132 + warn!("Feed generation error: {}", e); 133 + Err(StatusCode::INTERNAL_SERVER_ERROR) 134 + } 135 + } 136 + }
+64
src/publish.rs
··· 1 + use anyhow::Result; 2 + use atrium_api::{ 3 + agent::atp_agent::{store::MemorySessionStore, AtpAgent}, 4 + app::bsky::feed::generator::RecordData as FeedGeneratorRecord, 5 + com::atproto::repo::create_record::InputData as CreateRecordInput, 6 + }; 7 + use atrium_xrpc_client::reqwest::ReqwestClient; 8 + use serde_json::json; 9 + use std::env; 10 + 11 + #[tokio::main] 12 + async fn main() -> Result<()> { 13 + dotenvy::dotenv().ok(); 14 + 15 + let identifier = env::var("BLUESKY_IDENTIFIER")?; 16 + let password = env::var("BLUESKY_PASSWORD")?; 17 + let hostname = env::var("FEEDGEN_HOSTNAME")?; 18 + let service_did = env::var("FEEDGEN_SERVICE_DID")?; 19 + 20 + let agent = AtpAgent::new( 21 + ReqwestClient::new("https://bsky.social"), 22 + MemorySessionStore::default(), 23 + ); 24 + 25 + // Login to Bluesky 26 + agent.login(&identifier, &password).await?; 27 + 28 + // Create feed generator record 29 + let record = FeedGeneratorRecord { 30 + avatar: None, 31 + created_at: chrono::Utc::now().to_rfc3339(), 32 + description: Some("A feed showing posts from people you follow, without any reposts. Clean, chronological timeline of original content only.".to_string()), 33 + description_facets: None, 34 + did: service_did.clone(), 35 + display_name: "Following (No Reposts)".to_string(), 36 + labels: None, 37 + }; 38 + 39 + let input = CreateRecordInput { 40 + collection: "app.bsky.feed.generator".parse()?, 41 + record: record.into(), 42 + repo: agent.session().await?.did.clone(), 43 + rkey: Some("following-no-reposts".to_string()), 44 + swap_commit: None, 45 + validate: Some(true), 46 + }; 47 + 48 + let response = agent 49 + .service 50 + .com 51 + .atproto 52 + .repo 53 + .create_record(input.into()) 54 + .await?; 55 + 56 + println!("Feed generator published successfully!"); 57 + println!("URI: {}", response.uri); 58 + println!("CID: {}", response.cid); 59 + println!(); 60 + println!("You can now access your feed at:"); 61 + println!("https://bsky.app/profile/{}/feed/following-no-reposts", agent.session().await?.handle); 62 + 63 + Ok(()) 64 + }
+67
src/types.rs
··· 1 + use chrono::{DateTime, Utc}; 2 + use serde::{Deserialize, Serialize}; 3 + 4 + #[derive(Debug, Deserialize)] 5 + pub struct FeedSkeletonParams { 6 + pub feed: String, 7 + pub limit: Option<i32>, 8 + pub cursor: Option<String>, 9 + #[serde(rename = "Authorization")] 10 + pub auth: Option<String>, 11 + } 12 + 13 + #[derive(Debug, Serialize)] 14 + pub struct FeedSkeletonResponse { 15 + pub cursor: Option<String>, 16 + pub feed: Vec<SkeletonFeedPost>, 17 + } 18 + 19 + #[derive(Debug, Serialize)] 20 + pub struct SkeletonFeedPost { 21 + pub post: String, 22 + } 23 + 24 + #[derive(Debug, Serialize)] 25 + pub struct DidDocument { 26 + #[serde(rename = "@context")] 27 + pub context: Vec<String>, 28 + pub id: String, 29 + pub service: Vec<ServiceEndpoint>, 30 + } 31 + 32 + #[derive(Debug, Serialize)] 33 + pub struct ServiceEndpoint { 34 + pub id: String, 35 + #[serde(rename = "type")] 36 + pub service_type: String, 37 + #[serde(rename = "serviceEndpoint")] 38 + pub service_endpoint: String, 39 + } 40 + 41 + #[derive(Debug, Clone)] 42 + pub struct Post { 43 + pub uri: String, 44 + pub cid: String, 45 + pub author_did: String, 46 + pub text: String, 47 + pub created_at: DateTime<Utc>, 48 + pub indexed_at: DateTime<Utc>, 49 + } 50 + 51 + #[derive(Debug, Clone)] 52 + pub struct Follow { 53 + pub uri: String, 54 + pub follower_did: String, 55 + pub target_did: String, 56 + pub created_at: DateTime<Utc>, 57 + pub indexed_at: DateTime<Utc>, 58 + } 59 + 60 + 61 + // JWT Claims 62 + #[derive(Debug, Serialize, Deserialize)] 63 + pub struct JwtClaims { 64 + pub iss: String, // issuer (user DID) 65 + pub aud: String, // audience (feed generator DID) 66 + pub exp: i64, // expiration time 67 + }
+23
systemd/noreposts-feed.service
··· 1 + [Unit] 2 + Description=NoReposts ATProto Feed Generator 3 + After=network.target 4 + 5 + [Service] 6 + Type=simple 7 + User=root 8 + WorkingDirectory=/var/www/noreposts-feed 9 + ExecStart=/var/www/noreposts-feed/following-no-reposts-feed 10 + Restart=always 11 + RestartSec=10 12 + StandardOutput=journal 13 + StandardError=journal 14 + 15 + # Environment file 16 + EnvironmentFile=/var/www/noreposts-feed/.env 17 + 18 + # Security hardening 19 + NoNewPrivileges=true 20 + PrivateTmp=true 21 + 22 + [Install] 23 + WantedBy=multi-user.target