tangled
alpha
login
or
join now
edavis.dev
/
bsky-tools
0
fork
atom
this repo has no description
0
fork
atom
overview
issues
pulls
pipelines
feeds: cutover to jetstream
Eric Davis
2 years ago
d45b4a9e
e6024ace
+27
-58
5 changed files
expand all
collapse all
unified
split
feedgen.py
feeds
battle.py
homeruns.py
popular.py
rapidfire.py
+4
-27
feedgen.py
···
2
2
3
3
import asyncio
4
4
from io import BytesIO
5
5
+
import json
5
6
import logging
6
7
7
8
from atproto import CAR
···
20
21
logging.getLogger('firehose').setLevel(logging.DEBUG)
21
22
22
23
async def firehose_events(firehose_manager):
23
23
-
relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
24
24
-
seq = firehose_manager.get_sequence_number()
25
25
-
if seq is not None:
26
26
-
relay_url += f'?cursor={seq}'
24
24
+
relay_url = 'ws://localhost:6008/subscribe'
27
25
28
26
logger = logging.getLogger('feeds.events')
29
27
logger.info(f'opening websocket connection to {relay_url}')
30
28
31
29
async with websockets.connect(relay_url, ping_timeout=60) as firehose:
32
30
while True:
33
33
-
frame = BytesIO(await firehose.recv())
34
34
-
header = dag_cbor.decode(frame, allow_concat=True)
35
35
-
if header['op'] != 1 or header['t'] != '#commit':
36
36
-
continue
37
37
-
38
38
-
payload = dag_cbor.decode(frame)
39
39
-
if payload['tooBig']:
40
40
-
continue
41
41
-
42
42
-
blocks = payload.pop('blocks')
43
43
-
car_parsed = CAR.from_bytes(blocks)
44
44
-
message = payload.copy()
45
45
-
del message['ops']
46
46
-
message['commit'] = message['commit'].encode('base32')
47
47
-
48
48
-
for op in payload['ops']:
49
49
-
repo_op = op.copy()
50
50
-
if op['cid'] is not None:
51
51
-
repo_op['cid'] = repo_op['cid'].encode('base32')
52
52
-
repo_op['record'] = car_parsed.blocks.get(repo_op['cid'])
53
53
-
54
54
-
message['op'] = repo_op
55
55
-
yield message
31
31
+
payload = BytesIO(await firehose.recv())
32
32
+
yield json.load(payload)
56
33
57
34
async def main():
58
35
firehose_manager = FirehoseManager()
+6
-8
feeds/battle.py
···
28
28
self.logger = logging.getLogger('feeds.battle')
29
29
30
30
def process_commit(self, commit):
31
31
-
op = commit['op']
32
32
-
if op['action'] != 'create':
31
31
+
if commit['opType'] != 'c':
33
32
return
34
33
35
35
-
collection, _ = op['path'].split('/')
36
36
-
if collection != 'app.bsky.feed.post':
34
34
+
if commit['collection'] != 'app.bsky.feed.post':
37
35
return
38
36
39
39
-
record = op.get('record')
37
37
+
record = commit.get('record')
40
38
if record is None:
41
39
return
42
40
43
43
-
repo = commit['repo']
44
44
-
path = op['path']
45
45
-
post_uri = f'at://{repo}/{path}'
41
41
+
repo = commit['did']
42
42
+
rkey = commit['rkey']
43
43
+
post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}'
46
44
length = grapheme.length(record.get('text', ''))
47
45
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
48
46
+7
-9
feeds/homeruns.py
···
57
57
self.logger = logging.getLogger('feeds.homeruns')
58
58
59
59
def process_commit(self, commit):
60
60
-
if commit['repo'] != MLBHRS_DID:
60
60
+
if commit['did'] != MLBHRS_DID:
61
61
return
62
62
63
63
-
op = commit['op']
64
64
-
if op['action'] != 'create':
63
63
+
if commit['opType'] != 'c':
65
64
return
66
65
67
67
-
collection, _ = op['path'].split('/')
68
68
-
if collection != 'app.bsky.feed.post':
66
66
+
if commit['collection'] != 'app.bsky.feed.post':
69
67
return
70
68
71
71
-
record = op.get('record')
69
69
+
record = commit.get('record')
72
70
if record is None:
73
71
return
74
72
75
75
-
uri = 'at://{repo}/{path}'.format(
76
76
-
repo = commit['repo'],
77
77
-
path = op['path']
73
73
+
uri = 'at://{repo}/app.bsky.feed.post/{rkey}'.format(
74
74
+
repo = commit['did'],
75
75
+
rkey = commit['rkey']
78
76
)
79
77
tags = record.get('tags', [])
80
78
+4
-6
feeds/popular.py
···
22
22
self.logger = logging.getLogger('feeds.popular')
23
23
24
24
def process_commit(self, commit):
25
25
-
op = commit['op']
26
26
-
if op['action'] != 'create':
25
25
+
if commit['opType'] != 'c':
27
26
return
28
27
29
29
-
collection, _ = op['path'].split('/')
30
30
-
if collection != 'app.bsky.feed.like':
28
28
+
if commit['collection'] != 'app.bsky.feed.like':
31
29
return
32
30
33
33
-
record = op.get('record')
31
31
+
record = commit.get('record')
34
32
if record is None:
35
33
return
36
34
37
35
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
38
38
-
like_subject_uri = op['record']['subject']['uri']
36
36
+
like_subject_uri = record['subject']['uri']
39
37
40
38
self.transaction_begin(self.db_cnx)
41
39
+6
-8
feeds/rapidfire.py
···
25
25
self.logger = logging.getLogger('feeds.rapidfire')
26
26
27
27
def process_commit(self, commit):
28
28
-
op = commit['op']
29
29
-
if op['action'] != 'create':
28
28
+
if commit['opType'] != 'c':
30
29
return
31
30
32
32
-
collection, _ = op['path'].split('/')
33
33
-
if collection != 'app.bsky.feed.post':
31
31
+
if commit['collection'] != 'app.bsky.feed.post':
34
32
return
35
33
36
36
-
record = op.get('record')
34
34
+
record = commit.get('record')
37
35
if record is None:
38
36
return
39
37
···
43
41
record.get('embed') is None,
44
42
record.get('facets') is None
45
43
]):
46
46
-
repo = commit['repo']
47
47
-
path = op['path']
48
48
-
post_uri = f'at://{repo}/{path}'
44
44
+
repo = commit['did']
45
45
+
rkey = commit['rkey']
46
46
+
post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}'
49
47
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
50
48
51
49
self.transaction_begin(self.db_cnx)