Elixir ATProtocol ingestion and sync library.
1defmodule Drinkup.Socket do
2 # TODO: talk about how to implment, but that it's for internal use
3 @moduledoc false
4
5 require Logger
6
7 @behaviour :gen_statem
8
9 @type frame ::
10 {:binary, binary()}
11 | {:text, String.t()}
12 | :close
13 | {:close, errno :: integer(), reason :: binary()}
14
15 @type user_data :: term()
16
17 @type reconnect_strategy ::
18 :exponential
19 | {:exponential, max_backoff :: pos_integer()}
20 | {:custom, (attempt :: pos_integer() -> delay_ms :: pos_integer())}
21
22 @type option ::
23 {:host, String.t()}
24 | {:flow, pos_integer()}
25 | {:timeout, pos_integer()}
26 | {:tls_opts, keyword()}
27 | {:gun_opts, map()}
28 | {:reconnect_strategy, reconnect_strategy()}
29 | {atom(), term()}
30
31 @callback init(opts :: keyword()) :: {:ok, user_data()} | {:error, reason :: term()}
32
33 @callback build_path(data :: user_data()) :: String.t()
34
35 @callback handle_frame(
36 frame :: frame(),
37 data :: {user_data(), conn :: pid() | nil, stream :: :gun.stream_ref() | nil}
38 ) ::
39 {:ok, new_data :: user_data()} | :noop | nil | {:error, reason :: term()}
40
41 @callback handle_connected(data :: {user_data(), conn :: pid(), stream :: :gun.stream_ref()}) ::
42 {:ok, new_data :: user_data()}
43
44 @callback handle_disconnected(
45 reason :: term(),
46 data :: {user_data(), conn :: pid() | nil, stream :: :gun.stream_ref() | nil}
47 ) ::
48 {:ok, new_data :: user_data()}
49
50 @optional_callbacks handle_connected: 1, handle_disconnected: 2
51
52 defstruct [
53 :module,
54 :user_data,
55 :options,
56 :conn,
57 :stream,
58 reconnect_attempts: 0
59 ]
60
61 defmacro __using__(_opts) do
62 quote do
63 @behaviour Drinkup.Socket
64
65 def start_link(opts, statem_opts \\ [])
66
67 def start_link(opts, statem_opts) do
68 Drinkup.Socket.start_link(__MODULE__, opts, statem_opts)
69 end
70
71 defoverridable start_link: 2
72
73 def child_spec(opts) do
74 %{
75 id: __MODULE__,
76 start: {__MODULE__, :start_link, [opts, []]},
77 type: :worker,
78 restart: :permanent,
79 shutdown: 500
80 }
81 end
82
83 defoverridable child_spec: 1
84
85 @impl true
86 def handle_connected({user_data, _conn, _stream}), do: {:ok, user_data}
87
88 @impl true
89 def handle_disconnected(_reason, {user_data, _conn, _stream}), do: {:ok, user_data}
90
91 defoverridable handle_connected: 1, handle_disconnected: 2
92 end
93 end
94
95 @impl true
96 def callback_mode, do: [:state_functions, :state_enter]
97
98 @doc """
99 Start a WebSocket connection process.
100
101 ## Parameters
102
103 * `module` - The module implementing the Drinkup.Socket behaviour
104 * `opts` - Keyword list of options (see module documentation)
105 * `statem_opts` - Options passed to `:gen_statem.start_link/3`
106 """
107 def start_link(module, opts, statem_opts) do
108 :gen_statem.start_link(__MODULE__, {module, opts}, statem_opts)
109 end
110
111 @impl true
112 def init({module, opts}) do
113 case module.init(opts) do
114 {:ok, user_data} ->
115 options = parse_options(opts)
116
117 data = %__MODULE__{
118 module: module,
119 user_data: user_data,
120 options: options,
121 reconnect_attempts: 0
122 }
123
124 {:ok, :disconnected, data, [{:next_event, :internal, :connect}]}
125
126 {:error, reason} ->
127 {:stop, {:init_failed, reason}}
128 end
129 end
130
131 # :disconnected state - waiting to connect or reconnect
132
133 def disconnected(:enter, _from, _data) do
134 Logger.debug("[Drinkup.Socket] Entering disconnected state")
135 :keep_state_and_data
136 end
137
138 def disconnected(:internal, :connect, data) do
139 {:next_state, :connecting_http, data}
140 end
141
142 def disconnected(:timeout, :reconnect, data) do
143 {:next_state, :connecting_http, data}
144 end
145
146 # :connecting_http state - establishing HTTP connection with TLS
147
148 def connecting_http(:enter, _from, %{options: options} = data) do
149 Logger.debug("[Drinkup.Socket] Connecting to HTTP")
150
151 %{host: host, port: port} = URI.new!(options.host)
152
153 gun_opts =
154 Map.merge(
155 %{
156 retry: 0,
157 protocols: [:http],
158 connect_timeout: options.timeout,
159 domain_lookup_timeout: options.timeout,
160 tls_handshake_timeout: options.timeout,
161 tls_opts: options.tls_opts
162 },
163 options.gun_opts
164 )
165
166 case :gun.open(:binary.bin_to_list(host), port, gun_opts) do
167 {:ok, conn} ->
168 {:keep_state, %{data | conn: conn}, [{:state_timeout, options.timeout, :connect_timeout}]}
169
170 {:error, reason} ->
171 Logger.error("[Drinkup.Socket] Failed to open connection: #{inspect(reason)}")
172 {:stop, {:connect_failed, reason}}
173 end
174 end
175
176 def connecting_http(:info, {:gun_up, _conn, :http}, data) do
177 {:next_state, :connecting_ws, data}
178 end
179
180 def connecting_http(:state_timeout, :connect_timeout, data) do
181 Logger.error("[Drinkup.Socket] HTTP connection timeout")
182 trigger_reconnect(data)
183 end
184
185 # :connecting_ws state - upgrading to WebSocket
186
187 def connecting_ws(
188 :enter,
189 _from,
190 %{module: module, user_data: user_data, options: options} = data
191 ) do
192 Logger.debug("[Drinkup.Socket] Upgrading connection to WebSocket")
193
194 path = module.build_path(user_data)
195 stream = :gun.ws_upgrade(data.conn, path, [], %{flow: options.flow})
196
197 {:keep_state, %{data | stream: stream}, [{:state_timeout, options.timeout, :upgrade_timeout}]}
198 end
199
200 def connecting_ws(:info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, data) do
201 {:next_state, :connected, data}
202 end
203
204 def connecting_ws(:info, {:gun_response, _conn, _stream, _fin, status, _headers}, data) do
205 Logger.error("[Drinkup.Socket] WebSocket upgrade failed with status: #{status}")
206 trigger_reconnect(data)
207 end
208
209 def connecting_ws(:info, {:gun_error, _conn, _stream, reason}, data) do
210 Logger.error("[Drinkup.Socket] WebSocket upgrade error: #{inspect(reason)}")
211 trigger_reconnect(data)
212 end
213
214 def connecting_ws(:state_timeout, :upgrade_timeout, data) do
215 Logger.error("[Drinkup.Socket] WebSocket upgrade timeout")
216 trigger_reconnect(data)
217 end
218
219 # :connected state - active WebSocket connection
220
221 def connected(
222 :enter,
223 _from,
224 %{module: module, user_data: user_data, conn: conn, stream: stream} = data
225 ) do
226 Logger.debug("[Drinkup.Socket] WebSocket connected")
227
228 case module.handle_connected({user_data, conn, stream}) do
229 {:ok, new_user_data} ->
230 {:keep_state, %{data | user_data: new_user_data, reconnect_attempts: 0}}
231
232 _ ->
233 {:keep_state, %{data | reconnect_attempts: 0}}
234 end
235 end
236
237 def connected(
238 :info,
239 {:gun_ws, conn, _stream, frame},
240 %{module: module, user_data: user_data, options: options, conn: conn, stream: stream} =
241 data
242 ) do
243 result = module.handle_frame(frame, {user_data, conn, stream})
244
245 :ok = :gun.update_flow(conn, frame, options.flow)
246
247 case result do
248 {:ok, new_user_data} ->
249 {:keep_state, %{data | user_data: new_user_data}}
250
251 result when result in [:noop, nil] ->
252 :keep_state_and_data
253
254 {:error, reason} ->
255 Logger.error("[Drinkup.Socket] Frame handler error: #{inspect(reason)}")
256 :keep_state_and_data
257 end
258 end
259
260 def connected(:info, {:gun_ws, _conn, _stream, :close}, data) do
261 Logger.info("[Drinkup.Socket] WebSocket closed by remote")
262 trigger_reconnect(data, :remote_close)
263 end
264
265 def connected(:info, {:gun_ws, _conn, _stream, {:close, errno, reason}}, data) do
266 Logger.info("[Drinkup.Socket] WebSocket closed: #{errno} - #{inspect(reason)}")
267 trigger_reconnect(data, {:remote_close, errno, reason})
268 end
269
270 def connected(:info, {:gun_down, old_conn, _proto, _reason, _killed_streams}, %{conn: new_conn})
271 when old_conn != new_conn do
272 Logger.debug("[Drinkup.Socket] Ignoring :gun_down for old connection")
273 :keep_state_and_data
274 end
275
276 def connected(:info, {:gun_down, _conn, _proto, reason, _killed_streams}, data) do
277 Logger.info("[Drinkup.Socket] Connection down: #{inspect(reason)}")
278 trigger_reconnect(data, {:connection_down, reason})
279 end
280
281 def connected(
282 :internal,
283 :reconnect,
284 %{conn: conn, options: options, reconnect_attempts: attempts} = data
285 ) do
286 :ok = :gun.close(conn)
287 :ok = :gun.flush(conn)
288
289 backoff = calculate_backoff(attempts, options.reconnect_strategy)
290
291 Logger.info("[Drinkup.Socket] Reconnecting in #{backoff}ms (attempt #{attempts + 1})")
292
293 {:next_state, :disconnected,
294 %{data | conn: nil, stream: nil, reconnect_attempts: attempts + 1},
295 [{{:timeout, :reconnect}, backoff, :reconnect}]}
296 end
297
298 # Helper functions
299
300 defp trigger_reconnect(data, reason \\ :unknown) do
301 %{module: module, user_data: user_data, conn: conn, stream: stream} = data
302
303 case module.handle_disconnected(reason, {user_data, conn, stream}) do
304 {:ok, new_user_data} ->
305 {:keep_state, %{data | user_data: new_user_data}, [{:next_event, :internal, :reconnect}]}
306
307 _ ->
308 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
309 end
310 end
311
312 defp parse_options(opts) do
313 %{
314 host: Keyword.fetch!(opts, :host),
315 flow: Keyword.get(opts, :flow, 10),
316 timeout: Keyword.get(opts, :timeout, :timer.seconds(5)),
317 tls_opts: Keyword.get(opts, :tls_opts, default_tls_opts()),
318 gun_opts: Keyword.get(opts, :gun_opts, %{}),
319 reconnect_strategy: Keyword.get(opts, :reconnect_strategy, :exponential)
320 }
321 end
322
323 defp default_tls_opts do
324 [
325 verify: :verify_peer,
326 cacerts: :certifi.cacerts(),
327 depth: 3,
328 customize_hostname_check: [
329 match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
330 ]
331 ]
332 end
333
334 defp calculate_backoff(attempt, strategy) do
335 case strategy do
336 :exponential ->
337 exponential_backoff(attempt, :timer.seconds(60))
338
339 {:exponential, max_backoff} ->
340 exponential_backoff(attempt, max_backoff)
341
342 {:custom, func} when is_function(func, 1) ->
343 func.(attempt)
344 end
345 end
346
347 defp exponential_backoff(attempt, max_backoff) do
348 base = :timer.seconds(1)
349 delay = min(base * :math.pow(2, attempt), max_backoff)
350 jitter = :rand.uniform(trunc(delay * 0.1))
351 trunc(delay) + jitter
352 end
353end