tangled
alpha
login
or
join now
nauta.one
/
atproto_jetstream
2
fork
atom
Async Python Jetstream Client
pypi.org/project/atproto_jetstream/
atproto
jetstream
python
zstd
2
fork
atom
overview
issues
pulls
pipelines
ignore all non-text messages
Mario Nachbaur
4 months ago
cb408100
d10f1686
+5
1 changed file
expand all
collapse all
unified
split
atproto_jetstream
__init__.py
+5
atproto_jetstream/__init__.py
···
1
1
+
from aiohttp import WSMsgType
1
2
from aiohttp.client import ClientSession, ClientWebSocketResponse
2
3
from types import TracebackType
3
4
from typing import Any, Literal, NamedTuple
···
115
116
async def __anext__(self) -> JetstreamEvent:
116
117
if not self._session:
117
118
raise Exception("there's no _session")
119
119
+
118
120
wsm = await self._session.__anext__()
121
121
+
while wsm.type != WSMsgType.TEXT:
122
122
+
wsm = await self._session.__anext__()
123
123
+
119
124
json: dict[str, Any] = wsm.json()
120
125
match json["kind"]:
121
126
case "account":