Async Python Jetstream Client pypi.org/project/atproto_jetstream/
atproto jetstream python zstd
Python 97.6%
Makefile 2.3%
Other 0.1%
15 1 4

Clone this repository

https://tangled.org/nauta.one/atproto_jetstream https://tangled.org/did:plc:ussmfvrfnexgecwxpqcp5wq2/atproto_jetstream
git@tangled.org:nauta.one/atproto_jetstream git@tangled.org:did:plc:ussmfvrfnexgecwxpqcp5wq2/atproto_jetstream

For self-hosted knots, clone URLs may differ based on your setup.

Download tar.gz
README.md

atproto_jetstream#

Small, typed, and async package to receive Jetstream events from the AT Protocol.

install#

Using your package manager, install the atproto_jetstream dependency.

  • pip install atproto_jetstream
  • uv add atproto_jetstream

usage#

from asyncio import run
from atproto_jetstream import Jetstream, JetstreamOptions


async def main():
    options = JetstreamOptions("wss://jetstream1.us-east.bsky.network/subscribe")
    async with Jetstream(options) as stream:
        async for event in stream:
            match event.kind:
                case "account":
                    print(event.account)
                case "identity":
                    print(event.identity)
                case "commit":
                    print(event.commit)


if __name__ == "__main__":
    run(main())

development#

You need to download the zstd_dictionary file from the Jetstream repository. A Make recipe is provided to do so:

make atproto_jetstream/zstd_dictionary