Elixir ATProtocol ingestion and sync library.
at main 353 lines 10 kB view raw
1defmodule Drinkup.Socket do 2 # TODO: talk about how to implment, but that it's for internal use 3 @moduledoc false 4 5 require Logger 6 7 @behaviour :gen_statem 8 9 @type frame :: 10 {:binary, binary()} 11 | {:text, String.t()} 12 | :close 13 | {:close, errno :: integer(), reason :: binary()} 14 15 @type user_data :: term() 16 17 @type reconnect_strategy :: 18 :exponential 19 | {:exponential, max_backoff :: pos_integer()} 20 | {:custom, (attempt :: pos_integer() -> delay_ms :: pos_integer())} 21 22 @type option :: 23 {:host, String.t()} 24 | {:flow, pos_integer()} 25 | {:timeout, pos_integer()} 26 | {:tls_opts, keyword()} 27 | {:gun_opts, map()} 28 | {:reconnect_strategy, reconnect_strategy()} 29 | {atom(), term()} 30 31 @callback init(opts :: keyword()) :: {:ok, user_data()} | {:error, reason :: term()} 32 33 @callback build_path(data :: user_data()) :: String.t() 34 35 @callback handle_frame( 36 frame :: frame(), 37 data :: {user_data(), conn :: pid() | nil, stream :: :gun.stream_ref() | nil} 38 ) :: 39 {:ok, new_data :: user_data()} | :noop | nil | {:error, reason :: term()} 40 41 @callback handle_connected(data :: {user_data(), conn :: pid(), stream :: :gun.stream_ref()}) :: 42 {:ok, new_data :: user_data()} 43 44 @callback handle_disconnected( 45 reason :: term(), 46 data :: {user_data(), conn :: pid() | nil, stream :: :gun.stream_ref() | nil} 47 ) :: 48 {:ok, new_data :: user_data()} 49 50 @optional_callbacks handle_connected: 1, handle_disconnected: 2 51 52 defstruct [ 53 :module, 54 :user_data, 55 :options, 56 :conn, 57 :stream, 58 reconnect_attempts: 0 59 ] 60 61 defmacro __using__(_opts) do 62 quote do 63 @behaviour Drinkup.Socket 64 65 def start_link(opts, statem_opts \\ []) 66 67 def start_link(opts, statem_opts) do 68 Drinkup.Socket.start_link(__MODULE__, opts, statem_opts) 69 end 70 71 defoverridable start_link: 2 72 73 def child_spec(opts) do 74 %{ 75 id: __MODULE__, 76 start: {__MODULE__, :start_link, [opts, []]}, 77 type: :worker, 78 restart: :permanent, 79 shutdown: 500 80 } 81 end 82 83 defoverridable child_spec: 1 84 85 @impl true 86 def handle_connected({user_data, _conn, _stream}), do: {:ok, user_data} 87 88 @impl true 89 def handle_disconnected(_reason, {user_data, _conn, _stream}), do: {:ok, user_data} 90 91 defoverridable handle_connected: 1, handle_disconnected: 2 92 end 93 end 94 95 @impl true 96 def callback_mode, do: [:state_functions, :state_enter] 97 98 @doc """ 99 Start a WebSocket connection process. 100 101 ## Parameters 102 103 * `module` - The module implementing the Drinkup.Socket behaviour 104 * `opts` - Keyword list of options (see module documentation) 105 * `statem_opts` - Options passed to `:gen_statem.start_link/3` 106 """ 107 def start_link(module, opts, statem_opts) do 108 :gen_statem.start_link(__MODULE__, {module, opts}, statem_opts) 109 end 110 111 @impl true 112 def init({module, opts}) do 113 case module.init(opts) do 114 {:ok, user_data} -> 115 options = parse_options(opts) 116 117 data = %__MODULE__{ 118 module: module, 119 user_data: user_data, 120 options: options, 121 reconnect_attempts: 0 122 } 123 124 {:ok, :disconnected, data, [{:next_event, :internal, :connect}]} 125 126 {:error, reason} -> 127 {:stop, {:init_failed, reason}} 128 end 129 end 130 131 # :disconnected state - waiting to connect or reconnect 132 133 def disconnected(:enter, _from, _data) do 134 Logger.debug("[Drinkup.Socket] Entering disconnected state") 135 :keep_state_and_data 136 end 137 138 def disconnected(:internal, :connect, data) do 139 {:next_state, :connecting_http, data} 140 end 141 142 def disconnected(:timeout, :reconnect, data) do 143 {:next_state, :connecting_http, data} 144 end 145 146 # :connecting_http state - establishing HTTP connection with TLS 147 148 def connecting_http(:enter, _from, %{options: options} = data) do 149 Logger.debug("[Drinkup.Socket] Connecting to HTTP") 150 151 %{host: host, port: port} = URI.new!(options.host) 152 153 gun_opts = 154 Map.merge( 155 %{ 156 retry: 0, 157 protocols: [:http], 158 connect_timeout: options.timeout, 159 domain_lookup_timeout: options.timeout, 160 tls_handshake_timeout: options.timeout, 161 tls_opts: options.tls_opts 162 }, 163 options.gun_opts 164 ) 165 166 case :gun.open(:binary.bin_to_list(host), port, gun_opts) do 167 {:ok, conn} -> 168 {:keep_state, %{data | conn: conn}, [{:state_timeout, options.timeout, :connect_timeout}]} 169 170 {:error, reason} -> 171 Logger.error("[Drinkup.Socket] Failed to open connection: #{inspect(reason)}") 172 {:stop, {:connect_failed, reason}} 173 end 174 end 175 176 def connecting_http(:info, {:gun_up, _conn, :http}, data) do 177 {:next_state, :connecting_ws, data} 178 end 179 180 def connecting_http(:state_timeout, :connect_timeout, data) do 181 Logger.error("[Drinkup.Socket] HTTP connection timeout") 182 trigger_reconnect(data) 183 end 184 185 # :connecting_ws state - upgrading to WebSocket 186 187 def connecting_ws( 188 :enter, 189 _from, 190 %{module: module, user_data: user_data, options: options} = data 191 ) do 192 Logger.debug("[Drinkup.Socket] Upgrading connection to WebSocket") 193 194 path = module.build_path(user_data) 195 stream = :gun.ws_upgrade(data.conn, path, [], %{flow: options.flow}) 196 197 {:keep_state, %{data | stream: stream}, [{:state_timeout, options.timeout, :upgrade_timeout}]} 198 end 199 200 def connecting_ws(:info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, data) do 201 {:next_state, :connected, data} 202 end 203 204 def connecting_ws(:info, {:gun_response, _conn, _stream, _fin, status, _headers}, data) do 205 Logger.error("[Drinkup.Socket] WebSocket upgrade failed with status: #{status}") 206 trigger_reconnect(data) 207 end 208 209 def connecting_ws(:info, {:gun_error, _conn, _stream, reason}, data) do 210 Logger.error("[Drinkup.Socket] WebSocket upgrade error: #{inspect(reason)}") 211 trigger_reconnect(data) 212 end 213 214 def connecting_ws(:state_timeout, :upgrade_timeout, data) do 215 Logger.error("[Drinkup.Socket] WebSocket upgrade timeout") 216 trigger_reconnect(data) 217 end 218 219 # :connected state - active WebSocket connection 220 221 def connected( 222 :enter, 223 _from, 224 %{module: module, user_data: user_data, conn: conn, stream: stream} = data 225 ) do 226 Logger.debug("[Drinkup.Socket] WebSocket connected") 227 228 case module.handle_connected({user_data, conn, stream}) do 229 {:ok, new_user_data} -> 230 {:keep_state, %{data | user_data: new_user_data, reconnect_attempts: 0}} 231 232 _ -> 233 {:keep_state, %{data | reconnect_attempts: 0}} 234 end 235 end 236 237 def connected( 238 :info, 239 {:gun_ws, conn, _stream, frame}, 240 %{module: module, user_data: user_data, options: options, conn: conn, stream: stream} = 241 data 242 ) do 243 result = module.handle_frame(frame, {user_data, conn, stream}) 244 245 :ok = :gun.update_flow(conn, frame, options.flow) 246 247 case result do 248 {:ok, new_user_data} -> 249 {:keep_state, %{data | user_data: new_user_data}} 250 251 result when result in [:noop, nil] -> 252 :keep_state_and_data 253 254 {:error, reason} -> 255 Logger.error("[Drinkup.Socket] Frame handler error: #{inspect(reason)}") 256 :keep_state_and_data 257 end 258 end 259 260 def connected(:info, {:gun_ws, _conn, _stream, :close}, data) do 261 Logger.info("[Drinkup.Socket] WebSocket closed by remote") 262 trigger_reconnect(data, :remote_close) 263 end 264 265 def connected(:info, {:gun_ws, _conn, _stream, {:close, errno, reason}}, data) do 266 Logger.info("[Drinkup.Socket] WebSocket closed: #{errno} - #{inspect(reason)}") 267 trigger_reconnect(data, {:remote_close, errno, reason}) 268 end 269 270 def connected(:info, {:gun_down, old_conn, _proto, _reason, _killed_streams}, %{conn: new_conn}) 271 when old_conn != new_conn do 272 Logger.debug("[Drinkup.Socket] Ignoring :gun_down for old connection") 273 :keep_state_and_data 274 end 275 276 def connected(:info, {:gun_down, _conn, _proto, reason, _killed_streams}, data) do 277 Logger.info("[Drinkup.Socket] Connection down: #{inspect(reason)}") 278 trigger_reconnect(data, {:connection_down, reason}) 279 end 280 281 def connected( 282 :internal, 283 :reconnect, 284 %{conn: conn, options: options, reconnect_attempts: attempts} = data 285 ) do 286 :ok = :gun.close(conn) 287 :ok = :gun.flush(conn) 288 289 backoff = calculate_backoff(attempts, options.reconnect_strategy) 290 291 Logger.info("[Drinkup.Socket] Reconnecting in #{backoff}ms (attempt #{attempts + 1})") 292 293 {:next_state, :disconnected, 294 %{data | conn: nil, stream: nil, reconnect_attempts: attempts + 1}, 295 [{{:timeout, :reconnect}, backoff, :reconnect}]} 296 end 297 298 # Helper functions 299 300 defp trigger_reconnect(data, reason \\ :unknown) do 301 %{module: module, user_data: user_data, conn: conn, stream: stream} = data 302 303 case module.handle_disconnected(reason, {user_data, conn, stream}) do 304 {:ok, new_user_data} -> 305 {:keep_state, %{data | user_data: new_user_data}, [{:next_event, :internal, :reconnect}]} 306 307 _ -> 308 {:keep_state_and_data, [{:next_event, :internal, :reconnect}]} 309 end 310 end 311 312 defp parse_options(opts) do 313 %{ 314 host: Keyword.fetch!(opts, :host), 315 flow: Keyword.get(opts, :flow, 10), 316 timeout: Keyword.get(opts, :timeout, :timer.seconds(5)), 317 tls_opts: Keyword.get(opts, :tls_opts, default_tls_opts()), 318 gun_opts: Keyword.get(opts, :gun_opts, %{}), 319 reconnect_strategy: Keyword.get(opts, :reconnect_strategy, :exponential) 320 } 321 end 322 323 defp default_tls_opts do 324 [ 325 verify: :verify_peer, 326 cacerts: :certifi.cacerts(), 327 depth: 3, 328 customize_hostname_check: [ 329 match_fun: :public_key.pkix_verify_hostname_match_fun(:https) 330 ] 331 ] 332 end 333 334 defp calculate_backoff(attempt, strategy) do 335 case strategy do 336 :exponential -> 337 exponential_backoff(attempt, :timer.seconds(60)) 338 339 {:exponential, max_backoff} -> 340 exponential_backoff(attempt, max_backoff) 341 342 {:custom, func} when is_function(func, 1) -> 343 func.(attempt) 344 end 345 end 346 347 defp exponential_backoff(attempt, max_backoff) do 348 base = :timer.seconds(1) 349 delay = min(base * :math.pow(2, attempt), max_backoff) 350 jitter = :rand.uniform(trunc(delay * 0.1)) 351 trunc(delay) + jitter 352 end 353end