this repo has no description

add graph builder

+108 -1
+84
build_graph.py
··· 1 + from collections import UserString 2 + import logging 3 + from typing import Dict, Optional, Set 4 + 5 + import click 6 + 7 + from config import CONFIG 8 + from indexer import FollowIndexer 9 + import indexer 10 + 11 + 12 + logging.basicConfig( 13 + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 14 + ) 15 + 16 + logger = logging.getLogger(__name__) 17 + 18 + 19 + @click.command 20 + @click.option( 21 + "--ch-host", 22 + ) 23 + @click.option( 24 + "--ch-port", 25 + type=int, 26 + ) 27 + @click.option( 28 + "--ch-user", 29 + ) 30 + @click.option( 31 + "--ch-pass", 32 + ) 33 + def main( 34 + ch_host: Optional[str], 35 + ch_port: Optional[int], 36 + ch_user: Optional[str], 37 + ch_pass: Optional[str], 38 + ): 39 + logger.info("Building follow graph...") 40 + 41 + indexer = FollowIndexer( 42 + clickhouse_host=ch_host or CONFIG.clickhouse_host, 43 + clickhouse_port=ch_port or CONFIG.clickhouse_port, 44 + clickhouse_user=ch_user or CONFIG.clickhouse_user, 45 + clickhouse_pass=ch_pass or CONFIG.clickhouse_pass, 46 + batch_size=1000, 47 + ) 48 + 49 + graph: Dict[str, Set[str]] = {} 50 + 51 + def build_graph(did: str, subject: str): 52 + if did not in graph: 53 + graph[did] = set() 54 + 55 + graph[did].add(subject) 56 + 57 + indexer.stream_follows(build_graph) 58 + 59 + prox_map = {} 60 + 61 + for did in graph: 62 + first = graph.get(did, set()) 63 + 64 + second: Set[str] = set() 65 + for subject in first: 66 + second.update(graph.get(subject, set())) 67 + 68 + prox_map[did] = { 69 + "hop1": first, 70 + "hop2": second - first - {did}, 71 + } 72 + 73 + import pickle 74 + 75 + with open("prox_map.pkl", "wb") as f: 76 + pickle.dump(prox_map, f) 77 + 78 + logger.info( 79 + f"Finished building proximity map, saved to prox_map.pkl. {len(prox_map):,} users in map." 80 + ) 81 + 82 + 83 + if __name__ == "__main__": 84 + main()
+24 -1
indexer.py
··· 4 4 from datetime import datetime 5 5 from threading import Lock 6 6 from time import time 7 - from typing import Any, List, Optional 7 + from typing import Any, Callable, List, Optional 8 8 9 9 import click 10 10 from aiokafka import AIOKafkaConsumer, ConsumerRecord ··· 184 184 batch_to_flush = self._unfollow_batch.copy() 185 185 self._unfollow_batch = [] 186 186 self._flush_unfollows(batch_to_flush) 187 + 188 + def stream_follows(self, cb: Callable[[str, str], None], batch_size: int = 100_000): 189 + query = """ 190 + SELECT f.did, f.subject 191 + FROM follows f 192 + LEFT ANTI JOIN unfollows u ON f.uri = u.uri 193 + """ 194 + 195 + try: 196 + with self.client.query_row_block_stream( 197 + query, settings={"max_block_size": batch_size} 198 + ) as stream: 199 + total_handled = 0 200 + for block in stream: 201 + for row in block: 202 + cb(row[0], row[1]) 203 + total_handled += 1 204 + 205 + if total_handled % 1_000_000 == 0: 206 + logger.info(f"Handled {total_handled:,} follows so far") 207 + logger.info(f"Finished streaming {total_handled:,} follows") 208 + except Exception as e: 209 + logger.error(f"Error streaming follows: {e}") 187 210 188 211 189 212 class Consumer: