A third party ATProto appview
at main 551 lines 22 kB view raw
1#!/usr/bin/env python3 2""" 3Backfill Service for AT Protocol 4Python implementation based on TypeScript backfill.ts 5 6This service provides historical data backfilling capabilities for the AT Protocol worker. 7It can backfill: 8- A specific number of days of historical data 9- Total backfill (entire available history with -1) 10- Resume from a saved cursor position 11""" 12 13import asyncio 14import logging 15import os 16import signal 17import sys 18import time 19import psutil 20import threading 21from datetime import datetime, timedelta, timezone 22from typing import Optional, Dict, Any 23from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message, firehose_models, models 24 25# Import event processor and database components 26from unified_worker import EventProcessor, DatabasePool 27from did_resolver import did_resolver 28from pds_data_fetcher import PDSDataFetcher 29from label_service import LabelService 30 31# Set up logging 32logger = logging.getLogger(__name__) 33 34 35class BackfillProgress: 36 """Track backfill progress and statistics""" 37 38 def __init__(self): 39 self.start_cursor: Optional[int] = None 40 self.current_cursor: Optional[int] = None 41 self.events_processed: int = 0 42 self.events_skipped: int = 0 43 self.events_received: int = 0 44 self.start_time: datetime = datetime.now(timezone.utc) 45 self.last_update_time: datetime = datetime.now(timezone.utc) 46 self.estimated_completion: Optional[datetime] = None 47 self.is_running: bool = False 48 49 # Memory and performance tracking 50 self.queue_depth: int = 0 51 self.active_processing: int = 0 52 53 54class BackfillService: 55 """ 56 Historical data backfill service for AT Protocol 57 58 Features: 59 - Configurable backfill duration (days or total history) 60 - Resume from saved cursor position 61 - Resource throttling for background processing 62 - Memory management and monitoring 63 - Progress tracking and persistence 64 """ 65 66 # Default configuration 67 PROGRESS_SAVE_INTERVAL = 1000 # Save progress every 1000 events 68 MAX_EVENTS_PER_RUN = 1000000 # Safety limit for total backfill 69 70 def __init__( 71 self, 72 database_url: str, 73 relay_url: str = None, 74 db_pool_size: int = 10 # Smaller pool for backfill 75 ): 76 self.database_url = database_url 77 self.relay_url = relay_url or os.getenv("RELAY_URL", "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos") 78 self.db_pool_size = db_pool_size 79 80 # Initialize components 81 self.db_pool: Optional[DatabasePool] = None 82 self.event_processor: Optional[EventProcessor] = None 83 self.client: Optional[FirehoseSubscribeReposClient] = None 84 self.is_running = False 85 self.progress = BackfillProgress() 86 87 # Backfill configuration from environment 88 backfill_days_raw = os.getenv("BACKFILL_DAYS", "0") 89 try: 90 self.backfill_days = int(backfill_days_raw) 91 if self.backfill_days < -1: 92 self.backfill_days = 0 93 except ValueError: 94 logger.warning(f"Invalid BACKFILL_DAYS value '{backfill_days_raw}' - using default (0)") 95 self.backfill_days = 0 96 97 self.cutoff_date: Optional[datetime] = None 98 99 # Resource throttling configuration 100 self.BATCH_SIZE = int(os.getenv("BACKFILL_BATCH_SIZE", "5")) 101 self.BATCH_DELAY_MS = int(os.getenv("BACKFILL_BATCH_DELAY_MS", "2000")) 102 self.MAX_CONCURRENT_PROCESSING = int(os.getenv("BACKFILL_MAX_CONCURRENT", "2")) 103 self.MAX_MEMORY_MB = int(os.getenv("BACKFILL_MAX_MEMORY_MB", "512")) 104 self.USE_IDLE_PROCESSING = os.getenv("BACKFILL_USE_IDLE", "true").lower() != "false" 105 106 # Performance tracking 107 self.batch_counter = 0 108 self.last_memory_check = 0 109 self.memory_paused = False 110 self.active_processing = 0 111 self.processing_queue = [] 112 113 # Log configuration 114 logger.info("[BACKFILL] Resource throttling config:") 115 logger.info(f" - Batch size: {self.BATCH_SIZE} events") 116 logger.info(f" - Batch delay: {self.BATCH_DELAY_MS}ms") 117 logger.info(f" - Max concurrent: {self.MAX_CONCURRENT_PROCESSING}") 118 logger.info(f" - Memory limit: {self.MAX_MEMORY_MB}MB") 119 logger.info(f" - Idle processing: {self.USE_IDLE_PROCESSING}") 120 121 async def initialize(self): 122 """Initialize database connection and services""" 123 logger.info("[BACKFILL] Initializing backfill service...") 124 125 # Create database pool 126 self.db_pool = DatabasePool(self.database_url, self.db_pool_size) 127 await self.db_pool.connect() 128 129 # Initialize services 130 await did_resolver.initialize() 131 132 pds_data_fetcher = PDSDataFetcher(self.db_pool) 133 await pds_data_fetcher.initialize() 134 135 label_service = LabelService(self.db_pool) 136 137 # Create event processor with backfill-specific configuration 138 self.event_processor = EventProcessor(self.db_pool) 139 140 # Wire up services 141 self.event_processor.pds_data_fetcher = pds_data_fetcher 142 self.event_processor.label_service = label_service 143 pds_data_fetcher.event_processor = self.event_processor 144 145 # Disable signature verification for faster backfill 146 # Note: This is handled in the message processing 147 148 logger.info("[BACKFILL] Backfill service initialized") 149 150 async def start(self, start_cursor: Optional[int] = None): 151 """Start the backfill process""" 152 if self.is_running: 153 raise RuntimeError("Backfill is already running") 154 155 if self.backfill_days == 0: 156 logger.info("[BACKFILL] Backfill is disabled (BACKFILL_DAYS=0)") 157 return 158 159 # Configure backfill mode 160 if self.backfill_days == -1: 161 backfill_mode = "TOTAL (entire available history)" 162 self.cutoff_date = None 163 else: 164 backfill_mode = f"{self.backfill_days} days" 165 self.cutoff_date = datetime.now(timezone.utc) - timedelta(days=self.backfill_days) 166 167 logger.info(f"[BACKFILL] Starting {backfill_mode} historical backfill...") 168 if self.cutoff_date: 169 logger.info(f"[BACKFILL] Cutoff date: {self.cutoff_date.isoformat()}") 170 171 self.is_running = True 172 self.batch_counter = 0 173 174 # Initialize progress 175 self.progress = BackfillProgress() 176 self.progress.start_cursor = start_cursor 177 self.progress.current_cursor = start_cursor 178 self.progress.is_running = True 179 180 try: 181 # Clear any saved progress when starting a new backfill 182 logger.info("[BACKFILL] Starting fresh backfill from cursor 0 to fetch full historical window") 183 self.progress.current_cursor = None 184 self.progress.events_processed = 0 185 186 await self.run_backfill() 187 logger.info("[BACKFILL] Backfill completed successfully") 188 except Exception as e: 189 logger.error(f"[BACKFILL] Error during backfill: {e}", exc_info=True) 190 self.is_running = False 191 self.progress.is_running = False 192 raise 193 194 async def run_backfill(self): 195 """Main backfill loop""" 196 # Configure start cursor 197 if self.backfill_days == -1: 198 # Total backfill: start from oldest available (seq 0) 199 start_cursor = 0 200 logger.info("[BACKFILL] Using startCursor=0 for total backfill (entire rollback window)") 201 elif self.progress.current_cursor is not None: 202 # Resume from saved position 203 start_cursor = self.progress.current_cursor 204 logger.info(f"[BACKFILL] Resuming from saved cursor: {start_cursor}") 205 else: 206 # Start from oldest available 207 start_cursor = 0 208 logger.info("[BACKFILL] Using startCursor=0 to fetch available history") 209 210 # Create firehose client with cursor 211 logger.info("[BACKFILL] Creating Firehose client...") 212 213 # Track expected sequence for filtering 214 self.current_expected_seq = start_cursor if start_cursor > 0 else None 215 216 # Create client with cursor parameter if resuming 217 params = None 218 if start_cursor and start_cursor > 0: 219 params = models.ComAtprotoSyncSubscribeRepos.Params(cursor=start_cursor) 220 logger.info(f"[BACKFILL] Resuming from cursor: {start_cursor}") 221 222 self.client = FirehoseSubscribeReposClient(params) 223 224 # Get the current event loop for scheduling tasks 225 main_loop = asyncio.get_event_loop() 226 227 # Set up synchronous message handler that schedules work on the main loop 228 def on_message_handler(message: firehose_models.MessageFrame): 229 """Handle incoming firehose message (synchronous).""" 230 try: 231 # Schedule the async processing on the main event loop 232 # Use call_soon_threadsafe since client.start() runs in a thread 233 asyncio.run_coroutine_threadsafe( 234 self.process_message(message), 235 main_loop 236 ) 237 except Exception as e: 238 logger.error(f"[BACKFILL] Error scheduling message processing: {e}") 239 240 logger.info("[BACKFILL] Starting Firehose client in background thread...") 241 242 # Run the blocking client.start() in a separate thread 243 # This allows the main event loop to continue processing async tasks 244 client_thread = threading.Thread( 245 target=lambda: self.client.start(on_message_handler), 246 daemon=True, 247 name="FirehoseClientThread" 248 ) 249 client_thread.start() 250 logger.info("[BACKFILL] Firehose client thread started") 251 252 # Keep the async function running while backfill is active 253 # This allows the event loop to process the scheduled async tasks 254 try: 255 while self.is_running and client_thread.is_alive(): 256 await asyncio.sleep(1) 257 # Periodic status check 258 if self.progress.events_received > 0 and self.progress.events_received % 10000 == 0: 259 logger.debug(f"[BACKFILL] Thread alive, processed {self.progress.events_processed} events") 260 except Exception as e: 261 logger.error(f"[BACKFILL] Error in main loop: {e}") 262 raise 263 finally: 264 # Clean up 265 if self.client: 266 try: 267 self.client.stop() 268 except: 269 pass 270 271 async def process_message(self, message: firehose_models.MessageFrame): 272 """Process a single firehose message""" 273 try: 274 # Track all received events 275 self.progress.events_received += 1 276 277 # Parse message 278 commit = parse_subscribe_repos_message(message) 279 280 # Track sequence number if available 281 if hasattr(commit, 'seq') and commit.seq is not None: 282 self.progress.current_cursor = commit.seq 283 284 # Skip if we haven't reached our start cursor yet 285 if self.current_expected_seq is not None and commit.seq < self.current_expected_seq: 286 return 287 288 # Handle different event types 289 if isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit): 290 # Check cutoff date for create/update events 291 skip_old_events = False 292 if self.cutoff_date: 293 # Extract records from commit to check dates 294 for op in commit.ops: 295 if hasattr(op, 'record') and op.record: 296 record_data = op.record 297 if isinstance(record_data, dict) and 'createdAt' in record_data: 298 try: 299 record_date = datetime.fromisoformat(record_data['createdAt'].replace('Z', '+00:00')) 300 if record_date < self.cutoff_date: 301 skip_old_events = True 302 break 303 except: 304 pass 305 306 if skip_old_events: 307 self.progress.events_skipped += 1 308 return 309 310 # Process commit 311 await self.event_processor.process_commit(commit) 312 313 elif isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Identity): 314 # Process identity event 315 event_data = { 316 'did': commit.did, 317 'handle': commit.handle, 318 } 319 await self.event_processor.process_identity(event_data) 320 321 elif isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Account): 322 # Process account event 323 event_data = { 324 'did': commit.did, 325 'active': commit.active, 326 } 327 await self.event_processor.process_account(event_data) 328 329 # Update progress 330 self.progress.events_processed += 1 331 self.progress.last_update_time = datetime.now(timezone.utc) 332 self.batch_counter += 1 333 334 # Memory check and throttling 335 if self.progress.events_processed % 100 == 0: 336 await self.check_memory_and_throttle() 337 338 # Batch delay to prevent resource overload 339 if self.batch_counter >= self.BATCH_SIZE: 340 if self.USE_IDLE_PROCESSING: 341 # Yield to other tasks 342 await asyncio.sleep(0) 343 # Always add the configured delay 344 await asyncio.sleep(self.BATCH_DELAY_MS / 1000) 345 self.batch_counter = 0 346 347 # Log progress periodically 348 if self.progress.events_received % self.PROGRESS_SAVE_INTERVAL == 0: 349 elapsed = (datetime.now(timezone.utc) - self.progress.start_time).total_seconds() 350 rate = self.progress.events_received / elapsed if elapsed > 0 else 0 351 logger.info( 352 f"[BACKFILL] Progress: {self.progress.events_received} received, " 353 f"{self.progress.events_processed} processed, " 354 f"{self.progress.events_skipped} skipped ({rate:.0f} evt/s)" 355 ) 356 await self.save_progress() 357 358 # Check safety limit 359 if self.progress.events_processed >= self.MAX_EVENTS_PER_RUN: 360 logger.info(f"[BACKFILL] Reached safety limit of {self.MAX_EVENTS_PER_RUN} events") 361 await self.stop() 362 363 except Exception as e: 364 # Check for duplicate key errors (common during backfill) 365 if "duplicate key value" in str(e): 366 # Skip silently 367 pass 368 else: 369 logger.error(f"[BACKFILL] Error processing event: {e}") 370 371 async def check_memory_and_throttle(self): 372 """Check memory usage and throttle if necessary""" 373 try: 374 # Get current process memory usage 375 process = psutil.Process() 376 memory_info = process.memory_info() 377 heap_used_mb = memory_info.rss / 1024 / 1024 # RSS in MB 378 379 # Check if we're exceeding memory limit 380 if heap_used_mb > self.MAX_MEMORY_MB: 381 if not self.memory_paused: 382 logger.warning( 383 f"[BACKFILL] Memory usage high ({heap_used_mb:.0f}MB > {self.MAX_MEMORY_MB}MB), " 384 "pausing for GC..." 385 ) 386 self.memory_paused = True 387 388 # Wait for memory to be freed 389 await asyncio.sleep(5) 390 391 # Check again 392 memory_info = process.memory_info() 393 new_heap_used_mb = memory_info.rss / 1024 / 1024 394 395 if new_heap_used_mb < self.MAX_MEMORY_MB: 396 logger.info(f"[BACKFILL] Memory recovered ({new_heap_used_mb:.0f}MB), resuming...") 397 self.memory_paused = False 398 else: 399 # Still high, wait longer 400 logger.warning(f"[BACKFILL] Memory still high ({new_heap_used_mb:.0f}MB), waiting longer...") 401 await asyncio.sleep(10) 402 self.memory_paused = False 403 elif self.memory_paused: 404 # Memory back to normal 405 logger.info(f"[BACKFILL] Memory usage normal ({heap_used_mb:.0f}MB), resuming...") 406 self.memory_paused = False 407 408 # Log memory usage periodically 409 if self.progress.events_processed % 10000 == 0: 410 logger.info(f"[BACKFILL] Memory: {heap_used_mb:.0f}MB / {self.MAX_MEMORY_MB}MB limit") 411 412 except Exception as e: 413 logger.error(f"[BACKFILL] Error checking memory: {e}") 414 415 async def save_progress(self): 416 """Save backfill progress to database""" 417 try: 418 async with self.db_pool.acquire() as conn: 419 # Convert timezone-aware datetime to naive for PostgreSQL compatibility 420 last_update_naive = self.progress.last_update_time.replace(tzinfo=None) if self.progress.last_update_time.tzinfo else self.progress.last_update_time 421 422 # Upsert progress to firehose_cursor table 423 await conn.execute(""" 424 INSERT INTO firehose_cursor (service, cursor, last_event_time, updated_at) 425 VALUES ($1, $2, $3, NOW()) 426 ON CONFLICT (service) 427 DO UPDATE SET 428 cursor = EXCLUDED.cursor, 429 last_event_time = EXCLUDED.last_event_time, 430 updated_at = NOW() 431 """, 432 "backfill", 433 str(self.progress.current_cursor) if self.progress.current_cursor else None, 434 last_update_naive 435 ) 436 except Exception as e: 437 logger.error(f"[BACKFILL] Error saving progress: {e}") 438 439 async def get_saved_progress(self) -> Optional[Dict[str, Any]]: 440 """Load saved backfill progress from database""" 441 try: 442 async with self.db_pool.acquire() as conn: 443 row = await conn.fetchrow(""" 444 SELECT cursor, last_event_time 445 FROM firehose_cursor 446 WHERE service = $1 447 """, "backfill") 448 449 if row: 450 return { 451 'cursor': int(row['cursor']) if row['cursor'] else None, 452 'last_event_time': row['last_event_time'] 453 } 454 except Exception as e: 455 logger.error(f"[BACKFILL] Error loading progress: {e}") 456 457 return None 458 459 async def stop(self): 460 """Stop the backfill service""" 461 logger.info("[BACKFILL] Stopping backfill...") 462 463 if self.client: 464 try: 465 self.client.stop() 466 except: 467 pass 468 self.client = None 469 470 await self.save_progress() 471 self.is_running = False 472 self.progress.is_running = False 473 474 logger.info("[BACKFILL] Backfill stopped") 475 logger.info(f"[BACKFILL] Final stats: {self.progress.events_processed} events processed") 476 477 def get_progress(self) -> BackfillProgress: 478 """Get current progress information""" 479 # Update queue depth and active processing count 480 self.progress.queue_depth = len(self.processing_queue) 481 self.progress.active_processing = self.active_processing 482 return self.progress 483 484 async def cleanup(self): 485 """Clean up resources""" 486 if self.is_running: 487 await self.stop() 488 489 # Close services 490 if self.event_processor and self.event_processor.pds_data_fetcher: 491 await self.event_processor.pds_data_fetcher.close() 492 493 await did_resolver.close() 494 495 if self.db_pool: 496 await self.db_pool.close() 497 498 499async def main(): 500 """Main entry point for standalone backfill""" 501 # Configure logging 502 logging.basicConfig( 503 level=logging.INFO, 504 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', 505 handlers=[ 506 logging.StreamHandler(sys.stdout) 507 ] 508 ) 509 510 # Get configuration 511 database_url = os.getenv("DATABASE_URL", "postgresql://postgres:password@localhost:5432/atproto") 512 relay_url = os.getenv("RELAY_URL", "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos") 513 backfill_days = int(os.getenv("BACKFILL_DAYS", "0")) 514 515 if backfill_days == 0: 516 logger.info("[BACKFILL] Backfill is disabled (BACKFILL_DAYS=0)") 517 return 518 519 logger.info("=" * 60) 520 logger.info("AT Protocol Backfill Service (Python)") 521 logger.info("=" * 60) 522 logger.info(f"Relay URL: {relay_url}") 523 logger.info(f"Backfill days: {backfill_days} {'(total history)' if backfill_days == -1 else ''}") 524 logger.info("=" * 60) 525 526 # Create and initialize service 527 service = BackfillService(database_url, relay_url) 528 await service.initialize() 529 530 # Set up signal handlers 531 def signal_handler(signum, frame): 532 logger.info(f"Received signal {signum}, shutting down...") 533 asyncio.create_task(service.stop()) 534 535 signal.signal(signal.SIGINT, signal_handler) 536 signal.signal(signal.SIGTERM, signal_handler) 537 538 try: 539 # Start backfill 540 await service.start() 541 except KeyboardInterrupt: 542 logger.info("Keyboard interrupt received") 543 except Exception as e: 544 logger.error(f"Fatal error: {e}", exc_info=True) 545 sys.exit(1) 546 finally: 547 await service.cleanup() 548 549 550if __name__ == "__main__": 551 asyncio.run(main())