import argparse import asyncio import decimal import logging import os import sys import time from typing import Tuple, List, Dict from atproto import AsyncClient from atproto import exceptions as at_exceptions import pandas as pd from utils import RateLimit, BSKY_API_LIMIT logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # Create formatter formatter = logging.Formatter( "%(asctime)s | %(levelname)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) logger.addHandler(console_handler) BATCH_SIZE = 10 CHECKPOINT_THRESHOLD = 100 REQUIRED_ENV = ("BSKY_USER", "BSKY_APP_PW") async def get_all_follows( client: AsyncClient, rate_limit: RateLimit, did: str ) -> Tuple[str, List[str]]: follows = [] await rate_limit.acquire() data = await client.get_follows(actor=did, limit=100) cursor = data.cursor for follow in data.follows: follows.append(follow.did) while cursor: await rate_limit.acquire() data = await client.get_follows(actor=did, cursor=cursor, limit=100) cursor = data.cursor for follow in data.follows: follows.append(follow.did) return (did, follows) def save_checkpoint(output_dir: str, job_start: int, f_map: Dict[str, List[str]]): # Create dataframe for follow map follow_dict = {"follower": [], "follows": []} for follower, follows in f_map.items(): follow_dict["follower"].append(follower) follow_dict["follows"].append(follows) follow_df = pd.DataFrame(data=follow_dict) # Convert to scientific notation for more pleasent unix sort size_string = f"{decimal.Decimal(len(f_map)):.2e}".replace("+", "").replace( ".", "_" ) save_path = os.path.join( output_dir, f"{job_start}_checkpoint_{size_string}.parquet" ) # Save follow map as parquet file logger.info(f"Saving follow map to: {save_path}") try: follow_df.to_parquet(save_path) except Exception as e: logger.error(f"Failed to save follow map: {e}") sys.exit(1) async def explore(username: str, pw: str, start_did: str, num_hops: int, save_dir: str): client = AsyncClient() await client.login(username, pw) logger.info(f"Starting did: {start_did}") follow_map = dict() distance_map = {start_did: 0} to_explore = [start_did] logger.info( f"Starting crawl with:\nStart DID: {start_did}\nNum hops: {num_hops}\nSaving Output to: {save_dir}" ) job_start = int(time.time()) # Try to only send 10 requests a second batch_count = 1 fail_count = 0 rate_limiter = RateLimit(BSKY_API_LIMIT) while len(to_explore): batch = to_explore[:BATCH_SIZE] to_explore = to_explore[BATCH_SIZE:] logger.info( f"Starting batch with size: {len(batch)} remaining to_explore: {len(to_explore)}" ) for result in asyncio.as_completed( [get_all_follows(client, rate_limiter, did) for did in batch] ): try: follower, follows = await result follow_map[follower] = follows logger.info(f"{follower} follows {len(follows)} (public) accounts") except at_exceptions.BadRequestError as e: # Bad request is probably a profile that's private or deleted logger.info(f"Bad Request: {e.response.content.error}") continue except Exception as e: logger.error(f"Failed to get followers: {e}", exc_info=1) fail_count += 1 if fail_count >= 3: sys.exit(1) continue # Save a checkpoint every 10,000 accounts if batch_count % CHECKPOINT_THRESHOLD == 0: save_checkpoint(save_dir, job_start, follow_map) # If too far from start, dont add follows to exploration queue if distance_map[follower] != num_hops: for follow_did in follows: if follow_did not in follow_map: to_explore.append(follow_did) distance_map[follow_did] = distance_map[follower] + 1 logger.info( f"Finished batch {batch_count} | to_explore size: {len(to_explore)}" ) batch_count += 1 save_checkpoint(save_dir, job_start, follow_map) logger.info("Crawl complete") def main(): for key in REQUIRED_ENV: if key not in os.environ: raise ValueError(f"Must set '{key}' env var") user_name = os.environ["BSKY_USER"] app_pw = os.environ["BSKY_APP_PW"] parser = argparse.ArgumentParser( prog="CrawlFollows", description="Crawl social graph using follows starting from provided DID", ) parser.add_argument( "--start-did", dest="start_did", required=True, help="DID for account to start crawl at", ) parser.add_argument( "--num-hops", dest="num_hops", type=int, default=2, help="How many network hops to explore out from the start", ) parser.add_argument( "--save-dir", dest="save_dir", default="data/crawl/", help="Where to store crawl data", ) args = parser.parse_args() asyncio.run( explore( user_name, app_pw, start_did=args.start_did, num_hops=args.num_hops, save_dir=args.save_dir, ) ) if __name__ == "__main__": main()