Elixir ATProtocol ingestion and sync library.
at main 140 lines 4.2 kB view raw
1defmodule Drinkup.Firehose do 2 @moduledoc """ 3 Module for handling events from the AT Protocol [firehose](https://docs.bsky.app/docs/advanced-guides/firehose). 4 5 Due to the nature of the firehose, this will result in a lot of incoming 6 traffic as it receives every repo and identity event within the network. If 7 you're concerened about bandwidth constaints or just don't need a 8 whole-network sync, you may be better off using `Drinkup.Jetstream` or 9 `Drinkup.Tap`. 10 11 ## Usage 12 13 defmodule MyFirehoseConsumer do 14 use Drinkup.Firehose, 15 name: :my_firehose, 16 host: "https://bsky.network", 17 cursor: nil 18 19 @impl true 20 def handle_event(%Drinkup.Firehose.Event.Commit{} = event) do 21 IO.inspect(event, label: "Commit") 22 :ok 23 end 24 25 def handle_event(_event), do: :ok 26 end 27 28 # In your application supervision tree: 29 children = [MyFirehoseConsumer] 30 31 Exceptions raised by `handle_event/1` will be logged instead of killing and 32 restarting the socket process. 33 34 ## Options 35 36 - `:name` - Unique name for this Firehose instance (default: the module name) 37 - `:host` - Firehose relay URL (default: `"https://bsky.network"`) 38 - `:cursor` - Optional sequence number to resume streaming from 39 40 ## Runtime Configuration 41 42 You can override options at runtime by providing them to `child_spec/1`: 43 44 children = [ 45 {MyFirehoseConsumer, name: :runtime_name, cursor: 12345} 46 ] 47 48 ## Event Types 49 50 `handle_event/1` will receive the following event structs: 51 52 - `Drinkup.Firehose.Event.Commit` - Repository commits 53 - `Drinkup.Firehose.Event.Sync` - Sync events 54 - `Drinkup.Firehose.Event.Identity` - Identity updates 55 - `Drinkup.Firehose.Event.Account` - Account status changes 56 - `Drinkup.Firehose.Event.Info` - Info messages 57 """ 58 59 defmacro __using__(opts) do 60 quote location: :keep, bind_quoted: [opts: opts] do 61 use Supervisor 62 @behaviour Drinkup.Firehose.Consumer 63 64 alias Drinkup.Firehose.Options 65 66 # Store compile-time options as module attributes 67 @name Keyword.get(opts, :name) 68 @host Keyword.get(opts, :host, "https://bsky.network") 69 @cursor Keyword.get(opts, :cursor) 70 71 @doc """ 72 Starts the Firehose consumer supervisor. 73 74 Accepts optional runtime configuration that overrides compile-time options. 75 """ 76 def start_link(runtime_opts \\ []) do 77 # Merge compile-time and runtime options 78 opts = build_options(runtime_opts) 79 Supervisor.start_link(__MODULE__, opts, name: via_tuple(opts.name)) 80 end 81 82 @impl true 83 def init(%Options{name: name} = options) do 84 children = [ 85 {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, Tasks}}}}, 86 {Drinkup.Firehose.Socket, options} 87 ] 88 89 Supervisor.init(children, strategy: :one_for_one) 90 end 91 92 @doc """ 93 Returns a child spec for adding this consumer to a supervision tree. 94 95 Runtime options override compile-time options. 96 """ 97 def child_spec(runtime_opts) when is_list(runtime_opts) do 98 opts = build_options(runtime_opts) 99 100 %{ 101 id: opts.name, 102 start: {__MODULE__, :start_link, [runtime_opts]}, 103 type: :supervisor, 104 restart: :permanent, 105 shutdown: 500 106 } 107 end 108 109 def child_spec(_opts) do 110 raise ArgumentError, "child_spec expects a keyword list of options" 111 end 112 113 defoverridable child_spec: 1 114 115 # Build Options struct from compile-time and runtime options 116 defp build_options(runtime_opts) do 117 # Compile-time defaults 118 compile_opts = [ 119 name: @name || __MODULE__, 120 host: @host, 121 cursor: @cursor 122 ] 123 124 # Merge with runtime opts (runtime takes precedence) 125 merged = 126 compile_opts 127 |> Keyword.merge(runtime_opts) 128 |> Enum.reject(fn {_k, v} -> is_nil(v) end) 129 |> Map.new() 130 |> Map.put(:consumer, __MODULE__) 131 132 Options.from(merged) 133 end 134 135 defp via_tuple(name) do 136 {:via, Registry, {Drinkup.Registry, {name, Supervisor}}} 137 end 138 end 139 end 140end