import asyncio import logging import os import sys import pandas as pd logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) BSKY_API_LIMIT = 10 class RateLimit: def __init__(self, per_second: int): self.per_second = per_second self.cur_count = 0 self.refresh_event = asyncio.Event() self.refresh_running = False async def sleep_then_refresh(self): await asyncio.sleep(1) self.cur_count = 0 self.refresh_event.set() self.refresh_running = False async def acquire(self): # If we have remaining capacity in this second, dont block if self.cur_count < self.per_second: # Start timer for when our rate allocation refreshes if not self.refresh_running: self.refresh_running = True asyncio.create_task(self.sleep_then_refresh()) self.cur_count += 1 return # Otherwise we need to wait until current second is over # and our rate allocation refreshes self.refresh_event.clear() await self.refresh_event.wait() # Just recursively call after waiting return await self.acquire() def load_checkpoint(ckpt_dir: str) -> set[str]: # If checkpoint dir doesn't exist, try to create it if not os.path.isdir(ckpt_dir): logger.info("Checkpoint dir doesn't exist, creating...") try: os.mkdir(ckpt_dir) except Exception as e: logger.error(f"Failed to created checkpoint dir, {ckpt_dir}\n{e}") sys.exit(1) # Checkpoint folders contain one file per user completed_accounts = set() try: files = os.listdir(ckpt_dir) for file in files: # Grab entire file name except for .gz extension completed_accounts.add(file[:-3]) except Exception as e: logger.error( f"Failed to recover from checkpoint dir, {ckpt_dir}\n{e}", exc_info=1, ) sys.exit(1) return completed_accounts def get_accounts(graph_path: str, completed_accts: set[str]) -> list[tuple[str, int]]: # Load follow graph parquet file to_explore = dict() try: logger.info("Parsing follower graph file...") follow_df = pd.read_parquet(graph_path) # Limit to only accounts following between 100 and 1000 followers follow_df = follow_df.loc[follow_df["follows"].str.len().between(100, 1000)] except Exception as e: logger.error(f"Failed to open follow graph file, {graph_path}\n{e}") sys.exit(1) for _, row in follow_df.iterrows(): for acct in row["follows"]: if acct not in completed_accts: if acct not in to_explore: to_explore[acct] = 0 to_explore[acct] += 1 accts = [(acct, follows) for acct, follows in to_explore.items()] accts.sort(key=lambda x: -1 * x[1]) return accts def get_logger(name: str): logger = logging.getLogger(name) logger.setLevel(logging.INFO) # Create formatter formatter = logging.Formatter( "%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) logger.addHandler(console_handler) return logger