tangled
alpha
login
or
join now
edavis.dev
/
bsky-tools
0
fork
atom
this repo has no description
0
fork
atom
overview
issues
pulls
pipelines
bsky-activity: move firehose handler into bsky-activity.py
Eric Davis
2 years ago
681b4453
0e600c94
+45
-48
2 changed files
expand all
collapse all
unified
split
bsky-activity.py
firehose_utils.py
+45
-3
bsky-activity.py
···
1
1
#!/usr/bin/env python3
2
2
3
3
import asyncio
4
4
+
from datetime import datetime, timezone
5
5
+
from io import BytesIO
4
6
import os
5
5
-
import redis
6
7
import sqlite3
7
8
import sys
8
8
-
from datetime import datetime, timezone
9
9
-
from firehose_utils import bsky_activity
9
9
+
10
10
+
from atproto import CAR
11
11
+
import redis
12
12
+
import dag_cbor
13
13
+
import websockets
10
14
11
15
app_bsky_allowlist = set([
12
16
'app.bsky.actor.profile',
···
22
26
'app.bsky.graph.listitem',
23
27
'app.bsky.labeler.service',
24
28
])
29
29
+
30
30
+
async def bsky_activity():
31
31
+
redis_cnx = redis.Redis()
32
32
+
relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
33
33
+
firehose_seq = redis_cnx.get('dev.edavis.muninsky.seq')
34
34
+
if firehose_seq:
35
35
+
relay_url += f'?cursor={firehose_seq.decode()}'
36
36
+
37
37
+
sys.stdout.write(f'opening websocket connection to {relay_url}\n')
38
38
+
sys.stdout.flush()
39
39
+
40
40
+
async with websockets.connect(relay_url, ping_timeout=None) as firehose:
41
41
+
while True:
42
42
+
frame = BytesIO(await firehose.recv())
43
43
+
header = dag_cbor.decode(frame, allow_concat=True)
44
44
+
if header['op'] != 1 or header['t'] != '#commit':
45
45
+
continue
46
46
+
47
47
+
payload = dag_cbor.decode(frame)
48
48
+
if payload['tooBig']:
49
49
+
# TODO(ejd): figure out how to get blocks out-of-band
50
50
+
continue
51
51
+
52
52
+
# TODO(ejd): figure out how to validate blocks
53
53
+
blocks = payload.pop('blocks')
54
54
+
car_parsed = CAR.from_bytes(blocks)
55
55
+
56
56
+
message = payload.copy()
57
57
+
del message['ops']
58
58
+
message['commit'] = message['commit'].encode('base32')
59
59
+
60
60
+
for commit_op in payload['ops']:
61
61
+
op = commit_op.copy()
62
62
+
if op['cid'] is not None:
63
63
+
op['cid'] = op['cid'].encode('base32')
64
64
+
op['record'] = car_parsed.blocks.get(op['cid'])
65
65
+
66
66
+
yield message, op
25
67
26
68
async def main():
27
69
redis_cnx = redis.Redis()
-45
firehose_utils.py
···
1
1
-
import apsw
2
2
-
import dag_cbor
3
3
-
import redis
4
4
-
import sys
5
5
-
import websockets
6
6
-
from atproto import CAR
7
7
-
from io import BytesIO
8
8
-
9
9
-
async def bsky_activity():
10
10
-
redis_cnx = redis.Redis()
11
11
-
relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
12
12
-
firehose_seq = redis_cnx.get('dev.edavis.muninsky.seq')
13
13
-
if firehose_seq:
14
14
-
relay_url += f'?cursor={firehose_seq.decode()}'
15
15
-
16
16
-
sys.stdout.write(f'opening websocket connection to {relay_url}\n')
17
17
-
sys.stdout.flush()
18
18
-
19
19
-
async with websockets.connect(relay_url, ping_timeout=None) as firehose:
20
20
-
while True:
21
21
-
frame = BytesIO(await firehose.recv())
22
22
-
header = dag_cbor.decode(frame, allow_concat=True)
23
23
-
if header['op'] != 1 or header['t'] != '#commit':
24
24
-
continue
25
25
-
26
26
-
payload = dag_cbor.decode(frame)
27
27
-
if payload['tooBig']:
28
28
-
# TODO(ejd): figure out how to get blocks out-of-band
29
29
-
continue
30
30
-
31
31
-
# TODO(ejd): figure out how to validate blocks
32
32
-
blocks = payload.pop('blocks')
33
33
-
car_parsed = CAR.from_bytes(blocks)
34
34
-
35
35
-
message = payload.copy()
36
36
-
del message['ops']
37
37
-
message['commit'] = message['commit'].encode('base32')
38
38
-
39
39
-
for commit_op in payload['ops']:
40
40
-
op = commit_op.copy()
41
41
-
if op['cid'] is not None:
42
42
-
op['cid'] = op['cid'].encode('base32')
43
43
-
op['record'] = car_parsed.blocks.get(op['cid'])
44
44
-
45
45
-
yield message, op