this repo has no description
at master 461 lines 16 kB view raw
1defmodule Hobbes.Utils do 2 @moduledoc """ 3 Hobbes utils. 4 """ 5 6 alias Hobbes.Structs.{Cluster, TLogGeneration, Server} 7 alias Hobbes.Encoding.Keyset 8 9 alias Trinity.SimProcess 10 11 import ExUnit.Assertions, only: [assert: 1] 12 13 @type mutation :: {:write, binary, binary} | {:clear, binary} | {:clear_range, binary, binary} 14 @type numbered_mutation :: {non_neg_integer, mutation} 15 @type tag_list :: [integer] 16 @type tagged_mutation :: {tag_list, numbered_mutation} 17 18 defmacro normal_prefix, do: "" 19 defmacro normal_end, do: "\xFF" 20 defmacro meta_prefix, do: "\xFF" 21 defmacro meta_end, do: "\xFF\xFF" 22 defmacro database_prefix, do: normal_prefix() 23 defmacro database_end, do: meta_end() 24 25 # TODO: remove all_keys in favor of database 26 defmacro all_keys_prefix, do: "" 27 defmacro all_keys_end, do: meta_end() 28 29 defmacro storage_teams_prefix, do: "\xFF/storage_teams/" 30 defmacro storage_teams_end, do: "\xFF/storage_teams0" 31 defmacro server_tags_prefix, do: "\xFF/st/" 32 defmacro server_tags_end, do: "\xFF/st0" 33 34 defmacro key_storage_prefix, do: "\xFF/ks/" 35 defmacro key_storage_end, do: "\xFF/ks0" 36 # TODO: rename to "sk/" once server_keys has been removed 37 defmacro storage_keys_prefix, do: "\xFF/stk/" 38 defmacro storage_keys_end, do: "\xFF/stk0" 39 40 defmacro key_servers_prefix, do: "\xFF/key_servers/" 41 defmacro key_servers_end, do: "\xFF/key_servers0" 42 defmacro server_keys_prefix, do: "\xFF/sk/" 43 defmacro server_keys_end, do: "\xFF/sk0" 44 45 defmacro shard_moves_prefix, do: "\xFF/sm/" 46 defmacro shard_moves_end, do: "\xFF/sm0" 47 48 defmacro special_prefix, do: "\xFF\xFF" 49 defmacro special_end, do: "\xFF\xFF\xFF\xFF" 50 51 # TODO: rename to "sk/" once server_keys has been removed 52 defmacro special_storage_keys_prefix, do: "\xFF\xFF/stk/" 53 defmacro special_storage_keys_end, do: "\xFF\xFF/stk0" 54 55 defmacro special_server_keys_prefix, do: "\xFF\xFF/sk/" 56 defmacro special_server_keys_end, do: "\xFF\xFF/sk0" 57 58 defmacro special_shard_import_prefix, do: "\xFF\xFF/si/" 59 defmacro special_shard_import_end, do: "\xFF\xFF/si0" 60 61 defmacro special_byte_sample_prefix, do: "\xFF\xFF/bs/" 62 defmacro special_byte_sample_end, do: "\xFF\xFF/bs0" 63 64 defmacro special_id_key, do: "\xFF\xFF/id" 65 defmacro special_storage_team_id_key, do: special_prefix() <> "/storage_team_id" 66 67 defmacro next_shard_move_id_key, do: "\xFF/next_shard_move_id" 68 69 defmacro meta_tag, do: -1 70 71 def mvcc_window, do: 5_000_000 72 73 defmacro coordinator_config_prefix, do: "config/" 74 defmacro coordinator_config_end, do: "config0" 75 76 defguard is_database_key(key) when is_binary(key) and key >= database_prefix() and key < database_end() 77 78 defguard is_database_range(start_key, end_key) 79 when is_binary(start_key) and is_binary(end_key) 80 and start_key < end_key 81 and start_key >= database_prefix() and start_key < database_end() 82 and end_key <= database_end() 83 84 @spec next_key(binary) :: binary 85 def next_key(key) when is_binary(key), do: key <> "\x00" 86 87 @spec mutation_key(mutation) :: binary 88 def mutation_key({:write, key, _value}), do: key 89 def mutation_key({:clear, key}), do: key 90 91 @spec meta_mutation?(mutation) :: boolean 92 def meta_mutation?({:write, meta_prefix() <> _, _value}), do: true 93 def meta_mutation?({:write, _key, _value}), do: false 94 95 def meta_mutation?({:clear, meta_prefix() <> _}), do: true 96 def meta_mutation?({:clear, _key}), do: false 97 98 def meta_mutation?({:clear_range, _sk, meta_prefix() <> _}), do: true 99 def meta_mutation?({:clear_range, _sk, _ek}), do: false 100 101 @spec special_mutation?(mutation) :: boolean 102 def special_mutation?({:write, special_storage_keys_prefix() <> _key, _value}), do: true 103 def special_mutation?({:write, special_server_keys_prefix() <> _key, _value}), do: true 104 def special_mutation?({:write, _key, _value}), do: false 105 106 def special_mutation?({:clear, special_storage_keys_prefix() <> _key}), do: true 107 def special_mutation?({:clear, special_server_keys_prefix() <> _key}), do: true 108 def special_mutation?({:clear, _key}), do: false 109 110 # Special range clears are not currently supported 111 def special_mutation?({:clear_range, _sk, _ek}), do: false 112 113 @spec pack_storage_team_pair(non_neg_integer, [non_neg_integer]) :: {binary, binary} 114 def pack_storage_team_pair(storage_team_id, storage_server_ids) 115 when is_integer(storage_team_id) and is_list(storage_server_ids) do 116 { 117 storage_teams_prefix() <> Keyset.pack([storage_team_id]), 118 Keyset.pack(storage_server_ids), 119 } 120 end 121 122 @spec unpack_storage_team_pair({binary, binary}) :: {non_neg_integer, [non_neg_integer]} 123 def unpack_storage_team_pair({storage_teams_prefix() <> key, value}) do 124 [storage_team_id] = Keyset.unpack(key) 125 storage_server_ids = Keyset.unpack(value) 126 {storage_team_id, storage_server_ids} 127 end 128 129 @spec pack_key_storage_pair(binary, non_neg_integer, non_neg_integer | nil) :: {binary, binary} 130 def pack_key_storage_pair(shard_start_key, from_storage_team_id, to_storage_team_id) 131 when is_binary(shard_start_key) and is_integer(from_storage_team_id) and (is_integer(to_storage_team_id) or is_nil(to_storage_team_id)) do 132 { 133 key_storage_prefix() <> shard_start_key, 134 Keyset.pack([from_storage_team_id, to_storage_team_id]), 135 } 136 end 137 138 @spec unpack_key_storage_value(binary) :: [non_neg_integer | nil] 139 def unpack_key_storage_value(value) when is_binary(value) do 140 [_to_storage_team_id, _from_storage_team_id] = Keyset.unpack(value) 141 end 142 143 @spec pack_key_servers([integer], [integer]) :: binary 144 def pack_key_servers(from_ids, to_ids) when is_list(from_ids) and is_list(to_ids) do 145 Keyset.pack([from_ids, to_ids]) 146 end 147 148 @spec unpack_key_servers(binary) :: [[integer]] 149 def unpack_key_servers(value) when is_binary(value) do 150 [_from_ids, _to_ids] = Keyset.unpack(value) 151 end 152 153 def pack_server_keys_key(server_id, start_key) when is_integer(server_id) and is_binary(start_key) do 154 Keyset.pack([server_id, start_key]) 155 end 156 157 def unpack_server_keys_key(binary) when is_binary(binary) do 158 [_server_id, _start_key] = Keyset.unpack(binary) 159 end 160 161 def pack_server_keys_value(shard_move_id, end_key, status) 162 when (is_integer(shard_move_id) or is_nil(shard_move_id)) and is_binary(end_key) and status in ["fetching", "complete"] do 163 Keyset.pack([shard_move_id, end_key, status]) 164 end 165 166 def unpack_server_keys_value(nil), do: [nil, nil, nil] 167 168 def unpack_server_keys_value(binary) when is_binary(binary) do 169 [_shard_move_id, _end_key, _status] = Keyset.unpack(binary) 170 end 171 172 # These TLogGeneration pairs are stored in Coordinator keyspace, not the database 173 @spec pack_tlog_generation_pair(TLogGeneration.t) :: {binary, binary} 174 def pack_tlog_generation_pair(%TLogGeneration{} = gen) do 175 { 176 "gen/" <> Keyset.pack([gen.generation]), 177 Keyset.pack([gen.start_version, gen.replication_factor, gen.tlog_ids]), 178 } 179 end 180 181 @spec unpack_tlog_generation_pair({binary, binary}) :: TLogGeneration.t 182 def unpack_tlog_generation_pair({"gen/" <> key, value} = _pair) when is_binary(value) do 183 [generation] = Keyset.unpack(key) 184 [start_version, replication_factor, tlog_ids] = Keyset.unpack(value) 185 186 assert is_integer(generation) 187 assert generation > 0 188 assert is_integer(start_version) 189 assert start_version >= 0 190 assert replication_factor in [1, 2, 3] 191 assert is_list(tlog_ids) 192 for id <- tlog_ids, do: assert is_integer(id) 193 194 %TLogGeneration{generation: generation, start_version: start_version, replication_factor: replication_factor, tlog_ids: tlog_ids} 195 end 196 197 @spec get_servers(%Cluster{}, module) :: [%Server{}] 198 def get_servers(%Cluster{} = cluster, type) when is_atom(type) do 199 cluster.servers 200 |> Map.values() 201 |> Enum.filter(&(&1.type == type)) 202 |> Enum.sort_by(&(&1.id)) 203 end 204 205 @spec random_not_nil([term | nil]) :: {:ok, term} | :error 206 def random_not_nil(choices) do 207 case Enum.reject(choices, &is_nil/1) do 208 [] -> :error 209 remaining -> {:ok, Enum.random(remaining)} 210 end 211 end 212 213 def compute_special_mutations(mutations) when is_list(mutations) do 214 Enum.reduce(mutations, [], fn 215 {:write, storage_keys_prefix() <> k, v}, acc -> 216 [{:write, special_storage_keys_prefix() <> k, v} | acc] 217 218 {:clear, storage_keys_prefix() <> k}, acc -> 219 [{:clear, special_storage_keys_prefix() <> k} | acc] 220 221 _, acc -> acc 222 end) 223 |> Enum.reverse() 224 end 225 226 @spec intersect_ranges({binary, binary}, [{binary, binary, term}]) :: [{binary, binary, term}] 227 def intersect_ranges({start_key, end_key} = _range, ranges) when is_list(ranges) do 228 ranges 229 |> Enum.map(fn {s, e, data} -> 230 case intersect({start_key, end_key}, {s, e}) do 231 {:ok, {is, ie}} -> {is, ie, data} 232 {:error, _error} -> nil 233 end 234 end) 235 |> Enum.reject(&(&1 == nil)) 236 end 237 238 defp intersect({s1, e1}, {s2, e2}) do 239 is = max(s1, s2) 240 ie = min(e1, e2) 241 case is < ie do 242 true -> {:ok, {is, ie}} 243 false -> {:error, :ranges_do_not_overlap} 244 end 245 end 246 247 @spec buddy_tlog_id([non_neg_integer], non_neg_integer) :: non_neg_integer 248 def buddy_tlog_id(tlog_ids, server_id) when is_list(tlog_ids) and is_integer(server_id) do 249 index = rem(server_id, length(tlog_ids)) 250 Enum.at(tlog_ids, index) 251 end 252 253 @spec tlog_generation_for_version([TLogGeneration.t], non_neg_integer) :: TLogGeneration.t 254 def tlog_generation_for_version(generations, version) when is_list(generations) and is_integer(version) do 255 case Enum.find(generations, fn %TLogGeneration{} = tlg -> tlg.start_version <= version end) do 256 %TLogGeneration{} = generation -> 257 generation 258 259 nil -> raise "Could not find a generation for version #{inspect(version)} in #{inspect(generations)}" 260 end 261 end 262 263 @doc """ 264 Merges two sorted lists of key/value pairs. 265 266 If both lists contain the same key, the value from the 267 second list is used. 268 269 **Warning: The lists MUST be sorted by key in advance.** 270 271 ## Examples 272 273 iex> l1 = [{"foo", "bar"}, {"foo2", "bar2"}, {"zoo", "baz"}] 274 iex> l2 = [{"aoo", "aar"}, {"foo2", "different_bar2"}] 275 iex> merge_pairs(l1, l2) 276 [ 277 {"aoo", "aar"}, 278 {"foo", "bar"}, 279 {"foo2", "different_bar2"}, 280 {"zoo", "baz"}, 281 ] 282 283 """ 284 def merge_pairs(list1, list2), do: do_merge(list1, list2, []) |> Enum.reverse() 285 286 defp do_merge([], [], acc), do: acc 287 defp do_merge([p1 | rest1], [], acc), do: do_merge(rest1, [], [p1 | acc]) 288 defp do_merge([], [p2 | rest2], acc), do: do_merge([], rest2, [p2 | acc]) 289 290 defp do_merge([{k1, _} = p1 | rest1] = list1, [{k2, _} = p2 | rest2] = list2, acc) do 291 cond do 292 k1 < k2 -> 293 do_merge(rest1, list2, [p1 | acc]) 294 k1 > k2 -> 295 do_merge(list1, rest2, [p2 | acc]) 296 k1 == k2 -> 297 do_merge(rest1, rest2, [p2 | acc]) 298 end 299 end 300 301 @doc """ 302 Interleaves two sorted lists of batches, combining batches with the same version number. 303 304 **Warning: The lists MUST be sorted by version in advance.** 305 306 307 ## Examples 308 309 iex> b1 = [{1, [:a], {2, [:b]}] 310 iex> b2 = [{2, [:c]}, {4, [:d]}] 311 iex> interleave_batches(b1, b2) 312 [ 313 {1, [:a], []}, 314 {2, [:b], [:c]}, 315 {4, [], [:d]}, 316 ] 317 318 """ 319 @spec interleave_batches([{integer, list}], [{integer, list}]) :: [{integer, list}] 320 def interleave_batches(batches1, batches2) do 321 interleave(batches1, batches2, []) 322 |> Enum.reverse() 323 end 324 325 defp interleave([{ver1, _} = batch1 | rest1] = batches1, [{ver2, _} = batch2 | rest2] = batches2, acc) do 326 cond do 327 ver1 < ver2 -> 328 {ver1, l1} = batch1 329 interleave(rest1, batches2, [{ver1, l1, []} | acc]) 330 ver1 > ver2 -> 331 {ver2, l2} = batch2 332 interleave(batches1, rest2, [{ver2, [], l2} | acc]) 333 ver1 == ver2 -> 334 {_, l1} = batch1 335 {_, l2} = batch2 336 interleave(rest1, rest2, [{ver1, l1, l2} | acc]) 337 end 338 end 339 defp interleave([{ver1, l1} | rest1], [], acc), do: interleave(rest1, [], [{ver1, l1, []} | acc]) 340 defp interleave([], [{ver2, l2} | rest2], acc), do: interleave([], rest2, [{ver2, [], l2} | acc]) 341 defp interleave([], [], acc), do: acc 342 343 @doc """ 344 Merges a list of lists of versioned batches of numbered mutations (e.g. TLog peek replies). 345 346 ## Examples 347 348 iex> tlog_pids |> Enum.map(&TLog.peek(&1, tag, sv, ev)) |> merge_batches() 349 [ 350 { 351 100, 352 [ 353 {0, {:write, "foo", "bar"}}, 354 {1, {:clear, "foo"}}, 355 ] 356 }, 357 {101, ...}, 358 {102, ...}, 359 ] 360 """ 361 @spec merge_batches([[{non_neg_integer, [Utils.numbered_mutation]}]]) :: [{non_neg_integer, [Utils.numbered_mutation]}] 362 def merge_batches(batches) do 363 batches 364 |> Enum.concat() 365 |> Enum.group_by(fn {version, _mutations} -> version end, fn {_version, mutations} -> mutations end) 366 |> Enum.map(fn {version, mut_batches} -> 367 # TODO: this could be done more efficiently with a K-way merge since all batches are sorted 368 # But, for now, this function is not really on the hot path (only needed after TLog failure) 369 mutations = 370 mut_batches 371 |> Enum.concat() 372 |> Enum.uniq_by(fn {i, _mut} -> i end) 373 |> Enum.sort() 374 {version, mutations} 375 end) 376 |> Enum.sort() 377 end 378 379 @doc """ 380 Attempts to run `fun`, but backs off and retries 381 if `{:error, :read_version_too_new}` is received. 382 383 ## Examples 384 385 iex> too_new_backoff(fn -> Storage.read(...) end) 386 {:ok, ...} 387 388 iex> too_new_backoff(fn -> Storage.read(...) end) 389 {:error, :too_many_retries} 390 391 """ 392 @spec too_new_backoff(function, non_neg_integer) :: term | {:error, :too_many_retries} 393 def too_new_backoff(fun, attempt \\ 0) 394 395 def too_new_backoff(_fun, 10), do: {:error, :too_many_retries} 396 397 def too_new_backoff(fun, attempt) when is_integer(attempt) do 398 case fun.() do 399 {:error, :read_version_too_new} -> 400 delay = floor(Float.pow(1.6, attempt + 1)) 401 SimProcess.sleep(delay) 402 too_new_backoff(fun, attempt + 1) 403 result -> 404 result 405 end 406 end 407 408 @spec backoff(term, ({:cont | :halt, term} -> term), non_neg_integer, non_neg_integer) :: term 409 def backoff(acc \\ nil, fun, count \\ 6, attempt \\ 0) 410 411 def backoff(_acc, _fun, count, attempt) when attempt == count do 412 {:error, :too_many_retries} 413 end 414 415 def backoff(acc, fun, count, attempt) when is_function(fun, 1) and is_integer(count) and is_integer(attempt) do 416 case fun.(acc) do 417 {:cont, acc} -> 418 delay = Integer.pow(2, attempt + 1) 419 SimProcess.sleep(delay) 420 backoff(acc, fun, count, attempt + 1) 421 {:halt, acc} -> 422 acc 423 end 424 end 425 426 @spec inc_stat(map, term, number) :: map 427 def inc_stat(stats, key, amount \\ 1) when is_map(stats) do 428 Map.update(stats, key, amount, &(&1 + amount)) 429 end 430 431 @spec sum_stats([map]) :: map 432 def sum_stats(stats_list) when is_list(stats_list) do 433 Enum.reduce(stats_list, %{}, fn stats, acc -> 434 Enum.reduce(stats, acc, fn {k, v}, acc -> 435 inc_stat(acc, k, v) 436 end) 437 end) 438 end 439 440 @spec current_time :: integer 441 def current_time do 442 Trinity.SimSystem.monotonic_time(:microsecond) 443 end 444 445 @doc """ 446 Times a function and logs how long it takes. 447 448 ## Examples 449 450 iex> time_fn("hello world", fn -> :foo end) 451 :foo 452 453 $ "hello world took 0 ms" 454 455 """ 456 def time_fn(label, func) when is_binary(label) and is_function(func) do 457 {time, result} = :timer.tc(func) 458 IO.inspect "#{label} took #{time / 1000} ms" 459 result 460 end 461end