this repo has no description
1defmodule Hobbes.Servers.BeginBuffer do
2 use GenServer
3 alias Trinity.{SimProcess, SimServer, SimLogger}
4 require SimLogger
5
6 import ExUnit.Assertions, only: [assert: 1]
7
8 alias Hobbes.Structs.{Cluster, TLogGeneration, Server}
9 alias Hobbes.Servers.{Sequencer, TLog}
10
11 import Hobbes.Utils
12
13 defmodule State do
14 @enforce_keys [
15 :id,
16 :cluster,
17
18 :buffer,
19
20 :batch_buffer,
21 :batch_read_version,
22
23 :check_locked_nonce,
24 :check_locked_reply_ids,
25 ]
26 defstruct @enforce_keys
27 end
28
29 @tick_interval_ms 1
30
31 def start_link(arg), do: SimServer.start_link(__MODULE__, arg)
32
33 @spec get_read_version(GenServer.server) :: {:ok, non_neg_integer} | {:error, :timeout}
34 def get_read_version(pid) do
35 try do
36 SimServer.call(pid, :get_read_version)
37 catch
38 :exit, {:timeout, _mfa} -> {:error, :timeout}
39 end
40 end
41
42 def init(%{id: id, cluster: %Cluster{} = cluster}) do
43 state = %State{
44 id: id,
45 cluster: cluster,
46
47 buffer: [],
48
49 batch_buffer: nil,
50 batch_read_version: nil,
51
52 check_locked_nonce: nil,
53 check_locked_reply_ids: nil,
54 }
55
56 SimProcess.send_after self(), :tick, 0
57 {:ok, state}
58 end
59
60 def handle_call(:get_read_version, from, %State{} = state) do
61 state = %{state | buffer: [from | state.buffer]}
62 {:noreply, state}
63 end
64
65 def handle_info(:tick, %State{} = state) do
66 state = flush(state)
67
68 SimProcess.send_after self(), :tick, @tick_interval_ms
69 {:noreply, state}
70 end
71
72 def handle_info({:check_locked_reply, nonce, tlog_id, locked?}, %State{} = state) when nonce == state.check_locked_nonce do
73 assert is_integer(tlog_id)
74 assert tlog_id not in state.check_locked_reply_ids
75
76 case locked? do
77 false ->
78 state = %{state | check_locked_reply_ids: [tlog_id | state.check_locked_reply_ids]}
79 state = maybe_complete_flush(state)
80 {:noreply, state}
81
82 # If a TLog is locked in this generation there is an ongoing recovery,
83 # so we may as well just die instead of trying to handle this case
84 true ->
85 SimLogger.debug "BeginBuffer (id=#{state.id}, generation=#{state.cluster.generation}): TLog reported locked in reply, exiting"
86 exit(:shutdown)
87 end
88 end
89
90 def handle_info({:check_locked_reply, _nonce, _tlog_id, _locked?}, %State{} = state) do
91 {:noreply, state}
92 end
93
94 def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) do
95 cond do
96 cluster.generation < state.cluster.generation -> {:noreply, state}
97 cluster.generation == state.cluster.generation -> {:noreply, %{state | cluster: cluster}}
98 cluster.generation > state.cluster.generation -> exit(:shutdown)
99 end
100 end
101
102 defp flush(%State{} = state) when state.cluster.status != :normal, do: state
103 defp flush(%State{} = state) when state.buffer == [], do: state
104 defp flush(%State{} = state) when state.check_locked_nonce != nil, do: state
105
106 defp flush(%State{} = state) do
107 assert state.cluster.status == :normal
108 assert state.buffer != []
109 assert state.batch_read_version == nil
110 assert state.check_locked_nonce == nil
111
112 [%Server{pid: seq_pid}] = get_servers(state.cluster, Hobbes.Servers.Sequencer)
113 read_version =
114 case Sequencer.get_read_version(seq_pid) do
115 {:ok, read_version} when is_integer(read_version) -> read_version
116 {:error, :timeout} -> exit(:shutdown)
117 end
118
119 nonce = make_ref()
120 state = %{state |
121 buffer: [],
122
123 batch_buffer: state.buffer,
124 batch_read_version: read_version,
125
126 check_locked_nonce: nonce,
127 check_locked_reply_ids: [],
128 }
129
130 %TLogGeneration{tlog_ids: tlog_ids} = hd(state.cluster.tlog_generations)
131
132 Enum.each(tlog_ids, fn id ->
133 %Server{pid: tlog_pid} = Map.fetch!(state.cluster.servers, id)
134 TLog.check_locked(tlog_pid, nonce)
135 end)
136
137 state
138 end
139
140 defp maybe_complete_flush(%State{} = state) do
141 # We use the replication_factor for the current generation to do the liveness check
142 %TLogGeneration{replication_factor: replication_factor} = hd(state.cluster.tlog_generations)
143
144 case length(state.check_locked_reply_ids) >= replication_factor do
145 true ->
146 read_version = state.batch_read_version
147
148 Enum.each(state.batch_buffer, fn from ->
149 SimServer.reply(from, {:ok, read_version})
150 end)
151
152 %{state |
153 batch_buffer: nil,
154 batch_read_version: nil,
155 check_locked_nonce: nil,
156 check_locked_reply_ids: nil,
157 }
158
159 false -> state
160 end
161 end
162end