this repo has no description
1defmodule Hobbes.ClusterNode do
2 use GenServer
3 alias Trinity.{SimServer, SimProcess}
4
5 alias Hobbes.ClusterConfig
6 alias Hobbes.Servers.{Coordinator, ServerSupervisor}
7
8 import ExUnit.Assertions, only: [assert: 1]
9
10 @spec start_link(keyword) :: term
11 def start_link(config) when is_list(config), do: SimServer.start_link(__MODULE__, config)
12
13 defmodule NodeConfig do
14 @enforce_keys [
15 :cluster_name,
16
17 :coordinator_names,
18 :coordinator_id,
19 :initial_cluster_opts,
20
21 :slots,
22 ]
23 defstruct @enforce_keys
24
25 def from_opts(opts) do
26 # TODO: validate config with helpful error messages
27 %NodeConfig{
28 cluster_name: Keyword.fetch!(opts, :cluster),
29 coordinator_names: Keyword.fetch!(opts, :coordinators),
30 coordinator_id: Keyword.get(opts, :coordinator_id),
31 initial_cluster_opts: Keyword.fetch!(opts, :initial_cluster_opts),
32 slots: Keyword.fetch!(opts, :slots),
33 }
34 end
35 end
36
37 defmodule State do
38 @enforce_keys [
39 :node_config,
40 :initial_cluster_config,
41 :coordinator_pid,
42 :server_supervisor_pid,
43 ]
44 defstruct @enforce_keys
45 end
46
47 def init(opts) do
48 #SimProcess.flag(:trap_exit, true)
49 node_config = NodeConfig.from_opts(opts)
50 initial_cluster_config = ClusterConfig.from_opts(node_config.initial_cluster_opts)
51
52 coordinator_pid = start_coordinator(node_config)
53 server_supervisor_pid = start_server_supervisor(node_config)
54
55 state = %State{
56 node_config: node_config,
57 initial_cluster_config: initial_cluster_config,
58 coordinator_pid: coordinator_pid,
59 server_supervisor_pid: server_supervisor_pid,
60 }
61
62 :ok = ensure_cluster_config(state)
63 {:ok, state}
64 end
65
66 defp start_coordinator(%{coordinator_id: nil}), do: nil
67
68 defp start_coordinator(%{coordinator_id: id, coordinator_names: coordinator_names})
69 when is_integer(id) and is_list(coordinator_names) do
70 assert id < length(coordinator_names)
71
72 # TODO: use cluster name
73 name = String.to_atom("coordinator-#{id}")
74 {:ok, pid} = Coordinator.start_link(id, coordinator_names, name: name)
75 pid
76 end
77
78 defp start_server_supervisor(%{coordinator_names: coordinators, slots: slots}) do
79 {:ok, pid} = ServerSupervisor.start_link(%{coordinators: coordinators, slots: slots})
80 pid
81 end
82
83 defp ensure_cluster_config(%State{} = state) do
84 %{
85 initial_cluster_config: initial_cluster_config,
86 coordinator_pid: coordinator_pid,
87 } = state
88
89 SimProcess.spawn_link(fn ->
90 do_ensure_cluster_config(coordinator_pid, initial_cluster_config)
91 end)
92 :ok
93 end
94
95 defp do_ensure_cluster_config(coordinator_pid, %ClusterConfig{} = config) do
96 case Coordinator.write_initial_config(coordinator_pid, config) do
97 :ok -> :noop
98 {:error, {:not_primary, _}} -> :noop
99 {:error, :config_already_written} -> :noop
100 {:error, :timeout} ->
101 SimProcess.sleep(100)
102 do_ensure_cluster_config(coordinator_pid, config)
103 end
104 end
105end