defmodule Hobbes.Utils do @moduledoc """ Hobbes utils. """ alias Hobbes.Structs.{Cluster, TLogGeneration, Server} alias Hobbes.Encoding.Keyset alias Trinity.SimProcess import ExUnit.Assertions, only: [assert: 1] @type mutation :: {:write, binary, binary} | {:clear, binary} | {:clear_range, binary, binary} @type numbered_mutation :: {non_neg_integer, mutation} @type tag_list :: [integer] @type tagged_mutation :: {tag_list, numbered_mutation} defmacro normal_prefix, do: "" defmacro normal_end, do: "\xFF" defmacro meta_prefix, do: "\xFF" defmacro meta_end, do: "\xFF\xFF" defmacro database_prefix, do: normal_prefix() defmacro database_end, do: meta_end() # TODO: remove all_keys in favor of database defmacro all_keys_prefix, do: "" defmacro all_keys_end, do: meta_end() defmacro storage_teams_prefix, do: "\xFF/storage_teams/" defmacro storage_teams_end, do: "\xFF/storage_teams0" defmacro server_tags_prefix, do: "\xFF/st/" defmacro server_tags_end, do: "\xFF/st0" defmacro key_storage_prefix, do: "\xFF/ks/" defmacro key_storage_end, do: "\xFF/ks0" # TODO: rename to "sk/" once server_keys has been removed defmacro storage_keys_prefix, do: "\xFF/stk/" defmacro storage_keys_end, do: "\xFF/stk0" defmacro key_servers_prefix, do: "\xFF/key_servers/" defmacro key_servers_end, do: "\xFF/key_servers0" defmacro server_keys_prefix, do: "\xFF/sk/" defmacro server_keys_end, do: "\xFF/sk0" defmacro shard_moves_prefix, do: "\xFF/sm/" defmacro shard_moves_end, do: "\xFF/sm0" defmacro special_prefix, do: "\xFF\xFF" defmacro special_end, do: "\xFF\xFF\xFF\xFF" # TODO: rename to "sk/" once server_keys has been removed defmacro special_storage_keys_prefix, do: "\xFF\xFF/stk/" defmacro special_storage_keys_end, do: "\xFF\xFF/stk0" defmacro special_server_keys_prefix, do: "\xFF\xFF/sk/" defmacro special_server_keys_end, do: "\xFF\xFF/sk0" defmacro special_shard_import_prefix, do: "\xFF\xFF/si/" defmacro special_shard_import_end, do: "\xFF\xFF/si0" defmacro special_byte_sample_prefix, do: "\xFF\xFF/bs/" defmacro special_byte_sample_end, do: "\xFF\xFF/bs0" defmacro special_id_key, do: "\xFF\xFF/id" defmacro special_storage_team_id_key, do: special_prefix() <> "/storage_team_id" defmacro next_shard_move_id_key, do: "\xFF/next_shard_move_id" defmacro meta_tag, do: -1 def mvcc_window, do: 5_000_000 defmacro coordinator_config_prefix, do: "config/" defmacro coordinator_config_end, do: "config0" defguard is_database_key(key) when is_binary(key) and key >= database_prefix() and key < database_end() defguard is_database_range(start_key, end_key) when is_binary(start_key) and is_binary(end_key) and start_key < end_key and start_key >= database_prefix() and start_key < database_end() and end_key <= database_end() @spec next_key(binary) :: binary def next_key(key) when is_binary(key), do: key <> "\x00" @spec mutation_key(mutation) :: binary def mutation_key({:write, key, _value}), do: key def mutation_key({:clear, key}), do: key @spec meta_mutation?(mutation) :: boolean def meta_mutation?({:write, meta_prefix() <> _, _value}), do: true def meta_mutation?({:write, _key, _value}), do: false def meta_mutation?({:clear, meta_prefix() <> _}), do: true def meta_mutation?({:clear, _key}), do: false def meta_mutation?({:clear_range, _sk, meta_prefix() <> _}), do: true def meta_mutation?({:clear_range, _sk, _ek}), do: false @spec special_mutation?(mutation) :: boolean def special_mutation?({:write, special_storage_keys_prefix() <> _key, _value}), do: true def special_mutation?({:write, special_server_keys_prefix() <> _key, _value}), do: true def special_mutation?({:write, _key, _value}), do: false def special_mutation?({:clear, special_storage_keys_prefix() <> _key}), do: true def special_mutation?({:clear, special_server_keys_prefix() <> _key}), do: true def special_mutation?({:clear, _key}), do: false # Special range clears are not currently supported def special_mutation?({:clear_range, _sk, _ek}), do: false @spec pack_storage_team_pair(non_neg_integer, [non_neg_integer]) :: {binary, binary} def pack_storage_team_pair(storage_team_id, storage_server_ids) when is_integer(storage_team_id) and is_list(storage_server_ids) do { storage_teams_prefix() <> Keyset.pack([storage_team_id]), Keyset.pack(storage_server_ids), } end @spec unpack_storage_team_pair({binary, binary}) :: {non_neg_integer, [non_neg_integer]} def unpack_storage_team_pair({storage_teams_prefix() <> key, value}) do [storage_team_id] = Keyset.unpack(key) storage_server_ids = Keyset.unpack(value) {storage_team_id, storage_server_ids} end @spec pack_key_storage_pair(binary, non_neg_integer, non_neg_integer | nil) :: {binary, binary} def pack_key_storage_pair(shard_start_key, from_storage_team_id, to_storage_team_id) when is_binary(shard_start_key) and is_integer(from_storage_team_id) and (is_integer(to_storage_team_id) or is_nil(to_storage_team_id)) do { key_storage_prefix() <> shard_start_key, Keyset.pack([from_storage_team_id, to_storage_team_id]), } end @spec unpack_key_storage_value(binary) :: [non_neg_integer | nil] def unpack_key_storage_value(value) when is_binary(value) do [_to_storage_team_id, _from_storage_team_id] = Keyset.unpack(value) end @spec pack_key_servers([integer], [integer]) :: binary def pack_key_servers(from_ids, to_ids) when is_list(from_ids) and is_list(to_ids) do Keyset.pack([from_ids, to_ids]) end @spec unpack_key_servers(binary) :: [[integer]] def unpack_key_servers(value) when is_binary(value) do [_from_ids, _to_ids] = Keyset.unpack(value) end def pack_server_keys_key(server_id, start_key) when is_integer(server_id) and is_binary(start_key) do Keyset.pack([server_id, start_key]) end def unpack_server_keys_key(binary) when is_binary(binary) do [_server_id, _start_key] = Keyset.unpack(binary) end def pack_server_keys_value(shard_move_id, end_key, status) when (is_integer(shard_move_id) or is_nil(shard_move_id)) and is_binary(end_key) and status in ["fetching", "complete"] do Keyset.pack([shard_move_id, end_key, status]) end def unpack_server_keys_value(nil), do: [nil, nil, nil] def unpack_server_keys_value(binary) when is_binary(binary) do [_shard_move_id, _end_key, _status] = Keyset.unpack(binary) end # These TLogGeneration pairs are stored in Coordinator keyspace, not the database @spec pack_tlog_generation_pair(TLogGeneration.t) :: {binary, binary} def pack_tlog_generation_pair(%TLogGeneration{} = gen) do { "gen/" <> Keyset.pack([gen.generation]), Keyset.pack([gen.start_version, gen.replication_factor, gen.tlog_ids]), } end @spec unpack_tlog_generation_pair({binary, binary}) :: TLogGeneration.t def unpack_tlog_generation_pair({"gen/" <> key, value} = _pair) when is_binary(value) do [generation] = Keyset.unpack(key) [start_version, replication_factor, tlog_ids] = Keyset.unpack(value) assert is_integer(generation) assert generation > 0 assert is_integer(start_version) assert start_version >= 0 assert replication_factor in [1, 2, 3] assert is_list(tlog_ids) for id <- tlog_ids, do: assert is_integer(id) %TLogGeneration{generation: generation, start_version: start_version, replication_factor: replication_factor, tlog_ids: tlog_ids} end @spec get_servers(%Cluster{}, module) :: [%Server{}] def get_servers(%Cluster{} = cluster, type) when is_atom(type) do cluster.servers |> Map.values() |> Enum.filter(&(&1.type == type)) |> Enum.sort_by(&(&1.id)) end @spec random_not_nil([term | nil]) :: {:ok, term} | :error def random_not_nil(choices) do case Enum.reject(choices, &is_nil/1) do [] -> :error remaining -> {:ok, Enum.random(remaining)} end end def compute_special_mutations(mutations) when is_list(mutations) do Enum.reduce(mutations, [], fn {:write, storage_keys_prefix() <> k, v}, acc -> [{:write, special_storage_keys_prefix() <> k, v} | acc] {:clear, storage_keys_prefix() <> k}, acc -> [{:clear, special_storage_keys_prefix() <> k} | acc] _, acc -> acc end) |> Enum.reverse() end @spec intersect_ranges({binary, binary}, [{binary, binary, term}]) :: [{binary, binary, term}] def intersect_ranges({start_key, end_key} = _range, ranges) when is_list(ranges) do ranges |> Enum.map(fn {s, e, data} -> case intersect({start_key, end_key}, {s, e}) do {:ok, {is, ie}} -> {is, ie, data} {:error, _error} -> nil end end) |> Enum.reject(&(&1 == nil)) end defp intersect({s1, e1}, {s2, e2}) do is = max(s1, s2) ie = min(e1, e2) case is < ie do true -> {:ok, {is, ie}} false -> {:error, :ranges_do_not_overlap} end end @spec buddy_tlog_id([non_neg_integer], non_neg_integer) :: non_neg_integer def buddy_tlog_id(tlog_ids, server_id) when is_list(tlog_ids) and is_integer(server_id) do index = rem(server_id, length(tlog_ids)) Enum.at(tlog_ids, index) end @spec tlog_generation_for_version([TLogGeneration.t], non_neg_integer) :: TLogGeneration.t def tlog_generation_for_version(generations, version) when is_list(generations) and is_integer(version) do case Enum.find(generations, fn %TLogGeneration{} = tlg -> tlg.start_version <= version end) do %TLogGeneration{} = generation -> generation nil -> raise "Could not find a generation for version #{inspect(version)} in #{inspect(generations)}" end end @doc """ Merges two sorted lists of key/value pairs. If both lists contain the same key, the value from the second list is used. **Warning: The lists MUST be sorted by key in advance.** ## Examples iex> l1 = [{"foo", "bar"}, {"foo2", "bar2"}, {"zoo", "baz"}] iex> l2 = [{"aoo", "aar"}, {"foo2", "different_bar2"}] iex> merge_pairs(l1, l2) [ {"aoo", "aar"}, {"foo", "bar"}, {"foo2", "different_bar2"}, {"zoo", "baz"}, ] """ def merge_pairs(list1, list2), do: do_merge(list1, list2, []) |> Enum.reverse() defp do_merge([], [], acc), do: acc defp do_merge([p1 | rest1], [], acc), do: do_merge(rest1, [], [p1 | acc]) defp do_merge([], [p2 | rest2], acc), do: do_merge([], rest2, [p2 | acc]) defp do_merge([{k1, _} = p1 | rest1] = list1, [{k2, _} = p2 | rest2] = list2, acc) do cond do k1 < k2 -> do_merge(rest1, list2, [p1 | acc]) k1 > k2 -> do_merge(list1, rest2, [p2 | acc]) k1 == k2 -> do_merge(rest1, rest2, [p2 | acc]) end end @doc """ Interleaves two sorted lists of batches, combining batches with the same version number. **Warning: The lists MUST be sorted by version in advance.** ## Examples iex> b1 = [{1, [:a], {2, [:b]}] iex> b2 = [{2, [:c]}, {4, [:d]}] iex> interleave_batches(b1, b2) [ {1, [:a], []}, {2, [:b], [:c]}, {4, [], [:d]}, ] """ @spec interleave_batches([{integer, list}], [{integer, list}]) :: [{integer, list}] def interleave_batches(batches1, batches2) do interleave(batches1, batches2, []) |> Enum.reverse() end defp interleave([{ver1, _} = batch1 | rest1] = batches1, [{ver2, _} = batch2 | rest2] = batches2, acc) do cond do ver1 < ver2 -> {ver1, l1} = batch1 interleave(rest1, batches2, [{ver1, l1, []} | acc]) ver1 > ver2 -> {ver2, l2} = batch2 interleave(batches1, rest2, [{ver2, [], l2} | acc]) ver1 == ver2 -> {_, l1} = batch1 {_, l2} = batch2 interleave(rest1, rest2, [{ver1, l1, l2} | acc]) end end defp interleave([{ver1, l1} | rest1], [], acc), do: interleave(rest1, [], [{ver1, l1, []} | acc]) defp interleave([], [{ver2, l2} | rest2], acc), do: interleave([], rest2, [{ver2, [], l2} | acc]) defp interleave([], [], acc), do: acc @doc """ Merges a list of lists of versioned batches of numbered mutations (e.g. TLog peek replies). ## Examples iex> tlog_pids |> Enum.map(&TLog.peek(&1, tag, sv, ev)) |> merge_batches() [ { 100, [ {0, {:write, "foo", "bar"}}, {1, {:clear, "foo"}}, ] }, {101, ...}, {102, ...}, ] """ @spec merge_batches([[{non_neg_integer, [Utils.numbered_mutation]}]]) :: [{non_neg_integer, [Utils.numbered_mutation]}] def merge_batches(batches) do batches |> Enum.concat() |> Enum.group_by(fn {version, _mutations} -> version end, fn {_version, mutations} -> mutations end) |> Enum.map(fn {version, mut_batches} -> # TODO: this could be done more efficiently with a K-way merge since all batches are sorted # But, for now, this function is not really on the hot path (only needed after TLog failure) mutations = mut_batches |> Enum.concat() |> Enum.uniq_by(fn {i, _mut} -> i end) |> Enum.sort() {version, mutations} end) |> Enum.sort() end @doc """ Attempts to run `fun`, but backs off and retries if `{:error, :read_version_too_new}` is received. ## Examples iex> too_new_backoff(fn -> Storage.read(...) end) {:ok, ...} iex> too_new_backoff(fn -> Storage.read(...) end) {:error, :too_many_retries} """ @spec too_new_backoff(function, non_neg_integer) :: term | {:error, :too_many_retries} def too_new_backoff(fun, attempt \\ 0) def too_new_backoff(_fun, 10), do: {:error, :too_many_retries} def too_new_backoff(fun, attempt) when is_integer(attempt) do case fun.() do {:error, :read_version_too_new} -> delay = floor(Float.pow(1.6, attempt + 1)) SimProcess.sleep(delay) too_new_backoff(fun, attempt + 1) result -> result end end @spec backoff(term, ({:cont | :halt, term} -> term), non_neg_integer, non_neg_integer) :: term def backoff(acc \\ nil, fun, count \\ 6, attempt \\ 0) def backoff(_acc, _fun, count, attempt) when attempt == count do {:error, :too_many_retries} end def backoff(acc, fun, count, attempt) when is_function(fun, 1) and is_integer(count) and is_integer(attempt) do case fun.(acc) do {:cont, acc} -> delay = Integer.pow(2, attempt + 1) SimProcess.sleep(delay) backoff(acc, fun, count, attempt + 1) {:halt, acc} -> acc end end @spec inc_stat(map, term, number) :: map def inc_stat(stats, key, amount \\ 1) when is_map(stats) do Map.update(stats, key, amount, &(&1 + amount)) end @spec sum_stats([map]) :: map def sum_stats(stats_list) when is_list(stats_list) do Enum.reduce(stats_list, %{}, fn stats, acc -> Enum.reduce(stats, acc, fn {k, v}, acc -> inc_stat(acc, k, v) end) end) end @spec current_time :: integer def current_time do Trinity.SimSystem.monotonic_time(:microsecond) end @doc """ Times a function and logs how long it takes. ## Examples iex> time_fn("hello world", fn -> :foo end) :foo $ "hello world took 0 ms" """ def time_fn(label, func) when is_binary(label) and is_function(func) do {time, result} = :timer.tc(func) IO.inspect "#{label} took #{time / 1000} ms" result end end