atproto_jetstream#
Small, typed, and async package to receive Jetstream events from the AT Protocol.
install#
Using your package manager, install the atproto_jetstream dependency.
pip install atproto_jetstreamuv add atproto_jetstream
usage#
from asyncio import run
from atproto_jetstream import Jetstream, JetstreamOptions
async def main():
options = JetstreamOptions("wss://jetstream1.us-east.bsky.network/subscribe")
async with Jetstream(options) as stream:
async for event in stream:
match event.kind:
case "account":
print(event.account)
case "identity":
print(event.identity)
case "commit":
print(event.commit)
if __name__ == "__main__":
run(main())
development#
You need to download the zstd_dictionary file from the Jetstream repository.
A Make recipe is provided to do so:
make atproto_jetstream/zstd_dictionary