Elixir ATProtocol ingestion and sync library.
1defmodule Drinkup.Tap.Event do
2 @moduledoc """
3 Event handling and dispatch for Tap events.
4
5 Parses incoming JSON events from Tap and dispatches them to the configured
6 consumer via Task.Supervisor. After successful processing, sends an ack
7 message back to the socket.
8 """
9
10 require Logger
11 alias Drinkup.Tap.{Event, Options}
12
13 @type t() :: Event.Record.t() | Event.Identity.t()
14
15 @doc """
16 Parse a JSON map into an event struct.
17
18 Returns the appropriate event struct based on the "type" field.
19 """
20 @spec from(map()) :: t() | nil
21 def from(%{"type" => "record"} = payload), do: Event.Record.from(payload)
22 def from(%{"type" => "identity"} = payload), do: Event.Identity.from(payload)
23 def from(_payload), do: nil
24
25 @doc """
26 Dispatch an event to the consumer via Task.Supervisor.
27
28 Spawns a task that:
29 1. Processes the event via the consumer's handle_event/1 callback
30 2. Sends an ack to Tap if acks are enabled and the consumer returns :ok, {:ok, _}, or nil
31 3. Does not ack if the consumer returns an error-like value or raises an exception
32
33 Consumer return value semantics (when acks are enabled):
34 - `:ok` or `{:ok, any()}` or `nil` -> Success, send ack
35 - `{:error, _}` or any error-like tuple -> Failure, don't ack (Tap will retry)
36 - Exception raised -> Failure, don't ack (Tap will retry)
37
38 If `disable_acks: true` is set in options, no acks are sent regardless of
39 consumer return value.
40 """
41 @spec dispatch(t(), Options.t(), pid(), :gun.stream_ref()) :: :ok
42 def dispatch(
43 event,
44 %Options{consumer: consumer, name: name, disable_acks: disable_acks},
45 conn,
46 stream
47 ) do
48 supervisor_name = {:via, Registry, {Drinkup.Registry, {name, TapTasks}}}
49 event_id = get_event_id(event)
50
51 {:ok, _pid} =
52 Task.Supervisor.start_child(supervisor_name, fn ->
53 try do
54 result = consumer.handle_event(event)
55
56 unless disable_acks do
57 case result do
58 :ok ->
59 send_ack(conn, stream, event_id)
60
61 {:ok, _} ->
62 send_ack(conn, stream, event_id)
63
64 nil ->
65 send_ack(conn, stream, event_id)
66
67 :error ->
68 Logger.error("Consumer returned error for event #{event_id}, not acking.")
69
70 {:error, reason} ->
71 Logger.error(
72 "Consumer returned error for event #{event_id}, not acking: #{inspect(reason)}"
73 )
74
75 _ ->
76 Logger.warning(
77 "Consumer returned unexpected value for event #{event_id}, acking anyway: #{inspect(result)}"
78 )
79
80 send_ack(conn, stream, event_id)
81 end
82 end
83 rescue
84 e ->
85 Logger.error(
86 "Error in Tap event handler (event #{event_id}), not acking: #{Exception.format(:error, e, __STACKTRACE__)}"
87 )
88 end
89 end)
90
91 :ok
92 end
93
94 @spec send_ack(pid(), :gun.stream_ref(), integer()) :: :ok
95 defp send_ack(conn, stream, event_id) do
96 ack_message = Jason.encode!(%{type: "ack", id: event_id})
97
98 :ok = :gun.ws_send(conn, stream, {:text, ack_message})
99 Logger.debug("[Drinkup.Tap] Acked event #{event_id}")
100 end
101
102 @spec get_event_id(t()) :: integer()
103 defp get_event_id(%Event.Record{id: id}), do: id
104 defp get_event_id(%Event.Identity{id: id}), do: id
105end