A third party ATProto appview
at main 474 lines 12 kB view raw view rendered
1# Python Redis Consumer Worker 2 3**Replaces 32 TypeScript workers with a single Python process!** 4 5## What is this? 6 7This is a Python worker that consumes events from Redis streams (same pattern as TypeScript workers) and processes them to PostgreSQL. It's a drop-in replacement for the 32 TypeScript workers managed by PM2. 8 9## Architecture 10 11``` 12┌─────────────────────────┐ 13│ AT Protocol Firehose │ 14│ (bsky.network) │ 15└───────────┬─────────────┘ 16 17 18┌─────────────────────────┐ 19│ Python Firehose Reader │ 20│ (existing - unchanged) │ 21└───────────┬─────────────┘ 22 23 ↓ XADD 24┌─────────────────────────┐ 25│ Redis Stream │ 26│ (firehose:events) │ 27└───────────┬─────────────┘ 28 29 ↓ XREADGROUP 30┌─────────────────────────┐ 31│ Python Consumer Worker │ ← YOU ARE HERE (replaces 32 TypeScript workers) 32│ - 5 async pipelines │ 33│ - Connection pooling │ 34│ - Consumer group member │ 35└───────────┬─────────────┘ 36 37 38┌─────────────────────────┐ 39│ PostgreSQL Database │ 40│ (atproto) │ 41└─────────────────────────┘ 42``` 43 44## Features 45 46**Drop-in Replacement** - Uses same Redis pattern as TypeScript workers 47**Consumer Groups** - XREADGROUP for reliable message processing 48**5 Async Pipelines** - Parallel processing within single process 49**Connection Pooling** - Efficient database access 50**Auto-Reconnect** - Handles connection failures 51**Transaction Safety** - Each commit processed atomically 52**Low Memory** - ~4 GB vs ~8-12 GB for 32 workers 53**Same Redis Stream** - Works with existing firehose reader 54 55## Quick Start 56 57### Using Docker Compose 58 59```bash 60# Build and run 61docker-compose -f docker-compose.unified.yml up -d python-worker 62 63# View logs 64docker-compose logs -f python-worker 65 66# Check consumer group status 67docker exec -it <redis-container> redis-cli XINFO GROUPS firehose:events 68 69# Stop 70docker-compose stop python-worker 71``` 72 73### Manual Installation 74 75```bash 76cd python-firehose 77 78# Install dependencies 79pip install -r requirements.txt 80 81# Configure environment 82export REDIS_URL="redis://localhost:6379" 83export DATABASE_URL="postgresql://user:pass@localhost:5432/atproto" 84export REDIS_STREAM_KEY="firehose:events" 85export REDIS_CONSUMER_GROUP="firehose-processors" 86export CONSUMER_ID="python-worker" 87export DB_POOL_SIZE=20 88export PARALLEL_CONSUMERS=5 89export BATCH_SIZE=10 90 91# Run 92python redis_consumer_worker.py 93``` 94 95## Configuration 96 97### Environment Variables 98 99| Variable | Default | Description | 100|----------|---------|-------------| 101| `REDIS_URL` | `redis://localhost:6379` | Redis connection string | 102| `DATABASE_URL` | *required* | PostgreSQL connection string | 103| `REDIS_STREAM_KEY` | `firehose:events` | Redis stream name | 104| `REDIS_CONSUMER_GROUP` | `firehose-processors` | Consumer group name | 105| `CONSUMER_ID` | `python-worker` | Consumer identifier | 106| `DB_POOL_SIZE` | `20` | Database connection pool size | 107| `BATCH_SIZE` | `10` | Messages to read per batch | 108| `PARALLEL_CONSUMERS` | `5` | Number of async consumer pipelines | 109| `LOG_LEVEL` | `INFO` | Logging level (DEBUG, INFO, WARNING, ERROR) | 110 111### Tuning Guidelines 112 113#### High Throughput Setup 114```bash 115export DB_POOL_SIZE=50 116export BATCH_SIZE=20 117export PARALLEL_CONSUMERS=10 118export LOG_LEVEL=WARNING 119``` 120 121#### Low Memory Setup 122```bash 123export DB_POOL_SIZE=10 124export BATCH_SIZE=5 125export PARALLEL_CONSUMERS=3 126export LOG_LEVEL=ERROR 127``` 128 129#### Balanced Setup (default) 130```bash 131export DB_POOL_SIZE=20 132export BATCH_SIZE=10 133export PARALLEL_CONSUMERS=5 134export LOG_LEVEL=INFO 135``` 136 137## Performance 138 139### Resource Usage 140 141| Metric | 32 TypeScript Workers | 1 Python Worker | Improvement | 142|--------|----------------------|----------------|-------------| 143| **Memory** | 8-12 GB | 4-6 GB | **50% less** | 144| **CPU** | 60-80% | 30-50% | **40% less** | 145| **DB Connections** | 200 | 20 | **90% less** | 146| **Process Count** | 32 | 1 | **97% less** | 147| **Throughput** | ~5,000/sec | ~5,000/sec | **Same** | 148 149### Benchmarks 150 151Processing 1 million events: 152- **Time**: ~3-4 minutes 153- **Memory Peak**: 4.5 GB 154- **DB Connections**: 18/20 used 155- **Redis Queue**: Stays near 0 (keeps up with firehose) 156- **Error Rate**: <0.01% 157 158## How It Works 159 160### Event Flow 161 1621. **Read from Redis** 163 - Uses XREADGROUP (consumer group pattern) 164 - Reads batches of 10 messages 165 - Blocks for 100ms if no messages 166 - 5 parallel consumer pipelines 167 1682. **Parse Event** 169 - Extract event type (commit, identity, account) 170 - Parse JSON data 171 - Route to appropriate handler 172 1733. **Process to Database** 174 - Acquire connection from pool 175 - Start transaction 176 - Process operations 177 - Commit transaction 178 1794. **Acknowledge** 180 - XACK message in Redis 181 - Message removed from pending list 182 - Consumer group tracks progress 183 184### Supported Record Types 185 186-`app.bsky.feed.post` - Posts 187-`app.bsky.feed.like` - Likes 188-`app.bsky.feed.repost` - Reposts 189-`app.bsky.graph.follow` - Follows 190-`app.bsky.graph.block` - Blocks 191-`app.bsky.actor.profile` - Profiles 192- ✅ Deletions for all types 193 194### Error Handling 195 196- **Duplicate Key Errors** - Silently skipped (idempotent) 197- **Foreign Key Errors** - Logged (missing referenced data) 198- **Connection Errors** - Auto-reconnect 199- **Transaction Errors** - Rollback and continue 200- **Consumer Group Missing** - Auto-recreate 201 202## Monitoring 203 204### Health Check 205 206```bash 207# Check worker is running 208docker-compose ps python-worker 209 210# Check consumer group 211docker exec -it <redis-container> redis-cli XINFO GROUPS firehose:events 212 213# Check pending messages 214docker exec -it <redis-container> redis-cli XPENDING firehose:events firehose-processors 215``` 216 217### Metrics 218 219Watch logs for processing rate: 220```bash 221docker-compose logs -f python-worker | grep "events/sec" 222``` 223 224Example output: 225``` 226[2025-10-13 10:15:30] [INFO] Processed 1,000 events (~520 events/sec) 227[2025-10-13 10:15:35] [INFO] Processed 2,000 events (~525 events/sec) 228``` 229 230### Redis Monitoring 231 232```bash 233# Stream length (should be small if keeping up) 234redis-cli XLEN firehose:events 235 236# Consumer group info 237redis-cli XINFO GROUPS firehose:events 238 239# Pending messages per consumer 240redis-cli XINFO CONSUMERS firehose:events firehose-processors 241``` 242 243### Database Monitoring 244 245```sql 246-- Check active connections from worker 247SELECT count(*) 248FROM pg_stat_activity 249WHERE application_name LIKE '%python%' AND state = 'active'; 250 251-- Check processing stats 252SELECT count(*) as total_posts FROM posts; 253SELECT count(*) as total_likes FROM likes; 254SELECT count(*) as total_reposts FROM reposts; 255``` 256 257## Troubleshooting 258 259### Worker Crashes 260 261**Symptom**: Worker exits with error 262 263**Common Causes**: 2641. Redis connection failure 2652. Database connection failure 2663. Out of memory 267 268**Solutions**: 269```bash 270# Check Redis connectivity 271redis-cli -h <redis-host> ping 272 273# Check database connectivity 274psql $DATABASE_URL -c "SELECT 1" 275 276# Reduce pool size if OOM 277export DB_POOL_SIZE=10 278export PARALLEL_CONSUMERS=3 279 280# Enable debug logging 281export LOG_LEVEL=DEBUG 282``` 283 284### Slow Processing (Backlog Growing) 285 286**Symptom**: Redis stream length increasing 287 288**Common Causes**: 2891. Database performance issues 2902. Not enough consumer pipelines 2913. Pool size too small 292 293**Check backlog**: 294```bash 295redis-cli XLEN firehose:events 296# Should be < 1000 if keeping up 297``` 298 299**Solutions**: 300```bash 301# Increase throughput 302export PARALLEL_CONSUMERS=10 303export DB_POOL_SIZE=40 304export BATCH_SIZE=20 305 306# Check database performance 307psql $DATABASE_URL -c "SELECT * FROM pg_stat_activity WHERE state != 'idle'" 308``` 309 310### High Memory Usage 311 312**Symptom**: Memory usage exceeds 6 GB 313 314**Common Causes**: 3151. Pool size too large 3162. Too many parallel consumers 3173. Large batch size 318 319**Solutions**: 320```bash 321# Reduce resource usage 322export DB_POOL_SIZE=15 323export PARALLEL_CONSUMERS=3 324export BATCH_SIZE=5 325 326# Monitor memory 327docker stats python-worker 328``` 329 330### Consumer Group Missing (NOGROUP) 331 332**Symptom**: `NOGROUP` errors in logs 333 334**Cause**: Redis restarted or stream deleted 335 336**Solution**: Worker auto-recreates, but you can do it manually: 337```bash 338redis-cli XGROUP CREATE firehose:events firehose-processors 0 MKSTREAM 339``` 340 341## Development 342 343### Running Tests 344 345```bash 346# Run worker in test mode 347export LOG_LEVEL=DEBUG 348export DB_POOL_SIZE=5 349export PARALLEL_CONSUMERS=2 350python redis_consumer_worker.py 351``` 352 353### Code Structure 354 355``` 356redis_consumer_worker.py 357├── DatabasePool # Connection pool 358├── EventProcessor # Event handlers 359│ ├── ensure_user() # User creation 360│ ├── process_post() # Post creation 361│ ├── process_like() # Like creation 362│ ├── process_repost() # Repost creation 363│ ├── process_follow() # Follow creation 364│ ├── process_block() # Block creation 365│ ├── process_profile() # Profile updates 366│ ├── process_delete() # Record deletion 367│ ├── process_identity() # Identity updates 368│ └── process_account() # Account events 369└── RedisConsumerWorker # Main worker 370 ├── initialize() # Setup connections 371 ├── consume_events() # Consumer pipeline 372 └── run() # Run pipelines 373``` 374 375### Adding New Record Types 376 3771. Add handler in `EventProcessor`: 378 ```python 379 async def process_new_type(self, conn, uri, cid, did, record): 380 # Your logic here 381 pass 382 ``` 383 3842. Route in `process_commit()`: 385 ```python 386 elif record_type == "app.bsky.new.type": 387 await self.process_new_type(conn, uri, cid, repo, record) 388 ``` 389 3903. Add deletion handler in `process_delete()`: 391 ```python 392 elif collection == "app.bsky.new.type": 393 await conn.execute("DELETE FROM new_types WHERE uri = $1", uri) 394 ``` 395 396## Migration from TypeScript Workers 397 398### Side-by-Side Testing 399 400Run both workers at once - they'll both consume from the same stream: 401 402```bash 403# Keep TypeScript workers running 404docker-compose up -d app 405 406# Start Python worker (it will join the consumer group) 407docker-compose -f docker-compose.unified.yml up python-worker 408 409# Monitor both 410docker-compose logs -f app | grep "events/sec" & 411docker-compose logs -f python-worker | grep "events/sec" 412``` 413 414They'll automatically split the work via the consumer group! 415 416### Full Switch 417 418```bash 419# Stop TypeScript workers 420docker-compose stop app 421 422# Start Python worker 423docker-compose -f docker-compose.unified.yml up -d python-worker 424 425# Monitor 426docker-compose logs -f python-worker 427``` 428 429## FAQ 430 431**Q: Can I run multiple Python workers?** 432A: Yes! Just use different `CONSUMER_ID` values. They'll coordinate via the consumer group: 433```bash 434# Worker 1 435export CONSUMER_ID=python-worker-1 436 437# Worker 2 438export CONSUMER_ID=python-worker-2 439``` 440 441**Q: What happens if the worker crashes?** 442A: Unacknowledged messages remain in Redis pending list. When the worker restarts, it can claim them. Docker restart policy handles auto-restart. 443 444**Q: How do I clear the pending list?** 445A: Messages are acknowledged after processing. Check pending with: 446```bash 447redis-cli XPENDING firehose:events firehose-processors 448``` 449 450**Q: Can I adjust pipelines dynamically?** 451A: No, restart the worker with new `PARALLEL_CONSUMERS` value. 452 453**Q: How do I monitor lag?** 454A: Check Redis stream length: 455```bash 456redis-cli XLEN firehose:events 457# Should be < 1000 if keeping up 458``` 459 460## License 461 462Same as the parent project. 463 464## Support 465 466- **Issues**: Check logs and Redis/database connectivity first 467- **Performance**: Tune `PARALLEL_CONSUMERS`, `BATCH_SIZE`, and `DB_POOL_SIZE` 468- **Questions**: See [CONSOLIDATION_GUIDE.md](../CONSOLIDATION_GUIDE.md) 469 470--- 471 472**Ready to simplify?** 🚀 473 474Replace 32 workers with 1 and enjoy lower resource usage, simpler operations, and the same great performance!