defmodule Hobbes.Servers.BeginBuffer do use GenServer alias Trinity.{SimProcess, SimServer, SimLogger} require SimLogger import ExUnit.Assertions, only: [assert: 1] alias Hobbes.Structs.{Cluster, TLogGeneration, Server} alias Hobbes.Servers.{Sequencer, TLog} import Hobbes.Utils defmodule State do @enforce_keys [ :id, :cluster, :buffer, :batch_buffer, :batch_read_version, :check_locked_nonce, :check_locked_reply_ids, ] defstruct @enforce_keys end @tick_interval_ms 1 def start_link(arg), do: SimServer.start_link(__MODULE__, arg) @spec get_read_version(GenServer.server) :: {:ok, non_neg_integer} | {:error, :timeout} def get_read_version(pid) do try do SimServer.call(pid, :get_read_version) catch :exit, {:timeout, _mfa} -> {:error, :timeout} end end def init(%{id: id, cluster: %Cluster{} = cluster}) do state = %State{ id: id, cluster: cluster, buffer: [], batch_buffer: nil, batch_read_version: nil, check_locked_nonce: nil, check_locked_reply_ids: nil, } SimProcess.send_after self(), :tick, 0 {:ok, state} end def handle_call(:get_read_version, from, %State{} = state) do state = %{state | buffer: [from | state.buffer]} {:noreply, state} end def handle_info(:tick, %State{} = state) do state = flush(state) SimProcess.send_after self(), :tick, @tick_interval_ms {:noreply, state} end def handle_info({:check_locked_reply, nonce, tlog_id, locked?}, %State{} = state) when nonce == state.check_locked_nonce do assert is_integer(tlog_id) assert tlog_id not in state.check_locked_reply_ids case locked? do false -> state = %{state | check_locked_reply_ids: [tlog_id | state.check_locked_reply_ids]} state = maybe_complete_flush(state) {:noreply, state} # If a TLog is locked in this generation there is an ongoing recovery, # so we may as well just die instead of trying to handle this case true -> SimLogger.debug "BeginBuffer (id=#{state.id}, generation=#{state.cluster.generation}): TLog reported locked in reply, exiting" exit(:shutdown) end end def handle_info({:check_locked_reply, _nonce, _tlog_id, _locked?}, %State{} = state) do {:noreply, state} end def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) do cond do cluster.generation < state.cluster.generation -> {:noreply, state} cluster.generation == state.cluster.generation -> {:noreply, %{state | cluster: cluster}} cluster.generation > state.cluster.generation -> exit(:shutdown) end end defp flush(%State{} = state) when state.cluster.status != :normal, do: state defp flush(%State{} = state) when state.buffer == [], do: state defp flush(%State{} = state) when state.check_locked_nonce != nil, do: state defp flush(%State{} = state) do assert state.cluster.status == :normal assert state.buffer != [] assert state.batch_read_version == nil assert state.check_locked_nonce == nil [%Server{pid: seq_pid}] = get_servers(state.cluster, Hobbes.Servers.Sequencer) read_version = case Sequencer.get_read_version(seq_pid) do {:ok, read_version} when is_integer(read_version) -> read_version {:error, :timeout} -> exit(:shutdown) end nonce = make_ref() state = %{state | buffer: [], batch_buffer: state.buffer, batch_read_version: read_version, check_locked_nonce: nonce, check_locked_reply_ids: [], } %TLogGeneration{tlog_ids: tlog_ids} = hd(state.cluster.tlog_generations) Enum.each(tlog_ids, fn id -> %Server{pid: tlog_pid} = Map.fetch!(state.cluster.servers, id) TLog.check_locked(tlog_pid, nonce) end) state end defp maybe_complete_flush(%State{} = state) do # We use the replication_factor for the current generation to do the liveness check %TLogGeneration{replication_factor: replication_factor} = hd(state.cluster.tlog_generations) case length(state.check_locked_reply_ids) >= replication_factor do true -> read_version = state.batch_read_version Enum.each(state.batch_buffer, fn from -> SimServer.reply(from, {:ok, read_version}) end) %{state | batch_buffer: nil, batch_read_version: nil, check_locked_nonce: nil, check_locked_reply_ids: nil, } false -> state end end end