at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
Rust 85.1%
Nushell 14.4%
Nix 0.5%
162 1 0

Clone this repository

https://tangled.org/ptr.pet/hydrant https://tangled.org/did:plc:dfl62fgb7wtjj3fcbb72naae/hydrant
git@knot.gaze.systems:ptr.pet/hydrant git@knot.gaze.systems:did:plc:dfl62fgb7wtjj3fcbb72naae/hydrant

For self-hosted knots, clone URLs may differ based on your setup.

Download tar.gz
README.md

hydrant#

hydrant is an AT Protocol indexer built on the fjall database that handles sync for you. it's flexible, supporting both full-network indexing and filtered indexing (e.g., by DID), also allowing querying with XRPCs and providing an ordered event stream with cursor support.

you can see random.wisp.place for an example on how to use hydrant.

vs tap#

while tap is designed as a firehose consumer and simply just propagates events while handling sync, hydrant is flexible, it allows you to directly query the database for records, and it also provides an ordered view of events, allowing the use of a cursor to fetch events from a specific point in time.

stream behavior#

the WS /stream (hydrant) and WS /channel (tap) endpoints have different designs:

aspect tap (/channel) hydrant (/stream)
distribution sharded work queue: events are load-balanced across connected clients. If 5 clients connect, each receives ~20% of events. broadcast: every connected client receives a full copy of the event stream. if 5 clients connect, all 5 receive 100% of events.
cursors server-managed: clients ACK messages. the server tracks progress and redelivers unacked messages. client-managed: client provides ?cursor=123. the server streams from that point.
persistence events are stored in an outbox and sent to the consumer, removing them, so they can't be replayed once they are acked. record events are replayable. identity/account are ephemeral. use GET /repos/:did to query identity / account info (handle, pds, signing key, etc.).
backfill backfill events are mixed into the live queue and prioritized (per-repo, acting as synchronization barrier) by the server. backfill simply inserts historical events (live: false) into the global event log. streaming is just reading this log sequentially. synchronization is the same as tap, live: true vs live: false.
event types record, identity (includes status) record, identity (handle), account (status)

configuration#

hydrant is configured via environment variables. all variables are prefixed with HYDRANT_.

variable default description
DATABASE_PATH ./hydrant.db path to the database folder.
LOG_LEVEL info log level (e.g., debug, info, warn, error).
RELAY_HOST wss://relay.fire.hose.cam websocket URL of the upstream firehose relay.
PLC_URL https://plc.wtf base URL(s) of the PLC directory (comma-separated for multiple).
FULL_NETWORK false if true, discovers and indexes all repositories in the network.
FILTER_SIGNALS comma-separated list of NSID patterns to use for the filter on startup (e.g. app.bsky.feed.post,app.bsky.graph.*).
FILTER_COLLECTIONS comma-separated list of NSID patterns to use for the collections filter on startup.
FILTER_EXCLUDES comma-separated list of DIDs to exclude from indexing on startup.
FIREHOSE_WORKERS 8 (32 if full network) number of concurrent workers for firehose events.
BACKFILL_CONCURRENCY_LIMIT 128 maximum number of concurrent backfill tasks.
VERIFY_SIGNATURES full signature verification level: full, backfill-only, or none.
CURSOR_SAVE_INTERVAL 5 interval (in seconds) to save the firehose cursor.
REPO_FETCH_TIMEOUT 300 timeout (in seconds) for fetching repositories.
CACHE_SIZE 256 size of the database cache in MB.
IDENTITY_CACHE_SIZE 1000000 number of identity entries to cache.
API_PORT 3000 port for the API server.
ENABLE_DEBUG false enable debug endpoints.
DEBUG_PORT 3001 port for debug endpoints (if enabled).
NO_LZ4_COMPRESSION false disable lz4 compression for storage.
EPHEMERAL false if enabled, no records are stored (XRPCs won't be reliable). events are only stored up to an hour for playback.
ENABLE_FIREHOSE true whether to ingest relay subscriptions.
ENABLE_BACKFILL true whether to backfill from PDS instances.
ENABLE_CRAWLER false (if Filter), true (if Full) whether to actively query the network for unknown repositories.
CRAWLER_MAX_PENDING_REPOS 2000 max pending repos for crawler.
CRAWLER_RESUME_PENDING_REPOS 1000 resume threshold for crawler pending repos.

api#

management#

  • GET /filter: get the current filter configuration.
  • PATCH /filter: update the filter configuration.

filter mode#

the mode field controls what gets indexed:

mode behaviour
filter auto-discovers and backfills any account whose firehose commit touches a collection matching one of the signals patterns. you can also explicitly track individual repositories via the /repos endpoint regardless of matching signals.
full index the entire network. signals are ignored for discovery, but excludes and collections still apply.

fields#

field type description
mode "filter" | "full" indexing mode (see above).
signals set update NSID patterns (e.g. app.bsky.feed.post or app.bsky.*) that trigger auto-discovery in filter mode.
collections set update NSID patterns used to filter which records are stored. if empty, all collections are stored. applies in all modes.
excludes set update set of DIDs to always skip, regardless of mode. checked before any other filter logic.

set updates#

each set field accepts one of two forms:

  • replace: an array replaces the entire set — ["did:plc:abc", "did:web:example.org"]
  • patch: an object maps items to true (add) or false (remove) — {"did:plc:abc": true, "did:web:example.org": false}

NSID patterns#

signals and collections support an optional .* suffix to match an entire namespace:

  • app.bsky.feed.post — exact match only
  • app.bsky.feed.* — matches any collection under app.bsky.feed

repository management#

  • GET /repos: get an NDJSON stream of repositories and their sync status. supports pagination and filtering:
    • limit: max results (default 100, max 1000)
    • cursor: opaque key for paginating.
    • partition: all (default), pending (backfill queue), or resync (retries)
  • GET /repos/{did}: get the sync status and metadata of a specific repository. also returns the handle, PDS URL and the atproto signing key (these won't be available before the repo has been backfilled once at least).
  • PUT /repos: explicitly track repositories. accepts an NDJSON body of {"did": "..."} (or JSON array of the same).
  • DELETE /repos: untrack repositories. accepts an NDJSON body of {"did": "..."} (or JSON array of the same). optionally include "deleteData": true to also purge the repository from the database.

data access (xrpc)#

hydrant implements the following XRPC endpoints under /xrpc/:

com.atproto.*#

the following are implemented currently:

  • com.atproto.repo.getRecord
  • com.atproto.repo.listRecords

systems.gaze.hydrant.*#

these are some non-standard XRPCs that might be useful.

systems.gaze.hydrant.countRecords#

return the total number of stored records in a collection.

param required description
identifier yes DID or handle of the repository.
collection yes NSID of the collection.

returns { count }.

event stream#

  • GET /stream: subscribe to the event stream.
    • query parameters:
      • cursor (optional): start streaming from a specific event ID.
  • POST /stream/ack: ack events.
    • body:
      • ids: list of event IDs to acknowledge.

stats#

  • GET /stats: get aggregate counts of repos, records, events, and errors.