at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[docs] update readme, change default backfill workers limit to 128

ptr.pet 8e6111ba 042fabda

verified
+68 -7
+67 -6
README.md
··· 1 1 # hydrant 2 2 3 + `hydrant` is an AT Protocol indexer built on the `fjall` database. it's meant to be a flexible indexer, supporting both full-network indexing and filtered indexing (e.g., by DID), also allowing querying with XRPC's like `com.atproto.sync.getRepo`, `com.atproto.repo.listRecords`, and so on, which should allow many more usecases compared to just providing an event stream. 4 + 3 5 ## configuration 4 6 5 - environment variables: 6 - - `HYDRANT_DATABASE_PATH`: path to database folder (default: `./hydrant.db`) 7 - - `HYDRANT_RELAY_HOST`: relay WebSocket URL (default: `wss://relay.fire.hose.cam`) 8 - - `HYDRANT_PLC_URL`: base URL of the PLC directory (default: `https://plc.wtf`). 9 - - `HYDRANT_FULL_NETWORK`: if set to `true`, the indexer will discover and index all repos it sees. 10 - - `HYDRANT_CURSOR_SAVE_INTERVAL`: how often to save the Firehose cursor (default: `10s`). 7 + `hydrant` is configured via environment variables. all variables are prefixed with `HYDRANT_`. 8 + 9 + | variable | default | description | 10 + | :--- | :--- | :--- | 11 + | `DATABASE_PATH` | `./hydrant.db` | path to the database folder. | 12 + | `LOG_LEVEL` | `info` | log level (e.g., `debug`, `info`, `warn`, `error`). | 13 + | `RELAY_HOST` | `wss://relay.fire.hose.cam` | websocket URL of the upstream firehose relay. | 14 + | `PLC_URL` | `https://plc.wtf` | base URL(s) of the PLC directory (comma-separated for multiple). | 15 + | `FULL_NETWORK` | `false` | if `true`, discovers and indexes all repositories in the network. | 16 + | `FIREHOSE_WORKERS` | `64` | number of concurrent workers for processing firehose events. | 17 + | `BACKFILL_CONCURRENCY_LIMIT` | `128` | maximum number of concurrent backfill tasks. | 18 + | `VERIFY_SIGNATURES` | `full` | signature verification level: `full`, `backfill-only`, or `none`. | 19 + | `CURSOR_SAVE_INTERVAL` | `10` | interval (in seconds) to save the firehose cursor. | 20 + | `REPO_FETCH_TIMEOUT` | `300` | timeout (in seconds) for fetching repositories. | 21 + | `CACHE_SIZE` | `256` | size of the database cache in MB. | 22 + | `IDENTITY_CACHE_SIZE` | `100000` | number of identity entries to cache. | 23 + | `API_PORT` | `3000` | port for the API server. | 24 + | `ENABLE_DEBUG` | `false` | enable debug endpoints. | 25 + | `DEBUG_PORT` | `3001` | port for debug endpoints (if enabled). | 26 + | `NO_LZ4_COMPRESSION` | `false` | disable lz4 compression for storage. | 27 + | `DISABLE_FIREHOSE` | `false` | disable firehose ingestion. | 28 + | `DISABLE_BACKFILL` | `false` | disable backfill processing. | 29 + 30 + ## api 31 + 32 + ### management 33 + 34 + - `POST /repo/add`: register a DID, start backfilling, and subscribe to updates. 35 + - body: `{ "dids": ["did:plc:..."] }` 36 + - `POST /repo/remove`: unregister a DID and delete all associated data. 37 + - body: `{ "dids": ["did:plc:..."] }` 38 + 39 + ### data access (xrpc) 40 + 41 + `hydrant` implements some AT Protocol XRPC endpoints for reading data: 42 + 43 + - `com.atproto.repo.getRecord`: retrieve a single record by collection and rkey. 44 + - `com.atproto.repo.listRecords`: list records in a collection, with pagination. 45 + - `systems.gaze.hydrant.countRecords`: count records in a collection. 46 + 47 + ### event stream 48 + 49 + - `GET /stream`: subscribe to the event stream. 50 + - query parameters: 51 + - `cursor` (optional): start streaming from a specific event ID. 52 + 53 + ### stats 54 + 55 + - `GET /stats`: get aggregate counts of repos, records, events, and errors. 56 + 57 + ## vs `tap` 58 + 59 + while [`tap`](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) is designed primarily as a firehose consumer and relay, `hydrant` is flexible, it allows you to directly query the database for records, and it also provides an ordered view of events, allowing the use of a cursor to fetch events from a specific point in time. 60 + 61 + ### stream behavior 62 + 63 + the `WS /stream` (hydrant) and `WS /channel` (tap) endpoints have different designs: 64 + 65 + | aspect | `tap` (`/channel`) | `hydrant` (`/stream`) | 66 + | :--- | :--- | :--- | 67 + | distribution | sharded work queue: events are load-balanced across connected clients. If 5 clients connect, each receives ~20% of events. | broadcast: every connected client receives a full copy of the event stream. If 5 clients connect, all 5 receive 100% of events. | 68 + | cursors | server-managed: clients ACK messages. The server tracks progress and redelivers unacked messages. | client-managed: client provides `?cursor=123`. The server streams from that point. | 69 + | backfill | integrated queue: backfill events are mixed into the live queue and prioritized by the server. | unified log: backfill simply inserts "historical" events (`live: false`) into the global event log. streaming is just reading this log sequentially. | 70 + | event types | `record`, `identity` (includes status) | `record`, `identity` (handle), `account` (status) | 71 + | persistence | **full**: all events are stored and replayable. | **hybrid**: `record` events are persisted/replayable. `identity`/`account` are ephemeral/live-only. |
+1 -1
src/config.rs
··· 95 95 .unwrap_or_else(|| Ok(vec![Url::parse("https://plc.wtf").unwrap()]))?; 96 96 97 97 let full_network = cfg!("FULL_NETWORK", false); 98 - let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 32usize); 98 + let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 128usize); 99 99 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 10, sec); 100 100 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec); 101 101