Elixir ATProtocol ingestion and sync library.
at main 105 lines 3.4 kB view raw
1defmodule Drinkup.Tap.Event do 2 @moduledoc """ 3 Event handling and dispatch for Tap events. 4 5 Parses incoming JSON events from Tap and dispatches them to the configured 6 consumer via Task.Supervisor. After successful processing, sends an ack 7 message back to the socket. 8 """ 9 10 require Logger 11 alias Drinkup.Tap.{Event, Options} 12 13 @type t() :: Event.Record.t() | Event.Identity.t() 14 15 @doc """ 16 Parse a JSON map into an event struct. 17 18 Returns the appropriate event struct based on the "type" field. 19 """ 20 @spec from(map()) :: t() | nil 21 def from(%{"type" => "record"} = payload), do: Event.Record.from(payload) 22 def from(%{"type" => "identity"} = payload), do: Event.Identity.from(payload) 23 def from(_payload), do: nil 24 25 @doc """ 26 Dispatch an event to the consumer via Task.Supervisor. 27 28 Spawns a task that: 29 1. Processes the event via the consumer's handle_event/1 callback 30 2. Sends an ack to Tap if acks are enabled and the consumer returns :ok, {:ok, _}, or nil 31 3. Does not ack if the consumer returns an error-like value or raises an exception 32 33 Consumer return value semantics (when acks are enabled): 34 - `:ok` or `{:ok, any()}` or `nil` -> Success, send ack 35 - `{:error, _}` or any error-like tuple -> Failure, don't ack (Tap will retry) 36 - Exception raised -> Failure, don't ack (Tap will retry) 37 38 If `disable_acks: true` is set in options, no acks are sent regardless of 39 consumer return value. 40 """ 41 @spec dispatch(t(), Options.t(), pid(), :gun.stream_ref()) :: :ok 42 def dispatch( 43 event, 44 %Options{consumer: consumer, name: name, disable_acks: disable_acks}, 45 conn, 46 stream 47 ) do 48 supervisor_name = {:via, Registry, {Drinkup.Registry, {name, TapTasks}}} 49 event_id = get_event_id(event) 50 51 {:ok, _pid} = 52 Task.Supervisor.start_child(supervisor_name, fn -> 53 try do 54 result = consumer.handle_event(event) 55 56 unless disable_acks do 57 case result do 58 :ok -> 59 send_ack(conn, stream, event_id) 60 61 {:ok, _} -> 62 send_ack(conn, stream, event_id) 63 64 nil -> 65 send_ack(conn, stream, event_id) 66 67 :error -> 68 Logger.error("Consumer returned error for event #{event_id}, not acking.") 69 70 {:error, reason} -> 71 Logger.error( 72 "Consumer returned error for event #{event_id}, not acking: #{inspect(reason)}" 73 ) 74 75 _ -> 76 Logger.warning( 77 "Consumer returned unexpected value for event #{event_id}, acking anyway: #{inspect(result)}" 78 ) 79 80 send_ack(conn, stream, event_id) 81 end 82 end 83 rescue 84 e -> 85 Logger.error( 86 "Error in Tap event handler (event #{event_id}), not acking: #{Exception.format(:error, e, __STACKTRACE__)}" 87 ) 88 end 89 end) 90 91 :ok 92 end 93 94 @spec send_ack(pid(), :gun.stream_ref(), integer()) :: :ok 95 defp send_ack(conn, stream, event_id) do 96 ack_message = Jason.encode!(%{type: "ack", id: event_id}) 97 98 :ok = :gun.ws_send(conn, stream, {:text, ack_message}) 99 Logger.debug("[Drinkup.Tap] Acked event #{event_id}") 100 end 101 102 @spec get_event_id(t()) :: integer() 103 defp get_event_id(%Event.Record{id: id}), do: id 104 defp get_event_id(%Event.Identity{id: id}), do: id 105end