A third party ATProto appview
1#!/usr/bin/env python3
2"""
3High-performance AT Protocol Firehose Consumer (Python)
4
5This script connects to the AT Protocol firehose and pushes events to Redis streams.
6Based on official atproto examples - uses synchronous approach for simplicity.
7"""
8
9import json
10import logging
11import os
12import signal
13import sys
14import time
15from typing import Optional, Any
16
17import redis
18from atproto import (
19 CAR,
20 FirehoseSubscribeReposClient,
21 firehose_models,
22 models,
23 parse_subscribe_repos_message,
24)
25
26
27class SafeJSONEncoder(json.JSONEncoder):
28 """JSON encoder that handles bytes, CIDs, and other non-serializable objects."""
29 def default(self, obj):
30 # Handle bytes objects
31 if isinstance(obj, bytes):
32 return obj.hex() # Convert bytes to hex string
33 # Handle CID objects (they have a string representation)
34 if hasattr(obj, '__str__') and not isinstance(obj, (dict, list, tuple)):
35 return str(obj)
36 # Fallback to default behavior
37 try:
38 return super().default(obj)
39 except TypeError:
40 return repr(obj)
41
42# Configure logging
43log_level = os.getenv("LOG_LEVEL", "INFO").upper()
44logging.basicConfig(
45 level=getattr(logging, log_level),
46 format='[%(asctime)s] [%(levelname)s] %(message)s',
47 datefmt='%Y-%m-%d %H:%M:%S'
48)
49logger = logging.getLogger(__name__)
50
51
52class FirehoseConsumer:
53 """
54 Synchronous AT Protocol firehose consumer that pushes to Redis streams.
55 Uses sync Redis client for simplicity (no async/sync bridge issues).
56 """
57
58 def __init__(
59 self,
60 relay_url: str = "wss://bsky.network",
61 redis_url: str = "redis://localhost:6379",
62 stream_key: str = "firehose:events",
63 cursor_key: str = "firehose:cursor",
64 max_stream_len: int = 500000,
65 ):
66 self.relay_url = relay_url
67 self.redis_url = redis_url
68 self.stream_key = stream_key
69 self.cursor_key = cursor_key
70 self.max_stream_len = max_stream_len
71
72 self.redis_client: Optional[redis.Redis] = None
73 self.client: Optional[FirehoseSubscribeReposClient] = None
74 self.running = False
75 self.current_cursor: Optional[int] = None
76 self.is_connected = False
77
78 # Metrics
79 self.event_count = 0
80 self.last_event_time = time.time()
81 self.start_time = time.time()
82
83 # Cursor persistence
84 self.last_cursor_save = 0
85 self.cursor_save_interval = 5 # seconds
86
87 # Recent events buffer for dashboard (keep last 50)
88 self.recent_events = []
89
90 # Status heartbeat
91 self.last_status_update = 0
92 self.status_update_interval = 5 # seconds
93
94 def connect_redis(self) -> None:
95 """Connect to Redis."""
96 logger.info(f"Connecting to Redis at {self.redis_url}...")
97
98 self.redis_client = redis.from_url(
99 self.redis_url,
100 decode_responses=True,
101 socket_keepalive=True,
102 )
103
104 # Verify connection
105 self.redis_client.ping()
106 logger.info("Connected to Redis successfully")
107
108 # Load saved cursor
109 saved_cursor = self.redis_client.get(self.cursor_key)
110 if saved_cursor:
111 self.current_cursor = int(saved_cursor)
112 logger.info(f"Loaded saved cursor: {self.current_cursor}")
113 else:
114 logger.info("No saved cursor found, starting from current position")
115
116 def save_cursor(self, cursor: int) -> None:
117 """Save cursor to Redis for restart recovery."""
118 self.current_cursor = cursor
119
120 # Save periodically to avoid excessive writes
121 now = time.time()
122 if now - self.last_cursor_save > self.cursor_save_interval:
123 self.last_cursor_save = now
124 try:
125 self.redis_client.set(self.cursor_key, str(cursor))
126 except Exception as e:
127 logger.error(f"Error saving cursor: {e}")
128
129 def update_status(self) -> None:
130 """Update firehose status in Redis for dashboard visibility."""
131 now = time.time()
132 if now - self.last_status_update > self.status_update_interval:
133 self.last_status_update = now
134 try:
135 status = {
136 "connected": self.is_connected,
137 "url": self.relay_url,
138 "currentCursor": str(self.current_cursor) if self.current_cursor else None,
139 }
140 # Store with 10 second TTL (will be refreshed by heartbeat)
141 self.redis_client.setex(
142 "firehose:status",
143 10,
144 json.dumps(status)
145 )
146 except Exception as e:
147 logger.error(f"Error updating status: {e}")
148
149 def broadcast_event(self, event: dict) -> None:
150 """Broadcast event to Redis pub/sub for real-time dashboard updates."""
151 try:
152 # Add to recent events buffer (keep last 50)
153 self.recent_events.insert(0, event)
154 if len(self.recent_events) > 50:
155 self.recent_events.pop()
156
157 # Store recent events in Redis (with 10 second TTL)
158 self.redis_client.setex(
159 "firehose:recent_events",
160 10,
161 json.dumps(self.recent_events)
162 )
163
164 # Publish to Redis pub/sub for real-time streaming
165 self.redis_client.publish(
166 "firehose:events:broadcast",
167 json.dumps(event)
168 )
169 except Exception as e:
170 logger.error(f"Error broadcasting event: {e}")
171
172 def push_to_redis(self, event_type: str, data: dict, seq: Optional[int] = None) -> None:
173 """Push event to Redis stream."""
174 try:
175 # Use XADD with MAXLEN to prevent infinite stream growth
176 # Use SafeJSONEncoder to handle bytes and CIDs
177 self.redis_client.xadd(
178 self.stream_key,
179 {
180 "type": event_type,
181 "data": json.dumps(data, cls=SafeJSONEncoder),
182 "seq": str(seq) if seq else "",
183 },
184 maxlen=self.max_stream_len,
185 approximate=True,
186 )
187
188 self.event_count += 1
189
190 # Log progress every 1000 events
191 if self.event_count % 1000 == 0:
192 elapsed = time.time() - self.start_time
193 rate = self.event_count / elapsed if elapsed > 0 else 0
194 logger.info(
195 f"Processed {self.event_count:,} events "
196 f"(~{rate:.0f} events/sec, cursor: {self.current_cursor})"
197 )
198
199 except Exception as e:
200 logger.error(f"Error pushing to Redis: {e}")
201 raise
202
203 def on_message_handler(self, message: firehose_models.MessageFrame) -> None:
204 """Handle incoming firehose message."""
205 # Handler is being called - remove debug prints
206 logger.debug(f"Handler called")
207
208 try:
209 logger.debug(f"Received message")
210
211 commit = parse_subscribe_repos_message(message)
212 logger.debug(f"Parsed: {type(commit).__name__}")
213
214 # Handle Commit messages (posts, likes, follows, etc.)
215 if isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
216 seq = commit.seq
217 self.save_cursor(seq)
218
219 # Parse the commit operations
220 data = {
221 "repo": commit.repo,
222 "ops": [],
223 }
224
225 # Parse CAR blocks if available
226 car = None
227 if commit.blocks:
228 try:
229 car = CAR.from_bytes(commit.blocks)
230 except Exception as e:
231 logger.debug(f"Could not parse CAR: {e}")
232
233 # Process operations
234 for op in commit.ops:
235 op_data = {
236 "action": op.action,
237 "path": op.path,
238 }
239
240 # For create/update actions, we MUST extract the record
241 # The TypeScript event processor requires record.$type to be present
242 if op.action in ["create", "update"]:
243 # Include CID (required for create/update)
244 if hasattr(op, 'cid') and op.cid:
245 op_data["cid"] = str(op.cid)
246 else:
247 # Skip ops without CID for create/update - malformed data
248 logger.debug(f"Skipping {op.action} op without CID: {op.path}")
249 continue
250
251 # Extract record data from CAR blocks
252 if not car:
253 # No CAR blocks available - skip this op
254 logger.debug(f"Skipping {op.action} op - no CAR blocks: {op.path}")
255 continue
256
257 try:
258 record_bytes = car.blocks.get(op.cid)
259 if not record_bytes:
260 logger.debug(f"Skipping {op.action} op - CID not in CAR blocks: {op.path}")
261 continue
262
263 record = models.get_or_create(record_bytes, strict=False)
264 if not record:
265 logger.debug(f"Skipping {op.action} op - could not parse record: {op.path}")
266 continue
267
268 # Serialize the record and ensure $type field is included
269 if hasattr(record, 'model_dump'):
270 record_data = record.model_dump()
271 elif hasattr(record, 'dict'):
272 record_data = record.dict()
273 else:
274 record_data = {}
275
276 # CRITICAL: Add $type field from py_type attribute
277 # The TypeScript event processor uses this to determine record type
278 if hasattr(record, 'py_type') and record.py_type:
279 record_data["$type"] = record.py_type
280 else:
281 # No $type available - skip this op
282 logger.debug(f"Skipping {op.action} op - no $type available: {op.path}")
283 continue
284
285 op_data["record"] = record_data
286
287 except Exception as e:
288 logger.debug(f"Skipping {op.action} op - extraction error: {op.path} - {e}")
289 continue
290
291 elif op.action == "delete":
292 # Delete actions only need action and path (no CID or record)
293 pass
294
295 else:
296 # Unknown action - skip
297 logger.debug(f"Skipping unknown action: {op.action} - {op.path}")
298 continue
299
300 data["ops"].append(op_data)
301
302 logger.debug(f"Pushing commit with {len(data['ops'])} ops")
303 self.push_to_redis("commit", data, seq)
304 self.last_event_time = time.time()
305
306 # Broadcast event to dashboard (only first op for UI)
307 if data["ops"]:
308 first_op = data["ops"][0]
309 lexicon = first_op["path"].split('/')[0]
310 self.broadcast_event({
311 "type": "#commit",
312 "lexicon": lexicon,
313 "did": commit.repo,
314 "action": first_op["action"],
315 "timestamp": time.strftime("%H:%M:%S"),
316 })
317
318 # Update status heartbeat
319 self.update_status()
320
321 # Handle Identity messages
322 elif isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Identity):
323 data = {
324 "did": commit.did,
325 "handle": getattr(commit, 'handle', commit.did),
326 }
327 seq = getattr(commit, 'seq', None)
328 self.push_to_redis("identity", data, seq)
329 if seq:
330 self.save_cursor(seq)
331 self.last_event_time = time.time()
332
333 # Broadcast event to dashboard
334 self.broadcast_event({
335 "type": "#identity",
336 "lexicon": "com.atproto.identity",
337 "did": commit.did,
338 "action": f"→ {data['handle']}" if data['handle'] != commit.did else "update",
339 "timestamp": time.strftime("%H:%M:%S"),
340 })
341
342 # Update status heartbeat
343 self.update_status()
344
345 # Handle Account messages
346 elif isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Account):
347 data = {
348 "did": commit.did,
349 "active": getattr(commit, 'active', True),
350 }
351 seq = getattr(commit, 'seq', None)
352 self.push_to_redis("account", data, seq)
353 if seq:
354 self.save_cursor(seq)
355 self.last_event_time = time.time()
356
357 # Broadcast event to dashboard
358 self.broadcast_event({
359 "type": "#account",
360 "lexicon": "com.atproto.account",
361 "did": commit.did,
362 "action": "active" if data['active'] else "inactive",
363 "timestamp": time.strftime("%H:%M:%S"),
364 })
365
366 # Update status heartbeat
367 self.update_status()
368
369 except Exception as e:
370 logger.error(f"Error handling message: {e}", exc_info=True)
371
372 def run(self) -> None:
373 """Main run loop."""
374 self.running = True
375
376 # Connect to Redis
377 self.connect_redis()
378
379 # Create firehose client
380 params = None
381 if self.current_cursor:
382 params = models.ComAtprotoSyncSubscribeRepos.Params(cursor=self.current_cursor)
383 logger.info(f"Resuming from cursor: {self.current_cursor}")
384
385 logger.info(f"Connecting to firehose...")
386 self.client = FirehoseSubscribeReposClient(params)
387
388 logger.info("Firehose client created")
389 logger.info("Starting to listen for events...")
390
391 # Mark as connected and update status
392 self.is_connected = True
393 self.update_status()
394
395 # Start the client (this blocks until stopped)
396 try:
397 self.client.start(self.on_message_handler)
398 except Exception as e:
399 logger.error(f"Error in client.start(): {e}", exc_info=True)
400 self.is_connected = False
401 self.update_status()
402 raise
403
404 def stop(self) -> None:
405 """Gracefully stop the consumer."""
406 logger.info("Stopping firehose consumer...")
407 self.running = False
408 self.is_connected = False
409
410 # Update status to disconnected
411 if self.redis_client:
412 try:
413 self.update_status()
414 except:
415 pass
416
417 # Stop client
418 if self.client:
419 try:
420 self.client.stop()
421 except:
422 pass
423
424 # Save final cursor
425 if self.current_cursor and self.redis_client:
426 self.redis_client.set(self.cursor_key, str(self.current_cursor))
427 logger.info(f"Saved final cursor: {self.current_cursor}")
428
429 # Close Redis
430 if self.redis_client:
431 self.redis_client.close()
432
433 # Log final stats
434 elapsed = time.time() - self.start_time
435 rate = self.event_count / elapsed if elapsed > 0 else 0
436 logger.info(
437 f"Stopped. Total events: {self.event_count:,} "
438 f"(~{rate:.0f} events/sec over {elapsed:.0f}s)"
439 )
440
441
442def main():
443 """Main entry point."""
444 # Configuration from environment
445 relay_url = os.getenv("RELAY_URL", "wss://bsky.network")
446
447 # Ensure relay URL includes the full path if not already present
448 if relay_url and not relay_url.endswith("/xrpc/com.atproto.sync.subscribeRepos"):
449 # Strip trailing slash if present
450 relay_url = relay_url.rstrip('/')
451
452 redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
453 stream_key = os.getenv("REDIS_STREAM_KEY", "firehose:events")
454 cursor_key = os.getenv("REDIS_CURSOR_KEY", "firehose:python_cursor")
455 max_stream_len = int(os.getenv("REDIS_MAX_STREAM_LEN", "500000"))
456
457 logger.info("=" * 60)
458 logger.info("AT Protocol Firehose Consumer (Python)")
459 logger.info("=" * 60)
460 logger.info(f"Relay URL: {relay_url}")
461 logger.info(f"Redis URL: {redis_url}")
462 logger.info(f"Stream Key: {stream_key}")
463 logger.info(f"Cursor Key: {cursor_key}")
464 logger.info(f"Max Stream Length: {max_stream_len:,}")
465 logger.info("=" * 60)
466
467 # Create consumer
468 consumer = FirehoseConsumer(
469 relay_url=relay_url,
470 redis_url=redis_url,
471 stream_key=stream_key,
472 cursor_key=cursor_key,
473 max_stream_len=max_stream_len,
474 )
475
476 # Handle signals for graceful shutdown
477 def signal_handler(signum, frame):
478 logger.info(f"Received signal {signum}, shutting down...")
479 consumer.stop()
480 sys.exit(0)
481
482 signal.signal(signal.SIGINT, signal_handler)
483 signal.signal(signal.SIGTERM, signal_handler)
484
485 # Run consumer
486 try:
487 consumer.run()
488 except KeyboardInterrupt:
489 logger.info("Keyboard interrupt received")
490 consumer.stop()
491 except Exception as e:
492 logger.error(f"Fatal error: {e}", exc_info=True)
493 consumer.stop()
494 sys.exit(1)
495
496
497if __name__ == "__main__":
498 main()