tangled
alpha
login
or
join now
ligo.at
/
core
6
fork
atom
decentralized and customizable links page on top of atproto
ligo.at
atproto
link-in-bio
python
uv
6
fork
atom
overview
issues
2
pulls
pipelines
use atproto_jetstream dependency for ingestor
nauta.one
5 months ago
69aef4a9
66630461
+33
-28
3 changed files
expand all
collapse all
unified
split
ingestor
__main__.py
pyproject.toml
uv.lock
+18
-28
ingestor/__main__.py
···
1
1
import sqlite3
2
2
-
from typing import Any
3
3
-
import aiohttp
4
2
import asyncio
5
3
import dotenv
6
4
import json
7
5
import logging
6
6
+
from atproto_jetstream import Jetstream, JetstreamCommitEvent, JetstreamOptions
8
7
9
8
logger = logging.getLogger(__name__)
10
9
11
10
12
11
async def ingest_jetstream(config: dict[str, str | None]):
13
13
-
base = config.get("JETSTREAM_URL", "jetstream1.us-east.bsky.network")
14
14
-
socket = f"wss://{base}/subscribe"
15
15
-
socket += "?wantedCollections=at.ligo.*"
16
16
-
logger.info(f"connecting to {socket}")
17
17
-
async with aiohttp.ClientSession() as session:
18
18
-
async with session.ws_connect(socket) as ws:
19
19
-
async for message in ws:
20
20
-
if message.type == aiohttp.WSMsgType.TEXT:
21
21
-
json = message.json()
22
22
-
did = json["did"]
23
23
-
if json["kind"] == "commit":
24
24
-
handle_commit(did, json["commit"], config)
25
25
-
else:
26
26
-
continue
12
12
+
url = config.get("JETSTREAM_URL") or "jetstream1.us-east.bsky.network"
13
13
+
options = JetstreamOptions(wanted_collections=["at.ligo.*"])
14
14
+
async with Jetstream(url, options) as stream:
15
15
+
async for event in stream:
16
16
+
if event.kind == "commit":
17
17
+
handle_commit(event.did, event.commit, config)
27
18
28
19
29
29
-
def handle_commit(did: str, commit: dict[str, Any], config: dict[str, str | None]):
30
30
-
is_delete: bool = commit["operation"] == "delete"
31
31
-
collection: str = commit["collection"]
32
32
-
rkey: str = commit["rkey"]
20
20
+
def handle_commit(
21
21
+
did: str,
22
22
+
commit: JetstreamCommitEvent.Commit,
23
23
+
config: dict[str, str | None],
24
24
+
):
25
25
+
is_delete: bool = commit.operation == "delete"
33
26
34
34
-
if rkey != "self":
27
27
+
if commit.rkey != "self":
35
28
return
36
29
37
30
db = get_database(config)
···
41
34
42
35
prefix: str | None = None
43
36
type: str | None = None
44
44
-
match collection:
37
37
+
match commit.collection:
45
38
case "at.ligo.actor.profile":
46
39
prefix = "profile_from_did"
47
40
type = "at.ligo.actor.profile"
···
61
54
)
62
55
else:
63
56
logger.debug(f"creating or updating {prefix} for {did}")
64
64
-
record: dict[str, str] = commit["record"]
65
65
-
if record["$type"] != type:
57
57
+
if commit.record["$type"] != type:
66
58
return
67
67
-
content = json.dumps(record)
59
59
+
content = json.dumps(commit.record)
68
60
_ = cursor.execute(
69
61
"insert or replace into keyval values (?, ?, ?)",
70
62
(prefix, did, content),
···
76
68
77
69
78
70
def get_database(config: dict[str, str | None]) -> sqlite3.Connection | None:
79
79
-
database_name = config.get("FLASK_DATABASE_URL", "ligoat.db")
80
80
-
if not database_name:
81
81
-
return None
71
71
+
database_name = config.get("FLASK_DATABASE_URL") or "ligoat.db"
82
72
return sqlite3.connect(database_name)
83
73
84
74
+1
pyproject.toml
···
7
7
dependencies = [
8
8
"aiodns>=3.5.0",
9
9
"aiohttp>=3.13.0",
10
10
+
"atproto-jetstream>=0.1.1",
10
11
"authlib>=1.3",
11
12
"flask[async,dotenv]>=3.1.2",
12
13
"gunicorn>=23.0.0",
+14
uv.lock
···
117
117
]
118
118
119
119
[[package]]
120
120
+
name = "atproto-jetstream"
121
121
+
version = "0.1.1"
122
122
+
source = { registry = "https://pypi.org/simple" }
123
123
+
dependencies = [
124
124
+
{ name = "aiohttp" },
125
125
+
]
126
126
+
sdist = { url = "https://files.pythonhosted.org/packages/80/56/f6735dcb9a6f24677efe008597a9360f275ca1894de0a28283c94ad6a0ca/atproto_jetstream-0.1.1.tar.gz", hash = "sha256:3d37d3bda76e6eafbc5c76af3d626b2f6902cac56625d641588375bed1b6c883", size = 3455, upload-time = "2025-10-15T16:34:36.946Z" }
127
127
+
wheels = [
128
128
+
{ url = "https://files.pythonhosted.org/packages/a9/8d/4f387a87bf0e4aa1883c06de9f54b0d706185d9c701740ed56924ee1f5d1/atproto_jetstream-0.1.1-py3-none-any.whl", hash = "sha256:fa34444de12075e7d02e0768c3dca4cb84338ebea97723e90a6d5a587e8791fd", size = 3993, upload-time = "2025-10-15T16:34:35.826Z" },
129
129
+
]
130
130
+
131
131
+
[[package]]
120
132
name = "attrs"
121
133
version = "25.4.0"
122
134
source = { registry = "https://pypi.org/simple" }
···
394
406
dependencies = [
395
407
{ name = "aiodns" },
396
408
{ name = "aiohttp" },
409
409
+
{ name = "atproto-jetstream" },
397
410
{ name = "authlib" },
398
411
{ name = "flask", extra = ["async", "dotenv"] },
399
412
{ name = "gunicorn" },
···
403
416
requires-dist = [
404
417
{ name = "aiodns", specifier = ">=3.5.0" },
405
418
{ name = "aiohttp", specifier = ">=3.13.0" },
419
419
+
{ name = "atproto-jetstream", specifier = ">=0.1.1" },
406
420
{ name = "authlib", specifier = ">=1.3" },
407
421
{ name = "flask", extras = ["async", "dotenv"], specifier = ">=3.1.2" },
408
422
{ name = "gunicorn", specifier = ">=23.0.0" },