defmodule Hobbes.Servers.CommitBuffer do use GenServer alias Trinity.{SimProcess, SimServer, SimLogger} require SimLogger import ExUnit.Assertions, only: [assert: 1] alias Hobbes.ShardTagMap alias Hobbes.Structs.{Cluster, TLogGeneration, Server, ResolveBatch, CommitTxn, LogBatch} alias Hobbes.Servers.{Sequencer, Resolver, TLog} import Hobbes.Utils # TODO: centralize types? @type shard :: {binary, binary, {[integer], [pid]}} @type commit_info :: %{commit_version: non_neg_integer, batch_index: non_neg_integer} @flush_interval_ms 1 @commit_interval_us 1000 @empty_commit_interval_us 10_000 @max_buffer_size 300 defmodule State do @type t :: %__MODULE__{ id: non_neg_integer, cluster: Cluster.t, shard_map: ShardTagMap.t, last_committed_version: non_neg_integer, buffer: list, buffer_size: non_neg_integer, last_commit_timestamp: integer, storage_servers: map, } @enforce_keys [ :id, :cluster, :shard_map, :last_committed_version, :buffer, :buffer_size, :last_commit_timestamp, :storage_servers, ] defstruct @enforce_keys end def start_link(arg), do: SimServer.start_link(__MODULE__, arg) @doc ~S""" Returns shards for a key or key range. ## Examples iex> get_shards(server, "foo") [{"", "\xFF", [1, 2, 3]}] iex> get_shards(server, {"foo", "zoo"}) [ {"", "goo", [1, 2, 3]}, {"goo", "\xFF", [4, 5, 6]}, ] """ @spec get_shards(pid, binary | {binary, binary}) :: {:ok, [shard]} def get_shards(server, key_or_range) when is_binary(key_or_range) or is_tuple(key_or_range) do {:ok, [shards]} = get_shards_multi(server, [key_or_range]) {:ok, shards} end @doc ~S""" Returns shards for each key or key range. ## Examples iex> get_shards_multi(server, ["foo", "bar"]) [ [{"f", "\xFF", [4, 5, 6]}], [{"", "f", [1, 2, 3]}], ] """ @spec get_shards_multi(pid, [binary | {binary, binary}]) :: {:ok, [[shard]]} | {:error, :timeout} def get_shards_multi(server, keys_or_ranges) when is_list(keys_or_ranges) do try do SimServer.call(server, {:get_shards, keys_or_ranges}, 1000) catch :exit, {:timeout, _} -> {:error, :timeout} end end @spec commit(pid, CommitTxn.t) :: {:ok, commit_info} | {:error, :timeout | {:transaction_too_old | :read_conflict | :database_locked, commit_info}} def commit(server, %CommitTxn{} = txn) do try do SimServer.call(server, {:commit, txn}) catch :exit, {:timeout, _} -> {:error, :timeout} end end def init(%{id: id, cluster: %Cluster{} = cluster, storage_teams_pairs: storage_teams_pairs, key_storage_pairs: key_storage_pairs}) do %TLogGeneration{} = current_generation = hd(cluster.tlog_generations) assert current_generation.generation == cluster.generation state = %State{ id: id, cluster: cluster, shard_map: ShardTagMap.new(current_generation), last_committed_version: 0, buffer: [], buffer_size: 0, last_commit_timestamp: current_time(), storage_servers: %{}, } ShardTagMap.load_meta_pairs(state.shard_map, storage_teams_pairs) ShardTagMap.load_meta_pairs(state.shard_map, key_storage_pairs) SimProcess.send_after(self(), :flush, @flush_interval_ms) {:ok, state} end def handle_call({:get_shards, keys_or_ranges}, _from, state) when is_list(keys_or_ranges) do stm = state.shard_map storage_servers = state.storage_servers shard_lists = Enum.map(keys_or_ranges, fn key_or_range -> ShardTagMap.shards_for_key_or_range(stm, key_or_range) |> Enum.map(fn {sk, ek, storage_server_ids} -> storage_pids = Enum.map(storage_server_ids, &Map.get(storage_servers, &1)) {sk, ek, {storage_server_ids, storage_pids}} end) end) {:reply, {:ok, shard_lists}, state} end def handle_call({:commit, %CommitTxn{} = txn}, from, state) do txn = %{txn | from: from, batch_index: state.buffer_size} state = %{state | buffer: [txn | state.buffer], buffer_size: state.buffer_size + 1} state = maybe_commit_batch(state) {:noreply, state} end def handle_info(:flush, %State{} = state) do state = maybe_commit_batch(state) SimProcess.send_after(self(), :flush, @flush_interval_ms) {:noreply, state} end def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) do cond do cluster.generation < state.cluster.generation -> {:noreply, state} cluster.generation == state.cluster.generation -> {:noreply, %{state | cluster: cluster}} cluster.generation > state.cluster.generation -> exit(:shutdown) end end def handle_info({:update_storage_servers, storage_map}, %State{} = state) when is_map(storage_map) do # Validate for sanity Enum.each(storage_map, fn {id, pid} -> # Note: an assert failure here would actually be nondeterministic WRT which pair triggers it but it doesn't really matter assert is_integer(id) assert is_pid(pid) end) {:noreply, %{state | storage_servers: storage_map}} end defp maybe_commit_batch(%State{} = state) do buffer_size = state.buffer_size elapsed_us = current_time() - state.last_commit_timestamp cond do buffer_size >= @max_buffer_size -> commit_batch(state) buffer_size > 0 and elapsed_us >= @commit_interval_us -> commit_batch(state) elapsed_us >= @empty_commit_interval_us -> commit_batch(state) true -> state end end # If we have not recovered there are not yet any TLogs to write to defp commit_batch(%State{} = state) when state.cluster.status != :normal do state end defp commit_batch(%State{} = state) do assert state.cluster.status == :normal [%Server{pid: seq_pid}] = get_servers(state.cluster, Hobbes.Servers.Sequencer) {commit_version, prev_commit_version} = case Sequencer.get_commit_version(seq_pid) do {:ok, {_cv, _pcv} = versions} -> versions {:error, :timeout} -> SimLogger.debug "CommitBuffer (id=#{state.id}, generation=#{state.cluster.generation})): failed to get commit version, exiting" exit(:shutdown) end transactions_reversed = state.buffer # Filter transactions which started outside the mvcc window read_version_floor = commit_version - mvcc_window() {transactions_reversed, old_transactions} = Enum.split_with(transactions_reversed, fn %CommitTxn{} = txn -> # TODO: maybe validate instead of asserting? assert is_integer(txn.read_version) or txn.read_version == :write_only # TODO: make sure this is not off by one anywhere (e.g. WRT resolvers, storage) # Note: (:write_only > integer, so :write_only transactions always pass) txn.read_version > read_version_floor end) # Buffer is reversed, so these are in the correct order resolver_txns = Enum.reduce(transactions_reversed, [], fn %CommitTxn{} = txn, acc -> [{ txn.read_version, txn.read_conflicts, txn.write_conflicts, Enum.filter(txn.mutations, &meta_mutation?/1), } | acc] end) batch = %ResolveBatch{ commit_buffer_id: state.id, commit_version: commit_version, prev_commit_version: prev_commit_version, transactions: resolver_txns, } [%Server{pid: resolver_pid}] = get_servers(state.cluster, Hobbes.Servers.Resolver) {txn_results_reversed, meta_log} = case Resolver.resolve_batch(resolver_pid, batch) do {:ok, result} -> result {:error, :timeout} -> SimLogger.debug "CommitBuffer (id=#{state.id}, generation=#{state.cluster.generation})): failed to resolve batch, exiting" exit(:shutdown) end # Apply meta mutations received, including our own from this batch Enum.each(meta_log, fn {_commit_version, mutations} -> ShardTagMap.apply_metadata_mutations(state.shard_map, mutations) end) {allowed_transactions, rejected_transactions} = # Both are reversed, so they come out of this the right way around Enum.zip(transactions_reversed, txn_results_reversed) |> Enum.reduce({[], []}, fn {txn, true}, {a, r} -> {[txn | a], r} {txn, false}, {a, r} -> {a, [txn | r]} end) # If the database is locked, filter out all non-meta transactions # TODO: use a lock-aware flag instead like FDB # TODO: removed for now, bring back later without MetaStore #{allowed_transactions, locked_transactions} = # case MetaStore.locked?(state.meta_store, commit_version) do # true -> Enum.split_with(allowed_transactions, fn txn -> has_meta?(txn.mutations) end) # false -> {allowed_transactions, []} # end locked_transactions = [] # Add storage tags to each mutation (including special meta tag for meta mutations) tlog_mutations = allowed_transactions |> Enum.map(fn %CommitTxn{} = txn -> txn.mutations end) |> Enum.concat() |> then(fn mutations -> mutations ++ compute_special_mutations(mutations) end) |> then(fn mutations -> ShardTagMap.tag_and_slice_mutations(state.shard_map, mutations) end) tlog_ids = hd(state.cluster.tlog_generations).tlog_ids # Send sliced mutations to each tlog tlog_ids |> Enum.map(fn tlog_id -> tagged_mutations = Map.fetch!(tlog_mutations, tlog_id) log_batch = %LogBatch{ commit_buffer_id: state.id, commit_version: commit_version, prev_commit_version: prev_commit_version, tagged_mutations: tagged_mutations, last_committed_version: state.last_committed_version, } %Server{pid: tlog_pid} = Map.fetch!(state.cluster.servers, tlog_id) TLog.write_batch_send(tlog_pid, log_batch) end) |> Enum.each(fn req_id -> case TLog.write_batch_receive(req_id) do :ok -> :noop # If a commit fails, we trigger recovery (or are already recovering) {:error, err} when err in [:tlog_locked, :timeout] -> SimLogger.debug "CommitBuffer (id=#{state.id}, generation=#{state.cluster.generation})): failed to write batch to TLog (error=#{err}), exiting" exit(:shutdown) end end) # Once all tlogs have replied (made durable), notify sequencer this version is committed case Sequencer.notify_committed(seq_pid, commit_version) do :ok -> :noop {:error, :timeout} -> SimLogger.debug "CommitBuffer (id=#{state.id}, generation=#{state.cluster.generation})): failed to notify sequencer of commit, exiting" exit(:shutdown) end # Reply to clients Enum.each(allowed_transactions, fn %CommitTxn{} = txn -> SimServer.reply(txn.from, {:ok, %{commit_version: commit_version, batch_index: txn.batch_index}}) end) Enum.each(locked_transactions, fn %CommitTxn{} = txn -> # TODO: use a different error message SimServer.reply(txn.from, {:error, {:database_locked, %{commit_version: commit_version, batch_index: txn.batch_index}}}) end) Enum.each(old_transactions, fn %CommitTxn{} = txn -> SimServer.reply(txn.from, {:error, {:transaction_too_old, %{commit_version: commit_version, batch_index: txn.batch_index}}}) end) Enum.each(rejected_transactions, fn %CommitTxn{} = txn -> SimServer.reply(txn.from, {:error, {:read_conflict, %{commit_version: commit_version, batch_index: txn.batch_index}}}) end) case length(allowed_transactions) do 0 -> SimLogger.debug "CommitBuffer (id=#{state.id}) committed empty batch" batch_size -> SimLogger.debug "CommitBuffer (id=#{state.id}) committed batch (batch_size=#{batch_size} commit_version=#{commit_version})" end %{state | last_committed_version: commit_version, buffer: [], buffer_size: 0, last_commit_timestamp: current_time(), } end end