A third party ATProto appview
1#!/usr/bin/env python3
2"""
3Backfill Service for AT Protocol
4Python implementation based on TypeScript backfill.ts
5
6This service provides historical data backfilling capabilities for the AT Protocol worker.
7It can backfill:
8- A specific number of days of historical data
9- Total backfill (entire available history with -1)
10- Resume from a saved cursor position
11"""
12
13import asyncio
14import logging
15import os
16import signal
17import sys
18import time
19import psutil
20import threading
21from datetime import datetime, timedelta, timezone
22from typing import Optional, Dict, Any
23from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message, firehose_models, models
24
25# Import event processor and database components
26from unified_worker import EventProcessor, DatabasePool
27from did_resolver import did_resolver
28from pds_data_fetcher import PDSDataFetcher
29from label_service import LabelService
30
31# Set up logging
32logger = logging.getLogger(__name__)
33
34
35class BackfillProgress:
36 """Track backfill progress and statistics"""
37
38 def __init__(self):
39 self.start_cursor: Optional[int] = None
40 self.current_cursor: Optional[int] = None
41 self.events_processed: int = 0
42 self.events_skipped: int = 0
43 self.events_received: int = 0
44 self.start_time: datetime = datetime.now(timezone.utc)
45 self.last_update_time: datetime = datetime.now(timezone.utc)
46 self.estimated_completion: Optional[datetime] = None
47 self.is_running: bool = False
48
49 # Memory and performance tracking
50 self.queue_depth: int = 0
51 self.active_processing: int = 0
52
53
54class BackfillService:
55 """
56 Historical data backfill service for AT Protocol
57
58 Features:
59 - Configurable backfill duration (days or total history)
60 - Resume from saved cursor position
61 - Resource throttling for background processing
62 - Memory management and monitoring
63 - Progress tracking and persistence
64 """
65
66 # Default configuration
67 PROGRESS_SAVE_INTERVAL = 1000 # Save progress every 1000 events
68 MAX_EVENTS_PER_RUN = 1000000 # Safety limit for total backfill
69
70 def __init__(
71 self,
72 database_url: str,
73 relay_url: str = None,
74 db_pool_size: int = 10 # Smaller pool for backfill
75 ):
76 self.database_url = database_url
77 self.relay_url = relay_url or os.getenv("RELAY_URL", "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos")
78 self.db_pool_size = db_pool_size
79
80 # Initialize components
81 self.db_pool: Optional[DatabasePool] = None
82 self.event_processor: Optional[EventProcessor] = None
83 self.client: Optional[FirehoseSubscribeReposClient] = None
84 self.is_running = False
85 self.progress = BackfillProgress()
86
87 # Backfill configuration from environment
88 backfill_days_raw = os.getenv("BACKFILL_DAYS", "0")
89 try:
90 self.backfill_days = int(backfill_days_raw)
91 if self.backfill_days < -1:
92 self.backfill_days = 0
93 except ValueError:
94 logger.warning(f"Invalid BACKFILL_DAYS value '{backfill_days_raw}' - using default (0)")
95 self.backfill_days = 0
96
97 self.cutoff_date: Optional[datetime] = None
98
99 # Resource throttling configuration
100 self.BATCH_SIZE = int(os.getenv("BACKFILL_BATCH_SIZE", "5"))
101 self.BATCH_DELAY_MS = int(os.getenv("BACKFILL_BATCH_DELAY_MS", "2000"))
102 self.MAX_CONCURRENT_PROCESSING = int(os.getenv("BACKFILL_MAX_CONCURRENT", "2"))
103 self.MAX_MEMORY_MB = int(os.getenv("BACKFILL_MAX_MEMORY_MB", "512"))
104 self.USE_IDLE_PROCESSING = os.getenv("BACKFILL_USE_IDLE", "true").lower() != "false"
105
106 # Performance tracking
107 self.batch_counter = 0
108 self.last_memory_check = 0
109 self.memory_paused = False
110 self.active_processing = 0
111 self.processing_queue = []
112
113 # Log configuration
114 logger.info("[BACKFILL] Resource throttling config:")
115 logger.info(f" - Batch size: {self.BATCH_SIZE} events")
116 logger.info(f" - Batch delay: {self.BATCH_DELAY_MS}ms")
117 logger.info(f" - Max concurrent: {self.MAX_CONCURRENT_PROCESSING}")
118 logger.info(f" - Memory limit: {self.MAX_MEMORY_MB}MB")
119 logger.info(f" - Idle processing: {self.USE_IDLE_PROCESSING}")
120
121 async def initialize(self):
122 """Initialize database connection and services"""
123 logger.info("[BACKFILL] Initializing backfill service...")
124
125 # Create database pool
126 self.db_pool = DatabasePool(self.database_url, self.db_pool_size)
127 await self.db_pool.connect()
128
129 # Initialize services
130 await did_resolver.initialize()
131
132 pds_data_fetcher = PDSDataFetcher(self.db_pool)
133 await pds_data_fetcher.initialize()
134
135 label_service = LabelService(self.db_pool)
136
137 # Create event processor with backfill-specific configuration
138 self.event_processor = EventProcessor(self.db_pool)
139
140 # Wire up services
141 self.event_processor.pds_data_fetcher = pds_data_fetcher
142 self.event_processor.label_service = label_service
143 pds_data_fetcher.event_processor = self.event_processor
144
145 # Disable signature verification for faster backfill
146 # Note: This is handled in the message processing
147
148 logger.info("[BACKFILL] Backfill service initialized")
149
150 async def start(self, start_cursor: Optional[int] = None):
151 """Start the backfill process"""
152 if self.is_running:
153 raise RuntimeError("Backfill is already running")
154
155 if self.backfill_days == 0:
156 logger.info("[BACKFILL] Backfill is disabled (BACKFILL_DAYS=0)")
157 return
158
159 # Configure backfill mode
160 if self.backfill_days == -1:
161 backfill_mode = "TOTAL (entire available history)"
162 self.cutoff_date = None
163 else:
164 backfill_mode = f"{self.backfill_days} days"
165 self.cutoff_date = datetime.now(timezone.utc) - timedelta(days=self.backfill_days)
166
167 logger.info(f"[BACKFILL] Starting {backfill_mode} historical backfill...")
168 if self.cutoff_date:
169 logger.info(f"[BACKFILL] Cutoff date: {self.cutoff_date.isoformat()}")
170
171 self.is_running = True
172 self.batch_counter = 0
173
174 # Initialize progress
175 self.progress = BackfillProgress()
176 self.progress.start_cursor = start_cursor
177 self.progress.current_cursor = start_cursor
178 self.progress.is_running = True
179
180 try:
181 # Clear any saved progress when starting a new backfill
182 logger.info("[BACKFILL] Starting fresh backfill from cursor 0 to fetch full historical window")
183 self.progress.current_cursor = None
184 self.progress.events_processed = 0
185
186 await self.run_backfill()
187 logger.info("[BACKFILL] Backfill completed successfully")
188 except Exception as e:
189 logger.error(f"[BACKFILL] Error during backfill: {e}", exc_info=True)
190 self.is_running = False
191 self.progress.is_running = False
192 raise
193
194 async def run_backfill(self):
195 """Main backfill loop"""
196 # Configure start cursor
197 if self.backfill_days == -1:
198 # Total backfill: start from oldest available (seq 0)
199 start_cursor = 0
200 logger.info("[BACKFILL] Using startCursor=0 for total backfill (entire rollback window)")
201 elif self.progress.current_cursor is not None:
202 # Resume from saved position
203 start_cursor = self.progress.current_cursor
204 logger.info(f"[BACKFILL] Resuming from saved cursor: {start_cursor}")
205 else:
206 # Start from oldest available
207 start_cursor = 0
208 logger.info("[BACKFILL] Using startCursor=0 to fetch available history")
209
210 # Create firehose client with cursor
211 logger.info("[BACKFILL] Creating Firehose client...")
212
213 # Track expected sequence for filtering
214 self.current_expected_seq = start_cursor if start_cursor > 0 else None
215
216 # Create client with cursor parameter if resuming
217 params = None
218 if start_cursor and start_cursor > 0:
219 params = models.ComAtprotoSyncSubscribeRepos.Params(cursor=start_cursor)
220 logger.info(f"[BACKFILL] Resuming from cursor: {start_cursor}")
221
222 self.client = FirehoseSubscribeReposClient(params)
223
224 # Get the current event loop for scheduling tasks
225 main_loop = asyncio.get_event_loop()
226
227 # Set up synchronous message handler that schedules work on the main loop
228 def on_message_handler(message: firehose_models.MessageFrame):
229 """Handle incoming firehose message (synchronous)."""
230 try:
231 # Schedule the async processing on the main event loop
232 # Use call_soon_threadsafe since client.start() runs in a thread
233 asyncio.run_coroutine_threadsafe(
234 self.process_message(message),
235 main_loop
236 )
237 except Exception as e:
238 logger.error(f"[BACKFILL] Error scheduling message processing: {e}")
239
240 logger.info("[BACKFILL] Starting Firehose client in background thread...")
241
242 # Run the blocking client.start() in a separate thread
243 # This allows the main event loop to continue processing async tasks
244 client_thread = threading.Thread(
245 target=lambda: self.client.start(on_message_handler),
246 daemon=True,
247 name="FirehoseClientThread"
248 )
249 client_thread.start()
250 logger.info("[BACKFILL] Firehose client thread started")
251
252 # Keep the async function running while backfill is active
253 # This allows the event loop to process the scheduled async tasks
254 try:
255 while self.is_running and client_thread.is_alive():
256 await asyncio.sleep(1)
257 # Periodic status check
258 if self.progress.events_received > 0 and self.progress.events_received % 10000 == 0:
259 logger.debug(f"[BACKFILL] Thread alive, processed {self.progress.events_processed} events")
260 except Exception as e:
261 logger.error(f"[BACKFILL] Error in main loop: {e}")
262 raise
263 finally:
264 # Clean up
265 if self.client:
266 try:
267 self.client.stop()
268 except:
269 pass
270
271 async def process_message(self, message: firehose_models.MessageFrame):
272 """Process a single firehose message"""
273 try:
274 # Track all received events
275 self.progress.events_received += 1
276
277 # Parse message
278 commit = parse_subscribe_repos_message(message)
279
280 # Track sequence number if available
281 if hasattr(commit, 'seq') and commit.seq is not None:
282 self.progress.current_cursor = commit.seq
283
284 # Skip if we haven't reached our start cursor yet
285 if self.current_expected_seq is not None and commit.seq < self.current_expected_seq:
286 return
287
288 # Handle different event types
289 if isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
290 # Check cutoff date for create/update events
291 skip_old_events = False
292 if self.cutoff_date:
293 # Extract records from commit to check dates
294 for op in commit.ops:
295 if hasattr(op, 'record') and op.record:
296 record_data = op.record
297 if isinstance(record_data, dict) and 'createdAt' in record_data:
298 try:
299 record_date = datetime.fromisoformat(record_data['createdAt'].replace('Z', '+00:00'))
300 if record_date < self.cutoff_date:
301 skip_old_events = True
302 break
303 except:
304 pass
305
306 if skip_old_events:
307 self.progress.events_skipped += 1
308 return
309
310 # Process commit
311 await self.event_processor.process_commit(commit)
312
313 elif isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Identity):
314 # Process identity event
315 event_data = {
316 'did': commit.did,
317 'handle': commit.handle,
318 }
319 await self.event_processor.process_identity(event_data)
320
321 elif isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Account):
322 # Process account event
323 event_data = {
324 'did': commit.did,
325 'active': commit.active,
326 }
327 await self.event_processor.process_account(event_data)
328
329 # Update progress
330 self.progress.events_processed += 1
331 self.progress.last_update_time = datetime.now(timezone.utc)
332 self.batch_counter += 1
333
334 # Memory check and throttling
335 if self.progress.events_processed % 100 == 0:
336 await self.check_memory_and_throttle()
337
338 # Batch delay to prevent resource overload
339 if self.batch_counter >= self.BATCH_SIZE:
340 if self.USE_IDLE_PROCESSING:
341 # Yield to other tasks
342 await asyncio.sleep(0)
343 # Always add the configured delay
344 await asyncio.sleep(self.BATCH_DELAY_MS / 1000)
345 self.batch_counter = 0
346
347 # Log progress periodically
348 if self.progress.events_received % self.PROGRESS_SAVE_INTERVAL == 0:
349 elapsed = (datetime.now(timezone.utc) - self.progress.start_time).total_seconds()
350 rate = self.progress.events_received / elapsed if elapsed > 0 else 0
351 logger.info(
352 f"[BACKFILL] Progress: {self.progress.events_received} received, "
353 f"{self.progress.events_processed} processed, "
354 f"{self.progress.events_skipped} skipped ({rate:.0f} evt/s)"
355 )
356 await self.save_progress()
357
358 # Check safety limit
359 if self.progress.events_processed >= self.MAX_EVENTS_PER_RUN:
360 logger.info(f"[BACKFILL] Reached safety limit of {self.MAX_EVENTS_PER_RUN} events")
361 await self.stop()
362
363 except Exception as e:
364 # Check for duplicate key errors (common during backfill)
365 if "duplicate key value" in str(e):
366 # Skip silently
367 pass
368 else:
369 logger.error(f"[BACKFILL] Error processing event: {e}")
370
371 async def check_memory_and_throttle(self):
372 """Check memory usage and throttle if necessary"""
373 try:
374 # Get current process memory usage
375 process = psutil.Process()
376 memory_info = process.memory_info()
377 heap_used_mb = memory_info.rss / 1024 / 1024 # RSS in MB
378
379 # Check if we're exceeding memory limit
380 if heap_used_mb > self.MAX_MEMORY_MB:
381 if not self.memory_paused:
382 logger.warning(
383 f"[BACKFILL] Memory usage high ({heap_used_mb:.0f}MB > {self.MAX_MEMORY_MB}MB), "
384 "pausing for GC..."
385 )
386 self.memory_paused = True
387
388 # Wait for memory to be freed
389 await asyncio.sleep(5)
390
391 # Check again
392 memory_info = process.memory_info()
393 new_heap_used_mb = memory_info.rss / 1024 / 1024
394
395 if new_heap_used_mb < self.MAX_MEMORY_MB:
396 logger.info(f"[BACKFILL] Memory recovered ({new_heap_used_mb:.0f}MB), resuming...")
397 self.memory_paused = False
398 else:
399 # Still high, wait longer
400 logger.warning(f"[BACKFILL] Memory still high ({new_heap_used_mb:.0f}MB), waiting longer...")
401 await asyncio.sleep(10)
402 self.memory_paused = False
403 elif self.memory_paused:
404 # Memory back to normal
405 logger.info(f"[BACKFILL] Memory usage normal ({heap_used_mb:.0f}MB), resuming...")
406 self.memory_paused = False
407
408 # Log memory usage periodically
409 if self.progress.events_processed % 10000 == 0:
410 logger.info(f"[BACKFILL] Memory: {heap_used_mb:.0f}MB / {self.MAX_MEMORY_MB}MB limit")
411
412 except Exception as e:
413 logger.error(f"[BACKFILL] Error checking memory: {e}")
414
415 async def save_progress(self):
416 """Save backfill progress to database"""
417 try:
418 async with self.db_pool.acquire() as conn:
419 # Convert timezone-aware datetime to naive for PostgreSQL compatibility
420 last_update_naive = self.progress.last_update_time.replace(tzinfo=None) if self.progress.last_update_time.tzinfo else self.progress.last_update_time
421
422 # Upsert progress to firehose_cursor table
423 await conn.execute("""
424 INSERT INTO firehose_cursor (service, cursor, last_event_time, updated_at)
425 VALUES ($1, $2, $3, NOW())
426 ON CONFLICT (service)
427 DO UPDATE SET
428 cursor = EXCLUDED.cursor,
429 last_event_time = EXCLUDED.last_event_time,
430 updated_at = NOW()
431 """,
432 "backfill",
433 str(self.progress.current_cursor) if self.progress.current_cursor else None,
434 last_update_naive
435 )
436 except Exception as e:
437 logger.error(f"[BACKFILL] Error saving progress: {e}")
438
439 async def get_saved_progress(self) -> Optional[Dict[str, Any]]:
440 """Load saved backfill progress from database"""
441 try:
442 async with self.db_pool.acquire() as conn:
443 row = await conn.fetchrow("""
444 SELECT cursor, last_event_time
445 FROM firehose_cursor
446 WHERE service = $1
447 """, "backfill")
448
449 if row:
450 return {
451 'cursor': int(row['cursor']) if row['cursor'] else None,
452 'last_event_time': row['last_event_time']
453 }
454 except Exception as e:
455 logger.error(f"[BACKFILL] Error loading progress: {e}")
456
457 return None
458
459 async def stop(self):
460 """Stop the backfill service"""
461 logger.info("[BACKFILL] Stopping backfill...")
462
463 if self.client:
464 try:
465 self.client.stop()
466 except:
467 pass
468 self.client = None
469
470 await self.save_progress()
471 self.is_running = False
472 self.progress.is_running = False
473
474 logger.info("[BACKFILL] Backfill stopped")
475 logger.info(f"[BACKFILL] Final stats: {self.progress.events_processed} events processed")
476
477 def get_progress(self) -> BackfillProgress:
478 """Get current progress information"""
479 # Update queue depth and active processing count
480 self.progress.queue_depth = len(self.processing_queue)
481 self.progress.active_processing = self.active_processing
482 return self.progress
483
484 async def cleanup(self):
485 """Clean up resources"""
486 if self.is_running:
487 await self.stop()
488
489 # Close services
490 if self.event_processor and self.event_processor.pds_data_fetcher:
491 await self.event_processor.pds_data_fetcher.close()
492
493 await did_resolver.close()
494
495 if self.db_pool:
496 await self.db_pool.close()
497
498
499async def main():
500 """Main entry point for standalone backfill"""
501 # Configure logging
502 logging.basicConfig(
503 level=logging.INFO,
504 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
505 handlers=[
506 logging.StreamHandler(sys.stdout)
507 ]
508 )
509
510 # Get configuration
511 database_url = os.getenv("DATABASE_URL", "postgresql://postgres:password@localhost:5432/atproto")
512 relay_url = os.getenv("RELAY_URL", "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos")
513 backfill_days = int(os.getenv("BACKFILL_DAYS", "0"))
514
515 if backfill_days == 0:
516 logger.info("[BACKFILL] Backfill is disabled (BACKFILL_DAYS=0)")
517 return
518
519 logger.info("=" * 60)
520 logger.info("AT Protocol Backfill Service (Python)")
521 logger.info("=" * 60)
522 logger.info(f"Relay URL: {relay_url}")
523 logger.info(f"Backfill days: {backfill_days} {'(total history)' if backfill_days == -1 else ''}")
524 logger.info("=" * 60)
525
526 # Create and initialize service
527 service = BackfillService(database_url, relay_url)
528 await service.initialize()
529
530 # Set up signal handlers
531 def signal_handler(signum, frame):
532 logger.info(f"Received signal {signum}, shutting down...")
533 asyncio.create_task(service.stop())
534
535 signal.signal(signal.SIGINT, signal_handler)
536 signal.signal(signal.SIGTERM, signal_handler)
537
538 try:
539 # Start backfill
540 await service.start()
541 except KeyboardInterrupt:
542 logger.info("Keyboard interrupt received")
543 except Exception as e:
544 logger.error(f"Fatal error: {e}", exc_info=True)
545 sys.exit(1)
546 finally:
547 await service.cleanup()
548
549
550if __name__ == "__main__":
551 asyncio.run(main())