···11-# The directory Mix will write compiled artifacts to.
21/_build/
33-44-# If you run "mix test --cover", coverage assets end up here.
52/cover/
66-77-# The directory Mix downloads your dependencies sources to.
83/deps/
99-1010-# Where third-party dependencies like ExDoc output generated docs.
114/doc/
1212-1313-# If the VM crashes, it generates a dump, let's ignore it too.
145erl_crash.dump
1515-1616-# Also ignore archive artifacts (built via "mix archive.build").
176*.ez
1818-1919-# Ignore package tarball (built via "mix hex.build").
207drinkup-*.tar
2121-2222-# Temporary files, for example, from tests.
238/tmp/
2424-2525-# Nix
269.envrc
2710.direnv
2828-result1111+result
1212+priv/dets/
+6
CHANGELOG.md
···1313- Existing behaviour moved to `Drinkup.Firehose` namespace, to make way for
1414 alternate sync systems.
15151616+### Added
1717+1818+- Support for the
1919+ [Tap](https://github.com/bluesky-social/indigo/blob/main/cmd/tap/README.md)
2020+ sync and backfill utility service, via `Drinkup.Tap`.
2121+1622### Changed
17231824- Refactor core connection logic for websockets into `Drinkup.Socket` to make it
···11+defmodule Drinkup.Tap do
22+ @moduledoc """
33+ Supervisor and HTTP API for Tap indexer/backfill service.
44+55+ Tap simplifies AT sync by handling the firehose connection, verification,
66+ backfill, and filtering. Your application connects to a Tap service and
77+ receives simple JSON events for only the repos and collections you care about.
88+99+ ## Usage
1010+1111+ Add Tap to your supervision tree:
1212+1313+ children = [
1414+ {Drinkup.Tap, %{
1515+ consumer: MyTapConsumer,
1616+ name: MyTap,
1717+ host: "http://localhost:2480",
1818+ admin_password: "secret" # optional
1919+ }}
2020+ ]
2121+2222+ Then interact with the Tap HTTP API:
2323+2424+ # Add repos to track (triggers backfill)
2525+ Drinkup.Tap.add_repos(MyTap, ["did:plc:abc123"])
2626+2727+ # Get stats
2828+ {:ok, count} = Drinkup.Tap.get_repo_count(MyTap)
2929+3030+ ## Configuration
3131+3232+ Tap itself is configured via environment variables. See the Tap documentation
3333+ for details on configuring collection filters, signal collections, and other
3434+ operational settings:
3535+ https://github.com/bluesky-social/indigo/blob/main/cmd/tap/README.md
3636+ """
3737+3838+ use Supervisor
3939+ alias Drinkup.Tap.Options
4040+4141+ @dialyzer nowarn_function: {:init, 1}
4242+ @impl true
4343+ def init({%Options{name: name} = drinkup_options, supervisor_options}) do
4444+ # Register options in Registry for HTTP API access
4545+ Registry.register(Drinkup.Registry, {name, TapOptions}, drinkup_options)
4646+4747+ children = [
4848+ {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, TapTasks}}}},
4949+ {Drinkup.Tap.Socket, drinkup_options}
5050+ ]
5151+5252+ Supervisor.start_link(
5353+ children,
5454+ supervisor_options ++ [name: {:via, Registry, {Drinkup.Registry, {name, TapSupervisor}}}]
5555+ )
5656+ end
5757+5858+ @spec child_spec(Options.options()) :: Supervisor.child_spec()
5959+ def child_spec(%{} = options), do: child_spec({options, [strategy: :one_for_one]})
6060+6161+ @spec child_spec({Options.options(), Keyword.t()}) :: Supervisor.child_spec()
6262+ def child_spec({drinkup_options, supervisor_options}) do
6363+ %{
6464+ id: Map.get(drinkup_options, :name, __MODULE__),
6565+ start: {__MODULE__, :init, [{Options.from(drinkup_options), supervisor_options}]},
6666+ type: :supervisor,
6767+ restart: :permanent,
6868+ shutdown: 500
6969+ }
7070+ end
7171+7272+ # HTTP API Functions
7373+7474+ @doc """
7575+ Add DIDs to track.
7676+7777+ Triggers backfill for the specified DIDs. Historical events will be fetched
7878+ from each repo's PDS, followed by live events from the firehose.
7979+ """
8080+ @spec add_repos(atom(), [String.t()]) :: {:ok, term()} | {:error, term()}
8181+ def add_repos(name \\ Drinkup.Tap, dids) when is_list(dids) do
8282+ with {:ok, options} <- get_options(name),
8383+ {:ok, response} <- make_request(options, :post, "/repos/add", %{dids: dids}) do
8484+ {:ok, response}
8585+ end
8686+ end
8787+8888+ @doc """
8989+ Remove DIDs from tracking.
9090+9191+ Stops syncing the specified repos and deletes tracked repo metadata. Does not
9292+ delete buffered events in the outbox.
9393+ """
9494+ @spec remove_repos(atom(), [String.t()]) :: {:ok, term()} | {:error, term()}
9595+ def remove_repos(name \\ Drinkup.Tap, dids) when is_list(dids) do
9696+ with {:ok, options} <- get_options(name),
9797+ {:ok, response} <- make_request(options, :post, "/repos/remove", %{dids: dids}) do
9898+ {:ok, response}
9999+ end
100100+ end
101101+102102+ @doc """
103103+ Resolve a DID to its DID document.
104104+ """
105105+ @spec resolve_did(atom(), String.t()) :: {:ok, term()} | {:error, term()}
106106+ def resolve_did(name \\ Drinkup.Tap, did) when is_binary(did) do
107107+ with {:ok, options} <- get_options(name),
108108+ {:ok, response} <- make_request(options, :get, "/resolve/#{did}") do
109109+ {:ok, response}
110110+ end
111111+ end
112112+113113+ @doc """
114114+ Get info about a tracked repo.
115115+116116+ Returns repo state, repo rev, record count, error info, and retry count.
117117+ """
118118+ @spec get_repo_info(atom(), String.t()) :: {:ok, term()} | {:error, term()}
119119+ def get_repo_info(name \\ Drinkup.Tap, did) when is_binary(did) do
120120+ with {:ok, options} <- get_options(name),
121121+ {:ok, response} <- make_request(options, :get, "/info/#{did}") do
122122+ {:ok, response}
123123+ end
124124+ end
125125+126126+ @doc """
127127+ Get the total number of tracked repos.
128128+ """
129129+ @spec get_repo_count(atom()) :: {:ok, integer()} | {:error, term()}
130130+ def get_repo_count(name \\ Drinkup.Tap) do
131131+ with {:ok, options} <- get_options(name),
132132+ {:ok, response} <- make_request(options, :get, "/stats/repo-count") do
133133+ {:ok, response}
134134+ end
135135+ end
136136+137137+ @doc """
138138+ Get the total number of tracked records.
139139+ """
140140+ @spec get_record_count(atom()) :: {:ok, integer()} | {:error, term()}
141141+ def get_record_count(name \\ Drinkup.Tap) do
142142+ with {:ok, options} <- get_options(name),
143143+ {:ok, response} <- make_request(options, :get, "/stats/record-count") do
144144+ {:ok, response}
145145+ end
146146+ end
147147+148148+ @doc """
149149+ Get the number of events in the outbox buffer.
150150+ """
151151+ @spec get_outbox_buffer(atom()) :: {:ok, integer()} | {:error, term()}
152152+ def get_outbox_buffer(name \\ Drinkup.Tap) do
153153+ with {:ok, options} <- get_options(name),
154154+ {:ok, response} <- make_request(options, :get, "/stats/outbox-buffer") do
155155+ {:ok, response}
156156+ end
157157+ end
158158+159159+ @doc """
160160+ Get the number of events in the resync buffer.
161161+ """
162162+ @spec get_resync_buffer(atom()) :: {:ok, integer()} | {:error, term()}
163163+ def get_resync_buffer(name \\ Drinkup.Tap) do
164164+ with {:ok, options} <- get_options(name),
165165+ {:ok, response} <- make_request(options, :get, "/stats/resync-buffer") do
166166+ {:ok, response}
167167+ end
168168+ end
169169+170170+ @doc """
171171+ Get current firehose and list repos cursors.
172172+ """
173173+ @spec get_cursors(atom()) :: {:ok, map()} | {:error, term()}
174174+ def get_cursors(name \\ Drinkup.Tap) do
175175+ with {:ok, options} <- get_options(name),
176176+ {:ok, response} <- make_request(options, :get, "/stats/cursors") do
177177+ {:ok, response}
178178+ end
179179+ end
180180+181181+ @doc """
182182+ Check Tap health status.
183183+184184+ Returns `{:ok, %{"status" => "ok"}}` if healthy.
185185+ """
186186+ @spec health(atom()) :: {:ok, map()} | {:error, term()}
187187+ def health(name \\ Drinkup.Tap) do
188188+ with {:ok, options} <- get_options(name),
189189+ {:ok, response} <- make_request(options, :get, "/health") do
190190+ {:ok, response}
191191+ end
192192+ end
193193+194194+ # Private Functions
195195+196196+ @spec get_options(atom()) :: {:ok, Options.t()} | {:error, :not_found}
197197+ defp get_options(name) do
198198+ case Registry.lookup(Drinkup.Registry, {name, TapOptions}) do
199199+ [{_pid, options}] -> {:ok, options}
200200+ [] -> {:error, :not_found}
201201+ end
202202+ end
203203+204204+ @spec make_request(Options.t(), atom(), String.t(), map() | nil) ::
205205+ {:ok, term()} | {:error, term()}
206206+ defp make_request(options, method, path, body \\ nil) do
207207+ url = build_url(options.host, path)
208208+ headers = build_headers(options.admin_password)
209209+210210+ request_opts = [
211211+ method: method,
212212+ url: url,
213213+ headers: headers
214214+ ]
215215+216216+ request_opts =
217217+ if body do
218218+ Keyword.merge(request_opts, json: body)
219219+ else
220220+ request_opts
221221+ end
222222+223223+ case Req.request(request_opts) do
224224+ {:ok, %{status: status, body: body}} when status in 200..299 ->
225225+ {:ok, body}
226226+227227+ {:ok, %{status: status, body: body}} ->
228228+ {:error, {:http_error, status, body}}
229229+230230+ {:error, reason} ->
231231+ {:error, reason}
232232+ end
233233+ end
234234+235235+ @spec build_url(String.t(), String.t()) :: String.t()
236236+ defp build_url(host, path) do
237237+ host = String.trim_trailing(host, "/")
238238+ "#{host}#{path}"
239239+ end
240240+241241+ @spec build_headers(String.t() | nil) :: list()
242242+ defp build_headers(nil), do: []
243243+244244+ defp build_headers(admin_password) do
245245+ credentials = "admin:#{admin_password}"
246246+ auth_header = "Basic #{Base.encode64(credentials)}"
247247+ [{"authorization", auth_header}]
248248+ end
249249+end
+46
lib/tap/consumer.ex
···11+defmodule Drinkup.Tap.Consumer do
22+ @moduledoc """
33+ Consumer behaviour for handling Tap events.
44+55+ Implement this behaviour to process events from a Tap indexer/backfill service.
66+ Events are dispatched asynchronously via `Task.Supervisor` and acknowledged
77+ to Tap based on the return value of `handle_event/1`.
88+99+ ## Event Acknowledgment
1010+1111+ By default, events are acknowledged to Tap based on your return value:
1212+1313+ - `:ok`, `{:ok, any()}`, or `nil` → Success, event is acked to Tap
1414+ - `{:error, reason}` → Failure, event is NOT acked (Tap will retry after timeout)
1515+ - Exception raised → Failure, event is NOT acked (Tap will retry after timeout)
1616+1717+ Any other value will log a warning and acknowledge the event anyway.
1818+1919+ If you set `disable_acks: true` in your Tap options, no acks are sent regardless
2020+ of the return value. This matches Tap's `TAP_DISABLE_ACKS` environment variable.
2121+2222+ ## Example
2323+2424+ defmodule MyTapConsumer do
2525+ @behaviour Drinkup.Tap.Consumer
2626+2727+ def handle_event(%Drinkup.Tap.Event.Record{action: :create} = record) do
2828+ # Handle new record creation
2929+ case save_to_database(record) do
3030+ :ok -> :ok # Success - event will be acked
3131+ {:error, reason} -> {:error, reason} # Failure - Tap will retry
3232+ end
3333+ end
3434+3535+ def handle_event(%Drinkup.Tap.Event.Identity{} = identity) do
3636+ # Handle identity changes
3737+ update_identity(identity)
3838+ :ok # Success - event will be acked
3939+ end
4040+ end
4141+ """
4242+4343+ alias Drinkup.Tap.Event
4444+4545+ @callback handle_event(Event.Record.t() | Event.Identity.t()) :: any()
4646+end
+105
lib/tap/event.ex
···11+defmodule Drinkup.Tap.Event do
22+ @moduledoc """
33+ Event handling and dispatch for Tap events.
44+55+ Parses incoming JSON events from Tap and dispatches them to the configured
66+ consumer via Task.Supervisor. After successful processing, sends an ack
77+ message back to the socket.
88+ """
99+1010+ require Logger
1111+ alias Drinkup.Tap.{Event, Options}
1212+1313+ @type t() :: Event.Record.t() | Event.Identity.t()
1414+1515+ @doc """
1616+ Parse a JSON map into an event struct.
1717+1818+ Returns the appropriate event struct based on the "type" field.
1919+ """
2020+ @spec from(map()) :: t() | nil
2121+ def from(%{"type" => "record"} = payload), do: Event.Record.from(payload)
2222+ def from(%{"type" => "identity"} = payload), do: Event.Identity.from(payload)
2323+ def from(_payload), do: nil
2424+2525+ @doc """
2626+ Dispatch an event to the consumer via Task.Supervisor.
2727+2828+ Spawns a task that:
2929+ 1. Processes the event via the consumer's handle_event/1 callback
3030+ 2. Sends an ack to Tap if acks are enabled and the consumer returns :ok, {:ok, _}, or nil
3131+ 3. Does not ack if the consumer returns an error-like value or raises an exception
3232+3333+ Consumer return value semantics (when acks are enabled):
3434+ - `:ok` or `{:ok, any()}` or `nil` -> Success, send ack
3535+ - `{:error, _}` or any error-like tuple -> Failure, don't ack (Tap will retry)
3636+ - Exception raised -> Failure, don't ack (Tap will retry)
3737+3838+ If `disable_acks: true` is set in options, no acks are sent regardless of
3939+ consumer return value.
4040+ """
4141+ @spec dispatch(t(), Options.t(), pid(), :gun.stream_ref()) :: :ok
4242+ def dispatch(
4343+ event,
4444+ %Options{consumer: consumer, name: name, disable_acks: disable_acks},
4545+ conn,
4646+ stream
4747+ ) do
4848+ supervisor_name = {:via, Registry, {Drinkup.Registry, {name, TapTasks}}}
4949+ event_id = get_event_id(event)
5050+5151+ {:ok, _pid} =
5252+ Task.Supervisor.start_child(supervisor_name, fn ->
5353+ try do
5454+ result = consumer.handle_event(event)
5555+5656+ unless disable_acks do
5757+ case result do
5858+ :ok ->
5959+ send_ack(conn, stream, event_id)
6060+6161+ {:ok, _} ->
6262+ send_ack(conn, stream, event_id)
6363+6464+ nil ->
6565+ send_ack(conn, stream, event_id)
6666+6767+ :error ->
6868+ Logger.error("Consumer returned error for event #{event_id}, not acking.")
6969+7070+ {:error, reason} ->
7171+ Logger.error(
7272+ "Consumer returned error for event #{event_id}, not acking: #{inspect(reason)}"
7373+ )
7474+7575+ _ ->
7676+ Logger.warning(
7777+ "Consumer returned unexpected value for event #{event_id}, acking anyway: #{inspect(result)}"
7878+ )
7979+8080+ send_ack(conn, stream, event_id)
8181+ end
8282+ end
8383+ rescue
8484+ e ->
8585+ Logger.error(
8686+ "Error in Tap event handler (event #{event_id}), not acking: #{Exception.format(:error, e, __STACKTRACE__)}"
8787+ )
8888+ end
8989+ end)
9090+9191+ :ok
9292+ end
9393+9494+ @spec send_ack(pid(), :gun.stream_ref(), integer()) :: :ok
9595+ defp send_ack(conn, stream, event_id) do
9696+ ack_message = Jason.encode!(%{type: "ack", id: event_id})
9797+9898+ :ok = :gun.ws_send(conn, stream, {:text, ack_message})
9999+ Logger.debug("[Drinkup.Tap] Acked event #{event_id}")
100100+ end
101101+102102+ @spec get_event_id(t()) :: integer()
103103+ defp get_event_id(%Event.Record{id: id}), do: id
104104+ defp get_event_id(%Event.Identity{id: id}), do: id
105105+end
+39
lib/tap/event/identity.ex
···11+defmodule Drinkup.Tap.Event.Identity do
22+ @moduledoc """
33+ Struct for identity events from Tap.
44+55+ Represents handle or status changes for a DID.
66+ """
77+88+ use TypedStruct
99+1010+ typedstruct enforce: true do
1111+ field :id, integer()
1212+ field :did, String.t()
1313+ field :handle, String.t() | nil
1414+ field :is_active, boolean()
1515+ field :status, String.t()
1616+ end
1717+1818+ @spec from(map()) :: t()
1919+ def from(%{
2020+ "id" => id,
2121+ "type" => "identity",
2222+ "identity" =>
2323+ %{
2424+ "did" => did,
2525+ "is_active" => is_active,
2626+ "status" => status
2727+ } = identity_data
2828+ }) do
2929+ handle = Map.get(identity_data, "handle")
3030+3131+ %__MODULE__{
3232+ id: id,
3333+ did: did,
3434+ handle: handle,
3535+ is_active: is_active,
3636+ status: status
3737+ }
3838+ end
3939+end
+58
lib/tap/event/record.ex
···11+defmodule Drinkup.Tap.Event.Record do
22+ @moduledoc """
33+ Struct for record events from Tap.
44+55+ Represents create, update, or delete operations on records in the repository.
66+ """
77+88+ use TypedStruct
99+1010+ typedstruct enforce: true do
1111+ @type action() :: :create | :update | :delete
1212+1313+ field :id, integer()
1414+ field :live, boolean()
1515+ field :rev, String.t()
1616+ field :did, String.t()
1717+ field :collection, String.t()
1818+ field :rkey, String.t()
1919+ field :action, action()
2020+ field :cid, String.t() | nil
2121+ field :record, map() | nil
2222+ end
2323+2424+ @spec from(map()) :: t()
2525+ def from(%{
2626+ "id" => id,
2727+ "type" => "record",
2828+ "record" =>
2929+ %{
3030+ "live" => live,
3131+ "rev" => rev,
3232+ "did" => did,
3333+ "collection" => collection,
3434+ "rkey" => rkey,
3535+ "action" => action
3636+ } = record_data
3737+ }) do
3838+ cid = Map.get(record_data, "cid")
3939+ record = Map.get(record_data, "record")
4040+4141+ %__MODULE__{
4242+ id: id,
4343+ live: live,
4444+ rev: rev,
4545+ did: did,
4646+ collection: collection,
4747+ rkey: rkey,
4848+ action: parse_action(action),
4949+ cid: cid,
5050+ record: record
5151+ }
5252+ end
5353+5454+ @spec parse_action(String.t()) :: action()
5555+ defp parse_action("create"), do: :create
5656+ defp parse_action("update"), do: :update
5757+ defp parse_action("delete"), do: :delete
5858+end
+90
lib/tap/options.ex
···11+defmodule Drinkup.Tap.Options do
22+ @moduledoc """
33+ Configuration options for Tap indexer/backfill service connection.
44+55+ This module defines the configuration structure for connecting to and
66+ interacting with a Tap service. Tap simplifies AT Protocol sync by handling
77+ firehose connections, verification, backfill, and filtering server-side.
88+99+ ## Options
1010+1111+ - `:consumer` (required) - Module implementing `Drinkup.Tap.Consumer` behaviour
1212+ - `:name` - Unique name for this Tap instance in the supervision tree (default: `Drinkup.Tap`)
1313+ - `:host` - Tap service URL (default: `"http://localhost:2480"`)
1414+ - `:admin_password` - Optional password for authenticated Tap instances
1515+ - `:disable_acks` - Disable event acknowledgments (default: `false`)
1616+1717+ ## Example
1818+1919+ %{
2020+ consumer: MyTapConsumer,
2121+ name: MyTap,
2222+ host: "http://localhost:2480",
2323+ admin_password: "secret",
2424+ disable_acks: false
2525+ }
2626+ """
2727+2828+ use TypedStruct
2929+3030+ @default_host "http://localhost:2480"
3131+3232+ @typedoc """
3333+ Map of configuration options accepted by `Drinkup.Tap.child_spec/1`.
3434+ """
3535+ @type options() :: %{
3636+ required(:consumer) => consumer(),
3737+ optional(:name) => name(),
3838+ optional(:host) => host(),
3939+ optional(:admin_password) => admin_password(),
4040+ optional(:disable_acks) => disable_acks()
4141+ }
4242+4343+ @typedoc """
4444+ Module implementing the `Drinkup.Tap.Consumer` behaviour.
4545+ """
4646+ @type consumer() :: module()
4747+4848+ @typedoc """
4949+ Unique identifier for this Tap instance in the supervision tree.
5050+5151+ Used for Registry lookups and naming child processes.
5252+ """
5353+ @type name() :: atom()
5454+5555+ @typedoc """
5656+ HTTP/HTTPS URL of the Tap service.
5757+5858+ Defaults to `"http://localhost:2480"` which is Tap's default bind address.
5959+ """
6060+ @type host() :: String.t()
6161+6262+ @typedoc """
6363+ Optional password for HTTP Basic authentication.
6464+6565+ Required when connecting to a Tap service configured with `TAP_ADMIN_PASSWORD`.
6666+ The password is sent as `Basic admin:<password>` in the Authorization header.
6767+ """
6868+ @type admin_password() :: String.t() | nil
6969+7070+ @typedoc """
7171+ Whether to disable event acknowledgments.
7272+7373+ When `true`, events are not acknowledged to Tap regardless of consumer
7474+ return values. This matches Tap's `TAP_DISABLE_ACKS` environment variable.
7575+7676+ Defaults to `false` (acknowledgments enabled).
7777+ """
7878+ @type disable_acks() :: boolean()
7979+8080+ typedstruct do
8181+ field :consumer, consumer(), enforce: true
8282+ field :name, name(), default: Drinkup.Tap
8383+ field :host, host(), default: @default_host
8484+ field :admin_password, admin_password()
8585+ field :disable_acks, disable_acks(), default: false
8686+ end
8787+8888+ @spec from(options()) :: t()
8989+ def from(%{consumer: _} = options), do: struct(__MODULE__, options)
9090+end
+100
lib/tap/socket.ex
···11+defmodule Drinkup.Tap.Socket do
22+ @moduledoc """
33+ WebSocket connection handler for Tap indexer/backfill service.
44+55+ Implements the Drinkup.Socket behaviour to manage connections to a Tap service,
66+ handling JSON-encoded events and dispatching them to the configured consumer.
77+88+ Events are acknowledged after successful processing based on the consumer's
99+ return value:
1010+ - `:ok`, `{:ok, any()}`, or `nil` → Success, ack sent to Tap
1111+ - `{:error, reason}` → Failure, no ack (Tap will retry after timeout)
1212+ - Exception raised → Failure, no ack (Tap will retry after timeout)
1313+ """
1414+1515+ use Drinkup.Socket
1616+1717+ require Logger
1818+ alias Drinkup.Tap.{Event, Options}
1919+2020+ @impl true
2121+ def init(opts) do
2222+ options = Keyword.fetch!(opts, :options)
2323+ {:ok, %{options: options, host: options.host}}
2424+ end
2525+2626+ def start_link(%Options{} = options, statem_opts) do
2727+ socket_opts = build_socket_opts(options)
2828+ Drinkup.Socket.start_link(__MODULE__, socket_opts, statem_opts)
2929+ end
3030+3131+ @impl true
3232+ def build_path(_data) do
3333+ "/channel"
3434+ end
3535+3636+ @impl true
3737+ def handle_frame({:text, json}, {%{options: options} = data, conn, stream}) do
3838+ case Jason.decode(json) do
3939+ {:ok, payload} ->
4040+ case Event.from(payload) do
4141+ nil ->
4242+ Logger.warning("Received unrecognized event from Tap: #{inspect(payload)}")
4343+ :noop
4444+4545+ event ->
4646+ Event.dispatch(event, options, conn, stream)
4747+ {:ok, data}
4848+ end
4949+5050+ {:error, reason} ->
5151+ Logger.error("Failed to decode JSON from Tap: #{inspect(reason)}")
5252+ :noop
5353+ end
5454+ end
5555+5656+ @impl true
5757+ def handle_frame({:binary, _binary}, _data) do
5858+ Logger.warning("Received unexpected binary frame from Tap")
5959+ :noop
6060+ end
6161+6262+ @impl true
6363+ def handle_frame(:close, _data) do
6464+ Logger.info("Websocket closed, reason unknown")
6565+ nil
6666+ end
6767+6868+ @impl true
6969+ def handle_frame({:close, errno, reason}, _data) do
7070+ Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}")
7171+ nil
7272+ end
7373+7474+ defp build_socket_opts(%Options{host: host, admin_password: admin_password} = options) do
7575+ base_opts = [
7676+ host: host,
7777+ options: options
7878+ ]
7979+8080+ if admin_password do
8181+ auth_header = build_auth_header(admin_password)
8282+8383+ gun_opts = %{
8484+ ws_opts: %{
8585+ headers: [{"authorization", auth_header}]
8686+ }
8787+ }
8888+8989+ Keyword.put(base_opts, :gun_opts, gun_opts)
9090+ else
9191+ base_opts
9292+ end
9393+ end
9494+9595+ @spec build_auth_header(String.t()) :: String.t()
9696+ defp build_auth_header(password) do
9797+ credentials = "admin:#{password}"
9898+ "Basic #{Base.encode64(credentials)}"
9999+ end
100100+end