tangled
alpha
login
or
join now
edavis.dev
/
bsky-tools
0
fork
atom
this repo has no description
0
fork
atom
overview
issues
pulls
pipelines
Add feedgen/feedweb
Eric Davis
2 years ago
140c5eec
a43e3268
+239
-4
9 changed files
expand all
collapse all
unified
split
.gitignore
Pipfile
Pipfile.lock
feedgen.py
feeds
__init__.py
rapidfire.py
feedweb.py
service
feedgen.service
feedweb.service
+1
.gitignore
···
1
1
+
db/
+1
Pipfile
···
10
10
atproto = "*"
11
11
flask = "*"
12
12
requests = "*"
13
13
+
gunicorn = "*"
13
14
14
15
[dev-packages]
15
16
+22
-4
Pipfile.lock
···
1
1
{
2
2
"_meta": {
3
3
"hash": {
4
4
-
"sha256": "4c979af70167ffd0e10feab94039bc4cd6c633eafafc0ffc8fe610de279023a7"
4
4
+
"sha256": "35b6fce04f6f842ebca9cbcdd66f681bb94b7a913f3de9f67082df4986393af1"
5
5
},
6
6
"pipfile-spec": 6,
7
7
"requires": {
···
308
308
"markers": "python_version >= '3.8'",
309
309
"version": "==3.0.2"
310
310
},
311
311
+
"gunicorn": {
312
312
+
"hashes": [
313
313
+
"sha256:3213aa5e8c24949e792bcacfc176fef362e7aac80b76c56f6b5122bf350722f0",
314
314
+
"sha256:88ec8bff1d634f98e61b9f65bc4bf3cd918a90806c6f5c48bc5603849ec81033"
315
315
+
],
316
316
+
"index": "pypi",
317
317
+
"markers": "python_version >= '3.5'",
318
318
+
"version": "==21.2.0"
319
319
+
},
311
320
"h11": {
312
321
"hashes": [
313
322
"sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d",
···
545
554
"markers": "python_version >= '3.7'",
546
555
"version": "==0.3.1"
547
556
},
557
557
+
"packaging": {
558
558
+
"hashes": [
559
559
+
"sha256:2ddfb553fdf02fb784c234c7ba6ccc288296ceabec964ad2eae3777778130bc5",
560
560
+
"sha256:eb82c5e3e56209074766e6885bb04b8c38a0c015d0a30036ebe7ece34c9989e9"
561
561
+
],
562
562
+
"markers": "python_version >= '3.7'",
563
563
+
"version": "==24.0"
564
564
+
},
548
565
"pycparser": {
549
566
"hashes": [
550
550
-
"sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9",
551
551
-
"sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206"
567
567
+
"sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6",
568
568
+
"sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc"
552
569
],
553
553
-
"version": "==2.21"
570
570
+
"markers": "python_version >= '3.8'",
571
571
+
"version": "==2.22"
554
572
},
555
573
"pydantic": {
556
574
"hashes": [
+63
feedgen.py
···
1
1
+
#!/usr/bin/env python3
2
2
+
3
3
+
import asyncio
4
4
+
import dag_cbor
5
5
+
import redis
6
6
+
import sys
7
7
+
import websockets
8
8
+
from atproto import CAR
9
9
+
from io import BytesIO
10
10
+
11
11
+
from feeds import Manager
12
12
+
from feeds.rapidfire import RapidFireFeed
13
13
+
14
14
+
async def firehose_events():
15
15
+
redis_cnx = redis.Redis()
16
16
+
relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'
17
17
+
firehose_seq = redis_cnx.get('dev.edavis.feedgen.seq')
18
18
+
if firehose_seq:
19
19
+
relay_url += f'?cursor={firehose_seq.decode()}'
20
20
+
21
21
+
sys.stdout.write(f'opening websocket connection to {relay_url}\n')
22
22
+
sys.stdout.flush()
23
23
+
24
24
+
async with websockets.connect(relay_url, ping_timeout=None) as firehose:
25
25
+
op_count = 0
26
26
+
while True:
27
27
+
frame = BytesIO(await firehose.recv())
28
28
+
header = dag_cbor.decode(frame, allow_concat=True)
29
29
+
if header['op'] != 1 or header['t'] != '#commit':
30
30
+
continue
31
31
+
32
32
+
payload = dag_cbor.decode(frame)
33
33
+
if payload['tooBig']:
34
34
+
continue
35
35
+
36
36
+
blocks = payload.pop('blocks')
37
37
+
car_parsed = CAR.from_bytes(blocks)
38
38
+
message = payload.copy()
39
39
+
del message['ops']
40
40
+
message['commit'] = message['commit'].encode('base32')
41
41
+
42
42
+
for op in payload['ops']:
43
43
+
repo_op = op.copy()
44
44
+
if op['cid'] is not None:
45
45
+
repo_op['cid'] = repo_op['cid'].encode('base32')
46
46
+
repo_op['record'] = car_parsed.blocks[repo_op['cid']]
47
47
+
message['op'] = repo_op
48
48
+
yield message
49
49
+
50
50
+
op_count += 1
51
51
+
if op_count % 500 == 0:
52
52
+
redis_cnx.set('dev.edavis.feedgen.seq', payload['seq'])
53
53
+
54
54
+
async def main():
55
55
+
manager = Manager()
56
56
+
manager.register(RapidFireFeed)
57
57
+
58
58
+
async for commit in firehose_events():
59
59
+
manager.process(commit)
60
60
+
61
61
+
62
62
+
if __name__ == '__main__':
63
63
+
asyncio.run(main())
+18
feeds/__init__.py
···
1
1
+
class Manager:
2
2
+
def __init__(self):
3
3
+
self.feeds = []
4
4
+
self.webs = {}
5
5
+
6
6
+
def register(self, feed):
7
7
+
f = feed()
8
8
+
self.webs[feed.FEED_URI] = f
9
9
+
self.feeds.append(f)
10
10
+
11
11
+
def process(self, commit):
12
12
+
for feed in self.feeds:
13
13
+
feed.process(commit)
14
14
+
15
15
+
def serve(self, feed_uri, limit, offset):
16
16
+
feed = self.webs.get(feed_uri)
17
17
+
if feed is not None:
18
18
+
return feed.serve(limit, offset)
+60
feeds/rapidfire.py
···
1
1
+
import sqlite3
2
2
+
3
3
+
MAX_TEXT_LENGTH = 140
4
4
+
5
5
+
class RapidFireFeed:
6
6
+
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire'
7
7
+
8
8
+
def __init__(self):
9
9
+
self.checkpoint = 0
10
10
+
self.db_cnx = sqlite3.connect('db/rapidfire.db')
11
11
+
with self.db_cnx:
12
12
+
self.db_cnx.executescript("""
13
13
+
pragma journal_mode = WAL;
14
14
+
pragma synchronous = off;
15
15
+
create table if not exists posts (uri text, create_ts timestamp);
16
16
+
create index if not exists create_ts_idx on posts(create_ts);
17
17
+
""")
18
18
+
19
19
+
def process(self, commit):
20
20
+
op = commit['op']
21
21
+
if op['action'] != 'create':
22
22
+
return
23
23
+
24
24
+
collection, _ = op['path'].split('/')
25
25
+
if collection != 'app.bsky.feed.post':
26
26
+
return
27
27
+
28
28
+
ts = commit['time']
29
29
+
record = op['record']
30
30
+
31
31
+
if all([
32
32
+
len(record['text']) <= MAX_TEXT_LENGTH,
33
33
+
all(0x20 <= ord(c) <= 0x7e for c in record['text']),
34
34
+
record.get('reply') is None,
35
35
+
record.get('embed') is None,
36
36
+
record.get('facets') is None
37
37
+
]):
38
38
+
repo = commit['repo']
39
39
+
path = op['path']
40
40
+
post_uri = f'at://{repo}/{path}'
41
41
+
self.db_cnx.execute(
42
42
+
'insert into posts (uri, create_ts) values (:uri, :ts)',
43
43
+
dict(uri=post_uri, ts=ts)
44
44
+
)
45
45
+
46
46
+
self.checkpoint += 1
47
47
+
if self.checkpoint % 10 == 0:
48
48
+
self.db_cnx.execute("delete from posts where strftime('%s', create_ts) < strftime('%s', 'now', '-1 hour')")
49
49
+
self.db_cnx.commit()
50
50
+
51
51
+
def serve(self, limit, offset):
52
52
+
cur = self.db_cnx.execute(
53
53
+
"select uri from posts order by create_ts desc limit :limit offset :offset",
54
54
+
dict(limit=limit, offset=offset)
55
55
+
)
56
56
+
57
57
+
feed = [dict(post=uri) for (uri,) in cur]
58
58
+
offset += len(feed)
59
59
+
60
60
+
return dict(offset=str(offset), feed=feed)
+42
feedweb.py
···
1
1
+
#!/usr/bin/env python3
2
2
+
3
3
+
from feeds import Manager
4
4
+
from feeds.rapidfire import RapidFireFeed
5
5
+
6
6
+
from flask import Flask, request
7
7
+
app = Flask(__name__)
8
8
+
9
9
+
@app.route('/.well-known/did.json')
10
10
+
def well_known_did():
11
11
+
service = {
12
12
+
'id': '#bsky_fg',
13
13
+
'type': 'BskyFeedGenerator',
14
14
+
'serviceEndpoint': 'https://feedgen.edavis.dev',
15
15
+
}
16
16
+
return {
17
17
+
'@context': ['https://www.w3.org/ns/did/v1'],
18
18
+
'id': 'did:web:feedgen.edavis.dev',
19
19
+
'service': [service],
20
20
+
}
21
21
+
22
22
+
@app.route('/xrpc/app.bsky.feed.getFeedSkeleton')
23
23
+
def get_feed_skeleton():
24
24
+
manager = Manager()
25
25
+
manager.register(RapidFireFeed)
26
26
+
27
27
+
try:
28
28
+
limit = int(request.args.get('limit', 50))
29
29
+
except ValueError:
30
30
+
limit = 50
31
31
+
32
32
+
try:
33
33
+
offset = int(request.args.get('cursor', 0))
34
34
+
except ValueError:
35
35
+
offset = 0
36
36
+
37
37
+
feed_uri = request.args['feed']
38
38
+
return manager.serve(feed_uri, limit, offset)
39
39
+
40
40
+
41
41
+
if __name__ == '__main__':
42
42
+
app.run(debug=True)
+16
service/feedgen.service
···
1
1
+
[Unit]
2
2
+
Description=Bsky Feedgen
3
3
+
After=network.target syslog.target
4
4
+
5
5
+
[Service]
6
6
+
Type=simple
7
7
+
User=eric
8
8
+
WorkingDirectory=/home/eric/bsky-tools
9
9
+
ExecStart=/home/eric/.local/bin/pipenv run ./feedgen.py
10
10
+
TimeoutSec=15
11
11
+
Restart=on-failure
12
12
+
RestartSec=1
13
13
+
StandardOutput=journal
14
14
+
15
15
+
[Install]
16
16
+
WantedBy=multi-user.target
+16
service/feedweb.service
···
1
1
+
[Unit]
2
2
+
Description=Bsky Feedweb
3
3
+
After=network.target syslog.target
4
4
+
5
5
+
[Service]
6
6
+
Type=simple
7
7
+
User=eric
8
8
+
WorkingDirectory=/home/eric/bsky-tools
9
9
+
ExecStart=/home/eric/.local/bin/pipenv run gunicorn -w 4 -b 127.0.0.1:9060 feedweb:app
10
10
+
TimeoutSec=15
11
11
+
Restart=on-failure
12
12
+
RestartSec=1
13
13
+
StandardOutput=journal
14
14
+
15
15
+
[Install]
16
16
+
WantedBy=multi-user.target