this repo has no description
at master 162 lines 4.6 kB view raw
1defmodule Hobbes.Servers.BeginBuffer do 2 use GenServer 3 alias Trinity.{SimProcess, SimServer, SimLogger} 4 require SimLogger 5 6 import ExUnit.Assertions, only: [assert: 1] 7 8 alias Hobbes.Structs.{Cluster, TLogGeneration, Server} 9 alias Hobbes.Servers.{Sequencer, TLog} 10 11 import Hobbes.Utils 12 13 defmodule State do 14 @enforce_keys [ 15 :id, 16 :cluster, 17 18 :buffer, 19 20 :batch_buffer, 21 :batch_read_version, 22 23 :check_locked_nonce, 24 :check_locked_reply_ids, 25 ] 26 defstruct @enforce_keys 27 end 28 29 @tick_interval_ms 1 30 31 def start_link(arg), do: SimServer.start_link(__MODULE__, arg) 32 33 @spec get_read_version(GenServer.server) :: {:ok, non_neg_integer} | {:error, :timeout} 34 def get_read_version(pid) do 35 try do 36 SimServer.call(pid, :get_read_version) 37 catch 38 :exit, {:timeout, _mfa} -> {:error, :timeout} 39 end 40 end 41 42 def init(%{id: id, cluster: %Cluster{} = cluster}) do 43 state = %State{ 44 id: id, 45 cluster: cluster, 46 47 buffer: [], 48 49 batch_buffer: nil, 50 batch_read_version: nil, 51 52 check_locked_nonce: nil, 53 check_locked_reply_ids: nil, 54 } 55 56 SimProcess.send_after self(), :tick, 0 57 {:ok, state} 58 end 59 60 def handle_call(:get_read_version, from, %State{} = state) do 61 state = %{state | buffer: [from | state.buffer]} 62 {:noreply, state} 63 end 64 65 def handle_info(:tick, %State{} = state) do 66 state = flush(state) 67 68 SimProcess.send_after self(), :tick, @tick_interval_ms 69 {:noreply, state} 70 end 71 72 def handle_info({:check_locked_reply, nonce, tlog_id, locked?}, %State{} = state) when nonce == state.check_locked_nonce do 73 assert is_integer(tlog_id) 74 assert tlog_id not in state.check_locked_reply_ids 75 76 case locked? do 77 false -> 78 state = %{state | check_locked_reply_ids: [tlog_id | state.check_locked_reply_ids]} 79 state = maybe_complete_flush(state) 80 {:noreply, state} 81 82 # If a TLog is locked in this generation there is an ongoing recovery, 83 # so we may as well just die instead of trying to handle this case 84 true -> 85 SimLogger.debug "BeginBuffer (id=#{state.id}, generation=#{state.cluster.generation}): TLog reported locked in reply, exiting" 86 exit(:shutdown) 87 end 88 end 89 90 def handle_info({:check_locked_reply, _nonce, _tlog_id, _locked?}, %State{} = state) do 91 {:noreply, state} 92 end 93 94 def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) do 95 cond do 96 cluster.generation < state.cluster.generation -> {:noreply, state} 97 cluster.generation == state.cluster.generation -> {:noreply, %{state | cluster: cluster}} 98 cluster.generation > state.cluster.generation -> exit(:shutdown) 99 end 100 end 101 102 defp flush(%State{} = state) when state.cluster.status != :normal, do: state 103 defp flush(%State{} = state) when state.buffer == [], do: state 104 defp flush(%State{} = state) when state.check_locked_nonce != nil, do: state 105 106 defp flush(%State{} = state) do 107 assert state.cluster.status == :normal 108 assert state.buffer != [] 109 assert state.batch_read_version == nil 110 assert state.check_locked_nonce == nil 111 112 [%Server{pid: seq_pid}] = get_servers(state.cluster, Hobbes.Servers.Sequencer) 113 read_version = 114 case Sequencer.get_read_version(seq_pid) do 115 {:ok, read_version} when is_integer(read_version) -> read_version 116 {:error, :timeout} -> exit(:shutdown) 117 end 118 119 nonce = make_ref() 120 state = %{state | 121 buffer: [], 122 123 batch_buffer: state.buffer, 124 batch_read_version: read_version, 125 126 check_locked_nonce: nonce, 127 check_locked_reply_ids: [], 128 } 129 130 %TLogGeneration{tlog_ids: tlog_ids} = hd(state.cluster.tlog_generations) 131 132 Enum.each(tlog_ids, fn id -> 133 %Server{pid: tlog_pid} = Map.fetch!(state.cluster.servers, id) 134 TLog.check_locked(tlog_pid, nonce) 135 end) 136 137 state 138 end 139 140 defp maybe_complete_flush(%State{} = state) do 141 # We use the replication_factor for the current generation to do the liveness check 142 %TLogGeneration{replication_factor: replication_factor} = hd(state.cluster.tlog_generations) 143 144 case length(state.check_locked_reply_ids) >= replication_factor do 145 true -> 146 read_version = state.batch_read_version 147 148 Enum.each(state.batch_buffer, fn from -> 149 SimServer.reply(from, {:ok, read_version}) 150 end) 151 152 %{state | 153 batch_buffer: nil, 154 batch_read_version: nil, 155 check_locked_nonce: nil, 156 check_locked_reply_ids: nil, 157 } 158 159 false -> state 160 end 161 end 162end