this repo has no description
at master 105 lines 3.0 kB view raw
1defmodule Hobbes.ClusterNode do 2 use GenServer 3 alias Trinity.{SimServer, SimProcess} 4 5 alias Hobbes.ClusterConfig 6 alias Hobbes.Servers.{Coordinator, ServerSupervisor} 7 8 import ExUnit.Assertions, only: [assert: 1] 9 10 @spec start_link(keyword) :: term 11 def start_link(config) when is_list(config), do: SimServer.start_link(__MODULE__, config) 12 13 defmodule NodeConfig do 14 @enforce_keys [ 15 :cluster_name, 16 17 :coordinator_names, 18 :coordinator_id, 19 :initial_cluster_opts, 20 21 :slots, 22 ] 23 defstruct @enforce_keys 24 25 def from_opts(opts) do 26 # TODO: validate config with helpful error messages 27 %NodeConfig{ 28 cluster_name: Keyword.fetch!(opts, :cluster), 29 coordinator_names: Keyword.fetch!(opts, :coordinators), 30 coordinator_id: Keyword.get(opts, :coordinator_id), 31 initial_cluster_opts: Keyword.fetch!(opts, :initial_cluster_opts), 32 slots: Keyword.fetch!(opts, :slots), 33 } 34 end 35 end 36 37 defmodule State do 38 @enforce_keys [ 39 :node_config, 40 :initial_cluster_config, 41 :coordinator_pid, 42 :server_supervisor_pid, 43 ] 44 defstruct @enforce_keys 45 end 46 47 def init(opts) do 48 #SimProcess.flag(:trap_exit, true) 49 node_config = NodeConfig.from_opts(opts) 50 initial_cluster_config = ClusterConfig.from_opts(node_config.initial_cluster_opts) 51 52 coordinator_pid = start_coordinator(node_config) 53 server_supervisor_pid = start_server_supervisor(node_config) 54 55 state = %State{ 56 node_config: node_config, 57 initial_cluster_config: initial_cluster_config, 58 coordinator_pid: coordinator_pid, 59 server_supervisor_pid: server_supervisor_pid, 60 } 61 62 :ok = ensure_cluster_config(state) 63 {:ok, state} 64 end 65 66 defp start_coordinator(%{coordinator_id: nil}), do: nil 67 68 defp start_coordinator(%{coordinator_id: id, coordinator_names: coordinator_names}) 69 when is_integer(id) and is_list(coordinator_names) do 70 assert id < length(coordinator_names) 71 72 # TODO: use cluster name 73 name = String.to_atom("coordinator-#{id}") 74 {:ok, pid} = Coordinator.start_link(id, coordinator_names, name: name) 75 pid 76 end 77 78 defp start_server_supervisor(%{coordinator_names: coordinators, slots: slots}) do 79 {:ok, pid} = ServerSupervisor.start_link(%{coordinators: coordinators, slots: slots}) 80 pid 81 end 82 83 defp ensure_cluster_config(%State{} = state) do 84 %{ 85 initial_cluster_config: initial_cluster_config, 86 coordinator_pid: coordinator_pid, 87 } = state 88 89 SimProcess.spawn_link(fn -> 90 do_ensure_cluster_config(coordinator_pid, initial_cluster_config) 91 end) 92 :ok 93 end 94 95 defp do_ensure_cluster_config(coordinator_pid, %ClusterConfig{} = config) do 96 case Coordinator.write_initial_config(coordinator_pid, config) do 97 :ok -> :noop 98 {:error, {:not_primary, _}} -> :noop 99 {:error, :config_already_written} -> :noop 100 {:error, :timeout} -> 101 SimProcess.sleep(100) 102 do_ensure_cluster_config(coordinator_pid, config) 103 end 104 end 105end