Async Python Jetstream Client pypi.org/project/atproto_jetstream/
atproto jetstream python zstd

add endpoint to JetstreamOptions

+13 -10
+3 -2
README.md
··· 13 14 ```python 15 from asyncio import run 16 - from atproto_jetstream import Jetstream 17 18 19 async def main(): 20 - async with Jetstream("jetstream1.us-east.bsky.network") as stream: 21 async for event in stream: 22 match event.kind: 23 case "account":
··· 13 14 ```python 15 from asyncio import run 16 + from atproto_jetstream import Jetstream, JetstreamOptions 17 18 19 async def main(): 20 + options = JetstreamOptions("wss://jetstream1.us-east.bsky.network/subscribe") 21 + async with Jetstream(options) as stream: 22 async for event in stream: 23 match event.kind: 24 case "account":
+5 -6
atproto_jetstream/__init__.py
··· 10 11 12 class JetstreamOptions(NamedTuple): 13 wanted_collections: list[str] = [] 14 wanted_dids: list[str] = [] 15 max_message_size_bytes: int | None = None ··· 85 86 87 class Jetstream: 88 - _url: str 89 _options: JetstreamOptions 90 _client: ClientSession 91 _session: ClientWebSocketResponse | None 92 _zstd_dict: ZstdDict 93 94 - def __init__(self, host: str, options: JetstreamOptions | None = None) -> None: 95 - self._url = host 96 self._options = options or JetstreamOptions() 97 self._client = ClientSession(auto_decompress=True) 98 self._session = None ··· 100 with open(Path(__file__).with_name("zstd_dictionary"), "rb") as file: 101 self._zstd_dict = ZstdDict(file.read()) 102 103 - async def __aenter__(self) -> "Jetstream": 104 _ = await self._client.__aenter__() 105 - url = f"wss://{self._url}/subscribe?{self._options.to_query()}" 106 self._session = await self._client.ws_connect(url) 107 _ = await self._session.__aenter__() 108 return self ··· 117 await self._session.__aexit__(exc_type, exc_val, exc_tb) 118 await self._client.__aexit__(exc_type, exc_val, exc_tb) 119 120 - def __aiter__(self): 121 if self._session: 122 _ = self._session.__aiter__() 123 return self
··· 10 11 12 class JetstreamOptions(NamedTuple): 13 + endpoint: str = "wss://jetstream1.us-east.bsky.network/subscribe" 14 wanted_collections: list[str] = [] 15 wanted_dids: list[str] = [] 16 max_message_size_bytes: int | None = None ··· 86 87 88 class Jetstream: 89 _options: JetstreamOptions 90 _client: ClientSession 91 _session: ClientWebSocketResponse | None 92 _zstd_dict: ZstdDict 93 94 + def __init__(self, options: JetstreamOptions | None = None) -> None: 95 self._options = options or JetstreamOptions() 96 self._client = ClientSession(auto_decompress=True) 97 self._session = None ··· 99 with open(Path(__file__).with_name("zstd_dictionary"), "rb") as file: 100 self._zstd_dict = ZstdDict(file.read()) 101 102 + async def __aenter__(self) -> Jetstream: 103 _ = await self._client.__aenter__() 104 + url = f"{self._options.endpoint}?{self._options.to_query()}" 105 self._session = await self._client.ws_connect(url) 106 _ = await self._session.__aenter__() 107 return self ··· 116 await self._session.__aexit__(exc_type, exc_val, exc_tb) 117 await self._client.__aexit__(exc_type, exc_val, exc_tb) 118 119 + def __aiter__(self) -> Jetstream: 120 if self._session: 121 _ = self._session.__aiter__() 122 return self
+5 -2
example.py
··· 4 5 6 async def main(): 7 - options = JetstreamOptions(compress=True) 8 - async with Jetstream("jetstream1.us-east.bsky.network", options=options) as stream: 9 async for event in stream: 10 match event.kind: 11 case "account":
··· 4 5 6 async def main(): 7 + options = JetstreamOptions( 8 + endpoint="wss://jetstream1.us-east.bsky.network/subscribe", 9 + compress=True, 10 + ) 11 + async with Jetstream(options) as stream: 12 async for event in stream: 13 match event.kind: 14 case "account":