···3A small service that receives events from the AT firehose and produces them to Kafka. Supports standard JSON outputs as well as [Osprey](https://github.com/roostorg/osprey)
4formatted events.
5006## Usage
78### Docker Compose
···1112```yaml
13environment:
14- ATKAFKA_RELAY_HOST: "wss://bsky.network"
15- ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092"
16- ATKAFKA_OUTPUT_TOPIC: "atproto-events"
17- ATKAFKA_OSPREY_COMPATIBLE: "false"
18-```
1920-Then start:
002122-```bash
23-docker compose up -d # Start services
24-```
25-26-### Docker Run
2728-For standard mode:
0002930-```bash
31-docker run -d \
32- -e ATKAFKA_BOOTSTRAP_SERVERS=kafka:9092 \
33- -e ATKAFKA_OUTPUT_TOPIC=atproto-events \
34- -p 2112:2112 \
35- ghcr.io/haileyok/at-kafka:latest
36```
3738-For Osprey-compatible mode:
3940```bash
41-docker run -d \
42- -e ATKAFKA_BOOTSTRAP_SERVERS=kafka:9092 \
43- -e ATKAFKA_OUTPUT_TOPIC=atproto-events \
44- -e ATKAFKA_OSPREY_COMPATIBLE=true \
45- -p 2112:2112 \
46- ghcr.io/haileyok/at-kafka:latest
47-```
4849-## Configuration
05051-| Flag | Environment Variable | Default | Description |
52-|------|---------------------|---------|-------------|
53-| `--relay-host` | `ATKAFKA_RELAY_HOST` | `wss://bsky.network` | AT Protocol relay host to connect to |
54-| `--bootstrap-servers` | `ATKAFKA_BOOTSTRAP_SERVERS` | (required) | Comma-separated list of Kafka bootstrap servers |
55-| `--output-topic` | `ATKAFKA_OUTPUT_TOPIC` | (required) | Kafka topic to publish events to |
56-| `--osprey-compatible` | `ATKAFKA_OSPREY_COMPATIBLE` | `false` | Enable Osprey-compatible event format |
5758## Event Structure
59···143144- `atkafka_handled_events` - Total events that are received on the firehose and handled
145- `atkafka_produced_events` - Total messages that are output on the bus
0000
···3A small service that receives events from the AT firehose and produces them to Kafka. Supports standard JSON outputs as well as [Osprey](https://github.com/roostorg/osprey)
4formatted events.
56+Additionally, at-kafka supports subscribing to [Tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) if youare attempting to perform a network backfill.
7+8## Usage
910### Docker Compose
···1314```yaml
15environment:
16+ # For relay mode
17+ ATKAFKA_RELAY_HOST: "wss://bsky.network" # ATProto relay to subscribe to for events
0001819+ # For tap mode
20+ ATKAFKA_TAP_HOST: "ws://localhost:2480" # Tap websocket host to subscribe to for events
21+ ATKAFKA_DISABLE_ACKS: false # Whether to disable sending of acks to Tap
2223+ # Kafka configuration
24+ ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" # Kafka bootstrap servers, comma separated
25+ ATKAFKA_OUTPUT_TOPIC: "atproto-events" # The output topic for events
26+ ATKAFKA_OSPREY_COMPATIBLE: false # Whether to produce Osprey-compatible events
02728+ # Match only Blacksky PDS users
29+ ATKAFKA_MATCHED_SERVICES: "blacksky.app" # A comma-separated list of PDSes to emit events for
30+ # OR ignore anyone on Bluesky PBC PDSes
31+ ATKAFKA_IGNORED_SERVICES: "*.bsky.network" # OR a comma-separated list of PDSes to _not_ emit events for
3233+ # Match only Teal.fm records
34+ ATKAFKA_MATCHED_COLLECTIONS: "fm.teal.*" # A comma-separated list of collections to emit events for
35+ # OR ignore all Bluesky records
36+ ATKAFKA_IGNORED_COLLECTIONS: "app.bsky.*" # OR a comma-separated list of collections to ignore events for
0037```
3839+Then start:
4041```bash
42+# For normal mode
43+docker compose up -d
000004445+# For tap mode
46+docker compose -f docker-compose.tap.yml up -d
4748+```
000004950## Event Structure
51···135136- `atkafka_handled_events` - Total events that are received on the firehose and handled
137- `atkafka_produced_events` - Total messages that are output on the bus
138+- `atkafka_plc_requests` - Total number of PLC requests that were made, if applicable, and whether they were cached
139+- `atkafka_api_requests` - Total number of API requests that were made, if applicable, and whether they were cached
140+- `atkafka_cache_size` - The size of the PLC and API caches
141+- `atkafka_acks_sent` - Total acks that were sent to Tap, if applicable
+10-3
atkafka/atkafka.go
···2829type Server struct {
30 relayHost string
0031 bootstrapServers []string
32 outputTopic string
33 ospreyCompat bool
···42 plcClient *PlcClient
43 apiClient *ApiClient
44 logger *slog.Logger
045}
4647type ServerArgs struct {
48 // network params
49- RelayHost string
50- PlcHost string
51- ApiHost string
005253 // for watched and ignoed services or collections, only one list may be supplied
54 // for both services and collections, wildcards are acceptable. for example:
···113114 s := &Server{
115 relayHost: args.RelayHost,
00116 plcClient: plcClient,
117 apiClient: apiClient,
118 bootstrapServers: args.BootstrapServers,
···2829type Server struct {
30 relayHost string
31+ tapHost string
32+ disableAcks bool
33 bootstrapServers []string
34 outputTopic string
35 ospreyCompat bool
···44 plcClient *PlcClient
45 apiClient *ApiClient
46 logger *slog.Logger
47+ ws *websocket.Conn
48}
4950type ServerArgs struct {
51 // network params
52+ RelayHost string
53+ TapHost string
54+ DisableAcks bool
55+ PlcHost string
56+ ApiHost string
5758 // for watched and ignoed services or collections, only one list may be supplied
59 // for both services and collections, wildcards are acceptable. for example:
···118119 s := &Server{
120 relayHost: args.RelayHost,
121+ tapHost: args.TapHost,
122+ disableAcks: args.DisableAcks,
123 plcClient: plcClient,
124 apiClient: apiClient,
125 bootstrapServers: args.BootstrapServers,