this repo has no description
at master 434 lines 13 kB view raw
1defmodule Hobbes.Servers.ServerSupervisor do 2 use GenServer 3 alias Trinity.{Sim, SimProcess, SimServer, SimFile} 4 5 import ExUnit.Assertions, only: [assert: 1] 6 7 alias Hobbes.Structs.{Cluster, SupervisorStatus} 8 alias Hobbes.Servers.{Coordinator, Manager} 9 10 import Hobbes.Utils 11 12 defmodule State do 13 @type t :: %__MODULE__{ 14 coordinators: [pid], 15 manager_pid: pid | nil, 16 manager_generation: non_neg_integer | -1, 17 cluster: Cluster.t, 18 children: [{pid, map}], 19 child_generations: %{pid => non_neg_integer | nil}, 20 generation_failed: boolean, 21 } 22 @enforce_keys [ 23 :coordinators, 24 :slots, 25 26 :open_stateless, 27 :open_tlog, 28 :open_storage, 29 ] 30 defstruct [ 31 manager_pid: nil, 32 manager_generation: -1, 33 cluster: nil, 34 children: [], 35 child_generations: %{}, 36 generation_failed: false, 37 38 last_manager_message_timestamp: nil, 39 ] ++ @enforce_keys 40 end 41 42 @tick_interval_ms 100 43 44 @manager_timeout_ms 1000 45 46 def start_link(arg), do: SimServer.start_link(__MODULE__, arg) 47 48 @spec start_child_server(term, non_neg_integer, module, term) :: {:ok, pid} | {:error, :no_cluster | :wrong_generation | :slots_full | :timeout} 49 def start_child_server(server, generation, module, arg \\ nil) when is_integer(generation) do 50 try do 51 SimServer.call(server, {:start_child_server, generation, module, arg}) 52 catch 53 :exit, {:timeout, _mfa} -> {:error, :timeout} 54 end 55 end 56 57 def reply_current_manager(server, response) do 58 SimServer.cast(server, {:reply_current_manager, response}) 59 end 60 61 def init(%{coordinators: coordinators, slots: config_slots}) do 62 SimProcess.flag(:trap_exit, true) 63 64 slots = %{ 65 stateless: Keyword.get(config_slots, :stateless, 0), 66 tlog: Keyword.get(config_slots, :tlog, []), 67 storage: Keyword.get(config_slots, :storage, []), 68 } 69 assert is_integer(slots.stateless) 70 assert is_list(slots.tlog) 71 assert is_list(slots.storage) 72 73 state = %State{ 74 coordinators: coordinators, 75 slots: slots, 76 last_manager_message_timestamp: current_time(), 77 78 open_stateless: slots.stateless, 79 open_tlog: slots.tlog, 80 open_storage: slots.storage, 81 } 82 83 SimProcess.send_after(self(), :tick, @tick_interval_ms) 84 {:ok, state} 85 end 86 87 def handle_call({:start_child_server, generation, module, arg}, _from, %State{} = state) do 88 {result, state} = on_start_child_server(state, generation, module, arg) 89 {:reply, result, state} 90 end 91 92 def handle_cast({:reply_current_manager, response}, %State{} = state) do 93 {:noreply, on_coordinator_reply_manager(state, response)} 94 end 95 96 def handle_info(:tick, %State{} = state) do 97 state = tick(state) 98 99 SimProcess.send_after(self(), :tick, @tick_interval_ms) 100 {:noreply, state} 101 end 102 103 def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) do 104 {:noreply, on_update_cluster(state, cluster)} 105 end 106 107 def handle_info({:EXIT, pid, reason}, %State{} = state) do 108 case List.keymember?(state.children, pid, 0) do 109 true -> 110 {^pid, info} = List.keyfind!(state.children, pid, 0) 111 gen = info.generation 112 113 if reason != :shutdown do 114 require Logger 115 #Logger.warning "Supervised server #{inspect({pid, info})} crashed with reason:\n\n#{Exception.format_exit(reason)}" 116 end 117 118 state = 119 case gen == state.manager_generation do 120 true -> %{state | generation_failed: true} 121 false -> state 122 end 123 124 state = %{state | children: List.keydelete(state.children, pid, 0)} 125 state = maybe_restart_child(state, info) 126 {:noreply, state} 127 128 false -> 129 {:noreply, state} 130 end 131 end 132 133 # Tick 134 135 defp tick(%State{} = state) do 136 timed_out? = (current_time() - state.last_manager_message_timestamp) > (@manager_timeout_ms * 1000) 137 case state.manager_pid == nil or timed_out? do 138 true -> 139 state 140 |> send_coordinator_request() 141 142 false -> 143 state 144 |> send_manager_ping() 145 end 146 end 147 148 # Events 149 150 defp on_coordinator_reply_manager(%State{} = state, %{generation: generation}) when generation <= state.manager_generation do 151 # Ignore coordinator replies from old generations 152 state 153 end 154 155 defp on_coordinator_reply_manager(%State{} = state, %{generation: generation, manager_pid: manager_pid}) do 156 assert is_integer(generation) 157 assert generation > state.manager_generation 158 assert is_pid(manager_pid) 159 160 # Update manager and reset related state 161 %{state | 162 manager_pid: manager_pid, 163 manager_generation: generation, 164 last_manager_message_timestamp: current_time(), 165 generation_failed: false, 166 } 167 end 168 169 defp on_update_cluster(%State{} = state, %Cluster{} = cluster) when cluster.generation < state.manager_generation do 170 # Ignore responses from previous generations 171 state 172 end 173 174 defp on_update_cluster(%State{} = state, %Cluster{} = cluster) when state.cluster == nil do 175 # Cluster received for the first time 176 assert cluster.generation == state.manager_generation 177 178 %{state | cluster: cluster, last_manager_message_timestamp: current_time()} 179 |> load_stateful_slots() 180 |> broadcast_cluster_to_children() 181 end 182 183 defp on_update_cluster(%State{} = state, %Cluster{} = cluster) when cluster.generation > state.cluster.generation do 184 # Update cluster to new generation 185 assert cluster.generation == state.manager_generation 186 187 %{state | 188 cluster: cluster, 189 last_manager_message_timestamp: current_time(), 190 191 # Reset stateless and tlog slots for the new generation 192 # (but not storage slots as storage servers are not tied to the transaction system generation) 193 open_stateless: state.slots.stateless, 194 open_tlog: state.slots.tlog, 195 } 196 |> broadcast_cluster_to_children() 197 end 198 199 defp on_update_cluster(%State{} = state, %Cluster{} = cluster) when cluster == state.cluster do 200 # Ignore identical cluster but update keepalive timestamp 201 %{state | last_manager_message_timestamp: current_time()} 202 end 203 204 defp on_update_cluster(%State{} = state, %Cluster{} = cluster) when state.cluster != nil do 205 # Update cluster within this generation 206 assert cluster.generation == state.cluster.generation 207 assert cluster.generation == state.manager_generation 208 209 %{state | cluster: cluster, last_manager_message_timestamp: current_time()} 210 |> broadcast_cluster_to_children() 211 end 212 213 defp load_stateful_slots(%State{} = state) do 214 # TODO: check if cluster is in-memory instead 215 case Sim.simulated?() do 216 true -> 217 state 218 |> load_tlog_slots() 219 |> load_storage_slots() 220 221 false -> 222 %{state | 223 open_tlog: state.slots.tlog, 224 open_storage: state.slots.storage, 225 } 226 end 227 end 228 229 defp load_tlog_slots(%State{} = state) do 230 Enum.reduce(state.slots.tlog, {[], state}, fn slot_path, {acc, state} -> 231 SimFile.mkdir_p(slot_path) 232 233 # [{path, generation}, ...] 234 kv_files = 235 SimFile.ls(slot_path) 236 |> then(fn {:ok, names} -> names end) 237 |> Enum.reduce([], fn name, acc -> 238 case Regex.run(~r"^tlog_gen_(\d+)\.kv$", name) do 239 [^name, gen_str] -> [{slot_path <> "/" <> name, String.to_integer(gen_str)} | acc] 240 nil -> acc 241 end 242 end) 243 244 %State{} = state = 245 Enum.reduce(kv_files, state, fn {kv_path, gen}, state -> 246 {state, _pid} = spawn_child(state, gen, Hobbes.Servers.TLog, %{path: kv_path, cluster: state.cluster}, %{path: kv_path}) 247 state 248 end) 249 250 # Only mark this slot as open if there is no TLog for the current generation 251 # TODO: in practice this is essentially unreachable as a ServerSupervisor rebooting during the current generation should cause a recovery 252 # Maybe assert this away or find a way to guarantee that recovery occurs? 253 case Enum.any?(kv_files, fn {_path, gen} -> gen == state.cluster.generation end) do 254 true -> {acc, state} 255 false -> {[slot_path | acc], state} 256 end 257 end) 258 |> then(fn {tlog_slots, state} -> 259 %{state | open_tlog: Enum.reverse(tlog_slots)} 260 end) 261 end 262 263 defp load_storage_slots(%State{} = state) do 264 Enum.reduce(state.slots.storage, {[], state}, fn slot_path, {acc, state} -> 265 SimFile.mkdir_p(slot_path) 266 267 kv_path = slot_path <> "/storage.kv" 268 case SimFile.exists?(kv_path) do 269 true -> 270 gen = nil 271 {state, _pid} = spawn_child(state, gen, Hobbes.Servers.Storage, %{path: kv_path, cluster: state.cluster}, %{path: kv_path}) 272 273 {acc, state} 274 275 false -> {[slot_path | acc], state} 276 end 277 end) 278 |> then(fn {storage_slots, state} -> 279 %{state | open_storage: Enum.reverse(storage_slots)} 280 end) 281 end 282 283 defp on_start_child_server(%State{} = state, _generation, _module, _arg) when state.cluster == nil do 284 {{:error, :no_cluster}, state} 285 end 286 287 defp on_start_child_server(%State{} = state, generation, _module, _arg) when generation != state.cluster.generation do 288 {{:error, :wrong_generation}, state} 289 end 290 291 defp on_start_child_server(%State{} = state, generation, module, arg) do 292 assert %Cluster{} = state.cluster 293 assert generation == state.cluster.generation 294 295 case start_child(state, generation, module, arg) do 296 {:ok, {state, pid}} -> 297 {{:ok, pid}, state} 298 :error -> 299 {{:error, :slots_full}, state} 300 end 301 end 302 303 defp start_child(%State{} = state, generation, module, arg) when module == Hobbes.Servers.TLog do 304 case state.open_tlog do 305 [slot_path | rest] -> 306 state = %{state | open_tlog: rest} 307 308 # TODO: check for in-memory cluster instead 309 kv_path = case Sim.simulated?() do 310 true -> slot_path <> "/tlog_gen_#{Integer.to_string(generation)}.kv" 311 false -> :memory 312 end 313 if Sim.simulated?(), do: assert not SimFile.exists?(kv_path) 314 315 arg = Map.put(arg, :path, kv_path) 316 restart_arg = %{path: kv_path} 317 {state, pid} = spawn_child(state, generation, module, arg, restart_arg) 318 319 {:ok, {state, pid}} 320 321 [] -> :error 322 end 323 end 324 325 defp start_child(%State{} = state, generation, module, arg) when module == Hobbes.Servers.Storage do 326 case state.open_storage do 327 [slot_path | rest] -> 328 state = %{state | open_storage: rest} 329 330 # TODO: check for in-memory cluster instead 331 kv_path = case Sim.simulated?() do 332 true -> slot_path <> "/storage.kv" 333 false -> :memory 334 end 335 if Sim.simulated?(), do: assert not SimFile.exists?(kv_path) 336 337 arg = Map.put(arg, :path, kv_path) 338 restart_arg = %{path: kv_path} 339 {state, pid} = spawn_child(state, generation, module, arg, restart_arg) 340 341 {:ok, {state, pid}} 342 343 [] -> :error 344 end 345 end 346 347 defp start_child(%State{} = state, generation, module, arg) do 348 case state.open_stateless do 349 open_stateless when open_stateless > 0 -> 350 state = %{state | open_stateless: open_stateless - 1} 351 352 {state, pid} = spawn_child(state, generation, module, arg, nil) 353 {:ok, {state, pid}} 354 355 0 -> :error 356 end 357 end 358 359 defp maybe_restart_child(%State{} = state, %{module: module}) 360 when module not in [Hobbes.Servers.TLog, Hobbes.Servers.Storage] do 361 # Do not restart stateless servers 362 state 363 end 364 365 defp maybe_restart_child(%State{} = state, %{module: module, generation: generation, restart_arg: restart_arg}) do 366 assert is_struct(state.cluster, Cluster) 367 368 # TODO: after a node restart cluster might be able to go back a generation here, 369 # which may be a correctness issue 370 arg = Map.put(restart_arg, :cluster, state.cluster) 371 {state, _pid} = spawn_child(state, generation, module, arg, restart_arg) 372 373 state 374 end 375 376 defp spawn_child(%State{} = state, generation, module, arg, restart_arg) do 377 {:ok, pid} = module.start_link(arg) 378 379 info = %{module: module, generation: generation, restart_arg: restart_arg} 380 state = %{state | children: [{pid, info} | state.children]} 381 382 {state, pid} 383 end 384 385 # Send 386 387 defp send_coordinator_request(%State{} = state) do 388 Enum.each(state.coordinators, fn pid -> 389 Coordinator.request_current_manager(pid) 390 end) 391 392 state 393 end 394 395 defp send_manager_ping(%State{} = state) do 396 assert is_pid(state.manager_pid) 397 398 manager_generation = state.manager_generation 399 assert is_integer(manager_generation) 400 401 slots = 402 case state.cluster do 403 %Cluster{generation: ^manager_generation} -> 404 %{ 405 stateless: state.open_stateless, 406 tlog: length(state.open_tlog), 407 storage: length(state.open_storage), 408 } 409 410 _ -> 411 nil 412 end 413 414 status = %SupervisorStatus{ 415 node: SimProcess.node(), 416 pid: self(), 417 generation_failed?: state.generation_failed, 418 slots: slots, 419 } 420 421 Manager.supervisor_ping(state.manager_pid, status) 422 state 423 end 424 425 defp broadcast_cluster_to_children(%State{} = state) do 426 assert state.cluster != nil 427 428 Enum.each(state.children, fn {pid, _info} -> 429 SimProcess.send(pid, {:update_cluster, state.cluster}) 430 end) 431 432 state 433 end 434end