Elixir ATProtocol ingestion and sync library.
1defmodule Drinkup.Firehose do
2 @moduledoc """
3 Module for handling events from the AT Protocol [firehose](https://docs.bsky.app/docs/advanced-guides/firehose).
4
5 Due to the nature of the firehose, this will result in a lot of incoming
6 traffic as it receives every repo and identity event within the network. If
7 you're concerened about bandwidth constaints or just don't need a
8 whole-network sync, you may be better off using `Drinkup.Jetstream` or
9 `Drinkup.Tap`.
10
11 ## Usage
12
13 defmodule MyFirehoseConsumer do
14 use Drinkup.Firehose,
15 name: :my_firehose,
16 host: "https://bsky.network",
17 cursor: nil
18
19 @impl true
20 def handle_event(%Drinkup.Firehose.Event.Commit{} = event) do
21 IO.inspect(event, label: "Commit")
22 :ok
23 end
24
25 def handle_event(_event), do: :ok
26 end
27
28 # In your application supervision tree:
29 children = [MyFirehoseConsumer]
30
31 Exceptions raised by `handle_event/1` will be logged instead of killing and
32 restarting the socket process.
33
34 ## Options
35
36 - `:name` - Unique name for this Firehose instance (default: the module name)
37 - `:host` - Firehose relay URL (default: `"https://bsky.network"`)
38 - `:cursor` - Optional sequence number to resume streaming from
39
40 ## Runtime Configuration
41
42 You can override options at runtime by providing them to `child_spec/1`:
43
44 children = [
45 {MyFirehoseConsumer, name: :runtime_name, cursor: 12345}
46 ]
47
48 ## Event Types
49
50 `handle_event/1` will receive the following event structs:
51
52 - `Drinkup.Firehose.Event.Commit` - Repository commits
53 - `Drinkup.Firehose.Event.Sync` - Sync events
54 - `Drinkup.Firehose.Event.Identity` - Identity updates
55 - `Drinkup.Firehose.Event.Account` - Account status changes
56 - `Drinkup.Firehose.Event.Info` - Info messages
57 """
58
59 defmacro __using__(opts) do
60 quote location: :keep, bind_quoted: [opts: opts] do
61 use Supervisor
62 @behaviour Drinkup.Firehose.Consumer
63
64 alias Drinkup.Firehose.Options
65
66 # Store compile-time options as module attributes
67 @name Keyword.get(opts, :name)
68 @host Keyword.get(opts, :host, "https://bsky.network")
69 @cursor Keyword.get(opts, :cursor)
70
71 @doc """
72 Starts the Firehose consumer supervisor.
73
74 Accepts optional runtime configuration that overrides compile-time options.
75 """
76 def start_link(runtime_opts \\ []) do
77 # Merge compile-time and runtime options
78 opts = build_options(runtime_opts)
79 Supervisor.start_link(__MODULE__, opts, name: via_tuple(opts.name))
80 end
81
82 @impl true
83 def init(%Options{name: name} = options) do
84 children = [
85 {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, Tasks}}}},
86 {Drinkup.Firehose.Socket, options}
87 ]
88
89 Supervisor.init(children, strategy: :one_for_one)
90 end
91
92 @doc """
93 Returns a child spec for adding this consumer to a supervision tree.
94
95 Runtime options override compile-time options.
96 """
97 def child_spec(runtime_opts) when is_list(runtime_opts) do
98 opts = build_options(runtime_opts)
99
100 %{
101 id: opts.name,
102 start: {__MODULE__, :start_link, [runtime_opts]},
103 type: :supervisor,
104 restart: :permanent,
105 shutdown: 500
106 }
107 end
108
109 def child_spec(_opts) do
110 raise ArgumentError, "child_spec expects a keyword list of options"
111 end
112
113 defoverridable child_spec: 1
114
115 # Build Options struct from compile-time and runtime options
116 defp build_options(runtime_opts) do
117 # Compile-time defaults
118 compile_opts = [
119 name: @name || __MODULE__,
120 host: @host,
121 cursor: @cursor
122 ]
123
124 # Merge with runtime opts (runtime takes precedence)
125 merged =
126 compile_opts
127 |> Keyword.merge(runtime_opts)
128 |> Enum.reject(fn {_k, v} -> is_nil(v) end)
129 |> Map.new()
130 |> Map.put(:consumer, __MODULE__)
131
132 Options.from(merged)
133 end
134
135 defp via_tuple(name) do
136 {:via, Registry, {Drinkup.Registry, {name, Supervisor}}}
137 end
138 end
139 end
140end