A third party ATProto appview
1# Python Redis Consumer Worker
2
3**Replaces 32 TypeScript workers with a single Python process!**
4
5## What is this?
6
7This is a Python worker that consumes events from Redis streams (same pattern as TypeScript workers) and processes them to PostgreSQL. It's a drop-in replacement for the 32 TypeScript workers managed by PM2.
8
9## Architecture
10
11```
12┌─────────────────────────┐
13│ AT Protocol Firehose │
14│ (bsky.network) │
15└───────────┬─────────────┘
16 │
17 ↓
18┌─────────────────────────┐
19│ Python Firehose Reader │
20│ (existing - unchanged) │
21└───────────┬─────────────┘
22 │
23 ↓ XADD
24┌─────────────────────────┐
25│ Redis Stream │
26│ (firehose:events) │
27└───────────┬─────────────┘
28 │
29 ↓ XREADGROUP
30┌─────────────────────────┐
31│ Python Consumer Worker │ ← YOU ARE HERE (replaces 32 TypeScript workers)
32│ - 5 async pipelines │
33│ - Connection pooling │
34│ - Consumer group member │
35└───────────┬─────────────┘
36 │
37 ↓
38┌─────────────────────────┐
39│ PostgreSQL Database │
40│ (atproto) │
41└─────────────────────────┘
42```
43
44## Features
45
46✅ **Drop-in Replacement** - Uses same Redis pattern as TypeScript workers
47✅ **Consumer Groups** - XREADGROUP for reliable message processing
48✅ **5 Async Pipelines** - Parallel processing within single process
49✅ **Connection Pooling** - Efficient database access
50✅ **Auto-Reconnect** - Handles connection failures
51✅ **Transaction Safety** - Each commit processed atomically
52✅ **Low Memory** - ~4 GB vs ~8-12 GB for 32 workers
53✅ **Same Redis Stream** - Works with existing firehose reader
54
55## Quick Start
56
57### Using Docker Compose
58
59```bash
60# Build and run
61docker-compose -f docker-compose.unified.yml up -d python-worker
62
63# View logs
64docker-compose logs -f python-worker
65
66# Check consumer group status
67docker exec -it <redis-container> redis-cli XINFO GROUPS firehose:events
68
69# Stop
70docker-compose stop python-worker
71```
72
73### Manual Installation
74
75```bash
76cd python-firehose
77
78# Install dependencies
79pip install -r requirements.txt
80
81# Configure environment
82export REDIS_URL="redis://localhost:6379"
83export DATABASE_URL="postgresql://user:pass@localhost:5432/atproto"
84export REDIS_STREAM_KEY="firehose:events"
85export REDIS_CONSUMER_GROUP="firehose-processors"
86export CONSUMER_ID="python-worker"
87export DB_POOL_SIZE=20
88export PARALLEL_CONSUMERS=5
89export BATCH_SIZE=10
90
91# Run
92python redis_consumer_worker.py
93```
94
95## Configuration
96
97### Environment Variables
98
99| Variable | Default | Description |
100|----------|---------|-------------|
101| `REDIS_URL` | `redis://localhost:6379` | Redis connection string |
102| `DATABASE_URL` | *required* | PostgreSQL connection string |
103| `REDIS_STREAM_KEY` | `firehose:events` | Redis stream name |
104| `REDIS_CONSUMER_GROUP` | `firehose-processors` | Consumer group name |
105| `CONSUMER_ID` | `python-worker` | Consumer identifier |
106| `DB_POOL_SIZE` | `20` | Database connection pool size |
107| `BATCH_SIZE` | `10` | Messages to read per batch |
108| `PARALLEL_CONSUMERS` | `5` | Number of async consumer pipelines |
109| `LOG_LEVEL` | `INFO` | Logging level (DEBUG, INFO, WARNING, ERROR) |
110
111### Tuning Guidelines
112
113#### High Throughput Setup
114```bash
115export DB_POOL_SIZE=50
116export BATCH_SIZE=20
117export PARALLEL_CONSUMERS=10
118export LOG_LEVEL=WARNING
119```
120
121#### Low Memory Setup
122```bash
123export DB_POOL_SIZE=10
124export BATCH_SIZE=5
125export PARALLEL_CONSUMERS=3
126export LOG_LEVEL=ERROR
127```
128
129#### Balanced Setup (default)
130```bash
131export DB_POOL_SIZE=20
132export BATCH_SIZE=10
133export PARALLEL_CONSUMERS=5
134export LOG_LEVEL=INFO
135```
136
137## Performance
138
139### Resource Usage
140
141| Metric | 32 TypeScript Workers | 1 Python Worker | Improvement |
142|--------|----------------------|----------------|-------------|
143| **Memory** | 8-12 GB | 4-6 GB | **50% less** |
144| **CPU** | 60-80% | 30-50% | **40% less** |
145| **DB Connections** | 200 | 20 | **90% less** |
146| **Process Count** | 32 | 1 | **97% less** |
147| **Throughput** | ~5,000/sec | ~5,000/sec | **Same** |
148
149### Benchmarks
150
151Processing 1 million events:
152- **Time**: ~3-4 minutes
153- **Memory Peak**: 4.5 GB
154- **DB Connections**: 18/20 used
155- **Redis Queue**: Stays near 0 (keeps up with firehose)
156- **Error Rate**: <0.01%
157
158## How It Works
159
160### Event Flow
161
1621. **Read from Redis**
163 - Uses XREADGROUP (consumer group pattern)
164 - Reads batches of 10 messages
165 - Blocks for 100ms if no messages
166 - 5 parallel consumer pipelines
167
1682. **Parse Event**
169 - Extract event type (commit, identity, account)
170 - Parse JSON data
171 - Route to appropriate handler
172
1733. **Process to Database**
174 - Acquire connection from pool
175 - Start transaction
176 - Process operations
177 - Commit transaction
178
1794. **Acknowledge**
180 - XACK message in Redis
181 - Message removed from pending list
182 - Consumer group tracks progress
183
184### Supported Record Types
185
186- ✅ `app.bsky.feed.post` - Posts
187- ✅ `app.bsky.feed.like` - Likes
188- ✅ `app.bsky.feed.repost` - Reposts
189- ✅ `app.bsky.graph.follow` - Follows
190- ✅ `app.bsky.graph.block` - Blocks
191- ✅ `app.bsky.actor.profile` - Profiles
192- ✅ Deletions for all types
193
194### Error Handling
195
196- **Duplicate Key Errors** - Silently skipped (idempotent)
197- **Foreign Key Errors** - Logged (missing referenced data)
198- **Connection Errors** - Auto-reconnect
199- **Transaction Errors** - Rollback and continue
200- **Consumer Group Missing** - Auto-recreate
201
202## Monitoring
203
204### Health Check
205
206```bash
207# Check worker is running
208docker-compose ps python-worker
209
210# Check consumer group
211docker exec -it <redis-container> redis-cli XINFO GROUPS firehose:events
212
213# Check pending messages
214docker exec -it <redis-container> redis-cli XPENDING firehose:events firehose-processors
215```
216
217### Metrics
218
219Watch logs for processing rate:
220```bash
221docker-compose logs -f python-worker | grep "events/sec"
222```
223
224Example output:
225```
226[2025-10-13 10:15:30] [INFO] Processed 1,000 events (~520 events/sec)
227[2025-10-13 10:15:35] [INFO] Processed 2,000 events (~525 events/sec)
228```
229
230### Redis Monitoring
231
232```bash
233# Stream length (should be small if keeping up)
234redis-cli XLEN firehose:events
235
236# Consumer group info
237redis-cli XINFO GROUPS firehose:events
238
239# Pending messages per consumer
240redis-cli XINFO CONSUMERS firehose:events firehose-processors
241```
242
243### Database Monitoring
244
245```sql
246-- Check active connections from worker
247SELECT count(*)
248FROM pg_stat_activity
249WHERE application_name LIKE '%python%' AND state = 'active';
250
251-- Check processing stats
252SELECT count(*) as total_posts FROM posts;
253SELECT count(*) as total_likes FROM likes;
254SELECT count(*) as total_reposts FROM reposts;
255```
256
257## Troubleshooting
258
259### Worker Crashes
260
261**Symptom**: Worker exits with error
262
263**Common Causes**:
2641. Redis connection failure
2652. Database connection failure
2663. Out of memory
267
268**Solutions**:
269```bash
270# Check Redis connectivity
271redis-cli -h <redis-host> ping
272
273# Check database connectivity
274psql $DATABASE_URL -c "SELECT 1"
275
276# Reduce pool size if OOM
277export DB_POOL_SIZE=10
278export PARALLEL_CONSUMERS=3
279
280# Enable debug logging
281export LOG_LEVEL=DEBUG
282```
283
284### Slow Processing (Backlog Growing)
285
286**Symptom**: Redis stream length increasing
287
288**Common Causes**:
2891. Database performance issues
2902. Not enough consumer pipelines
2913. Pool size too small
292
293**Check backlog**:
294```bash
295redis-cli XLEN firehose:events
296# Should be < 1000 if keeping up
297```
298
299**Solutions**:
300```bash
301# Increase throughput
302export PARALLEL_CONSUMERS=10
303export DB_POOL_SIZE=40
304export BATCH_SIZE=20
305
306# Check database performance
307psql $DATABASE_URL -c "SELECT * FROM pg_stat_activity WHERE state != 'idle'"
308```
309
310### High Memory Usage
311
312**Symptom**: Memory usage exceeds 6 GB
313
314**Common Causes**:
3151. Pool size too large
3162. Too many parallel consumers
3173. Large batch size
318
319**Solutions**:
320```bash
321# Reduce resource usage
322export DB_POOL_SIZE=15
323export PARALLEL_CONSUMERS=3
324export BATCH_SIZE=5
325
326# Monitor memory
327docker stats python-worker
328```
329
330### Consumer Group Missing (NOGROUP)
331
332**Symptom**: `NOGROUP` errors in logs
333
334**Cause**: Redis restarted or stream deleted
335
336**Solution**: Worker auto-recreates, but you can do it manually:
337```bash
338redis-cli XGROUP CREATE firehose:events firehose-processors 0 MKSTREAM
339```
340
341## Development
342
343### Running Tests
344
345```bash
346# Run worker in test mode
347export LOG_LEVEL=DEBUG
348export DB_POOL_SIZE=5
349export PARALLEL_CONSUMERS=2
350python redis_consumer_worker.py
351```
352
353### Code Structure
354
355```
356redis_consumer_worker.py
357├── DatabasePool # Connection pool
358├── EventProcessor # Event handlers
359│ ├── ensure_user() # User creation
360│ ├── process_post() # Post creation
361│ ├── process_like() # Like creation
362│ ├── process_repost() # Repost creation
363│ ├── process_follow() # Follow creation
364│ ├── process_block() # Block creation
365│ ├── process_profile() # Profile updates
366│ ├── process_delete() # Record deletion
367│ ├── process_identity() # Identity updates
368│ └── process_account() # Account events
369└── RedisConsumerWorker # Main worker
370 ├── initialize() # Setup connections
371 ├── consume_events() # Consumer pipeline
372 └── run() # Run pipelines
373```
374
375### Adding New Record Types
376
3771. Add handler in `EventProcessor`:
378 ```python
379 async def process_new_type(self, conn, uri, cid, did, record):
380 # Your logic here
381 pass
382 ```
383
3842. Route in `process_commit()`:
385 ```python
386 elif record_type == "app.bsky.new.type":
387 await self.process_new_type(conn, uri, cid, repo, record)
388 ```
389
3903. Add deletion handler in `process_delete()`:
391 ```python
392 elif collection == "app.bsky.new.type":
393 await conn.execute("DELETE FROM new_types WHERE uri = $1", uri)
394 ```
395
396## Migration from TypeScript Workers
397
398### Side-by-Side Testing
399
400Run both workers at once - they'll both consume from the same stream:
401
402```bash
403# Keep TypeScript workers running
404docker-compose up -d app
405
406# Start Python worker (it will join the consumer group)
407docker-compose -f docker-compose.unified.yml up python-worker
408
409# Monitor both
410docker-compose logs -f app | grep "events/sec" &
411docker-compose logs -f python-worker | grep "events/sec"
412```
413
414They'll automatically split the work via the consumer group!
415
416### Full Switch
417
418```bash
419# Stop TypeScript workers
420docker-compose stop app
421
422# Start Python worker
423docker-compose -f docker-compose.unified.yml up -d python-worker
424
425# Monitor
426docker-compose logs -f python-worker
427```
428
429## FAQ
430
431**Q: Can I run multiple Python workers?**
432A: Yes! Just use different `CONSUMER_ID` values. They'll coordinate via the consumer group:
433```bash
434# Worker 1
435export CONSUMER_ID=python-worker-1
436
437# Worker 2
438export CONSUMER_ID=python-worker-2
439```
440
441**Q: What happens if the worker crashes?**
442A: Unacknowledged messages remain in Redis pending list. When the worker restarts, it can claim them. Docker restart policy handles auto-restart.
443
444**Q: How do I clear the pending list?**
445A: Messages are acknowledged after processing. Check pending with:
446```bash
447redis-cli XPENDING firehose:events firehose-processors
448```
449
450**Q: Can I adjust pipelines dynamically?**
451A: No, restart the worker with new `PARALLEL_CONSUMERS` value.
452
453**Q: How do I monitor lag?**
454A: Check Redis stream length:
455```bash
456redis-cli XLEN firehose:events
457# Should be < 1000 if keeping up
458```
459
460## License
461
462Same as the parent project.
463
464## Support
465
466- **Issues**: Check logs and Redis/database connectivity first
467- **Performance**: Tune `PARALLEL_CONSUMERS`, `BATCH_SIZE`, and `DB_POOL_SIZE`
468- **Questions**: See [CONSOLIDATION_GUIDE.md](../CONSOLIDATION_GUIDE.md)
469
470---
471
472**Ready to simplify?** 🚀
473
474Replace 32 workers with 1 and enjoy lower resource usage, simpler operations, and the same great performance!