defmodule Hobbes.ClusterNode do use GenServer alias Trinity.{SimServer, SimProcess} alias Hobbes.ClusterConfig alias Hobbes.Servers.{Coordinator, ServerSupervisor} import ExUnit.Assertions, only: [assert: 1] @spec start_link(keyword) :: term def start_link(config) when is_list(config), do: SimServer.start_link(__MODULE__, config) defmodule NodeConfig do @enforce_keys [ :cluster_name, :coordinator_names, :coordinator_id, :initial_cluster_opts, :slots, ] defstruct @enforce_keys def from_opts(opts) do # TODO: validate config with helpful error messages %NodeConfig{ cluster_name: Keyword.fetch!(opts, :cluster), coordinator_names: Keyword.fetch!(opts, :coordinators), coordinator_id: Keyword.get(opts, :coordinator_id), initial_cluster_opts: Keyword.fetch!(opts, :initial_cluster_opts), slots: Keyword.fetch!(opts, :slots), } end end defmodule State do @enforce_keys [ :node_config, :initial_cluster_config, :coordinator_pid, :server_supervisor_pid, ] defstruct @enforce_keys end def init(opts) do #SimProcess.flag(:trap_exit, true) node_config = NodeConfig.from_opts(opts) initial_cluster_config = ClusterConfig.from_opts(node_config.initial_cluster_opts) coordinator_pid = start_coordinator(node_config) server_supervisor_pid = start_server_supervisor(node_config) state = %State{ node_config: node_config, initial_cluster_config: initial_cluster_config, coordinator_pid: coordinator_pid, server_supervisor_pid: server_supervisor_pid, } :ok = ensure_cluster_config(state) {:ok, state} end defp start_coordinator(%{coordinator_id: nil}), do: nil defp start_coordinator(%{coordinator_id: id, coordinator_names: coordinator_names}) when is_integer(id) and is_list(coordinator_names) do assert id < length(coordinator_names) # TODO: use cluster name name = String.to_atom("coordinator-#{id}") {:ok, pid} = Coordinator.start_link(id, coordinator_names, name: name) pid end defp start_server_supervisor(%{coordinator_names: coordinators, slots: slots}) do {:ok, pid} = ServerSupervisor.start_link(%{coordinators: coordinators, slots: slots}) pid end defp ensure_cluster_config(%State{} = state) do %{ initial_cluster_config: initial_cluster_config, coordinator_pid: coordinator_pid, } = state SimProcess.spawn_link(fn -> do_ensure_cluster_config(coordinator_pid, initial_cluster_config) end) :ok end defp do_ensure_cluster_config(coordinator_pid, %ClusterConfig{} = config) do case Coordinator.write_initial_config(coordinator_pid, config) do :ok -> :noop {:error, {:not_primary, _}} -> :noop {:error, :config_already_written} -> :noop {:error, :timeout} -> SimProcess.sleep(100) do_ensure_cluster_config(coordinator_pid, config) end end end