tangled
alpha
login
or
join now
ligo.at
/
core
6
fork
atom
decentralized and customizable links page on top of atproto
ligo.at
atproto
link-in-bio
python
uv
6
fork
atom
overview
issues
2
pulls
pipelines
very basic jetstream ingestor
nauta.one
4 months ago
6eefc5bc
ff4d900e
+98
2 changed files
expand all
collapse all
unified
split
Makefile
src
ingest.py
+4
Makefile
···
8
8
.PHONY: run
9
9
run:
10
10
uv run -- dotenv run -- gunicorn
11
11
+
12
12
+
.PHONY: ingest
13
13
+
ingest:
14
14
+
uv run -- src/ingest.py
+94
src/ingest.py
···
1
1
+
import sqlite3
2
2
+
from typing import Any
3
3
+
import aiohttp
4
4
+
import asyncio
5
5
+
import dotenv
6
6
+
import json
7
7
+
import logging
8
8
+
9
9
+
logger = logging.getLogger(__name__)
10
10
+
11
11
+
12
12
+
async def ingest_jetstream(config: dict[str, str | None]):
13
13
+
socket = f"wss://{config['JETSTREAM_URL']}/subscribe"
14
14
+
socket += "?wantedCollections=at.ligo.*"
15
15
+
logger.info(f"connecting to {socket}")
16
16
+
async with aiohttp.ClientSession() as session:
17
17
+
async with session.ws_connect(socket) as ws:
18
18
+
async for message in ws:
19
19
+
if message.type == aiohttp.WSMsgType.TEXT:
20
20
+
json = message.json()
21
21
+
did = json["did"]
22
22
+
if json["kind"] == "commit":
23
23
+
handle_commit(did, json["commit"], config)
24
24
+
else:
25
25
+
continue
26
26
+
27
27
+
28
28
+
def handle_commit(did: str, commit: dict[str, Any], config: dict[str, str | None]):
29
29
+
is_delete: bool = commit["operation"] == "delete"
30
30
+
collection: str = commit["collection"]
31
31
+
rkey: str = commit["rkey"]
32
32
+
33
33
+
if rkey != "self":
34
34
+
return
35
35
+
36
36
+
db = get_database(config)
37
37
+
if not db:
38
38
+
return
39
39
+
cursor = db.cursor()
40
40
+
41
41
+
prefix: str | None = None
42
42
+
type: str | None = None
43
43
+
match collection:
44
44
+
case "at.ligo.actor.profile":
45
45
+
prefix = "profile_from_did"
46
46
+
type = "at.ligo.actor.profile"
47
47
+
case "at.ligo.actor.links":
48
48
+
prefix = "links_from_did"
49
49
+
type = "at.ligo.actor.links"
50
50
+
case _:
51
51
+
pass
52
52
+
if prefix is None:
53
53
+
return
54
54
+
55
55
+
if is_delete:
56
56
+
logger.debug(f"deleting {prefix} for {did}")
57
57
+
_ = cursor.execute(
58
58
+
"delete from keyval where prefix = ? and key = ?",
59
59
+
(prefix, did),
60
60
+
)
61
61
+
else:
62
62
+
logger.debug(f"creating or updating {prefix} for {did}")
63
63
+
record: dict[str, str] = commit["record"]
64
64
+
if record["$type"] != type:
65
65
+
return
66
66
+
content = json.dumps(record)
67
67
+
_ = cursor.execute(
68
68
+
"insert or replace into keyval values (?, ?, ?)",
69
69
+
(prefix, did, content),
70
70
+
)
71
71
+
72
72
+
db.commit()
73
73
+
cursor.close()
74
74
+
db.close()
75
75
+
76
76
+
77
77
+
def get_database(config: dict[str, str | None]) -> sqlite3.Connection | None:
78
78
+
database_name = config.get("FLASK_DATABASE_URL", "ligoat.db")
79
79
+
if not database_name:
80
80
+
return None
81
81
+
return sqlite3.connect(database_name)
82
82
+
83
83
+
84
84
+
async def main(config: dict[str, str | None]):
85
85
+
try:
86
86
+
await ingest_jetstream(config)
87
87
+
except asyncio.CancelledError:
88
88
+
pass
89
89
+
90
90
+
91
91
+
if __name__ == "__main__":
92
92
+
config = dotenv.dotenv_values()
93
93
+
asyncio.run(main(config))
94
94
+
print("see you next time!")