A third party ATProto appview
at main 498 lines 19 kB view raw
1#!/usr/bin/env python3 2""" 3High-performance AT Protocol Firehose Consumer (Python) 4 5This script connects to the AT Protocol firehose and pushes events to Redis streams. 6Based on official atproto examples - uses synchronous approach for simplicity. 7""" 8 9import json 10import logging 11import os 12import signal 13import sys 14import time 15from typing import Optional, Any 16 17import redis 18from atproto import ( 19 CAR, 20 FirehoseSubscribeReposClient, 21 firehose_models, 22 models, 23 parse_subscribe_repos_message, 24) 25 26 27class SafeJSONEncoder(json.JSONEncoder): 28 """JSON encoder that handles bytes, CIDs, and other non-serializable objects.""" 29 def default(self, obj): 30 # Handle bytes objects 31 if isinstance(obj, bytes): 32 return obj.hex() # Convert bytes to hex string 33 # Handle CID objects (they have a string representation) 34 if hasattr(obj, '__str__') and not isinstance(obj, (dict, list, tuple)): 35 return str(obj) 36 # Fallback to default behavior 37 try: 38 return super().default(obj) 39 except TypeError: 40 return repr(obj) 41 42# Configure logging 43log_level = os.getenv("LOG_LEVEL", "INFO").upper() 44logging.basicConfig( 45 level=getattr(logging, log_level), 46 format='[%(asctime)s] [%(levelname)s] %(message)s', 47 datefmt='%Y-%m-%d %H:%M:%S' 48) 49logger = logging.getLogger(__name__) 50 51 52class FirehoseConsumer: 53 """ 54 Synchronous AT Protocol firehose consumer that pushes to Redis streams. 55 Uses sync Redis client for simplicity (no async/sync bridge issues). 56 """ 57 58 def __init__( 59 self, 60 relay_url: str = "wss://bsky.network", 61 redis_url: str = "redis://localhost:6379", 62 stream_key: str = "firehose:events", 63 cursor_key: str = "firehose:cursor", 64 max_stream_len: int = 500000, 65 ): 66 self.relay_url = relay_url 67 self.redis_url = redis_url 68 self.stream_key = stream_key 69 self.cursor_key = cursor_key 70 self.max_stream_len = max_stream_len 71 72 self.redis_client: Optional[redis.Redis] = None 73 self.client: Optional[FirehoseSubscribeReposClient] = None 74 self.running = False 75 self.current_cursor: Optional[int] = None 76 self.is_connected = False 77 78 # Metrics 79 self.event_count = 0 80 self.last_event_time = time.time() 81 self.start_time = time.time() 82 83 # Cursor persistence 84 self.last_cursor_save = 0 85 self.cursor_save_interval = 5 # seconds 86 87 # Recent events buffer for dashboard (keep last 50) 88 self.recent_events = [] 89 90 # Status heartbeat 91 self.last_status_update = 0 92 self.status_update_interval = 5 # seconds 93 94 def connect_redis(self) -> None: 95 """Connect to Redis.""" 96 logger.info(f"Connecting to Redis at {self.redis_url}...") 97 98 self.redis_client = redis.from_url( 99 self.redis_url, 100 decode_responses=True, 101 socket_keepalive=True, 102 ) 103 104 # Verify connection 105 self.redis_client.ping() 106 logger.info("Connected to Redis successfully") 107 108 # Load saved cursor 109 saved_cursor = self.redis_client.get(self.cursor_key) 110 if saved_cursor: 111 self.current_cursor = int(saved_cursor) 112 logger.info(f"Loaded saved cursor: {self.current_cursor}") 113 else: 114 logger.info("No saved cursor found, starting from current position") 115 116 def save_cursor(self, cursor: int) -> None: 117 """Save cursor to Redis for restart recovery.""" 118 self.current_cursor = cursor 119 120 # Save periodically to avoid excessive writes 121 now = time.time() 122 if now - self.last_cursor_save > self.cursor_save_interval: 123 self.last_cursor_save = now 124 try: 125 self.redis_client.set(self.cursor_key, str(cursor)) 126 except Exception as e: 127 logger.error(f"Error saving cursor: {e}") 128 129 def update_status(self) -> None: 130 """Update firehose status in Redis for dashboard visibility.""" 131 now = time.time() 132 if now - self.last_status_update > self.status_update_interval: 133 self.last_status_update = now 134 try: 135 status = { 136 "connected": self.is_connected, 137 "url": self.relay_url, 138 "currentCursor": str(self.current_cursor) if self.current_cursor else None, 139 } 140 # Store with 10 second TTL (will be refreshed by heartbeat) 141 self.redis_client.setex( 142 "firehose:status", 143 10, 144 json.dumps(status) 145 ) 146 except Exception as e: 147 logger.error(f"Error updating status: {e}") 148 149 def broadcast_event(self, event: dict) -> None: 150 """Broadcast event to Redis pub/sub for real-time dashboard updates.""" 151 try: 152 # Add to recent events buffer (keep last 50) 153 self.recent_events.insert(0, event) 154 if len(self.recent_events) > 50: 155 self.recent_events.pop() 156 157 # Store recent events in Redis (with 10 second TTL) 158 self.redis_client.setex( 159 "firehose:recent_events", 160 10, 161 json.dumps(self.recent_events) 162 ) 163 164 # Publish to Redis pub/sub for real-time streaming 165 self.redis_client.publish( 166 "firehose:events:broadcast", 167 json.dumps(event) 168 ) 169 except Exception as e: 170 logger.error(f"Error broadcasting event: {e}") 171 172 def push_to_redis(self, event_type: str, data: dict, seq: Optional[int] = None) -> None: 173 """Push event to Redis stream.""" 174 try: 175 # Use XADD with MAXLEN to prevent infinite stream growth 176 # Use SafeJSONEncoder to handle bytes and CIDs 177 self.redis_client.xadd( 178 self.stream_key, 179 { 180 "type": event_type, 181 "data": json.dumps(data, cls=SafeJSONEncoder), 182 "seq": str(seq) if seq else "", 183 }, 184 maxlen=self.max_stream_len, 185 approximate=True, 186 ) 187 188 self.event_count += 1 189 190 # Log progress every 1000 events 191 if self.event_count % 1000 == 0: 192 elapsed = time.time() - self.start_time 193 rate = self.event_count / elapsed if elapsed > 0 else 0 194 logger.info( 195 f"Processed {self.event_count:,} events " 196 f"(~{rate:.0f} events/sec, cursor: {self.current_cursor})" 197 ) 198 199 except Exception as e: 200 logger.error(f"Error pushing to Redis: {e}") 201 raise 202 203 def on_message_handler(self, message: firehose_models.MessageFrame) -> None: 204 """Handle incoming firehose message.""" 205 # Handler is being called - remove debug prints 206 logger.debug(f"Handler called") 207 208 try: 209 logger.debug(f"Received message") 210 211 commit = parse_subscribe_repos_message(message) 212 logger.debug(f"Parsed: {type(commit).__name__}") 213 214 # Handle Commit messages (posts, likes, follows, etc.) 215 if isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit): 216 seq = commit.seq 217 self.save_cursor(seq) 218 219 # Parse the commit operations 220 data = { 221 "repo": commit.repo, 222 "ops": [], 223 } 224 225 # Parse CAR blocks if available 226 car = None 227 if commit.blocks: 228 try: 229 car = CAR.from_bytes(commit.blocks) 230 except Exception as e: 231 logger.debug(f"Could not parse CAR: {e}") 232 233 # Process operations 234 for op in commit.ops: 235 op_data = { 236 "action": op.action, 237 "path": op.path, 238 } 239 240 # For create/update actions, we MUST extract the record 241 # The TypeScript event processor requires record.$type to be present 242 if op.action in ["create", "update"]: 243 # Include CID (required for create/update) 244 if hasattr(op, 'cid') and op.cid: 245 op_data["cid"] = str(op.cid) 246 else: 247 # Skip ops without CID for create/update - malformed data 248 logger.debug(f"Skipping {op.action} op without CID: {op.path}") 249 continue 250 251 # Extract record data from CAR blocks 252 if not car: 253 # No CAR blocks available - skip this op 254 logger.debug(f"Skipping {op.action} op - no CAR blocks: {op.path}") 255 continue 256 257 try: 258 record_bytes = car.blocks.get(op.cid) 259 if not record_bytes: 260 logger.debug(f"Skipping {op.action} op - CID not in CAR blocks: {op.path}") 261 continue 262 263 record = models.get_or_create(record_bytes, strict=False) 264 if not record: 265 logger.debug(f"Skipping {op.action} op - could not parse record: {op.path}") 266 continue 267 268 # Serialize the record and ensure $type field is included 269 if hasattr(record, 'model_dump'): 270 record_data = record.model_dump() 271 elif hasattr(record, 'dict'): 272 record_data = record.dict() 273 else: 274 record_data = {} 275 276 # CRITICAL: Add $type field from py_type attribute 277 # The TypeScript event processor uses this to determine record type 278 if hasattr(record, 'py_type') and record.py_type: 279 record_data["$type"] = record.py_type 280 else: 281 # No $type available - skip this op 282 logger.debug(f"Skipping {op.action} op - no $type available: {op.path}") 283 continue 284 285 op_data["record"] = record_data 286 287 except Exception as e: 288 logger.debug(f"Skipping {op.action} op - extraction error: {op.path} - {e}") 289 continue 290 291 elif op.action == "delete": 292 # Delete actions only need action and path (no CID or record) 293 pass 294 295 else: 296 # Unknown action - skip 297 logger.debug(f"Skipping unknown action: {op.action} - {op.path}") 298 continue 299 300 data["ops"].append(op_data) 301 302 logger.debug(f"Pushing commit with {len(data['ops'])} ops") 303 self.push_to_redis("commit", data, seq) 304 self.last_event_time = time.time() 305 306 # Broadcast event to dashboard (only first op for UI) 307 if data["ops"]: 308 first_op = data["ops"][0] 309 lexicon = first_op["path"].split('/')[0] 310 self.broadcast_event({ 311 "type": "#commit", 312 "lexicon": lexicon, 313 "did": commit.repo, 314 "action": first_op["action"], 315 "timestamp": time.strftime("%H:%M:%S"), 316 }) 317 318 # Update status heartbeat 319 self.update_status() 320 321 # Handle Identity messages 322 elif isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Identity): 323 data = { 324 "did": commit.did, 325 "handle": getattr(commit, 'handle', commit.did), 326 } 327 seq = getattr(commit, 'seq', None) 328 self.push_to_redis("identity", data, seq) 329 if seq: 330 self.save_cursor(seq) 331 self.last_event_time = time.time() 332 333 # Broadcast event to dashboard 334 self.broadcast_event({ 335 "type": "#identity", 336 "lexicon": "com.atproto.identity", 337 "did": commit.did, 338 "action": f"{data['handle']}" if data['handle'] != commit.did else "update", 339 "timestamp": time.strftime("%H:%M:%S"), 340 }) 341 342 # Update status heartbeat 343 self.update_status() 344 345 # Handle Account messages 346 elif isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Account): 347 data = { 348 "did": commit.did, 349 "active": getattr(commit, 'active', True), 350 } 351 seq = getattr(commit, 'seq', None) 352 self.push_to_redis("account", data, seq) 353 if seq: 354 self.save_cursor(seq) 355 self.last_event_time = time.time() 356 357 # Broadcast event to dashboard 358 self.broadcast_event({ 359 "type": "#account", 360 "lexicon": "com.atproto.account", 361 "did": commit.did, 362 "action": "active" if data['active'] else "inactive", 363 "timestamp": time.strftime("%H:%M:%S"), 364 }) 365 366 # Update status heartbeat 367 self.update_status() 368 369 except Exception as e: 370 logger.error(f"Error handling message: {e}", exc_info=True) 371 372 def run(self) -> None: 373 """Main run loop.""" 374 self.running = True 375 376 # Connect to Redis 377 self.connect_redis() 378 379 # Create firehose client 380 params = None 381 if self.current_cursor: 382 params = models.ComAtprotoSyncSubscribeRepos.Params(cursor=self.current_cursor) 383 logger.info(f"Resuming from cursor: {self.current_cursor}") 384 385 logger.info(f"Connecting to firehose...") 386 self.client = FirehoseSubscribeReposClient(params) 387 388 logger.info("Firehose client created") 389 logger.info("Starting to listen for events...") 390 391 # Mark as connected and update status 392 self.is_connected = True 393 self.update_status() 394 395 # Start the client (this blocks until stopped) 396 try: 397 self.client.start(self.on_message_handler) 398 except Exception as e: 399 logger.error(f"Error in client.start(): {e}", exc_info=True) 400 self.is_connected = False 401 self.update_status() 402 raise 403 404 def stop(self) -> None: 405 """Gracefully stop the consumer.""" 406 logger.info("Stopping firehose consumer...") 407 self.running = False 408 self.is_connected = False 409 410 # Update status to disconnected 411 if self.redis_client: 412 try: 413 self.update_status() 414 except: 415 pass 416 417 # Stop client 418 if self.client: 419 try: 420 self.client.stop() 421 except: 422 pass 423 424 # Save final cursor 425 if self.current_cursor and self.redis_client: 426 self.redis_client.set(self.cursor_key, str(self.current_cursor)) 427 logger.info(f"Saved final cursor: {self.current_cursor}") 428 429 # Close Redis 430 if self.redis_client: 431 self.redis_client.close() 432 433 # Log final stats 434 elapsed = time.time() - self.start_time 435 rate = self.event_count / elapsed if elapsed > 0 else 0 436 logger.info( 437 f"Stopped. Total events: {self.event_count:,} " 438 f"(~{rate:.0f} events/sec over {elapsed:.0f}s)" 439 ) 440 441 442def main(): 443 """Main entry point.""" 444 # Configuration from environment 445 relay_url = os.getenv("RELAY_URL", "wss://bsky.network") 446 447 # Ensure relay URL includes the full path if not already present 448 if relay_url and not relay_url.endswith("/xrpc/com.atproto.sync.subscribeRepos"): 449 # Strip trailing slash if present 450 relay_url = relay_url.rstrip('/') 451 452 redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") 453 stream_key = os.getenv("REDIS_STREAM_KEY", "firehose:events") 454 cursor_key = os.getenv("REDIS_CURSOR_KEY", "firehose:python_cursor") 455 max_stream_len = int(os.getenv("REDIS_MAX_STREAM_LEN", "500000")) 456 457 logger.info("=" * 60) 458 logger.info("AT Protocol Firehose Consumer (Python)") 459 logger.info("=" * 60) 460 logger.info(f"Relay URL: {relay_url}") 461 logger.info(f"Redis URL: {redis_url}") 462 logger.info(f"Stream Key: {stream_key}") 463 logger.info(f"Cursor Key: {cursor_key}") 464 logger.info(f"Max Stream Length: {max_stream_len:,}") 465 logger.info("=" * 60) 466 467 # Create consumer 468 consumer = FirehoseConsumer( 469 relay_url=relay_url, 470 redis_url=redis_url, 471 stream_key=stream_key, 472 cursor_key=cursor_key, 473 max_stream_len=max_stream_len, 474 ) 475 476 # Handle signals for graceful shutdown 477 def signal_handler(signum, frame): 478 logger.info(f"Received signal {signum}, shutting down...") 479 consumer.stop() 480 sys.exit(0) 481 482 signal.signal(signal.SIGINT, signal_handler) 483 signal.signal(signal.SIGTERM, signal_handler) 484 485 # Run consumer 486 try: 487 consumer.run() 488 except KeyboardInterrupt: 489 logger.info("Keyboard interrupt received") 490 consumer.stop() 491 except Exception as e: 492 logger.error(f"Fatal error: {e}", exc_info=True) 493 consumer.stop() 494 sys.exit(1) 495 496 497if __name__ == "__main__": 498 main()