···1818- Support for the
1919 [Tap](https://github.com/bluesky-social/indigo/blob/main/cmd/tap/README.md)
2020 sync and backfill utility service, via `Drinkup.Tap`.
2121+- Support for [Jetstream](https://github.com/bluesky-social/jetstream), a
2222+ simplified JSON event stream for ATProto, via `Drinkup.Jetstream`.
21232224### Changed
2325
···5353 HTTP/HTTPS URL of the ATProto Firehose relay.
54545555 Defaults to `"https://bsky.network"` which is the public Bluesky relay.
5656+5757+ You can find a list of third-party relays at https://compare.hose.cam/.
5658 """
5759 @type host() :: String.t()
5860
+207
lib/jetstream.ex
···11+defmodule Drinkup.Jetstream do
22+ @moduledoc """
33+ Supervisor for Jetstream event stream connections.
44+55+ Jetstream is a simplified JSON event stream that converts the CBOR-encoded
66+ ATProto Firehose into lightweight, friendly JSON events. It provides zstd
77+ compression and filtering capabilities for collections and DIDs.
88+99+ ## Usage
1010+1111+ Add Jetstream to your supervision tree:
1212+1313+ children = [
1414+ {Drinkup.Jetstream, %{
1515+ consumer: MyJetstreamConsumer,
1616+ name: MyJetstream,
1717+ wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"]
1818+ }}
1919+ ]
2020+2121+ ## Configuration
2222+2323+ See `Drinkup.Jetstream.Options` for all available configuration options.
2424+2525+ ## Dynamic Filter Updates
2626+2727+ You can update filters after the connection is established:
2828+2929+ Drinkup.Jetstream.update_options(MyJetstream, %{
3030+ wanted_collections: ["app.bsky.graph.follow"],
3131+ wanted_dids: ["did:plc:abc123"]
3232+ })
3333+3434+ ## Public Instances
3535+3636+ By default Drinkup connects to `jetstream2.us-east.bsky.network`.
3737+3838+ Bluesky operates a few different Jetstream instances:
3939+ - `jetstream1.us-east.bsky.network`
4040+ - `jetstream2.us-east.bsky.network`
4141+ - `jetstream1.us-west.bsky.network`
4242+ - `jetstream2.us-west.bsky.network`
4343+4444+ There also some third-party instances not run by Bluesky PBC:
4545+ - `jetstream.fire.hose.cam`
4646+ - `jetstream2.fr.hose.cam`
4747+ - `jetstream1.us-east.fire.hose.cam`
4848+ """
4949+5050+ use Supervisor
5151+ require Logger
5252+ alias Drinkup.Jetstream.Options
5353+5454+ @dialyzer nowarn_function: {:init, 1}
5555+5656+ @impl true
5757+ def init({%Options{name: name} = drinkup_options, supervisor_options}) do
5858+ children = [
5959+ {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, JetstreamTasks}}}},
6060+ {Drinkup.Jetstream.Socket, drinkup_options}
6161+ ]
6262+6363+ Supervisor.start_link(
6464+ children,
6565+ supervisor_options ++
6666+ [name: {:via, Registry, {Drinkup.Registry, {name, JetstreamSupervisor}}}]
6767+ )
6868+ end
6969+7070+ @spec child_spec(Options.options()) :: Supervisor.child_spec()
7171+ def child_spec(%{} = options), do: child_spec({options, [strategy: :one_for_one]})
7272+7373+ @spec child_spec({Options.options(), Keyword.t()}) :: Supervisor.child_spec()
7474+ def child_spec({drinkup_options, supervisor_options}) do
7575+ %{
7676+ id: Map.get(drinkup_options, :name, __MODULE__),
7777+ start: {__MODULE__, :init, [{Options.from(drinkup_options), supervisor_options}]},
7878+ type: :supervisor,
7979+ restart: :permanent,
8080+ shutdown: 500
8181+ }
8282+ end
8383+8484+ # Options Update API
8585+8686+ @typedoc """
8787+ Options that can be updated dynamically via `update_options/2`.
8888+8989+ - `:wanted_collections` - List of collection NSIDs or prefixes (max 100)
9090+ - `:wanted_dids` - List of DIDs to filter (max 10,000)
9191+ - `:max_message_size_bytes` - Maximum message size to receive
9292+9393+ Empty arrays will disable the corresponding filter (i.e., receive all).
9494+ """
9595+ @type update_opts :: %{
9696+ optional(:wanted_collections) => [String.t()],
9797+ optional(:wanted_dids) => [String.t()],
9898+ optional(:max_message_size_bytes) => integer()
9999+ }
100100+101101+ @doc """
102102+ Update filters and options for an active Jetstream connection.
103103+104104+ Sends an options update message to the Jetstream server over the websocket
105105+ connection. This allows you to dynamically change which collections and DIDs
106106+ you're interested in without reconnecting.
107107+108108+ ## Parameters
109109+110110+ - `name` - The name of the Jetstream instance (default: `Drinkup.Jetstream`)
111111+ - `opts` - Map with optional fields:
112112+ - `:wanted_collections` - List of collection NSIDs or prefixes (max 100)
113113+ - `:wanted_dids` - List of DIDs to filter (max 10,000)
114114+ - `:max_message_size_bytes` - Maximum message size to receive
115115+116116+ ## Examples
117117+118118+ # Filter to only posts
119119+ Drinkup.Jetstream.update_options(MyJetstream, %{
120120+ wanted_collections: ["app.bsky.feed.post"]
121121+ })
122122+123123+ # Filter to specific DIDs
124124+ Drinkup.Jetstream.update_options(MyJetstream, %{
125125+ wanted_dids: ["did:plc:abc123", "did:plc:def456"]
126126+ })
127127+128128+ # Disable all filters (receive all events)
129129+ Drinkup.Jetstream.update_options(MyJetstream, %{
130130+ wanted_collections: [],
131131+ wanted_dids: []
132132+ })
133133+134134+ ## Return Value
135135+136136+ Returns `:ok` if the message was sent successfully, or `{:error, reason}` if
137137+ the socket process could not be found or the message could not be sent.
138138+139139+ Note: The server may reject invalid updates (e.g., too many collections/DIDs).
140140+ Invalid updates will result in the connection being closed by the server.
141141+ """
142142+ @spec update_options(atom(), update_opts()) :: :ok | {:error, term()}
143143+ def update_options(name \\ Drinkup.Jetstream, opts) when is_map(opts) do
144144+ case find_connection(name) do
145145+ {:ok, {conn, stream}} ->
146146+ message = build_options_update_message(opts)
147147+ :ok = :gun.ws_send(conn, stream, {:text, message})
148148+149149+ Logger.debug("[Drinkup.Jetstream] Sent options update")
150150+ :ok
151151+152152+ {:error, reason} ->
153153+ {:error, reason}
154154+ end
155155+ end
156156+157157+ # Private functions
158158+159159+ @spec find_connection(atom()) :: {:ok, {pid(), :gun.stream_ref()}} | {:error, :not_connected}
160160+ defp find_connection(name) do
161161+ # Look up the connection details from Registry
162162+ case Registry.lookup(Drinkup.Registry, {name, JetstreamConnection}) do
163163+ [{_socket_pid, {conn, stream}}] ->
164164+ {:ok, {conn, stream}}
165165+166166+ [] ->
167167+ {:error, :not_connected}
168168+ end
169169+ end
170170+171171+ @spec build_options_update_message(update_opts()) :: String.t()
172172+ defp build_options_update_message(opts) do
173173+ payload =
174174+ %{}
175175+ |> maybe_add_wanted_collections(Map.get(opts, :wanted_collections))
176176+ |> maybe_add_wanted_dids(Map.get(opts, :wanted_dids))
177177+ |> maybe_add_max_message_size(Map.get(opts, :max_message_size_bytes))
178178+179179+ message = %{
180180+ "type" => "options_update",
181181+ "payload" => payload
182182+ }
183183+184184+ Jason.encode!(message)
185185+ end
186186+187187+ @spec maybe_add_wanted_collections(map(), [String.t()] | nil) :: map()
188188+ defp maybe_add_wanted_collections(payload, nil), do: payload
189189+190190+ defp maybe_add_wanted_collections(payload, collections) when is_list(collections) do
191191+ Map.put(payload, "wantedCollections", collections)
192192+ end
193193+194194+ @spec maybe_add_wanted_dids(map(), [String.t()] | nil) :: map()
195195+ defp maybe_add_wanted_dids(payload, nil), do: payload
196196+197197+ defp maybe_add_wanted_dids(payload, dids) when is_list(dids) do
198198+ Map.put(payload, "wantedDids", dids)
199199+ end
200200+201201+ @spec maybe_add_max_message_size(map(), integer() | nil) :: map()
202202+ defp maybe_add_max_message_size(payload, nil), do: payload
203203+204204+ defp maybe_add_max_message_size(payload, max_size) when is_integer(max_size) do
205205+ Map.put(payload, "maxMessageSizeBytes", max_size)
206206+ end
207207+end
+61
lib/jetstream/consumer.ex
···11+defmodule Drinkup.Jetstream.Consumer do
22+ @moduledoc """
33+ Consumer behaviour for handling Jetstream events.
44+55+ Implement this behaviour to process events from a Jetstream instance.
66+ Events are dispatched asynchronously via `Task.Supervisor`.
77+88+ Unlike Tap, Jetstream does not require event acknowledgments. Events are
99+ processed in a fire-and-forget manner.
1010+1111+ ## Example
1212+1313+ defmodule MyJetstreamConsumer do
1414+ @behaviour Drinkup.Jetstream.Consumer
1515+1616+ def handle_event(%Drinkup.Jetstream.Event.Commit{operation: :create} = event) do
1717+ # Handle new record creation
1818+ IO.inspect(event, label: "New record")
1919+ :ok
2020+ end
2121+2222+ def handle_event(%Drinkup.Jetstream.Event.Commit{operation: :delete} = event) do
2323+ # Handle record deletion
2424+ IO.inspect(event, label: "Deleted record")
2525+ :ok
2626+ end
2727+2828+ def handle_event(%Drinkup.Jetstream.Event.Identity{} = event) do
2929+ # Handle identity changes
3030+ IO.inspect(event, label: "Identity update")
3131+ :ok
3232+ end
3333+3434+ def handle_event(%Drinkup.Jetstream.Event.Account{active: false} = event) do
3535+ # Handle account deactivation
3636+ IO.inspect(event, label: "Account inactive")
3737+ :ok
3838+ end
3939+4040+ def handle_event(_event), do: :ok
4141+ end
4242+4343+ ## Event Types
4444+4545+ The consumer will receive one of three event types:
4646+4747+ - `Drinkup.Jetstream.Event.Commit` - Repository commits (create, update, delete)
4848+ - `Drinkup.Jetstream.Event.Identity` - Identity updates (handle changes, etc.)
4949+ - `Drinkup.Jetstream.Event.Account` - Account status changes (active, taken down, etc.)
5050+5151+ ## Error Handling
5252+5353+ If your `handle_event/1` implementation raises an exception, it will be logged
5454+ but will not affect the stream. The error is caught and logged by the event
5555+ dispatcher.
5656+ """
5757+5858+ alias Drinkup.Jetstream.Event
5959+6060+ @callback handle_event(Event.t()) :: any()
6161+end
+100
lib/jetstream/event.ex
···11+defmodule Drinkup.Jetstream.Event do
22+ @moduledoc """
33+ Event handling and dispatch for Jetstream events.
44+55+ Parses incoming JSON events from Jetstream and dispatches them to the
66+ configured consumer via Task.Supervisor.
77+ """
88+99+ require Logger
1010+ alias Drinkup.Jetstream.{Event, Options}
1111+1212+ @type t() :: Event.Commit.t() | Event.Identity.t() | Event.Account.t()
1313+1414+ @doc """
1515+ Parse a JSON map into an event struct.
1616+1717+ Jetstream events have a top-level structure with a "kind" field that
1818+ determines the event type, and a nested object with the event data.
1919+2020+ ## Example Event Structure
2121+2222+ %{
2323+ "did" => "did:plc:...",
2424+ "time_us" => 1726880765818347,
2525+ "kind" => "commit",
2626+ "commit" => %{...}
2727+ }
2828+2929+ Returns the appropriate event struct based on the "kind" field, or `nil`
3030+ if the event type is not recognized.
3131+ """
3232+ @spec from(map()) :: t() | nil
3333+ def from(%{"did" => did, "time_us" => time_us, "kind" => kind} = payload) do
3434+ case kind do
3535+ "commit" ->
3636+ case Map.get(payload, "commit") do
3737+ nil ->
3838+ Logger.warning("Commit event missing 'commit' field: #{inspect(payload)}")
3939+ nil
4040+4141+ commit ->
4242+ Event.Commit.from(did, time_us, commit)
4343+ end
4444+4545+ "identity" ->
4646+ case Map.get(payload, "identity") do
4747+ nil ->
4848+ Logger.warning("Identity event missing 'identity' field: #{inspect(payload)}")
4949+ nil
5050+5151+ identity ->
5252+ Event.Identity.from(did, time_us, identity)
5353+ end
5454+5555+ "account" ->
5656+ case Map.get(payload, "account") do
5757+ nil ->
5858+ Logger.warning("Account event missing 'account' field: #{inspect(payload)}")
5959+ nil
6060+6161+ account ->
6262+ Event.Account.from(did, time_us, account)
6363+ end
6464+6565+ _ ->
6666+ Logger.warning("Received unrecognized event kind from Jetstream: #{inspect(kind)}")
6767+ nil
6868+ end
6969+ end
7070+7171+ def from(payload) do
7272+ Logger.warning("Received invalid event structure from Jetstream: #{inspect(payload)}")
7373+ nil
7474+ end
7575+7676+ @doc """
7777+ Dispatch an event to the consumer via Task.Supervisor.
7878+7979+ Spawns a task that processes the event via the consumer's `handle_event/1`
8080+ callback. Unlike Tap, Jetstream does not require acknowledgments.
8181+ """
8282+ @spec dispatch(t(), Options.t()) :: :ok
8383+ def dispatch(event, %Options{consumer: consumer, name: name}) do
8484+ supervisor_name = {:via, Registry, {Drinkup.Registry, {name, JetstreamTasks}}}
8585+8686+ {:ok, _pid} =
8787+ Task.Supervisor.start_child(supervisor_name, fn ->
8888+ try do
8989+ consumer.handle_event(event)
9090+ rescue
9191+ e ->
9292+ Logger.error(
9393+ "Error in Jetstream event handler: #{Exception.format(:error, e, __STACKTRACE__)}"
9494+ )
9595+ end
9696+ end)
9797+9898+ :ok
9999+ end
100100+end
+106
lib/jetstream/event/account.ex
···11+defmodule Drinkup.Jetstream.Event.Account do
22+ @moduledoc """
33+ Struct for account events from Jetstream.
44+55+ Represents a change to an account's status on a host (e.g., PDS or Relay).
66+ The semantics of this event are that the status is at the host which emitted
77+ the event, not necessarily that at the currently active PDS.
88+99+ For example, a Relay takedown would emit a takedown with `active: false`,
1010+ even if the PDS is still active.
1111+ """
1212+1313+ use TypedStruct
1414+1515+ typedstruct enforce: true do
1616+ @typedoc """
1717+ The status of an inactive account.
1818+1919+ Known values from the ATProto lexicon:
2020+ - `:takendown` - Account has been taken down
2121+ - `:suspended` - Account is suspended
2222+ - `:deleted` - Account has been deleted
2323+ - `:deactivated` - Account has been deactivated by the user
2424+ - `:desynchronized` - Account is out of sync
2525+ - `:throttled` - Account is throttled
2626+2727+ The status can also be any other string value for future compatibility.
2828+ """
2929+ @type status() ::
3030+ :takendown
3131+ | :suspended
3232+ | :deleted
3333+ | :deactivated
3434+ | :desynchronized
3535+ | :throttled
3636+ | String.t()
3737+3838+ field :did, String.t()
3939+ field :time_us, integer()
4040+ field :kind, :account, default: :account
4141+ field :active, boolean()
4242+ field :seq, integer()
4343+ field :time, NaiveDateTime.t()
4444+ field :status, status() | nil
4545+ end
4646+4747+ @doc """
4848+ Parses a Jetstream account payload into an Account struct.
4949+5050+ ## Example Payload (Active)
5151+5252+ %{
5353+ "active" => true,
5454+ "did" => "did:plc:ufbl4k27gp6kzas5glhz7fim",
5555+ "seq" => 1409753013,
5656+ "time" => "2024-09-05T06:11:04.870Z"
5757+ }
5858+5959+ ## Example Payload (Inactive)
6060+6161+ %{
6262+ "active" => false,
6363+ "did" => "did:plc:abc123",
6464+ "seq" => 1409753014,
6565+ "time" => "2024-09-05T06:12:00.000Z",
6666+ "status" => "takendown"
6767+ }
6868+ """
6969+ @spec from(String.t(), integer(), map()) :: t()
7070+ def from(
7171+ did,
7272+ time_us,
7373+ %{
7474+ "active" => active,
7575+ "seq" => seq,
7676+ "time" => time
7777+ } = account
7878+ ) do
7979+ %__MODULE__{
8080+ did: did,
8181+ time_us: time_us,
8282+ active: active,
8383+ seq: seq,
8484+ time: parse_datetime(time),
8585+ status: parse_status(Map.get(account, "status"))
8686+ }
8787+ end
8888+8989+ @spec parse_datetime(String.t()) :: NaiveDateTime.t()
9090+ defp parse_datetime(time_str) do
9191+ case NaiveDateTime.from_iso8601(time_str) do
9292+ {:ok, datetime} -> datetime
9393+ {:error, _} -> raise "Invalid datetime format: #{time_str}"
9494+ end
9595+ end
9696+9797+ @spec parse_status(String.t() | nil) :: status() | nil
9898+ defp parse_status(nil), do: nil
9999+ defp parse_status("takendown"), do: :takendown
100100+ defp parse_status("suspended"), do: :suspended
101101+ defp parse_status("deleted"), do: :deleted
102102+ defp parse_status("deactivated"), do: :deactivated
103103+ defp parse_status("desynchronized"), do: :desynchronized
104104+ defp parse_status("throttled"), do: :throttled
105105+ defp parse_status(status) when is_binary(status), do: status
106106+end
+78
lib/jetstream/event/commit.ex
···11+defmodule Drinkup.Jetstream.Event.Commit do
22+ @moduledoc """
33+ Struct for commit events from Jetstream.
44+55+ Represents a repository commit containing either a create, update, or delete
66+ operation on a record. Unlike the Firehose commit events, Jetstream provides
77+ simplified JSON structures without CAR/CBOR encoding.
88+ """
99+1010+ use TypedStruct
1111+1212+ typedstruct enforce: true do
1313+ @typedoc """
1414+ The operation type for this commit.
1515+1616+ - `:create` - A new record was created
1717+ - `:update` - An existing record was updated
1818+ - `:delete` - An existing record was deleted
1919+ """
2020+ @type operation() :: :create | :update | :delete
2121+2222+ field :did, String.t()
2323+ field :time_us, integer()
2424+ field :kind, :commit, default: :commit
2525+ field :operation, operation()
2626+ field :collection, String.t()
2727+ field :rkey, String.t()
2828+ field :rev, String.t()
2929+ field :record, map() | nil
3030+ field :cid, String.t() | nil
3131+ end
3232+3333+ @doc """
3434+ Parses a Jetstream commit payload into a Commit struct.
3535+3636+ ## Example Payload
3737+3838+ %{
3939+ "rev" => "3l3qo2vutsw2b",
4040+ "operation" => "create",
4141+ "collection" => "app.bsky.feed.like",
4242+ "rkey" => "3l3qo2vuowo2b",
4343+ "record" => %{
4444+ "$type" => "app.bsky.feed.like",
4545+ "createdAt" => "2024-09-09T19:46:02.102Z",
4646+ "subject" => %{...}
4747+ },
4848+ "cid" => "bafyreidwaivazkwu67xztlmuobx35hs2lnfh3kolmgfmucldvhd3sgzcqi"
4949+ }
5050+ """
5151+ @spec from(String.t(), integer(), map()) :: t()
5252+ def from(
5353+ did,
5454+ time_us,
5555+ %{
5656+ "rev" => rev,
5757+ "operation" => operation,
5858+ "collection" => collection,
5959+ "rkey" => rkey
6060+ } = commit
6161+ ) do
6262+ %__MODULE__{
6363+ did: did,
6464+ time_us: time_us,
6565+ operation: parse_operation(operation),
6666+ collection: collection,
6767+ rkey: rkey,
6868+ rev: rev,
6969+ record: Map.get(commit, "record"),
7070+ cid: Map.get(commit, "cid")
7171+ }
7272+ end
7373+7474+ @spec parse_operation(String.t()) :: operation()
7575+ defp parse_operation("create"), do: :create
7676+ defp parse_operation("update"), do: :update
7777+ defp parse_operation("delete"), do: :delete
7878+end
+58
lib/jetstream/event/identity.ex
···11+defmodule Drinkup.Jetstream.Event.Identity do
22+ @moduledoc """
33+ Struct for identity events from Jetstream.
44+55+ Represents a change to an account's identity, such as an updated handle,
66+ signing key, or PDS hosting endpoint. This serves as a signal to downstream
77+ services to refresh their identity cache.
88+ """
99+1010+ use TypedStruct
1111+1212+ typedstruct enforce: true do
1313+ field :did, String.t()
1414+ field :time_us, integer()
1515+ field :kind, :identity, default: :identity
1616+ field :handle, String.t() | nil
1717+ field :seq, integer()
1818+ field :time, NaiveDateTime.t()
1919+ end
2020+2121+ @doc """
2222+ Parses a Jetstream identity payload into an Identity struct.
2323+2424+ ## Example Payload
2525+2626+ %{
2727+ "did" => "did:plc:ufbl4k27gp6kzas5glhz7fim",
2828+ "handle" => "yohenrique.bsky.social",
2929+ "seq" => 1409752997,
3030+ "time" => "2024-09-05T06:11:04.870Z"
3131+ }
3232+ """
3333+ @spec from(String.t(), integer(), map()) :: t()
3434+ def from(
3535+ did,
3636+ time_us,
3737+ %{
3838+ "seq" => seq,
3939+ "time" => time
4040+ } = identity
4141+ ) do
4242+ %__MODULE__{
4343+ did: did,
4444+ time_us: time_us,
4545+ handle: Map.get(identity, "handle"),
4646+ seq: seq,
4747+ time: parse_datetime(time)
4848+ }
4949+ end
5050+5151+ @spec parse_datetime(String.t()) :: NaiveDateTime.t()
5252+ defp parse_datetime(time_str) do
5353+ case NaiveDateTime.from_iso8601(time_str) do
5454+ {:ok, datetime} -> datetime
5555+ {:error, _} -> raise "Invalid datetime format: #{time_str}"
5656+ end
5757+ end
5858+end
+151
lib/jetstream/options.ex
···11+defmodule Drinkup.Jetstream.Options do
22+ @moduledoc """
33+ Configuration options for Jetstream event stream connection.
44+55+ Jetstream is a simplified JSON event stream that converts the CBOR-encoded
66+ ATProto Firehose into lightweight, friendly JSON. It provides zstd compression
77+ and filtering capabilities for collections and DIDs.
88+99+ ## Options
1010+1111+ - `:consumer` (required) - Module implementing `Drinkup.Jetstream.Consumer` behaviour
1212+ - `:name` - Unique name for this Jetstream instance in the supervision tree (default: `Drinkup.Jetstream`)
1313+ - `:host` - Jetstream service URL (default: `"wss://jetstream2.us-east.bsky.network"`)
1414+ - `:wanted_collections` - List of collection NSIDs or prefixes to filter (default: `[]` = all collections)
1515+ - `:wanted_dids` - List of DIDs to filter (default: `[]` = all repos)
1616+ - `:cursor` - Unix microseconds timestamp to resume from (default: `nil` = live-tail)
1717+ - `:require_hello` - Pause replay until first options update is sent (default: `false`)
1818+ - `:max_message_size_bytes` - Maximum message size to receive (default: `nil` = no limit)
1919+2020+ ## Example
2121+2222+ %{
2323+ consumer: MyJetstreamConsumer,
2424+ name: MyJetstream,
2525+ host: "wss://jetstream2.us-east.bsky.network",
2626+ wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"],
2727+ wanted_dids: ["did:plc:abc123"],
2828+ cursor: 1725519626134432
2929+ }
3030+3131+ ## Collection Filters
3232+3333+ The `wanted_collections` option supports:
3434+ - Full NSIDs: `"app.bsky.feed.post"`
3535+ - NSID prefixes: `"app.bsky.graph.*"`, `"app.bsky.*"`
3636+3737+ You can specify up to 100 collection filters.
3838+3939+ ## DID Filters
4040+4141+ The `wanted_dids` option accepts a list of DID strings.
4242+ You can specify up to 10,000 DIDs.
4343+4444+ ## Compression
4545+4646+ Jetstream always uses zstd compression with a custom dictionary.
4747+ This is handled automatically by the socket implementation.
4848+ """
4949+5050+ use TypedStruct
5151+5252+ @default_host "wss://jetstream2.us-east.bsky.network"
5353+5454+ @typedoc """
5555+ Map of configuration options accepted by `Drinkup.Jetstream.child_spec/1`.
5656+ """
5757+ @type options() :: %{
5858+ required(:consumer) => consumer(),
5959+ optional(:name) => name(),
6060+ optional(:host) => host(),
6161+ optional(:wanted_collections) => wanted_collections(),
6262+ optional(:wanted_dids) => wanted_dids(),
6363+ optional(:cursor) => cursor(),
6464+ optional(:require_hello) => require_hello(),
6565+ optional(:max_message_size_bytes) => max_message_size_bytes()
6666+ }
6767+6868+ @typedoc """
6969+ Module implementing the `Drinkup.Jetstream.Consumer` behaviour.
7070+ """
7171+ @type consumer() :: module()
7272+7373+ @typedoc """
7474+ Unique identifier for this Jetstream instance in the supervision tree.
7575+7676+ Used for Registry lookups and naming child processes.
7777+ """
7878+ @type name() :: atom()
7979+8080+ @typedoc """
8181+ WebSocket URL of the Jetstream service.
8282+8383+ Defaults to `"wss://jetstream2.us-east.bsky.network"` which is a public Bluesky instance.
8484+ """
8585+ @type host() :: String.t()
8686+8787+ @typedoc """
8888+ List of collection NSIDs or NSID prefixes to filter.
8989+9090+ Examples:
9191+ - `["app.bsky.feed.post"]` - Only posts
9292+ - `["app.bsky.graph.*"]` - All graph collections
9393+ - `["app.bsky.*"]` - All Bluesky app collections
9494+9595+ You can specify up to 100 collection filters.
9696+ Defaults to `[]` (all collections).
9797+ """
9898+ @type wanted_collections() :: [String.t()]
9999+100100+ @typedoc """
101101+ List of DIDs to filter events by.
102102+103103+ You can specify up to 10,000 DIDs.
104104+ Defaults to `[]` (all repos).
105105+ """
106106+ @type wanted_dids() :: [String.t()]
107107+108108+ @typedoc """
109109+ Unix microseconds timestamp to resume streaming from.
110110+111111+ When provided, Jetstream will replay events starting from this timestamp.
112112+ Useful for resuming after a restart without missing events. The cursor is
113113+ automatically tracked and updated as events are received.
114114+115115+ Defaults to `nil` (live-tail from current time).
116116+ """
117117+ @type cursor() :: pos_integer() | nil
118118+119119+ @typedoc """
120120+ Whether to pause replay/live-tail until the first options update is sent.
121121+122122+ When `true`, the connection will wait for a `Drinkup.Jetstream.update_options/2`
123123+ call before starting to receive events.
124124+125125+ Defaults to `false`.
126126+ """
127127+ @type require_hello() :: boolean()
128128+129129+ @typedoc """
130130+ Maximum message size in bytes that the client would like to receive.
131131+132132+ Zero or `nil` means no limit. Negative values are treated as zero.
133133+ Defaults to `nil` (no maximum size).
134134+ """
135135+ @type max_message_size_bytes() :: integer() | nil
136136+137137+ typedstruct do
138138+ field :consumer, consumer(), enforce: true
139139+ field :name, name(), default: Drinkup.Jetstream
140140+ field :host, host(), default: @default_host
141141+ # TODO: Add NSID prefix validation once available in atex
142142+ field :wanted_collections, wanted_collections(), default: []
143143+ field :wanted_dids, wanted_dids(), default: []
144144+ field :cursor, cursor()
145145+ field :require_hello, require_hello(), default: false
146146+ field :max_message_size_bytes, max_message_size_bytes()
147147+ end
148148+149149+ @spec from(options()) :: t()
150150+ def from(%{consumer: _} = options), do: struct(__MODULE__, options)
151151+end
+201
lib/jetstream/socket.ex
···11+defmodule Drinkup.Jetstream.Socket do
22+ @moduledoc """
33+ WebSocket connection handler for Jetstream event streams.
44+55+ Implements the Drinkup.Socket behaviour to manage connections to a Jetstream
66+ service, handling zstd-compressed JSON events and dispatching them to the
77+ configured consumer.
88+ """
99+1010+ use Drinkup.Socket
1111+1212+ require Logger
1313+ alias Drinkup.Jetstream.{Event, Options}
1414+1515+ @dict_path "priv/jetstream/zstd_dictionary"
1616+ @external_resource @dict_path
1717+ @zstd_dict File.read!(@dict_path)
1818+1919+ @impl true
2020+ def init(opts) do
2121+ options = Keyword.fetch!(opts, :options)
2222+2323+ {:ok, %{options: options, host: options.host, cursor: options.cursor}}
2424+ end
2525+2626+ def start_link(%Options{} = options, statem_opts) do
2727+ socket_opts = [
2828+ host: options.host,
2929+ options: options
3030+ ]
3131+3232+ statem_opts =
3333+ Keyword.put(
3434+ statem_opts,
3535+ :name,
3636+ {:via, Registry, {Drinkup.Registry, {options.name, JetstreamSocket}}}
3737+ )
3838+3939+ Drinkup.Socket.start_link(__MODULE__, socket_opts, statem_opts)
4040+ end
4141+4242+ @impl true
4343+ def build_path(%{options: options}) do
4444+ query_params = [compress: "true"]
4545+4646+ query_params =
4747+ query_params
4848+ |> put_collections(options.wanted_collections)
4949+ |> put_dids(options.wanted_dids)
5050+ |> put_cursor(options.cursor)
5151+ |> put_max_size(options.max_message_size_bytes)
5252+ |> put_require_hello(options.require_hello)
5353+5454+ "/subscribe?" <> URI.encode_query(query_params)
5555+ end
5656+5757+ @impl true
5858+ def handle_frame(
5959+ {:binary, compressed_data},
6060+ {%{options: options} = data, _conn, _stream}
6161+ ) do
6262+ case decompress_and_parse(compressed_data) do
6363+ {:ok, payload} ->
6464+ case Event.from(payload) do
6565+ nil ->
6666+ # Event.from already logs warnings for unrecognized events
6767+ :noop
6868+6969+ event ->
7070+ Event.dispatch(event, options)
7171+ # Update cursor with the event's time_us
7272+ new_cursor = Map.get(payload, "time_us")
7373+ {:ok, %{data | cursor: new_cursor}}
7474+ end
7575+7676+ # TODO: sometimes getting ZSTD_CONTENTSIZE_UNKNOWN
7777+ {:error, reason} ->
7878+ Logger.error(
7979+ "[Drinkup.Jetstream.Socket] Failed to decompress/parse frame: #{inspect(reason)}"
8080+ )
8181+8282+ :noop
8383+ end
8484+ end
8585+8686+ @impl true
8787+ def handle_frame({:text, json}, {%{options: options} = data, _conn, _stream}) do
8888+ # Text frames shouldn't happen since we force compression, but handle them anyway
8989+ case Jason.decode(json) do
9090+ {:ok, payload} ->
9191+ case Event.from(payload) do
9292+ nil ->
9393+ :noop
9494+9595+ event ->
9696+ Event.dispatch(event, options)
9797+ new_cursor = Map.get(payload, "time_us")
9898+ {:ok, %{data | cursor: new_cursor}}
9999+ end
100100+101101+ {:error, reason} ->
102102+ Logger.error("[Drinkup.Jetstream.Socket] Failed to decode JSON: #{inspect(reason)}")
103103+ :noop
104104+ end
105105+ end
106106+107107+ @impl true
108108+ def handle_frame(:close, _data) do
109109+ Logger.info("[Drinkup.Jetstream.Socket] WebSocket closed, reason unknown")
110110+ nil
111111+ end
112112+113113+ @impl true
114114+ def handle_frame({:close, errno, reason}, _data) do
115115+ Logger.info(
116116+ "[Drinkup.Jetstream.Socket] WebSocket closed, errno: #{errno}, reason: #{inspect(reason)}"
117117+ )
118118+119119+ nil
120120+ end
121121+122122+ @impl true
123123+ def handle_connected({user_data, conn, stream}) do
124124+ # Register connection for options updates
125125+ Registry.register(
126126+ Drinkup.Registry,
127127+ {user_data.options.name, JetstreamConnection},
128128+ {conn, stream}
129129+ )
130130+131131+ {:ok, user_data}
132132+ end
133133+134134+ @impl true
135135+ def handle_disconnected(_reason, {user_data, _conn, _stream}) do
136136+ # Unregister connection when disconnected
137137+ Registry.unregister(Drinkup.Registry, {user_data.options.name, JetstreamConnection})
138138+ {:ok, user_data}
139139+ end
140140+141141+ # Can't use `create_ddict` as the value of `@zstd_dict` because it returns a reference :(
142142+ @spec get_dictionary() :: reference()
143143+ defp get_dictionary() do
144144+ case :ezstd.create_ddict(@zstd_dict) do
145145+ {:error, reason} ->
146146+ raise ArgumentError,
147147+ "somehow failed to created Jetstream's ZSTD dictionary: #{inspect(reason)}"
148148+149149+ dict ->
150150+ dict
151151+ end
152152+ end
153153+154154+ @spec decompress_and_parse(binary()) :: {:ok, map()} | {:error, term()}
155155+ defp decompress_and_parse(compressed_data) do
156156+ with ctx when is_reference(ctx) <-
157157+ :ezstd.create_decompression_context(byte_size(compressed_data)),
158158+ :ok <- :ezstd.select_ddict(ctx, get_dictionary()),
159159+ iolist when is_list(iolist) <- :ezstd.decompress_streaming(ctx, compressed_data),
160160+ decompressed <- IO.iodata_to_binary(iolist),
161161+ {:ok, payload} <- JSON.decode(decompressed) do
162162+ {:ok, payload}
163163+ else
164164+ {:error, reason} -> {:error, reason}
165165+ end
166166+ end
167167+168168+ @spec put_collections(keyword(), [String.t()]) :: keyword()
169169+ defp put_collections(params, []), do: params
170170+171171+ defp put_collections(params, collections) when is_list(collections) do
172172+ Enum.reduce(collections, params, fn collection, acc ->
173173+ [{:wantedCollections, collection} | acc]
174174+ end)
175175+ end
176176+177177+ @spec put_dids(keyword(), [String.t()]) :: keyword()
178178+ defp put_dids(params, []), do: params
179179+180180+ defp put_dids(params, dids) when is_list(dids) do
181181+ Enum.reduce(dids, params, fn did, acc ->
182182+ [{:wantedDids, did} | acc]
183183+ end)
184184+ end
185185+186186+ @spec put_cursor(keyword(), integer() | nil) :: keyword()
187187+ defp put_cursor(params, nil), do: params
188188+189189+ defp put_cursor(params, cursor) when is_integer(cursor), do: [{:cursor, cursor} | params]
190190+191191+ @spec put_max_size(keyword(), integer() | nil) :: keyword()
192192+ defp put_max_size(params, nil), do: params
193193+194194+ defp put_max_size(params, max_size) when is_integer(max_size),
195195+ do: [{:maxMessageSizeBytes, max_size} | params]
196196+197197+ @spec put_require_hello(keyword(), boolean()) :: keyword()
198198+ defp put_require_hello(params, false), do: params
199199+200200+ defp put_require_hello(params, true), do: [{:requireHello, "true"} | params]
201201+end