this repo has no description
1defmodule Hobbes.Servers.ServerSupervisor do
2 use GenServer
3 alias Trinity.{Sim, SimProcess, SimServer, SimFile}
4
5 import ExUnit.Assertions, only: [assert: 1]
6
7 alias Hobbes.Structs.{Cluster, SupervisorStatus}
8 alias Hobbes.Servers.{Coordinator, Manager}
9
10 import Hobbes.Utils
11
12 defmodule State do
13 @type t :: %__MODULE__{
14 coordinators: [pid],
15 manager_pid: pid | nil,
16 manager_generation: non_neg_integer | -1,
17 cluster: Cluster.t,
18 children: [{pid, map}],
19 child_generations: %{pid => non_neg_integer | nil},
20 generation_failed: boolean,
21 }
22 @enforce_keys [
23 :coordinators,
24 :slots,
25
26 :open_stateless,
27 :open_tlog,
28 :open_storage,
29 ]
30 defstruct [
31 manager_pid: nil,
32 manager_generation: -1,
33 cluster: nil,
34 children: [],
35 child_generations: %{},
36 generation_failed: false,
37
38 last_manager_message_timestamp: nil,
39 ] ++ @enforce_keys
40 end
41
42 @tick_interval_ms 100
43
44 @manager_timeout_ms 1000
45
46 def start_link(arg), do: SimServer.start_link(__MODULE__, arg)
47
48 @spec start_child_server(term, non_neg_integer, module, term) :: {:ok, pid} | {:error, :no_cluster | :wrong_generation | :slots_full | :timeout}
49 def start_child_server(server, generation, module, arg \\ nil) when is_integer(generation) do
50 try do
51 SimServer.call(server, {:start_child_server, generation, module, arg})
52 catch
53 :exit, {:timeout, _mfa} -> {:error, :timeout}
54 end
55 end
56
57 def reply_current_manager(server, response) do
58 SimServer.cast(server, {:reply_current_manager, response})
59 end
60
61 def init(%{coordinators: coordinators, slots: config_slots}) do
62 SimProcess.flag(:trap_exit, true)
63
64 slots = %{
65 stateless: Keyword.get(config_slots, :stateless, 0),
66 tlog: Keyword.get(config_slots, :tlog, []),
67 storage: Keyword.get(config_slots, :storage, []),
68 }
69 assert is_integer(slots.stateless)
70 assert is_list(slots.tlog)
71 assert is_list(slots.storage)
72
73 state = %State{
74 coordinators: coordinators,
75 slots: slots,
76 last_manager_message_timestamp: current_time(),
77
78 open_stateless: slots.stateless,
79 open_tlog: slots.tlog,
80 open_storage: slots.storage,
81 }
82
83 SimProcess.send_after(self(), :tick, @tick_interval_ms)
84 {:ok, state}
85 end
86
87 def handle_call({:start_child_server, generation, module, arg}, _from, %State{} = state) do
88 {result, state} = on_start_child_server(state, generation, module, arg)
89 {:reply, result, state}
90 end
91
92 def handle_cast({:reply_current_manager, response}, %State{} = state) do
93 {:noreply, on_coordinator_reply_manager(state, response)}
94 end
95
96 def handle_info(:tick, %State{} = state) do
97 state = tick(state)
98
99 SimProcess.send_after(self(), :tick, @tick_interval_ms)
100 {:noreply, state}
101 end
102
103 def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) do
104 {:noreply, on_update_cluster(state, cluster)}
105 end
106
107 def handle_info({:EXIT, pid, reason}, %State{} = state) do
108 case List.keymember?(state.children, pid, 0) do
109 true ->
110 {^pid, info} = List.keyfind!(state.children, pid, 0)
111 gen = info.generation
112
113 if reason != :shutdown do
114 require Logger
115 #Logger.warning "Supervised server #{inspect({pid, info})} crashed with reason:\n\n#{Exception.format_exit(reason)}"
116 end
117
118 state =
119 case gen == state.manager_generation do
120 true -> %{state | generation_failed: true}
121 false -> state
122 end
123
124 state = %{state | children: List.keydelete(state.children, pid, 0)}
125 state = maybe_restart_child(state, info)
126 {:noreply, state}
127
128 false ->
129 {:noreply, state}
130 end
131 end
132
133 # Tick
134
135 defp tick(%State{} = state) do
136 timed_out? = (current_time() - state.last_manager_message_timestamp) > (@manager_timeout_ms * 1000)
137 case state.manager_pid == nil or timed_out? do
138 true ->
139 state
140 |> send_coordinator_request()
141
142 false ->
143 state
144 |> send_manager_ping()
145 end
146 end
147
148 # Events
149
150 defp on_coordinator_reply_manager(%State{} = state, %{generation: generation}) when generation <= state.manager_generation do
151 # Ignore coordinator replies from old generations
152 state
153 end
154
155 defp on_coordinator_reply_manager(%State{} = state, %{generation: generation, manager_pid: manager_pid}) do
156 assert is_integer(generation)
157 assert generation > state.manager_generation
158 assert is_pid(manager_pid)
159
160 # Update manager and reset related state
161 %{state |
162 manager_pid: manager_pid,
163 manager_generation: generation,
164 last_manager_message_timestamp: current_time(),
165 generation_failed: false,
166 }
167 end
168
169 defp on_update_cluster(%State{} = state, %Cluster{} = cluster) when cluster.generation < state.manager_generation do
170 # Ignore responses from previous generations
171 state
172 end
173
174 defp on_update_cluster(%State{} = state, %Cluster{} = cluster) when state.cluster == nil do
175 # Cluster received for the first time
176 assert cluster.generation == state.manager_generation
177
178 %{state | cluster: cluster, last_manager_message_timestamp: current_time()}
179 |> load_stateful_slots()
180 |> broadcast_cluster_to_children()
181 end
182
183 defp on_update_cluster(%State{} = state, %Cluster{} = cluster) when cluster.generation > state.cluster.generation do
184 # Update cluster to new generation
185 assert cluster.generation == state.manager_generation
186
187 %{state |
188 cluster: cluster,
189 last_manager_message_timestamp: current_time(),
190
191 # Reset stateless and tlog slots for the new generation
192 # (but not storage slots as storage servers are not tied to the transaction system generation)
193 open_stateless: state.slots.stateless,
194 open_tlog: state.slots.tlog,
195 }
196 |> broadcast_cluster_to_children()
197 end
198
199 defp on_update_cluster(%State{} = state, %Cluster{} = cluster) when cluster == state.cluster do
200 # Ignore identical cluster but update keepalive timestamp
201 %{state | last_manager_message_timestamp: current_time()}
202 end
203
204 defp on_update_cluster(%State{} = state, %Cluster{} = cluster) when state.cluster != nil do
205 # Update cluster within this generation
206 assert cluster.generation == state.cluster.generation
207 assert cluster.generation == state.manager_generation
208
209 %{state | cluster: cluster, last_manager_message_timestamp: current_time()}
210 |> broadcast_cluster_to_children()
211 end
212
213 defp load_stateful_slots(%State{} = state) do
214 # TODO: check if cluster is in-memory instead
215 case Sim.simulated?() do
216 true ->
217 state
218 |> load_tlog_slots()
219 |> load_storage_slots()
220
221 false ->
222 %{state |
223 open_tlog: state.slots.tlog,
224 open_storage: state.slots.storage,
225 }
226 end
227 end
228
229 defp load_tlog_slots(%State{} = state) do
230 Enum.reduce(state.slots.tlog, {[], state}, fn slot_path, {acc, state} ->
231 SimFile.mkdir_p(slot_path)
232
233 # [{path, generation}, ...]
234 kv_files =
235 SimFile.ls(slot_path)
236 |> then(fn {:ok, names} -> names end)
237 |> Enum.reduce([], fn name, acc ->
238 case Regex.run(~r"^tlog_gen_(\d+)\.kv$", name) do
239 [^name, gen_str] -> [{slot_path <> "/" <> name, String.to_integer(gen_str)} | acc]
240 nil -> acc
241 end
242 end)
243
244 %State{} = state =
245 Enum.reduce(kv_files, state, fn {kv_path, gen}, state ->
246 {state, _pid} = spawn_child(state, gen, Hobbes.Servers.TLog, %{path: kv_path, cluster: state.cluster}, %{path: kv_path})
247 state
248 end)
249
250 # Only mark this slot as open if there is no TLog for the current generation
251 # TODO: in practice this is essentially unreachable as a ServerSupervisor rebooting during the current generation should cause a recovery
252 # Maybe assert this away or find a way to guarantee that recovery occurs?
253 case Enum.any?(kv_files, fn {_path, gen} -> gen == state.cluster.generation end) do
254 true -> {acc, state}
255 false -> {[slot_path | acc], state}
256 end
257 end)
258 |> then(fn {tlog_slots, state} ->
259 %{state | open_tlog: Enum.reverse(tlog_slots)}
260 end)
261 end
262
263 defp load_storage_slots(%State{} = state) do
264 Enum.reduce(state.slots.storage, {[], state}, fn slot_path, {acc, state} ->
265 SimFile.mkdir_p(slot_path)
266
267 kv_path = slot_path <> "/storage.kv"
268 case SimFile.exists?(kv_path) do
269 true ->
270 gen = nil
271 {state, _pid} = spawn_child(state, gen, Hobbes.Servers.Storage, %{path: kv_path, cluster: state.cluster}, %{path: kv_path})
272
273 {acc, state}
274
275 false -> {[slot_path | acc], state}
276 end
277 end)
278 |> then(fn {storage_slots, state} ->
279 %{state | open_storage: Enum.reverse(storage_slots)}
280 end)
281 end
282
283 defp on_start_child_server(%State{} = state, _generation, _module, _arg) when state.cluster == nil do
284 {{:error, :no_cluster}, state}
285 end
286
287 defp on_start_child_server(%State{} = state, generation, _module, _arg) when generation != state.cluster.generation do
288 {{:error, :wrong_generation}, state}
289 end
290
291 defp on_start_child_server(%State{} = state, generation, module, arg) do
292 assert %Cluster{} = state.cluster
293 assert generation == state.cluster.generation
294
295 case start_child(state, generation, module, arg) do
296 {:ok, {state, pid}} ->
297 {{:ok, pid}, state}
298 :error ->
299 {{:error, :slots_full}, state}
300 end
301 end
302
303 defp start_child(%State{} = state, generation, module, arg) when module == Hobbes.Servers.TLog do
304 case state.open_tlog do
305 [slot_path | rest] ->
306 state = %{state | open_tlog: rest}
307
308 # TODO: check for in-memory cluster instead
309 kv_path = case Sim.simulated?() do
310 true -> slot_path <> "/tlog_gen_#{Integer.to_string(generation)}.kv"
311 false -> :memory
312 end
313 if Sim.simulated?(), do: assert not SimFile.exists?(kv_path)
314
315 arg = Map.put(arg, :path, kv_path)
316 restart_arg = %{path: kv_path}
317 {state, pid} = spawn_child(state, generation, module, arg, restart_arg)
318
319 {:ok, {state, pid}}
320
321 [] -> :error
322 end
323 end
324
325 defp start_child(%State{} = state, generation, module, arg) when module == Hobbes.Servers.Storage do
326 case state.open_storage do
327 [slot_path | rest] ->
328 state = %{state | open_storage: rest}
329
330 # TODO: check for in-memory cluster instead
331 kv_path = case Sim.simulated?() do
332 true -> slot_path <> "/storage.kv"
333 false -> :memory
334 end
335 if Sim.simulated?(), do: assert not SimFile.exists?(kv_path)
336
337 arg = Map.put(arg, :path, kv_path)
338 restart_arg = %{path: kv_path}
339 {state, pid} = spawn_child(state, generation, module, arg, restart_arg)
340
341 {:ok, {state, pid}}
342
343 [] -> :error
344 end
345 end
346
347 defp start_child(%State{} = state, generation, module, arg) do
348 case state.open_stateless do
349 open_stateless when open_stateless > 0 ->
350 state = %{state | open_stateless: open_stateless - 1}
351
352 {state, pid} = spawn_child(state, generation, module, arg, nil)
353 {:ok, {state, pid}}
354
355 0 -> :error
356 end
357 end
358
359 defp maybe_restart_child(%State{} = state, %{module: module})
360 when module not in [Hobbes.Servers.TLog, Hobbes.Servers.Storage] do
361 # Do not restart stateless servers
362 state
363 end
364
365 defp maybe_restart_child(%State{} = state, %{module: module, generation: generation, restart_arg: restart_arg}) do
366 assert is_struct(state.cluster, Cluster)
367
368 # TODO: after a node restart cluster might be able to go back a generation here,
369 # which may be a correctness issue
370 arg = Map.put(restart_arg, :cluster, state.cluster)
371 {state, _pid} = spawn_child(state, generation, module, arg, restart_arg)
372
373 state
374 end
375
376 defp spawn_child(%State{} = state, generation, module, arg, restart_arg) do
377 {:ok, pid} = module.start_link(arg)
378
379 info = %{module: module, generation: generation, restart_arg: restart_arg}
380 state = %{state | children: [{pid, info} | state.children]}
381
382 {state, pid}
383 end
384
385 # Send
386
387 defp send_coordinator_request(%State{} = state) do
388 Enum.each(state.coordinators, fn pid ->
389 Coordinator.request_current_manager(pid)
390 end)
391
392 state
393 end
394
395 defp send_manager_ping(%State{} = state) do
396 assert is_pid(state.manager_pid)
397
398 manager_generation = state.manager_generation
399 assert is_integer(manager_generation)
400
401 slots =
402 case state.cluster do
403 %Cluster{generation: ^manager_generation} ->
404 %{
405 stateless: state.open_stateless,
406 tlog: length(state.open_tlog),
407 storage: length(state.open_storage),
408 }
409
410 _ ->
411 nil
412 end
413
414 status = %SupervisorStatus{
415 node: SimProcess.node(),
416 pid: self(),
417 generation_failed?: state.generation_failed,
418 slots: slots,
419 }
420
421 Manager.supervisor_ping(state.manager_pid, status)
422 state
423 end
424
425 defp broadcast_cluster_to_children(%State{} = state) do
426 assert state.cluster != nil
427
428 Enum.each(state.children, fn {pid, _info} ->
429 SimProcess.send(pid, {:update_cluster, state.cluster})
430 end)
431
432 state
433 end
434end