this repo has no description
Go 97.0%
Dockerfile 3.0%
32 2 3

Clone this repository

https://tangled.org/hailey.at/at-kafka https://tangled.org/did:plc:oisofpd7lj26yvgiivf3lxsi/at-kafka
git@knot.hailey.at:hailey.at/at-kafka git@knot.hailey.at:did:plc:oisofpd7lj26yvgiivf3lxsi/at-kafka

For self-hosted knots, clone URLs may differ based on your setup.

Download tar.gz
README.md

at-kafka#

A small service that receives events from ATProto and produces them to Kafka. Supports:

  • Firehose events from relay or Tap (for network backfill)
  • Ozone moderation events
  • Standard JSON and Osprey-compatible event formats

Usage#

Docker Compose#

Three compose files are provided:

# Firehose relay mode (default)
docker compose up -d

# Firehose tap mode (for backfill)
docker compose -f docker-compose.tap.yml up -d

# Ozone moderation events
docker compose -f docker-compose.ozone.yml up -d

Firehose Configuration#

environment:
  # Relay/Tap connection
  ATKAFKA_RELAY_HOST: "wss://bsky.network"
  ATKAFKA_TAP_HOST: "ws://localhost:2480"
  ATKAFKA_DISABLE_ACKS: false

  # Kafka
  ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092"
  ATKAFKA_OUTPUT_TOPIC: "atproto-events"
  ATKAFKA_OSPREY_COMPATIBLE: false

  # Filtering
  ATKAFKA_WATCHED_SERVICES: "*.bsky.network"
  ATKAFKA_IGNORED_SERVICES: "blacksky.app"
  ATKAFKA_WATCHED_COLLECTIONS: "app.bsky.*"
  ATKAFKA_IGNORED_COLLECTIONS: "fm.teal.*"

Ozone Configuration#

environment:
  # Ozone connection
  ATKAFKA_OZONE_PDS_HOST: "https://pds.example.com"
  ATKAFKA_OZONE_IDENTIFIER: "your.handle"
  ATKAFKA_OZONE_PASSWORD: "password"
  ATKAFKA_OZONE_LABELER_DID: "did:plc:..."

  # Kafka
  ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092"
  ATKAFKA_OUTPUT_TOPIC: "ozone-events"

CLI#

# Firehose modes
atkafka firehose relay --bootstrap-servers localhost:9092 --output-topic events
atkafka firehose tap --tap-host ws://localhost:2480 --bootstrap-servers localhost:9092 --output-topic events

# Ozone mode
atkafka ozone-events \
  --pds-host https://pds.example.com \
  --identifier admin@example.com \
  --password password \
  --labeler-did did:plc:... \
  --bootstrap-servers localhost:9092 \
  --output-topic ozone-events

Event Structure#

Firehose Events#

Events are structured similarly to the raw AT Protocol firehose, with one key difference: commit events are split into individual operation events.

Operation Event#

{
  "did": "did:plc:...",
  "timestamp": "2024-01-01T12:00:00.000Z",
  "operation": {
    "action": "create",
    "collection": "app.bsky.feed.post",
    "rkey": "some-rkey",
    "uri": "at://did:plc:123/app.bsky.feed.post/some-rkey",
    "cid": "bafyrei...",
    "path": "app.bsky.feed.post/...",
    "record": {
      "text": "Hello world!",
      "$type": "app.bsky.feed.post",
      "createdAt": "2024-01-01T12:00:00.000Z"
    }
  }
}

Account Event#

{
  "did": "did:plc:...",
  "timestamp": "2024-01-01T12:00:00.000Z",
  "account": {
    "active": true,
    "seq": 12345,
    "status": "active"
  }
}

Identity Event#

{
  "did": "did:plc:...",
  "timestamp": "2024-01-01T12:00:00.000Z",
  "identity": {
    "seq": 12345,
    "handle": "user.bsky.social"
  }
}

Osprey-Compatible Mode#

When --osprey-compatible is enabled, events are wrapped in the Osprey event format:

{
  "data": {
    "action_name": "operation#create",
    "action_id": 1234567890,
    "data": {
      "did": "did:plc:...",
      "timestamp": "2024-01-01T12:00:00.000Z",
      "operation": { ... }
    },
    "timestamp": "2024-01-01T12:00:00.000Z",
    "secret_data": {},
    "encoding": "UTF8"
  },
  "send_time": "2024-01-01T12:00:00Z"
}

Action names in Osprey mode:

  • operation#create - Record creation
  • operation#update - Record update
  • operation#delete - Record deletion
  • account - Account status changes
  • identity - Identity/handle changes
  • info - Informational messages

Ozone Events#

Ozone events are produced as-is from the tools.ozone.moderation.queryEvents API response. Events include moderation actions, reports, and other moderation activity. The cursor is persisted to disk and automatically resumed on restart.

Monitoring#

The service exposes Prometheus metrics on the default metrics port.

  • atkafka_handled_events - Total events that are received on the firehose and handled
  • atkafka_produced_events - Total messages that are output on the bus
  • atkafka_plc_requests - Total number of PLC requests that were made, if applicable, and whether they were cached
  • atkafka_api_requests - Total number of API requests that were made, if applicable, and whether they were cached
  • atkafka_cache_size - The size of the PLC and API caches
  • atkafka_acks_sent - Total acks that were sent to Tap, if applicable