···24242525## Project Structure
26262727-- **Namespace**: All firehose functionality under `Drinkup.Firehose.*`
2828- - `Drinkup.Firehose` - Main supervisor
2929- - `Drinkup.Firehose.Consumer` - Behaviour for handling all events
3030- - `Drinkup.Firehose.RecordConsumer` - Macro for handling commit record events with filtering
3131- - `Drinkup.Firehose.Event` - Event types (`Commit`, `Sync`, `Identity`, `Account`, `Info`)
3232- - `Drinkup.Firehose.Socket` - `:gen_statem` WebSocket connection manager
3333-- **Consumer Pattern**: Implement `@behaviour Drinkup.Firehose.Consumer` with `handle_event/1`
3434-- **RecordConsumer Pattern**: `use Drinkup.Firehose.RecordConsumer, collections: [~r/app\.bsky\.graph\..+/, "app.bsky.feed.post"]` with `handle_create/1`, `handle_update/1`, `handle_delete/1` overrides
2727+Each namespace (`Drinkup.Firehose`, `Drinkup.Jetstream`, `Drinkup.Tap`) follows a common architecture:
2828+2929+- **Core Modules**:
3030+ - `Consumer` - Behaviour/macro for handling events; `use Namespace` with `handle_event/1` implementation
3131+ - `Event` - Typed event structs specific to the protocol
3232+ - `Socket` - `:gen_statem` WebSocket connection manager
3333+ - `Options` (or top-level utility module) - Configuration and runtime utilities
3434+3535+- **Consumer Pattern**: `use Namespace, opts...` with `handle_event/1` callback; consumer module becomes a supervisor
3636+3737+### Namespace-Specific Details
3838+3939+- **Firehose** (`Drinkup.Firehose.*`): Full AT Protocol firehose
4040+ - Events: `Commit`, `Sync`, `Identity`, `Account`, `Info`
4141+ - Additional: `RecordConsumer` macro for filtered commit records with `handle_create/1`, `handle_update/1`, `handle_delete/1` callbacks
4242+ - Pattern: `use Drinkup.Firehose.RecordConsumer, collections: [~r/app\.bsky\.graph\..+/, "app.bsky.feed.post"]`
4343+4444+- **Jetstream** (`Drinkup.Jetstream.*`): Simplified JSON event stream
4545+ - Events: `Commit`, `Identity`, `Account`
4646+ - Config: `:wanted_collections`, `:wanted_dids`, `:compress` (zstd)
4747+ - Utility: `Drinkup.Jetstream.update_options/2` for dynamic filtering
4848+ - Semantics: Fire-and-forget (no acks)
4949+5050+- **Tap** (`Drinkup.Tap.*`): HTTP API + WebSocket indexer/backfill service
5151+ - Events: `Record`, `Identity`
5252+ - Config: `:host`, `:admin_password`, `:disable_acks`
5353+ - Utility: `Drinkup.Tap` HTTP API functions (`add_repos/2`, `remove_repos/2`, `get_repo_info/2`)
5454+ - Semantics: Ack/nack - return `:ok`/`{:ok, _}`/`nil` to ack, `{:error, _}` to nack (Tap retries)
35553656## Important Notes
3757
+2
CHANGELOG.md
···10101111### Breaking Changes
12121313+- Simplify usage by removing the concept of a separate "consumer", integrating
1414+ it directly into the socket's behaviour.
1315- Existing behaviour moved to `Drinkup.Firehose` namespace, to make way for
1416 alternate sync systems.
1517
+33-18
README.md
···31313232```elixir
3333defmodule MyApp.FirehoseConsumer do
3434- @behaviour Drinkup.Firehose.Consumer
3434+ use Drinkup.Firehose
35353636+ @impl true
3637 def handle_event(%Drinkup.Firehose.Event.Commit{} = event) do
3738 IO.inspect(event, label: "Commit")
3839 end
···4142end
42434344# In your supervision tree:
4444-children = [{Drinkup.Firehose, %{consumer: MyApp.FirehoseConsumer}}]
4545+children = [MyApp.FirehoseConsumer]
4646+```
4747+4848+For filtered commit events by collection:
4949+5050+```elixir
5151+defmodule MyApp.PostConsumer do
5252+ use Drinkup.Firehose.RecordConsumer,
5353+ collections: ["app.bsky.feed.post"]
5454+5555+ @impl true
5656+ def handle_create(record) do
5757+ IO.inspect(record, label: "New post")
5858+ end
5959+end
4560```
46614762### Jetstream
48634964```elixir
5065defmodule MyApp.JetstreamConsumer do
5151- @behaviour Drinkup.Jetstream.Consumer
6666+ use Drinkup.Jetstream,
6767+ wanted_collections: ["app.bsky.feed.post"]
52686969+ @impl true
5370 def handle_event(%Drinkup.Jetstream.Event.Commit{} = event) do
5471 IO.inspect(event, label: "Commit")
5572 end
···5875end
59766077# In your supervision tree:
6161-children = [
6262- {Drinkup.Jetstream, %{
6363- consumer: MyApp.JetstreamConsumer,
6464- wanted_collections: ["app.bsky.feed.post"]
6565- }}
6666-]
7878+children = [MyApp.JetstreamConsumer]
7979+8080+# Update filters dynamically:
8181+Drinkup.Jetstream.update_options(MyApp.JetstreamConsumer, %{
8282+ wanted_collections: ["app.bsky.graph.follow"]
8383+})
6784```
68856986### Tap
70877188```elixir
7289defmodule MyApp.TapConsumer do
7373- @behaviour Drinkup.Tap.Consumer
9090+ use Drinkup.Tap,
9191+ host: "http://localhost:2480"
74929393+ @impl true
7594 def handle_event(%Drinkup.Tap.Event.Record{} = event) do
7695 IO.inspect(event, label: "Record")
9696+ :ok
7797 end
78987979- def handle_event(_), do: :noop
9999+ def handle_event(_), do: :ok
80100end
8110182102# In your supervision tree:
8383-children = [
8484- {Drinkup.Tap, %{
8585- consumer: MyApp.TapConsumer,
8686- host: "http://localhost:2480"
8787- }}
8888-]
103103+children = [MyApp.TapConsumer]
8910490105# Track specific repos:
9191-Drinkup.Tap.add_repos(Drinkup.Tap, ["did:plc:abc123"])
106106+Drinkup.Tap.add_repos(MyTap.TapConsumer, ["did:plc:abc123"])
92107```
9310894109See [the examples](./examples) for some more complete samples.
+3-4
examples/firehose/basic_consumer.ex
···11defmodule BasicConsumer do
22- @behaviour Drinkup.Firehose.Consumer
22+ use Drinkup.Firehose
3344+ @impl true
45 def handle_event(%Drinkup.Firehose.Event.Commit{} = event) do
56 IO.inspect(event, label: "Got commit event")
67 end
···17181819 @impl true
1920 def init(_) do
2020- children = [
2121- {Drinkup.Firehose, %{consumer: BasicConsumer}}
2222- ]
2121+ children = [BasicConsumer]
23222423 Supervisor.init(children, strategy: :one_for_one)
2524 end
+4-3
examples/firehose/multiple_consumers.ex
···77end
8899defmodule IdentityConsumer do
1010- @behaviour Drinkup.Firehose.Consumer
1010+ use Drinkup.Firehose, name: :identities
11111212+ @impl true
1213 def handle_event(%Drinkup.Firehose.Event.Identity{} = event) do
1314 IO.inspect(event, label: "identity event")
1415 end
···2627 @impl true
2728 def init(_) do
2829 children = [
2929- {Drinkup.Firehose, %{consumer: PostDeleteConsumer}},
3030- {Drinkup.Firehose, %{consumer: IdentityConsumer, name: :identities}}
3030+ PostDeleteConsumer,
3131+ IdentityConsumer
3132 ]
32333334 Supervisor.init(children, strategy: :one_for_one)
+4-3
examples/firehose/record_consumer.ex
···22 use Drinkup.Firehose.RecordConsumer,
33 collections: [~r/app\.bsky\.graph\..+/, "app.bsky.feed.post"]
4455+ @impl true
56 def handle_create(record) do
67 IO.inspect(record, label: "create")
78 end
891010+ @impl true
911 def handle_update(record) do
1012 IO.inspect(record, label: "update")
1113 end
12141515+ @impl true
1316 def handle_delete(record) do
1417 IO.inspect(record, label: "delete")
1518 end
···24272528 @impl true
2629 def init(_) do
2727- children = [
2828- {Drinkup.Firehose, %{consumer: ExampleRecordConsumer}}
2929- ]
3030+ children = [ExampleRecordConsumer]
30313132 Supervisor.init(children, strategy: :one_for_one)
3233 end
+13-12
examples/jetstream/jetstream_consumer.ex
···88 - Account events (status changes)
99 """
10101111- @behaviour Drinkup.Jetstream.Consumer
1111+ use Drinkup.Jetstream,
1212+ name: MyJetstream,
1313+ wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"]
12141515+ @impl true
1316 def handle_event(%Drinkup.Jetstream.Event.Commit{operation: :create} = event) do
1417 IO.inspect(event, label: "New record created")
1518 :ok
···72757376 @impl true
7477 def init(_) do
7575- children = [
7676- # Connect to public Jetstream instance and filter for posts and likes
7777- {Drinkup.Jetstream,
7878- %{
7979- consumer: JetstreamConsumer,
8080- name: MyJetstream,
8181- wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"]
8282- }}
8383- ]
7878+ children = [JetstreamConsumer]
84798580 Supervisor.init(children, strategy: :one_for_one)
8681 end
···88838984# Example: Filter for all graph operations (follows, blocks, etc.)
9085defmodule GraphEventsConsumer do
9191- @behaviour Drinkup.Jetstream.Consumer
8686+ use Drinkup.Jetstream,
8787+ name: :graph_events,
8888+ wanted_collections: ["app.bsky.graph.*"]
92899090+ @impl true
9391 def handle_event(%Drinkup.Jetstream.Event.Commit{collection: "app.bsky.graph." <> _} = event) do
9492 IO.puts("Graph event: #{event.collection} - #{event.operation}")
9593 :ok
···1009810199# Example: Filter for specific DIDs
102100defmodule SpecificDIDConsumer do
103103- @behaviour Drinkup.Jetstream.Consumer
101101+ use Drinkup.Jetstream,
102102+ name: :specific_dids,
103103+ wanted_dids: ["did:plc:abc123", "did:plc:def456"]
104104105105 @watched_dids [
106106 "did:plc:abc123",
107107 "did:plc:def456"
108108 ]
109109110110+ @impl true
110111 def handle_event(%Drinkup.Jetstream.Event.Commit{did: did} = event)
111112 when did in @watched_dids do
112113 IO.puts("Activity from watched DID: #{did}")
+8-10
examples/tap/tap_consumer.ex
···11defmodule TapConsumer do
22- @behaviour Drinkup.Tap.Consumer
22+ use Drinkup.Tap,
33+ name: MyTap,
44+ host: "http://localhost:2480"
3566+ @impl true
47 def handle_event(%Drinkup.Tap.Event.Record{} = record) do
58 IO.inspect(record, label: "Tap record event")
99+ :ok
610 end
711812 def handle_event(%Drinkup.Tap.Event.Identity{} = identity) do
913 IO.inspect(identity, label: "Tap identity event")
1414+ :ok
1015 end
1116end
12171313-defmodule TapExampleSupervisor do
1818+defmodule ExampleTapConsumer do
1419 use Supervisor
15201621 def start_link(arg \\ []) do
···19242025 @impl true
2126 def init(_) do
2222- children = [
2323- {Drinkup.Tap,
2424- %{
2525- consumer: TapConsumer,
2626- name: MyTap,
2727- host: "http://localhost:2480"
2828- }}
2929- ]
2727+ children = [TapConsumer]
30283129 Supervisor.init(children, strategy: :one_for_one)
3230 end
+133-25
lib/firehose.ex
···11defmodule Drinkup.Firehose do
22- use Supervisor
33- alias Drinkup.Firehose.Options
22+ @moduledoc """
33+ Module for handling events from the AT Protocol [firehose](https://docs.bsky.app/docs/advanced-guides/firehose).
44+55+ Due to the nature of the firehose, this will result in a lot of incoming
66+ traffic as it receives every repo and identity event within the network. If
77+ you're concerened about bandwidth constaints or just don't need a
88+ whole-network sync, you may be better off using `Drinkup.Jetstream` or
99+ `Drinkup.Tap`.
1010+1111+ ## Usage
1212+1313+ defmodule MyFirehoseConsumer do
1414+ use Drinkup.Firehose,
1515+ name: :my_firehose,
1616+ host: "https://bsky.network",
1717+ cursor: nil
1818+1919+ @impl true
2020+ def handle_event(%Drinkup.Firehose.Event.Commit{} = event) do
2121+ IO.inspect(event, label: "Commit")
2222+ :ok
2323+ end
2424+2525+ def handle_event(_event), do: :ok
2626+ end
2727+2828+ # In your application supervision tree:
2929+ children = [MyFirehoseConsumer]
3030+3131+ Exceptions raised by `handle_event/1` will be logged instead of killing and
3232+ restarting the socket process.
3333+3434+ ## Options
3535+3636+ - `:name` - Unique name for this Firehose instance (default: the module name)
3737+ - `:host` - Firehose relay URL (default: `"https://bsky.network"`)
3838+ - `:cursor` - Optional sequence number to resume streaming from
3939+4040+ ## Runtime Configuration
4141+4242+ You can override options at runtime by providing them to `child_spec/1`:
44355- @dialyzer nowarn_function: {:init, 1}
66- @impl true
77- def init({%Options{name: name} = drinkup_options, supervisor_options}) do
88- children = [
99- {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, Tasks}}}},
1010- {Drinkup.Firehose.Socket, drinkup_options}
1111- ]
4444+ children = [
4545+ {MyFirehoseConsumer, name: :runtime_name, cursor: 12345}
4646+ ]
12471313- Supervisor.start_link(
1414- children,
1515- supervisor_options ++ [name: {:via, Registry, {Drinkup.Registry, {name, Supervisor}}}]
1616- )
1717- end
4848+ ## Event Types
4949+5050+ `handle_event/1` will receive the following event structs:
18511919- @spec child_spec(Options.options()) :: Supervisor.child_spec()
2020- def child_spec(%{} = options), do: child_spec({options, [strategy: :one_for_one]})
5252+ - `Drinkup.Firehose.Event.Commit` - Repository commits
5353+ - `Drinkup.Firehose.Event.Sync` - Sync events
5454+ - `Drinkup.Firehose.Event.Identity` - Identity updates
5555+ - `Drinkup.Firehose.Event.Account` - Account status changes
5656+ - `Drinkup.Firehose.Event.Info` - Info messages
5757+ """
21582222- @spec child_spec({Options.options(), Keyword.t()}) :: Supervisor.child_spec()
2323- def child_spec({drinkup_options, supervisor_options}) do
2424- %{
2525- id: Map.get(drinkup_options, :name, __MODULE__),
2626- start: {__MODULE__, :init, [{Options.from(drinkup_options), supervisor_options}]},
2727- type: :supervisor,
2828- restart: :permanent,
2929- shutdown: 500
3030- }
5959+ defmacro __using__(opts) do
6060+ quote location: :keep, bind_quoted: [opts: opts] do
6161+ use Supervisor
6262+ @behaviour Drinkup.Firehose.Consumer
6363+6464+ alias Drinkup.Firehose.Options
6565+6666+ # Store compile-time options as module attributes
6767+ @name Keyword.get(opts, :name)
6868+ @host Keyword.get(opts, :host, "https://bsky.network")
6969+ @cursor Keyword.get(opts, :cursor)
7070+7171+ @doc """
7272+ Starts the Firehose consumer supervisor.
7373+7474+ Accepts optional runtime configuration that overrides compile-time options.
7575+ """
7676+ def start_link(runtime_opts \\ []) do
7777+ # Merge compile-time and runtime options
7878+ opts = build_options(runtime_opts)
7979+ Supervisor.start_link(__MODULE__, opts, name: via_tuple(opts.name))
8080+ end
8181+8282+ @impl true
8383+ def init(%Options{name: name} = options) do
8484+ children = [
8585+ {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, Tasks}}}},
8686+ {Drinkup.Firehose.Socket, options}
8787+ ]
8888+8989+ Supervisor.init(children, strategy: :one_for_one)
9090+ end
9191+9292+ @doc """
9393+ Returns a child spec for adding this consumer to a supervision tree.
9494+9595+ Runtime options override compile-time options.
9696+ """
9797+ def child_spec(runtime_opts) when is_list(runtime_opts) do
9898+ opts = build_options(runtime_opts)
9999+100100+ %{
101101+ id: opts.name,
102102+ start: {__MODULE__, :start_link, [runtime_opts]},
103103+ type: :supervisor,
104104+ restart: :permanent,
105105+ shutdown: 500
106106+ }
107107+ end
108108+109109+ def child_spec(_opts) do
110110+ raise ArgumentError, "child_spec expects a keyword list of options"
111111+ end
112112+113113+ defoverridable child_spec: 1
114114+115115+ # Build Options struct from compile-time and runtime options
116116+ defp build_options(runtime_opts) do
117117+ # Compile-time defaults
118118+ compile_opts = [
119119+ name: @name || __MODULE__,
120120+ host: @host,
121121+ cursor: @cursor
122122+ ]
123123+124124+ # Merge with runtime opts (runtime takes precedence)
125125+ merged =
126126+ compile_opts
127127+ |> Keyword.merge(runtime_opts)
128128+ |> Enum.reject(fn {_k, v} -> is_nil(v) end)
129129+ |> Map.new()
130130+ |> Map.put(:consumer, __MODULE__)
131131+132132+ Options.from(merged)
133133+ end
134134+135135+ defp via_tuple(name) do
136136+ {:via, Registry, {Drinkup.Registry, {name, Supervisor}}}
137137+ end
138138+ end
31139 end
32140end
+4-4
lib/firehose/consumer.ex
···11defmodule Drinkup.Firehose.Consumer do
22 @moduledoc """
33- An unopinionated consumer of the Firehose. Will receive all events, not just commits.
44- """
33+ Behaviour for handling Firehose events.
5466- alias Drinkup.Firehose.Event
55+ Implemented by `Drinkup.Firehose`, you'll likely want to be using that instead.
66+ """
7788- @callback handle_event(Event.t()) :: any()
88+ @callback handle_event(Drinkup.Firehose.Event.t()) :: any()
99end
+48-3
lib/firehose/record_consumer.ex
···11defmodule Drinkup.Firehose.RecordConsumer do
22 @moduledoc """
33- An opinionated consumer of the Firehose that eats consumers
33+ Opinionated consumer of the Firehose focused on record operations.
44+55+ This is an abstraction over the core `Drinkup.Firehose` implementation
66+ designed for easily handling `commit` events, with the ability to filter by
77+ collection. It's similiar to `Drinkup.Jetstream`, but using the Firehose
88+ directly (and currently more naive).
99+1010+ ## Example
1111+1212+ defmodule MyRecordConsumer do
1313+ use Drinkup.Firehose.RecordConsumer,
1414+ collections: ["app.bsky.feed.post", ~r/app\\.bsky\\.graph\\..+/],
1515+ name: :my_records,
1616+ host: "https://bsky.network"
1717+1818+ @impl true
1919+ def handle_create(record) do
2020+ IO.inspect(record, label: "New record")
2121+ end
2222+2323+ @impl true
2424+ def handle_delete(record) do
2525+ IO.inspect(record, label: "Deleted record")
2626+ end
2727+ end
2828+2929+ # In your application supervision tree:
3030+ children = [MyRecordConsumer]
3131+3232+ ## Options
3333+3434+ All options from `Drinkup.Firehose` are supported, plus:
3535+3636+ - `:collections` - List of collection NSIDs (strings or regexes) to filter. If
3737+ empty or not provided, all collections are processed.
3838+3939+ ## Callbacks
4040+4141+ Implement these callbacks to handle different record actions:
4242+4343+ - `handle_create/1` - Called when a record is created
4444+ - `handle_update/1` - Called when a record is updated
4545+ - `handle_delete/1` - Called when a record is deleted
4646+4747+ All callbacks receive a `Drinkup.Firehose.RecordConsumer.Record` struct.
448 """
549650 @callback handle_create(any()) :: any()
···852 @callback handle_delete(any()) :: any()
9531054 defmacro __using__(opts) do
1111- {collections, _opts} = Keyword.pop(opts, :collections, [])
5555+ {collections, firehose_opts} = Keyword.pop(opts, :collections, [])
12561357 quote location: :keep do
1414- @behaviour Drinkup.Firehose.Consumer
5858+ use Drinkup.Firehose, unquote(firehose_opts)
1559 @behaviour Drinkup.Firehose.RecordConsumer
16606161+ @impl true
1762 def handle_event(%Drinkup.Firehose.Event.Commit{} = event) do
1863 event.ops
1964 |> Enum.filter(fn %{path: path} ->
+121-55
lib/jetstream.ex
···11defmodule Drinkup.Jetstream do
22 @moduledoc """
33- Supervisor for Jetstream event stream connections.
33+ Module for handling events from an AT Protocol
44+ [Jetstream](https://github.com/bluesky-social/jetstream) instance.
4555- Jetstream is a simplified JSON event stream that converts the CBOR-encoded
66- ATProto Firehose into lightweight, friendly JSON events. It provides zstd
77- compression and filtering capabilities for collections and DIDs.
66+ Jetstream is an abstraction over the raw AT Protocol firehose that converts
77+ the CBOR-encoded events into easier to handle JSON objects, and also provides
88+ the ability to filter the events received by repository DID or collection
99+ NSID. This is useful when you know specifically which repos or collections you
1010+ want events from, and thus reduces the amount of bandwidth consumed vs
1111+ consuming the raw firehose directly.
1212+1313+ If you need a solution for easy backfilling from repositories and not just a
1414+ firehose translation layer, check out `Drinkup.Tap`.
815916 ## Usage
10171111- Add Jetstream to your supervision tree:
1818+ defmodule MyJetstreamConsumer do
1919+ use Drinkup.Jetstream,
2020+ name: :my_jetstream,
2121+ wanted_collections: ["app.bsky.feed.post"]
12221313- children = [
1414- {Drinkup.Jetstream, %{
1515- consumer: MyJetstreamConsumer,
1616- name: MyJetstream,
1717- wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"]
1818- }}
1919- ]
2323+ @impl true
2424+ def handle_event(event) do
2525+ IO.inspect(event)
2626+ end
2727+ end
2828+2929+ # In your application supervision tree:
3030+ children = [MyJetstreamConsumer]
20312132 ## Configuration
22332323- See `Drinkup.Jetstream.Options` for all available configuration options.
3434+ See `Drinkup.Jetstream.Consumer` for all available configuration options.
24352536 ## Dynamic Filter Updates
26372738 You can update filters after the connection is established:
28392929- Drinkup.Jetstream.update_options(MyJetstream, %{
4040+ Drinkup.Jetstream.update_options(:my_jetstream, %{
3041 wanted_collections: ["app.bsky.graph.follow"],
3142 wanted_dids: ["did:plc:abc123"]
3243 })
···3647 By default Drinkup connects to `jetstream2.us-east.bsky.network`.
37483849 Bluesky operates a few different Jetstream instances:
3939- - `jetstream1.us-east.bsky.network`
4040- - `jetstream2.us-east.bsky.network`
4141- - `jetstream1.us-west.bsky.network`
4242- - `jetstream2.us-west.bsky.network`
5050+ - `wss://jetstream1.us-east.bsky.network`
5151+ - `wss://jetstream2.us-east.bsky.network`
5252+ - `wss://jetstream1.us-west.bsky.network`
5353+ - `wss://jetstream2.us-west.bsky.network`
43544444- There also some third-party instances not run by Bluesky PBC:
4545- - `jetstream.fire.hose.cam`
4646- - `jetstream2.fr.hose.cam`
4747- - `jetstream1.us-east.fire.hose.cam`
5555+ There also some third-party instances not run by Bluesky PBC, including but not limited to:
5656+ - `wss://jetstream.fire.hose.cam`
5757+ - `wss://jetstream2.fr.hose.cam`
5858+ - `wss://jetstream1.us-east.fire.hose.cam`
5959+6060+ https://firehose.stream/ also hosts several instances around the world.
4861 """
49625050- use Supervisor
5163 require Logger
5252- alias Drinkup.Jetstream.Options
53645454- @dialyzer nowarn_function: {:init, 1}
6565+ defmacro __using__(opts) do
6666+ quote location: :keep, bind_quoted: [opts: opts] do
6767+ use Supervisor
6868+ @behaviour Drinkup.Jetstream.Consumer
55695656- @impl true
5757- def init({%Options{name: name} = drinkup_options, supervisor_options}) do
5858- children = [
5959- {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, JetstreamTasks}}}},
6060- {Drinkup.Jetstream.Socket, drinkup_options}
6161- ]
7070+ alias Drinkup.Jetstream.Options
62716363- Supervisor.start_link(
6464- children,
6565- supervisor_options ++
6666- [name: {:via, Registry, {Drinkup.Registry, {name, JetstreamSupervisor}}}]
6767- )
6868- end
7272+ # Store compile-time options as module attributes
7373+ @name Keyword.get(opts, :name)
7474+ @host Keyword.get(opts, :host, "wss://jetstream2.us-east.bsky.network")
7575+ @wanted_collections Keyword.get(opts, :wanted_collections, [])
7676+ @wanted_dids Keyword.get(opts, :wanted_dids, [])
7777+ @cursor Keyword.get(opts, :cursor)
7878+ @require_hello Keyword.get(opts, :require_hello, false)
7979+ @max_message_size_bytes Keyword.get(opts, :max_message_size_bytes)
69807070- @spec child_spec(Options.options()) :: Supervisor.child_spec()
7171- def child_spec(%{} = options), do: child_spec({options, [strategy: :one_for_one]})
8181+ @doc """
8282+ Starts the Jetstream consumer supervisor.
72837373- @spec child_spec({Options.options(), Keyword.t()}) :: Supervisor.child_spec()
7474- def child_spec({drinkup_options, supervisor_options}) do
7575- %{
7676- id: Map.get(drinkup_options, :name, __MODULE__),
7777- start: {__MODULE__, :init, [{Options.from(drinkup_options), supervisor_options}]},
7878- type: :supervisor,
7979- restart: :permanent,
8080- shutdown: 500
8181- }
8484+ Accepts optional runtime configuration that overrides compile-time options.
8585+ """
8686+ def start_link(runtime_opts \\ []) do
8787+ opts = build_options(runtime_opts)
8888+ Supervisor.start_link(__MODULE__, opts, name: via_tuple(opts.name))
8989+ end
9090+9191+ @impl true
9292+ def init(%Options{name: name} = options) do
9393+ children = [
9494+ {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, JetstreamTasks}}}},
9595+ {Drinkup.Jetstream.Socket, options}
9696+ ]
9797+9898+ Supervisor.init(children, strategy: :one_for_one)
9999+ end
100100+101101+ @doc """
102102+ Returns a child spec for adding this consumer to a supervision tree.
103103+104104+ Runtime options override compile-time options.
105105+ """
106106+ def child_spec(runtime_opts) when is_list(runtime_opts) do
107107+ opts = build_options(runtime_opts)
108108+109109+ %{
110110+ id: opts.name,
111111+ start: {__MODULE__, :start_link, [runtime_opts]},
112112+ type: :supervisor,
113113+ restart: :permanent,
114114+ shutdown: 500
115115+ }
116116+ end
117117+118118+ def child_spec(_opts) do
119119+ raise ArgumentError, "child_spec expects a keyword list of options"
120120+ end
121121+122122+ defoverridable child_spec: 1
123123+124124+ # Build Options struct from compile-time and runtime options
125125+ defp build_options(runtime_opts) do
126126+ compile_opts = [
127127+ name: @name || __MODULE__,
128128+ host: @host,
129129+ wanted_collections: @wanted_collections,
130130+ wanted_dids: @wanted_dids,
131131+ cursor: @cursor,
132132+ require_hello: @require_hello,
133133+ max_message_size_bytes: @max_message_size_bytes
134134+ ]
135135+136136+ merged =
137137+ compile_opts
138138+ |> Keyword.merge(runtime_opts)
139139+ |> Enum.reject(fn {_k, v} -> is_nil(v) end)
140140+ |> Map.new()
141141+ |> Map.put(:consumer, __MODULE__)
142142+143143+ Options.from(merged)
144144+ end
145145+146146+ defp via_tuple(name) do
147147+ {:via, Registry, {Drinkup.Registry, {name, JetstreamSupervisor}}}
148148+ end
149149+ end
82150 end
8315184152 # Options Update API
···107175108176 ## Parameters
109177110110- - `name` - The name of the Jetstream instance (default: `Drinkup.Jetstream`)
178178+ - `name` - The name of the Jetstream consumer (the `:name` option passed to `use Drinkup.Jetstream`)
111179 - `opts` - Map with optional fields:
112180 - `:wanted_collections` - List of collection NSIDs or prefixes (max 100)
113181 - `:wanted_dids` - List of DIDs to filter (max 10,000)
···116184 ## Examples
117185118186 # Filter to only posts
119119- Drinkup.Jetstream.update_options(MyJetstream, %{
187187+ Drinkup.Jetstream.update_options(:my_jetstream, %{
120188 wanted_collections: ["app.bsky.feed.post"]
121189 })
122190123191 # Filter to specific DIDs
124124- Drinkup.Jetstream.update_options(MyJetstream, %{
192192+ Drinkup.Jetstream.update_options(:my_jetstream, %{
125193 wanted_dids: ["did:plc:abc123", "did:plc:def456"]
126194 })
127195128196 # Disable all filters (receive all events)
129129- Drinkup.Jetstream.update_options(MyJetstream, %{
197197+ Drinkup.Jetstream.update_options(:my_jetstream, %{
130198 wanted_collections: [],
131199 wanted_dids: []
132200 })
···140208 Invalid updates will result in the connection being closed by the server.
141209 """
142210 @spec update_options(atom(), update_opts()) :: :ok | {:error, term()}
143143- def update_options(name \\ Drinkup.Jetstream, opts) when is_map(opts) do
211211+ def update_options(name, opts) when is_atom(name) and is_map(opts) do
144212 case find_connection(name) do
145213 {:ok, {conn, stream}} ->
146214 message = build_options_update_message(opts)
···153221 {:error, reason}
154222 end
155223 end
156156-157157- # Private functions
158224159225 @spec find_connection(atom()) :: {:ok, {pid(), :gun.stream_ref()}} | {:error, :not_connected}
160226 defp find_connection(name) do
+2-52
lib/jetstream/consumer.ex
···11defmodule Drinkup.Jetstream.Consumer do
22 @moduledoc """
33- Consumer behaviour for handling Jetstream events.
44-55- Implement this behaviour to process events from a Jetstream instance.
66- Events are dispatched asynchronously via `Task.Supervisor`.
77-88- Unlike Tap, Jetstream does not require event acknowledgments. Events are
99- processed in a fire-and-forget manner.
1010-1111- ## Example
1212-1313- defmodule MyJetstreamConsumer do
1414- @behaviour Drinkup.Jetstream.Consumer
1515-1616- def handle_event(%Drinkup.Jetstream.Event.Commit{operation: :create} = event) do
1717- # Handle new record creation
1818- IO.inspect(event, label: "New record")
1919- :ok
2020- end
2121-2222- def handle_event(%Drinkup.Jetstream.Event.Commit{operation: :delete} = event) do
2323- # Handle record deletion
2424- IO.inspect(event, label: "Deleted record")
2525- :ok
2626- end
2727-2828- def handle_event(%Drinkup.Jetstream.Event.Identity{} = event) do
2929- # Handle identity changes
3030- IO.inspect(event, label: "Identity update")
3131- :ok
3232- end
3333-3434- def handle_event(%Drinkup.Jetstream.Event.Account{active: false} = event) do
3535- # Handle account deactivation
3636- IO.inspect(event, label: "Account inactive")
3737- :ok
3838- end
33+ Behaviour for handling Jetstream events.
3944040- def handle_event(_event), do: :ok
4141- end
4242-4343- ## Event Types
4444-4545- The consumer will receive one of three event types:
4646-4747- - `Drinkup.Jetstream.Event.Commit` - Repository commits (create, update, delete)
4848- - `Drinkup.Jetstream.Event.Identity` - Identity updates (handle changes, etc.)
4949- - `Drinkup.Jetstream.Event.Account` - Account status changes (active, taken down, etc.)
5050-5151- ## Error Handling
5252-5353- If your `handle_event/1` implementation raises an exception, it will be logged
5454- but will not affect the stream. The error is caught and logged by the event
5555- dispatcher.
55+ Implemented by `Drinkup.Jetstream`, you'll likely want to be using that instead.
566 """
577588 alias Drinkup.Jetstream.Event
+159-61
lib/tap.ex
···11defmodule Drinkup.Tap do
22 @moduledoc """
33- Supervisor and HTTP API for Tap indexer/backfill service.
33+ Module for handling events from a
44+ [Tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) instance.
4555- Tap simplifies AT sync by handling the firehose connection, verification,
66- backfill, and filtering. Your application connects to a Tap service and
77- receives simple JSON events for only the repos and collections you care about.
66+ Tap is a complete sync and backfill solution which handles the firehose
77+ connection itself, and automatically searches for repositories to backfill
88+ from based on the options given to it. It's great for building an app that
99+ wants all of a certain set of records within the AT Protocol network.
1010+1111+ This module requires you to be running a properly configured Tap instance, it
1212+ doesn't spawn one for itself.
813914 ## Usage
10151111- Add Tap to your supervision tree:
1616+ defmodule MyTapConsumer do
1717+ use Drinkup.Tap,
1818+ name: :my_tap,
1919+ host: "http://localhost:2480",
2020+ admin_password: System.get_env("TAP_PASSWORD")
12211313- children = [
1414- {Drinkup.Tap, %{
1515- consumer: MyTapConsumer,
1616- name: MyTap,
1717- host: "http://localhost:2480",
1818- admin_password: "secret" # optional
1919- }}
2020- ]
2222+ @impl true
2323+ def handle_event(event) do
2424+ # Process event
2525+ :ok
2626+ end
2727+ end
2828+2929+ # In your application supervision tree:
3030+ children = [MyTapConsumer]
21312222- Then interact with the Tap HTTP API:
3232+ You can also interact with the Tap HTTP API to manually start tracking
3333+ specific repositories or get information about what's going on.
23342435 # Add repos to track (triggers backfill)
2525- Drinkup.Tap.add_repos(MyTap, ["did:plc:abc123"])
3636+ Drinkup.Tap.add_repos(:my_tap, ["did:plc:abc123"])
26372738 # Get stats
2828- {:ok, count} = Drinkup.Tap.get_repo_count(MyTap)
2929-3030- ## Configuration
3131-3232- Tap itself is configured via environment variables. See the Tap documentation
3333- for details on configuring collection filters, signal collections, and other
3434- operational settings:
3535- https://github.com/bluesky-social/indigo/blob/main/cmd/tap/README.md
3939+ {:ok, count} = Drinkup.Tap.get_repo_count(:my_tap)
3640 """
37413838- use Supervisor
3942 alias Drinkup.Tap.Options
40434141- @dialyzer nowarn_function: {:init, 1}
4242- @impl true
4343- def init({%Options{name: name} = drinkup_options, supervisor_options}) do
4444- # Register options in Registry for HTTP API access
4545- Registry.register(Drinkup.Registry, {name, TapOptions}, drinkup_options)
4444+ defmacro __using__(opts) do
4545+ quote location: :keep, bind_quoted: [opts: opts] do
4646+ use Supervisor
4747+ @behaviour Drinkup.Tap.Consumer
46484747- children = [
4848- {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, TapTasks}}}},
4949- {Drinkup.Tap.Socket, drinkup_options}
5050- ]
4949+ alias Drinkup.Tap.Options
5050+5151+ # Store compile-time options as module attributes
5252+ @name Keyword.get(opts, :name)
5353+ @host Keyword.get(opts, :host, "http://localhost:2480")
5454+ @admin_password Keyword.get(opts, :admin_password)
5555+ @disable_acks Keyword.get(opts, :disable_acks, false)
5656+5757+ @doc """
5858+ Starts the Tap consumer supervisor.
5959+6060+ Accepts optional runtime configuration that overrides compile-time options.
6161+ """
6262+ def start_link(runtime_opts \\ []) do
6363+ opts = build_options(runtime_opts)
6464+ Supervisor.start_link(__MODULE__, opts, name: via_tuple(opts.name))
6565+ end
6666+6767+ @impl true
6868+ def init(%Options{name: name} = options) do
6969+ # Register options in Registry for HTTP API access
7070+ Registry.register(Drinkup.Registry, {name, TapOptions}, options)
51715252- Supervisor.start_link(
5353- children,
5454- supervisor_options ++ [name: {:via, Registry, {Drinkup.Registry, {name, TapSupervisor}}}]
5555- )
5656- end
7272+ children = [
7373+ {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, TapTasks}}}},
7474+ {Drinkup.Tap.Socket, options}
7575+ ]
57765858- @spec child_spec(Options.options()) :: Supervisor.child_spec()
5959- def child_spec(%{} = options), do: child_spec({options, [strategy: :one_for_one]})
7777+ Supervisor.init(children, strategy: :one_for_one)
7878+ end
60796161- @spec child_spec({Options.options(), Keyword.t()}) :: Supervisor.child_spec()
6262- def child_spec({drinkup_options, supervisor_options}) do
6363- %{
6464- id: Map.get(drinkup_options, :name, __MODULE__),
6565- start: {__MODULE__, :init, [{Options.from(drinkup_options), supervisor_options}]},
6666- type: :supervisor,
6767- restart: :permanent,
6868- shutdown: 500
6969- }
8080+ @doc """
8181+ Returns a child spec for adding this consumer to a supervision tree.
8282+8383+ Runtime options override compile-time options.
8484+ """
8585+ def child_spec(runtime_opts) when is_list(runtime_opts) do
8686+ opts = build_options(runtime_opts)
8787+8888+ %{
8989+ id: opts.name,
9090+ start: {__MODULE__, :start_link, [runtime_opts]},
9191+ type: :supervisor,
9292+ restart: :permanent,
9393+ shutdown: 500
9494+ }
9595+ end
9696+9797+ def child_spec(_opts) do
9898+ raise ArgumentError, "child_spec expects a keyword list of options"
9999+ end
100100+101101+ defoverridable child_spec: 1
102102+103103+ # Build Options struct from compile-time and runtime options
104104+ defp build_options(runtime_opts) do
105105+ compile_opts = [
106106+ name: @name || __MODULE__,
107107+ host: @host,
108108+ admin_password: @admin_password,
109109+ disable_acks: @disable_acks
110110+ ]
111111+112112+ merged =
113113+ compile_opts
114114+ |> Keyword.merge(runtime_opts)
115115+ |> Enum.reject(fn {_k, v} -> is_nil(v) end)
116116+ |> Map.new()
117117+ |> Map.put(:consumer, __MODULE__)
118118+119119+ Options.from(merged)
120120+ end
121121+122122+ defp via_tuple(name) do
123123+ {:via, Registry, {Drinkup.Registry, {name, TapSupervisor}}}
124124+ end
125125+ end
70126 end
7112772128 # HTTP API Functions
···7613277133 Triggers backfill for the specified DIDs. Historical events will be fetched
78134 from each repo's PDS, followed by live events from the firehose.
135135+136136+ ## Parameters
137137+138138+ - `name` - The name of the Tap consumer (the `:name` option passed to `use Drinkup.Tap`)
139139+ - `dids` - List of DID strings to add
79140 """
80141 @spec add_repos(atom(), [String.t()]) :: {:ok, term()} | {:error, term()}
8181- def add_repos(name \\ Drinkup.Tap, dids) when is_list(dids) do
142142+ def add_repos(name, dids) when is_atom(name) and is_list(dids) do
82143 with {:ok, options} <- get_options(name),
83144 {:ok, response} <- make_request(options, :post, "/repos/add", %{dids: dids}) do
84145 {:ok, response}
···9015191152 Stops syncing the specified repos and deletes tracked repo metadata. Does not
92153 delete buffered events in the outbox.
154154+155155+ ## Parameters
156156+157157+ - `name` - The name of the Tap consumer (the `:name` option passed to `use Drinkup.Tap`)
158158+ - `dids` - List of DID strings to remove
93159 """
94160 @spec remove_repos(atom(), [String.t()]) :: {:ok, term()} | {:error, term()}
9595- def remove_repos(name \\ Drinkup.Tap, dids) when is_list(dids) do
161161+ def remove_repos(name, dids) when is_atom(name) and is_list(dids) do
96162 with {:ok, options} <- get_options(name),
97163 {:ok, response} <- make_request(options, :post, "/repos/remove", %{dids: dids}) do
98164 {:ok, response}
···101167102168 @doc """
103169 Resolve a DID to its DID document.
170170+171171+ ## Parameters
172172+173173+ - `name` - The name of the Tap consumer
174174+ - `did` - DID string to resolve
104175 """
105176 @spec resolve_did(atom(), String.t()) :: {:ok, term()} | {:error, term()}
106106- def resolve_did(name \\ Drinkup.Tap, did) when is_binary(did) do
177177+ def resolve_did(name, did) when is_atom(name) and is_binary(did) do
107178 with {:ok, options} <- get_options(name),
108179 {:ok, response} <- make_request(options, :get, "/resolve/#{did}") do
109180 {:ok, response}
···114185 Get info about a tracked repo.
115186116187 Returns repo state, repo rev, record count, error info, and retry count.
188188+189189+ ## Parameters
190190+191191+ - `name` - The name of the Tap consumer
192192+ - `did` - DID string to get info for
117193 """
118194 @spec get_repo_info(atom(), String.t()) :: {:ok, term()} | {:error, term()}
119119- def get_repo_info(name \\ Drinkup.Tap, did) when is_binary(did) do
195195+ def get_repo_info(name, did) when is_atom(name) and is_binary(did) do
120196 with {:ok, options} <- get_options(name),
121197 {:ok, response} <- make_request(options, :get, "/info/#{did}") do
122198 {:ok, response}
···125201126202 @doc """
127203 Get the total number of tracked repos.
204204+205205+ ## Parameters
206206+207207+ - `name` - The name of the Tap consumer
128208 """
129209 @spec get_repo_count(atom()) :: {:ok, integer()} | {:error, term()}
130130- def get_repo_count(name \\ Drinkup.Tap) do
210210+ def get_repo_count(name) when is_atom(name) do
131211 with {:ok, options} <- get_options(name),
132212 {:ok, response} <- make_request(options, :get, "/stats/repo-count") do
133213 {:ok, response}
···136216137217 @doc """
138218 Get the total number of tracked records.
219219+220220+ ## Parameters
221221+222222+ - `name` - The name of the Tap consumer
139223 """
140224 @spec get_record_count(atom()) :: {:ok, integer()} | {:error, term()}
141141- def get_record_count(name \\ Drinkup.Tap) do
225225+ def get_record_count(name) when is_atom(name) do
142226 with {:ok, options} <- get_options(name),
143227 {:ok, response} <- make_request(options, :get, "/stats/record-count") do
144228 {:ok, response}
···147231148232 @doc """
149233 Get the number of events in the outbox buffer.
234234+235235+ ## Parameters
236236+237237+ - `name` - The name of the Tap consumer
150238 """
151239 @spec get_outbox_buffer(atom()) :: {:ok, integer()} | {:error, term()}
152152- def get_outbox_buffer(name \\ Drinkup.Tap) do
240240+ def get_outbox_buffer(name) when is_atom(name) do
153241 with {:ok, options} <- get_options(name),
154242 {:ok, response} <- make_request(options, :get, "/stats/outbox-buffer") do
155243 {:ok, response}
···158246159247 @doc """
160248 Get the number of events in the resync buffer.
249249+250250+ ## Parameters
251251+252252+ - `name` - The name of the Tap consumer
161253 """
162254 @spec get_resync_buffer(atom()) :: {:ok, integer()} | {:error, term()}
163163- def get_resync_buffer(name \\ Drinkup.Tap) do
255255+ def get_resync_buffer(name) when is_atom(name) do
164256 with {:ok, options} <- get_options(name),
165257 {:ok, response} <- make_request(options, :get, "/stats/resync-buffer") do
166258 {:ok, response}
···169261170262 @doc """
171263 Get current firehose and list repos cursors.
264264+265265+ ## Parameters
266266+267267+ - `name` - The name of the Tap consumer
172268 """
173269 @spec get_cursors(atom()) :: {:ok, map()} | {:error, term()}
174174- def get_cursors(name \\ Drinkup.Tap) do
270270+ def get_cursors(name) when is_atom(name) do
175271 with {:ok, options} <- get_options(name),
176272 {:ok, response} <- make_request(options, :get, "/stats/cursors") do
177273 {:ok, response}
···182278 Check Tap health status.
183279184280 Returns `{:ok, %{"status" => "ok"}}` if healthy.
281281+282282+ ## Parameters
283283+284284+ - `name` - The name of the Tap consumer
185285 """
186286 @spec health(atom()) :: {:ok, map()} | {:error, term()}
187187- def health(name \\ Drinkup.Tap) do
287287+ def health(name) when is_atom(name) do
188288 with {:ok, options} <- get_options(name),
189289 {:ok, response} <- make_request(options, :get, "/health") do
190290 {:ok, response}
191291 end
192292 end
193193-194194- # Private Functions
195293196294 @spec get_options(atom()) :: {:ok, Options.t()} | {:error, :not_found}
197295 defp get_options(name) do
+2-37
lib/tap/consumer.ex
···11defmodule Drinkup.Tap.Consumer do
22 @moduledoc """
33- Consumer behaviour for handling Tap events.
44-55- Implement this behaviour to process events from a Tap indexer/backfill service.
66- Events are dispatched asynchronously via `Task.Supervisor` and acknowledged
77- to Tap based on the return value of `handle_event/1`.
88-99- ## Event Acknowledgment
1010-1111- By default, events are acknowledged to Tap based on your return value:
1212-1313- - `:ok`, `{:ok, any()}`, or `nil` → Success, event is acked to Tap
1414- - `{:error, reason}` → Failure, event is NOT acked (Tap will retry after timeout)
1515- - Exception raised → Failure, event is NOT acked (Tap will retry after timeout)
1616-1717- Any other value will log a warning and acknowledge the event anyway.
33+ Behaviour for handling Tap events.
1841919- If you set `disable_acks: true` in your Tap options, no acks are sent regardless
2020- of the return value. This matches Tap's `TAP_DISABLE_ACKS` environment variable.
2121-2222- ## Example
2323-2424- defmodule MyTapConsumer do
2525- @behaviour Drinkup.Tap.Consumer
2626-2727- def handle_event(%Drinkup.Tap.Event.Record{action: :create} = record) do
2828- # Handle new record creation
2929- case save_to_database(record) do
3030- :ok -> :ok # Success - event will be acked
3131- {:error, reason} -> {:error, reason} # Failure - Tap will retry
3232- end
3333- end
3434-3535- def handle_event(%Drinkup.Tap.Event.Identity{} = identity) do
3636- # Handle identity changes
3737- update_identity(identity)
3838- :ok # Success - event will be acked
3939- end
4040- end
55+ Implemented by `Drinkup.Tap`, you'll likely want to be using that instead.
416 """
427438 alias Drinkup.Tap.Event