Elixir ATProtocol ingestion and sync library.
1defmodule Drinkup.Firehose.Socket do
2 @moduledoc """
3 WebSocket connection handler for ATProto relay subscriptions.
4
5 Implements the Drinkup.Socket behaviour to manage connections to an ATProto
6 Firehose relay, handling CAR/CBOR-encoded frames and dispatching events to
7 the configured consumer.
8 """
9
10 use Drinkup.Socket
11
12 require Logger
13 alias Drinkup.Firehose.{Event, Options}
14
15 @op_regular 1
16 @op_error -1
17
18 @impl true
19 def init(opts) do
20 options = Keyword.fetch!(opts, :options)
21 {:ok, %{seq: options.cursor, options: options, host: options.host}}
22 end
23
24 def start_link(%Options{} = options, statem_opts) do
25 # Build opts for Drinkup.Socket from Options struct
26 socket_opts = [
27 host: options.host,
28 cursor: options.cursor,
29 options: options
30 ]
31
32 Drinkup.Socket.start_link(__MODULE__, socket_opts, statem_opts)
33 end
34
35 @impl true
36 def build_path(%{seq: seq}) do
37 cursor_param = if seq, do: %{cursor: seq}, else: %{}
38 "/xrpc/com.atproto.sync.subscribeRepos?" <> URI.encode_query(cursor_param)
39 end
40
41 @impl true
42 def handle_frame({:binary, frame}, {%{seq: seq, options: options} = data, _conn, _stream}) do
43 with {:ok, header, next} <- CAR.DagCbor.decode(frame),
44 {:ok, payload, _} <- CAR.DagCbor.decode(next),
45 {%{"op" => @op_regular, "t" => type}, _} <- {header, payload},
46 true <- Event.valid_seq?(seq, payload["seq"]) do
47 new_seq = payload["seq"] || seq
48
49 case Event.from(type, payload) do
50 nil ->
51 Logger.warning("Received unrecognised event from firehose: #{inspect({type, payload})}")
52
53 message ->
54 Event.dispatch(message, options)
55 end
56
57 {:ok, %{data | seq: new_seq}}
58 else
59 false ->
60 Logger.error("Got out of sequence or invalid `seq` from Firehose")
61 :noop
62
63 {%{"op" => @op_error, "t" => type}, payload} ->
64 Logger.error("Got error from Firehose: #{inspect({type, payload})}")
65 :noop
66
67 {:error, reason} ->
68 Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}")
69 :noop
70 end
71 end
72
73 @impl true
74 def handle_frame(:close, _data) do
75 Logger.info("Websocket closed, reason unknown")
76 nil
77 end
78
79 @impl true
80 def handle_frame({:close, errno, reason}, _data) do
81 Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}")
82 nil
83 end
84
85 @impl true
86 def handle_frame({:text, _text}, _data) do
87 Logger.warning("Received unexpected text frame from Firehose")
88 :noop
89 end
90end