this repo has no description
at master 361 lines 12 kB view raw
1defmodule Hobbes.Servers.CommitBuffer do 2 use GenServer 3 alias Trinity.{SimProcess, SimServer, SimLogger} 4 require SimLogger 5 6 import ExUnit.Assertions, only: [assert: 1] 7 8 alias Hobbes.ShardTagMap 9 alias Hobbes.Structs.{Cluster, TLogGeneration, Server, ResolveBatch, CommitTxn, LogBatch} 10 11 alias Hobbes.Servers.{Sequencer, Resolver, TLog} 12 13 import Hobbes.Utils 14 15 # TODO: centralize types? 16 @type shard :: {binary, binary, {[integer], [pid]}} 17 @type commit_info :: %{commit_version: non_neg_integer, batch_index: non_neg_integer} 18 19 @flush_interval_ms 1 20 @commit_interval_us 1000 21 @empty_commit_interval_us 10_000 22 23 @max_buffer_size 300 24 25 defmodule State do 26 @type t :: %__MODULE__{ 27 id: non_neg_integer, 28 cluster: Cluster.t, 29 shard_map: ShardTagMap.t, 30 last_committed_version: non_neg_integer, 31 32 buffer: list, 33 buffer_size: non_neg_integer, 34 last_commit_timestamp: integer, 35 36 storage_servers: map, 37 } 38 39 @enforce_keys [ 40 :id, 41 :cluster, 42 :shard_map, 43 :last_committed_version, 44 45 :buffer, 46 :buffer_size, 47 :last_commit_timestamp, 48 49 :storage_servers, 50 ] 51 defstruct @enforce_keys 52 end 53 54 def start_link(arg), do: SimServer.start_link(__MODULE__, arg) 55 56 @doc ~S""" 57 Returns shards for a key or key range. 58 59 ## Examples 60 61 iex> get_shards(server, "foo") 62 [{"", "\xFF", [1, 2, 3]}] 63 64 iex> get_shards(server, {"foo", "zoo"}) 65 [ 66 {"", "goo", [1, 2, 3]}, 67 {"goo", "\xFF", [4, 5, 6]}, 68 ] 69 70 """ 71 @spec get_shards(pid, binary | {binary, binary}) :: {:ok, [shard]} 72 def get_shards(server, key_or_range) when is_binary(key_or_range) or is_tuple(key_or_range) do 73 {:ok, [shards]} = get_shards_multi(server, [key_or_range]) 74 {:ok, shards} 75 end 76 77 @doc ~S""" 78 Returns shards for each key or key range. 79 80 ## Examples 81 82 iex> get_shards_multi(server, ["foo", "bar"]) 83 [ 84 [{"f", "\xFF", [4, 5, 6]}], 85 [{"", "f", [1, 2, 3]}], 86 ] 87 88 """ 89 @spec get_shards_multi(pid, [binary | {binary, binary}]) :: {:ok, [[shard]]} | {:error, :timeout} 90 def get_shards_multi(server, keys_or_ranges) when is_list(keys_or_ranges) do 91 try do 92 SimServer.call(server, {:get_shards, keys_or_ranges}, 1000) 93 catch 94 :exit, {:timeout, _} -> {:error, :timeout} 95 end 96 end 97 98 @spec commit(pid, CommitTxn.t) :: {:ok, commit_info} | {:error, :timeout | {:transaction_too_old | :read_conflict | :database_locked, commit_info}} 99 def commit(server, %CommitTxn{} = txn) do 100 try do 101 SimServer.call(server, {:commit, txn}) 102 catch 103 :exit, {:timeout, _} -> {:error, :timeout} 104 end 105 end 106 107 def init(%{id: id, cluster: %Cluster{} = cluster, storage_teams_pairs: storage_teams_pairs, key_storage_pairs: key_storage_pairs}) do 108 %TLogGeneration{} = current_generation = hd(cluster.tlog_generations) 109 assert current_generation.generation == cluster.generation 110 111 state = %State{ 112 id: id, 113 cluster: cluster, 114 shard_map: ShardTagMap.new(current_generation), 115 last_committed_version: 0, 116 117 buffer: [], 118 buffer_size: 0, 119 last_commit_timestamp: current_time(), 120 121 storage_servers: %{}, 122 } 123 124 ShardTagMap.load_meta_pairs(state.shard_map, storage_teams_pairs) 125 ShardTagMap.load_meta_pairs(state.shard_map, key_storage_pairs) 126 127 SimProcess.send_after(self(), :flush, @flush_interval_ms) 128 {:ok, state} 129 end 130 131 def handle_call({:get_shards, keys_or_ranges}, _from, state) when is_list(keys_or_ranges) do 132 stm = state.shard_map 133 storage_servers = state.storage_servers 134 135 shard_lists = 136 Enum.map(keys_or_ranges, fn key_or_range -> 137 ShardTagMap.shards_for_key_or_range(stm, key_or_range) 138 |> Enum.map(fn {sk, ek, storage_server_ids} -> 139 storage_pids = Enum.map(storage_server_ids, &Map.get(storage_servers, &1)) 140 {sk, ek, {storage_server_ids, storage_pids}} 141 end) 142 end) 143 144 {:reply, {:ok, shard_lists}, state} 145 end 146 147 def handle_call({:commit, %CommitTxn{} = txn}, from, state) do 148 txn = %{txn | from: from, batch_index: state.buffer_size} 149 state = %{state | buffer: [txn | state.buffer], buffer_size: state.buffer_size + 1} 150 151 state = maybe_commit_batch(state) 152 {:noreply, state} 153 end 154 155 def handle_info(:flush, %State{} = state) do 156 state = maybe_commit_batch(state) 157 SimProcess.send_after(self(), :flush, @flush_interval_ms) 158 {:noreply, state} 159 end 160 161 def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) do 162 cond do 163 cluster.generation < state.cluster.generation -> {:noreply, state} 164 cluster.generation == state.cluster.generation -> {:noreply, %{state | cluster: cluster}} 165 cluster.generation > state.cluster.generation -> exit(:shutdown) 166 end 167 end 168 169 def handle_info({:update_storage_servers, storage_map}, %State{} = state) when is_map(storage_map) do 170 # Validate for sanity 171 Enum.each(storage_map, fn {id, pid} -> 172 # Note: an assert failure here would actually be nondeterministic WRT which pair triggers it but it doesn't really matter 173 assert is_integer(id) 174 assert is_pid(pid) 175 end) 176 177 {:noreply, %{state | storage_servers: storage_map}} 178 end 179 180 defp maybe_commit_batch(%State{} = state) do 181 buffer_size = state.buffer_size 182 elapsed_us = current_time() - state.last_commit_timestamp 183 184 cond do 185 buffer_size >= @max_buffer_size -> 186 commit_batch(state) 187 188 buffer_size > 0 and elapsed_us >= @commit_interval_us -> 189 commit_batch(state) 190 191 elapsed_us >= @empty_commit_interval_us -> 192 commit_batch(state) 193 194 true -> state 195 end 196 end 197 198 # If we have not recovered there are not yet any TLogs to write to 199 defp commit_batch(%State{} = state) when state.cluster.status != :normal do 200 state 201 end 202 203 defp commit_batch(%State{} = state) do 204 assert state.cluster.status == :normal 205 206 [%Server{pid: seq_pid}] = get_servers(state.cluster, Hobbes.Servers.Sequencer) 207 208 {commit_version, prev_commit_version} = 209 case Sequencer.get_commit_version(seq_pid) do 210 {:ok, {_cv, _pcv} = versions} -> 211 versions 212 {:error, :timeout} -> 213 SimLogger.debug "CommitBuffer (id=#{state.id}, generation=#{state.cluster.generation})): failed to get commit version, exiting" 214 exit(:shutdown) 215 end 216 217 transactions_reversed = state.buffer 218 219 # Filter transactions which started outside the mvcc window 220 read_version_floor = commit_version - mvcc_window() 221 {transactions_reversed, old_transactions} = 222 Enum.split_with(transactions_reversed, fn %CommitTxn{} = txn -> 223 # TODO: maybe validate instead of asserting? 224 assert is_integer(txn.read_version) or txn.read_version == :write_only 225 226 # TODO: make sure this is not off by one anywhere (e.g. WRT resolvers, storage) 227 # Note: (:write_only > integer, so :write_only transactions always pass) 228 txn.read_version > read_version_floor 229 end) 230 231 # Buffer is reversed, so these are in the correct order 232 resolver_txns = 233 Enum.reduce(transactions_reversed, [], fn %CommitTxn{} = txn, acc -> 234 [{ 235 txn.read_version, 236 txn.read_conflicts, 237 txn.write_conflicts, 238 Enum.filter(txn.mutations, &meta_mutation?/1), 239 } | acc] 240 end) 241 242 batch = %ResolveBatch{ 243 commit_buffer_id: state.id, 244 commit_version: commit_version, 245 prev_commit_version: prev_commit_version, 246 transactions: resolver_txns, 247 } 248 249 [%Server{pid: resolver_pid}] = get_servers(state.cluster, Hobbes.Servers.Resolver) 250 {txn_results_reversed, meta_log} = 251 case Resolver.resolve_batch(resolver_pid, batch) do 252 {:ok, result} -> 253 result 254 {:error, :timeout} -> 255 SimLogger.debug "CommitBuffer (id=#{state.id}, generation=#{state.cluster.generation})): failed to resolve batch, exiting" 256 exit(:shutdown) 257 end 258 259 # Apply meta mutations received, including our own from this batch 260 Enum.each(meta_log, fn {_commit_version, mutations} -> 261 ShardTagMap.apply_metadata_mutations(state.shard_map, mutations) 262 end) 263 264 {allowed_transactions, rejected_transactions} = 265 # Both are reversed, so they come out of this the right way around 266 Enum.zip(transactions_reversed, txn_results_reversed) 267 |> Enum.reduce({[], []}, fn 268 {txn, true}, {a, r} -> {[txn | a], r} 269 {txn, false}, {a, r} -> {a, [txn | r]} 270 end) 271 272 # If the database is locked, filter out all non-meta transactions 273 # TODO: use a lock-aware flag instead like FDB 274 # TODO: removed for now, bring back later without MetaStore 275 #{allowed_transactions, locked_transactions} = 276 # case MetaStore.locked?(state.meta_store, commit_version) do 277 # true -> Enum.split_with(allowed_transactions, fn txn -> has_meta?(txn.mutations) end) 278 # false -> {allowed_transactions, []} 279 # end 280 locked_transactions = [] 281 282 # Add storage tags to each mutation (including special meta tag for meta mutations) 283 tlog_mutations = 284 allowed_transactions 285 |> Enum.map(fn %CommitTxn{} = txn -> txn.mutations end) 286 |> Enum.concat() 287 |> then(fn mutations -> 288 mutations ++ compute_special_mutations(mutations) 289 end) 290 |> then(fn mutations -> 291 ShardTagMap.tag_and_slice_mutations(state.shard_map, mutations) 292 end) 293 294 tlog_ids = hd(state.cluster.tlog_generations).tlog_ids 295 296 # Send sliced mutations to each tlog 297 tlog_ids 298 |> Enum.map(fn tlog_id -> 299 tagged_mutations = Map.fetch!(tlog_mutations, tlog_id) 300 301 log_batch = %LogBatch{ 302 commit_buffer_id: state.id, 303 commit_version: commit_version, 304 prev_commit_version: prev_commit_version, 305 tagged_mutations: tagged_mutations, 306 last_committed_version: state.last_committed_version, 307 } 308 309 %Server{pid: tlog_pid} = Map.fetch!(state.cluster.servers, tlog_id) 310 TLog.write_batch_send(tlog_pid, log_batch) 311 end) 312 |> Enum.each(fn req_id -> 313 case TLog.write_batch_receive(req_id) do 314 :ok -> :noop 315 # If a commit fails, we trigger recovery (or are already recovering) 316 {:error, err} when err in [:tlog_locked, :timeout] -> 317 SimLogger.debug "CommitBuffer (id=#{state.id}, generation=#{state.cluster.generation})): failed to write batch to TLog (error=#{err}), exiting" 318 exit(:shutdown) 319 end 320 end) 321 322 # Once all tlogs have replied (made durable), notify sequencer this version is committed 323 case Sequencer.notify_committed(seq_pid, commit_version) do 324 :ok -> :noop 325 {:error, :timeout} -> 326 SimLogger.debug "CommitBuffer (id=#{state.id}, generation=#{state.cluster.generation})): failed to notify sequencer of commit, exiting" 327 exit(:shutdown) 328 end 329 330 # Reply to clients 331 332 Enum.each(allowed_transactions, fn %CommitTxn{} = txn -> 333 SimServer.reply(txn.from, {:ok, %{commit_version: commit_version, batch_index: txn.batch_index}}) 334 end) 335 336 Enum.each(locked_transactions, fn %CommitTxn{} = txn -> 337 # TODO: use a different error message 338 SimServer.reply(txn.from, {:error, {:database_locked, %{commit_version: commit_version, batch_index: txn.batch_index}}}) 339 end) 340 341 Enum.each(old_transactions, fn %CommitTxn{} = txn -> 342 SimServer.reply(txn.from, {:error, {:transaction_too_old, %{commit_version: commit_version, batch_index: txn.batch_index}}}) 343 end) 344 345 Enum.each(rejected_transactions, fn %CommitTxn{} = txn -> 346 SimServer.reply(txn.from, {:error, {:read_conflict, %{commit_version: commit_version, batch_index: txn.batch_index}}}) 347 end) 348 349 case length(allowed_transactions) do 350 0 -> SimLogger.debug "CommitBuffer (id=#{state.id}) committed empty batch" 351 batch_size -> SimLogger.debug "CommitBuffer (id=#{state.id}) committed batch (batch_size=#{batch_size} commit_version=#{commit_version})" 352 end 353 354 %{state | 355 last_committed_version: commit_version, 356 buffer: [], 357 buffer_size: 0, 358 last_commit_timestamp: current_time(), 359 } 360 end 361end