this repo has no description

bsky-activity.py: switch to jetstream

+10 -35
+10 -35
bsky-activity.py
··· 3 import asyncio 4 from datetime import datetime, timezone 5 from io import BytesIO 6 import os 7 import sqlite3 8 import sys ··· 29 ]) 30 31 async def bsky_activity(): 32 - redis_cnx = redis.Redis() 33 - relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 34 - firehose_seq = redis_cnx.get('dev.edavis.muninsky.seq') 35 - if firehose_seq: 36 - relay_url += f'?cursor={firehose_seq.decode()}' 37 38 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 39 sys.stdout.flush() 40 41 async with websockets.connect(relay_url, ping_timeout=60) as firehose: 42 while True: 43 - frame = BytesIO(await firehose.recv()) 44 - header = dag_cbor.decode(frame, allow_concat=True) 45 - if header['op'] != 1 or header['t'] != '#commit': 46 - continue 47 48 - payload = dag_cbor.decode(frame) 49 - if payload['tooBig']: 50 - # TODO(ejd): figure out how to get blocks out-of-band 51 - continue 52 - 53 - # TODO(ejd): figure out how to validate blocks 54 - blocks = payload.pop('blocks') 55 - car_parsed = CAR.from_bytes(blocks) 56 - 57 - message = payload.copy() 58 - del message['ops'] 59 - message['commit'] = message['commit'].encode('base32') 60 - 61 - for commit_op in payload['ops']: 62 - op = commit_op.copy() 63 - if op['cid'] is not None: 64 - op['cid'] = op['cid'].encode('base32') 65 - op['record'] = car_parsed.blocks.get(op['cid']) 66 - 67 - yield message, op 68 69 async def main(): 70 redis_cnx = redis.Redis() ··· 89 sys.stdout.flush() 90 91 op_count = 0 92 - async for commit, op in bsky_activity(): 93 - if op['action'] != 'create': 94 continue 95 96 - collection, _ = op['path'].split('/') 97 if collection not in app_bsky_allowlist: 98 continue 99 100 - repo_did = commit['repo'] 101 - repo_update_time = datetime.strptime(commit['time'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc) 102 db_cnx.execute( 103 'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts', 104 {'did': repo_did, 'ts': repo_update_time.timestamp()} ··· 111 op_count += 1 112 if op_count % 500 == 0: 113 now = datetime.now(timezone.utc) 114 - payload_seq = commit['seq'] 115 payload_lag = now - repo_update_time 116 117 sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n')
··· 3 import asyncio 4 from datetime import datetime, timezone 5 from io import BytesIO 6 + import json 7 import os 8 import sqlite3 9 import sys ··· 30 ]) 31 32 async def bsky_activity(): 33 + relay_url = 'ws://localhost:6008/subscribe' 34 35 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 36 sys.stdout.flush() 37 38 async with websockets.connect(relay_url, ping_timeout=60) as firehose: 39 while True: 40 + payload = BytesIO(await firehose.recv()) 41 42 + yield json.load(payload) 43 44 async def main(): 45 redis_cnx = redis.Redis() ··· 64 sys.stdout.flush() 65 66 op_count = 0 67 + async for payload in bsky_activity(): 68 + if payload['opType'] != 'c': 69 continue 70 71 + collection = payload['collection'] 72 if collection not in app_bsky_allowlist: 73 continue 74 75 + repo_did = payload['did'] 76 + repo_update_time = datetime.now(timezone.utc) 77 db_cnx.execute( 78 'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts', 79 {'did': repo_did, 'ts': repo_update_time.timestamp()} ··· 86 op_count += 1 87 if op_count % 500 == 0: 88 now = datetime.now(timezone.utc) 89 + payload_seq = payload['seq'] 90 payload_lag = now - repo_update_time 91 92 sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n')