this repo has no description
1defmodule Hobbes.Utils do
2 @moduledoc """
3 Hobbes utils.
4 """
5
6 alias Hobbes.Structs.{Cluster, TLogGeneration, Server}
7 alias Hobbes.Encoding.Keyset
8
9 alias Trinity.SimProcess
10
11 import ExUnit.Assertions, only: [assert: 1]
12
13 @type mutation :: {:write, binary, binary} | {:clear, binary} | {:clear_range, binary, binary}
14 @type numbered_mutation :: {non_neg_integer, mutation}
15 @type tag_list :: [integer]
16 @type tagged_mutation :: {tag_list, numbered_mutation}
17
18 defmacro normal_prefix, do: ""
19 defmacro normal_end, do: "\xFF"
20 defmacro meta_prefix, do: "\xFF"
21 defmacro meta_end, do: "\xFF\xFF"
22 defmacro database_prefix, do: normal_prefix()
23 defmacro database_end, do: meta_end()
24
25 # TODO: remove all_keys in favor of database
26 defmacro all_keys_prefix, do: ""
27 defmacro all_keys_end, do: meta_end()
28
29 defmacro storage_teams_prefix, do: "\xFF/storage_teams/"
30 defmacro storage_teams_end, do: "\xFF/storage_teams0"
31 defmacro server_tags_prefix, do: "\xFF/st/"
32 defmacro server_tags_end, do: "\xFF/st0"
33
34 defmacro key_storage_prefix, do: "\xFF/ks/"
35 defmacro key_storage_end, do: "\xFF/ks0"
36 # TODO: rename to "sk/" once server_keys has been removed
37 defmacro storage_keys_prefix, do: "\xFF/stk/"
38 defmacro storage_keys_end, do: "\xFF/stk0"
39
40 defmacro key_servers_prefix, do: "\xFF/key_servers/"
41 defmacro key_servers_end, do: "\xFF/key_servers0"
42 defmacro server_keys_prefix, do: "\xFF/sk/"
43 defmacro server_keys_end, do: "\xFF/sk0"
44
45 defmacro shard_moves_prefix, do: "\xFF/sm/"
46 defmacro shard_moves_end, do: "\xFF/sm0"
47
48 defmacro special_prefix, do: "\xFF\xFF"
49 defmacro special_end, do: "\xFF\xFF\xFF\xFF"
50
51 # TODO: rename to "sk/" once server_keys has been removed
52 defmacro special_storage_keys_prefix, do: "\xFF\xFF/stk/"
53 defmacro special_storage_keys_end, do: "\xFF\xFF/stk0"
54
55 defmacro special_server_keys_prefix, do: "\xFF\xFF/sk/"
56 defmacro special_server_keys_end, do: "\xFF\xFF/sk0"
57
58 defmacro special_shard_import_prefix, do: "\xFF\xFF/si/"
59 defmacro special_shard_import_end, do: "\xFF\xFF/si0"
60
61 defmacro special_byte_sample_prefix, do: "\xFF\xFF/bs/"
62 defmacro special_byte_sample_end, do: "\xFF\xFF/bs0"
63
64 defmacro special_id_key, do: "\xFF\xFF/id"
65 defmacro special_storage_team_id_key, do: special_prefix() <> "/storage_team_id"
66
67 defmacro next_shard_move_id_key, do: "\xFF/next_shard_move_id"
68
69 defmacro meta_tag, do: -1
70
71 def mvcc_window, do: 5_000_000
72
73 defmacro coordinator_config_prefix, do: "config/"
74 defmacro coordinator_config_end, do: "config0"
75
76 defguard is_database_key(key) when is_binary(key) and key >= database_prefix() and key < database_end()
77
78 defguard is_database_range(start_key, end_key)
79 when is_binary(start_key) and is_binary(end_key)
80 and start_key < end_key
81 and start_key >= database_prefix() and start_key < database_end()
82 and end_key <= database_end()
83
84 @spec next_key(binary) :: binary
85 def next_key(key) when is_binary(key), do: key <> "\x00"
86
87 @spec mutation_key(mutation) :: binary
88 def mutation_key({:write, key, _value}), do: key
89 def mutation_key({:clear, key}), do: key
90
91 @spec meta_mutation?(mutation) :: boolean
92 def meta_mutation?({:write, meta_prefix() <> _, _value}), do: true
93 def meta_mutation?({:write, _key, _value}), do: false
94
95 def meta_mutation?({:clear, meta_prefix() <> _}), do: true
96 def meta_mutation?({:clear, _key}), do: false
97
98 def meta_mutation?({:clear_range, _sk, meta_prefix() <> _}), do: true
99 def meta_mutation?({:clear_range, _sk, _ek}), do: false
100
101 @spec special_mutation?(mutation) :: boolean
102 def special_mutation?({:write, special_storage_keys_prefix() <> _key, _value}), do: true
103 def special_mutation?({:write, special_server_keys_prefix() <> _key, _value}), do: true
104 def special_mutation?({:write, _key, _value}), do: false
105
106 def special_mutation?({:clear, special_storage_keys_prefix() <> _key}), do: true
107 def special_mutation?({:clear, special_server_keys_prefix() <> _key}), do: true
108 def special_mutation?({:clear, _key}), do: false
109
110 # Special range clears are not currently supported
111 def special_mutation?({:clear_range, _sk, _ek}), do: false
112
113 @spec pack_storage_team_pair(non_neg_integer, [non_neg_integer]) :: {binary, binary}
114 def pack_storage_team_pair(storage_team_id, storage_server_ids)
115 when is_integer(storage_team_id) and is_list(storage_server_ids) do
116 {
117 storage_teams_prefix() <> Keyset.pack([storage_team_id]),
118 Keyset.pack(storage_server_ids),
119 }
120 end
121
122 @spec unpack_storage_team_pair({binary, binary}) :: {non_neg_integer, [non_neg_integer]}
123 def unpack_storage_team_pair({storage_teams_prefix() <> key, value}) do
124 [storage_team_id] = Keyset.unpack(key)
125 storage_server_ids = Keyset.unpack(value)
126 {storage_team_id, storage_server_ids}
127 end
128
129 @spec pack_key_storage_pair(binary, non_neg_integer, non_neg_integer | nil) :: {binary, binary}
130 def pack_key_storage_pair(shard_start_key, from_storage_team_id, to_storage_team_id)
131 when is_binary(shard_start_key) and is_integer(from_storage_team_id) and (is_integer(to_storage_team_id) or is_nil(to_storage_team_id)) do
132 {
133 key_storage_prefix() <> shard_start_key,
134 Keyset.pack([from_storage_team_id, to_storage_team_id]),
135 }
136 end
137
138 @spec unpack_key_storage_value(binary) :: [non_neg_integer | nil]
139 def unpack_key_storage_value(value) when is_binary(value) do
140 [_to_storage_team_id, _from_storage_team_id] = Keyset.unpack(value)
141 end
142
143 @spec pack_key_servers([integer], [integer]) :: binary
144 def pack_key_servers(from_ids, to_ids) when is_list(from_ids) and is_list(to_ids) do
145 Keyset.pack([from_ids, to_ids])
146 end
147
148 @spec unpack_key_servers(binary) :: [[integer]]
149 def unpack_key_servers(value) when is_binary(value) do
150 [_from_ids, _to_ids] = Keyset.unpack(value)
151 end
152
153 def pack_server_keys_key(server_id, start_key) when is_integer(server_id) and is_binary(start_key) do
154 Keyset.pack([server_id, start_key])
155 end
156
157 def unpack_server_keys_key(binary) when is_binary(binary) do
158 [_server_id, _start_key] = Keyset.unpack(binary)
159 end
160
161 def pack_server_keys_value(shard_move_id, end_key, status)
162 when (is_integer(shard_move_id) or is_nil(shard_move_id)) and is_binary(end_key) and status in ["fetching", "complete"] do
163 Keyset.pack([shard_move_id, end_key, status])
164 end
165
166 def unpack_server_keys_value(nil), do: [nil, nil, nil]
167
168 def unpack_server_keys_value(binary) when is_binary(binary) do
169 [_shard_move_id, _end_key, _status] = Keyset.unpack(binary)
170 end
171
172 # These TLogGeneration pairs are stored in Coordinator keyspace, not the database
173 @spec pack_tlog_generation_pair(TLogGeneration.t) :: {binary, binary}
174 def pack_tlog_generation_pair(%TLogGeneration{} = gen) do
175 {
176 "gen/" <> Keyset.pack([gen.generation]),
177 Keyset.pack([gen.start_version, gen.replication_factor, gen.tlog_ids]),
178 }
179 end
180
181 @spec unpack_tlog_generation_pair({binary, binary}) :: TLogGeneration.t
182 def unpack_tlog_generation_pair({"gen/" <> key, value} = _pair) when is_binary(value) do
183 [generation] = Keyset.unpack(key)
184 [start_version, replication_factor, tlog_ids] = Keyset.unpack(value)
185
186 assert is_integer(generation)
187 assert generation > 0
188 assert is_integer(start_version)
189 assert start_version >= 0
190 assert replication_factor in [1, 2, 3]
191 assert is_list(tlog_ids)
192 for id <- tlog_ids, do: assert is_integer(id)
193
194 %TLogGeneration{generation: generation, start_version: start_version, replication_factor: replication_factor, tlog_ids: tlog_ids}
195 end
196
197 @spec get_servers(%Cluster{}, module) :: [%Server{}]
198 def get_servers(%Cluster{} = cluster, type) when is_atom(type) do
199 cluster.servers
200 |> Map.values()
201 |> Enum.filter(&(&1.type == type))
202 |> Enum.sort_by(&(&1.id))
203 end
204
205 @spec random_not_nil([term | nil]) :: {:ok, term} | :error
206 def random_not_nil(choices) do
207 case Enum.reject(choices, &is_nil/1) do
208 [] -> :error
209 remaining -> {:ok, Enum.random(remaining)}
210 end
211 end
212
213 def compute_special_mutations(mutations) when is_list(mutations) do
214 Enum.reduce(mutations, [], fn
215 {:write, storage_keys_prefix() <> k, v}, acc ->
216 [{:write, special_storage_keys_prefix() <> k, v} | acc]
217
218 {:clear, storage_keys_prefix() <> k}, acc ->
219 [{:clear, special_storage_keys_prefix() <> k} | acc]
220
221 _, acc -> acc
222 end)
223 |> Enum.reverse()
224 end
225
226 @spec intersect_ranges({binary, binary}, [{binary, binary, term}]) :: [{binary, binary, term}]
227 def intersect_ranges({start_key, end_key} = _range, ranges) when is_list(ranges) do
228 ranges
229 |> Enum.map(fn {s, e, data} ->
230 case intersect({start_key, end_key}, {s, e}) do
231 {:ok, {is, ie}} -> {is, ie, data}
232 {:error, _error} -> nil
233 end
234 end)
235 |> Enum.reject(&(&1 == nil))
236 end
237
238 defp intersect({s1, e1}, {s2, e2}) do
239 is = max(s1, s2)
240 ie = min(e1, e2)
241 case is < ie do
242 true -> {:ok, {is, ie}}
243 false -> {:error, :ranges_do_not_overlap}
244 end
245 end
246
247 @spec buddy_tlog_id([non_neg_integer], non_neg_integer) :: non_neg_integer
248 def buddy_tlog_id(tlog_ids, server_id) when is_list(tlog_ids) and is_integer(server_id) do
249 index = rem(server_id, length(tlog_ids))
250 Enum.at(tlog_ids, index)
251 end
252
253 @spec tlog_generation_for_version([TLogGeneration.t], non_neg_integer) :: TLogGeneration.t
254 def tlog_generation_for_version(generations, version) when is_list(generations) and is_integer(version) do
255 case Enum.find(generations, fn %TLogGeneration{} = tlg -> tlg.start_version <= version end) do
256 %TLogGeneration{} = generation ->
257 generation
258
259 nil -> raise "Could not find a generation for version #{inspect(version)} in #{inspect(generations)}"
260 end
261 end
262
263 @doc """
264 Merges two sorted lists of key/value pairs.
265
266 If both lists contain the same key, the value from the
267 second list is used.
268
269 **Warning: The lists MUST be sorted by key in advance.**
270
271 ## Examples
272
273 iex> l1 = [{"foo", "bar"}, {"foo2", "bar2"}, {"zoo", "baz"}]
274 iex> l2 = [{"aoo", "aar"}, {"foo2", "different_bar2"}]
275 iex> merge_pairs(l1, l2)
276 [
277 {"aoo", "aar"},
278 {"foo", "bar"},
279 {"foo2", "different_bar2"},
280 {"zoo", "baz"},
281 ]
282
283 """
284 def merge_pairs(list1, list2), do: do_merge(list1, list2, []) |> Enum.reverse()
285
286 defp do_merge([], [], acc), do: acc
287 defp do_merge([p1 | rest1], [], acc), do: do_merge(rest1, [], [p1 | acc])
288 defp do_merge([], [p2 | rest2], acc), do: do_merge([], rest2, [p2 | acc])
289
290 defp do_merge([{k1, _} = p1 | rest1] = list1, [{k2, _} = p2 | rest2] = list2, acc) do
291 cond do
292 k1 < k2 ->
293 do_merge(rest1, list2, [p1 | acc])
294 k1 > k2 ->
295 do_merge(list1, rest2, [p2 | acc])
296 k1 == k2 ->
297 do_merge(rest1, rest2, [p2 | acc])
298 end
299 end
300
301 @doc """
302 Interleaves two sorted lists of batches, combining batches with the same version number.
303
304 **Warning: The lists MUST be sorted by version in advance.**
305
306
307 ## Examples
308
309 iex> b1 = [{1, [:a], {2, [:b]}]
310 iex> b2 = [{2, [:c]}, {4, [:d]}]
311 iex> interleave_batches(b1, b2)
312 [
313 {1, [:a], []},
314 {2, [:b], [:c]},
315 {4, [], [:d]},
316 ]
317
318 """
319 @spec interleave_batches([{integer, list}], [{integer, list}]) :: [{integer, list}]
320 def interleave_batches(batches1, batches2) do
321 interleave(batches1, batches2, [])
322 |> Enum.reverse()
323 end
324
325 defp interleave([{ver1, _} = batch1 | rest1] = batches1, [{ver2, _} = batch2 | rest2] = batches2, acc) do
326 cond do
327 ver1 < ver2 ->
328 {ver1, l1} = batch1
329 interleave(rest1, batches2, [{ver1, l1, []} | acc])
330 ver1 > ver2 ->
331 {ver2, l2} = batch2
332 interleave(batches1, rest2, [{ver2, [], l2} | acc])
333 ver1 == ver2 ->
334 {_, l1} = batch1
335 {_, l2} = batch2
336 interleave(rest1, rest2, [{ver1, l1, l2} | acc])
337 end
338 end
339 defp interleave([{ver1, l1} | rest1], [], acc), do: interleave(rest1, [], [{ver1, l1, []} | acc])
340 defp interleave([], [{ver2, l2} | rest2], acc), do: interleave([], rest2, [{ver2, [], l2} | acc])
341 defp interleave([], [], acc), do: acc
342
343 @doc """
344 Merges a list of lists of versioned batches of numbered mutations (e.g. TLog peek replies).
345
346 ## Examples
347
348 iex> tlog_pids |> Enum.map(&TLog.peek(&1, tag, sv, ev)) |> merge_batches()
349 [
350 {
351 100,
352 [
353 {0, {:write, "foo", "bar"}},
354 {1, {:clear, "foo"}},
355 ]
356 },
357 {101, ...},
358 {102, ...},
359 ]
360 """
361 @spec merge_batches([[{non_neg_integer, [Utils.numbered_mutation]}]]) :: [{non_neg_integer, [Utils.numbered_mutation]}]
362 def merge_batches(batches) do
363 batches
364 |> Enum.concat()
365 |> Enum.group_by(fn {version, _mutations} -> version end, fn {_version, mutations} -> mutations end)
366 |> Enum.map(fn {version, mut_batches} ->
367 # TODO: this could be done more efficiently with a K-way merge since all batches are sorted
368 # But, for now, this function is not really on the hot path (only needed after TLog failure)
369 mutations =
370 mut_batches
371 |> Enum.concat()
372 |> Enum.uniq_by(fn {i, _mut} -> i end)
373 |> Enum.sort()
374 {version, mutations}
375 end)
376 |> Enum.sort()
377 end
378
379 @doc """
380 Attempts to run `fun`, but backs off and retries
381 if `{:error, :read_version_too_new}` is received.
382
383 ## Examples
384
385 iex> too_new_backoff(fn -> Storage.read(...) end)
386 {:ok, ...}
387
388 iex> too_new_backoff(fn -> Storage.read(...) end)
389 {:error, :too_many_retries}
390
391 """
392 @spec too_new_backoff(function, non_neg_integer) :: term | {:error, :too_many_retries}
393 def too_new_backoff(fun, attempt \\ 0)
394
395 def too_new_backoff(_fun, 10), do: {:error, :too_many_retries}
396
397 def too_new_backoff(fun, attempt) when is_integer(attempt) do
398 case fun.() do
399 {:error, :read_version_too_new} ->
400 delay = floor(Float.pow(1.6, attempt + 1))
401 SimProcess.sleep(delay)
402 too_new_backoff(fun, attempt + 1)
403 result ->
404 result
405 end
406 end
407
408 @spec backoff(term, ({:cont | :halt, term} -> term), non_neg_integer, non_neg_integer) :: term
409 def backoff(acc \\ nil, fun, count \\ 6, attempt \\ 0)
410
411 def backoff(_acc, _fun, count, attempt) when attempt == count do
412 {:error, :too_many_retries}
413 end
414
415 def backoff(acc, fun, count, attempt) when is_function(fun, 1) and is_integer(count) and is_integer(attempt) do
416 case fun.(acc) do
417 {:cont, acc} ->
418 delay = Integer.pow(2, attempt + 1)
419 SimProcess.sleep(delay)
420 backoff(acc, fun, count, attempt + 1)
421 {:halt, acc} ->
422 acc
423 end
424 end
425
426 @spec inc_stat(map, term, number) :: map
427 def inc_stat(stats, key, amount \\ 1) when is_map(stats) do
428 Map.update(stats, key, amount, &(&1 + amount))
429 end
430
431 @spec sum_stats([map]) :: map
432 def sum_stats(stats_list) when is_list(stats_list) do
433 Enum.reduce(stats_list, %{}, fn stats, acc ->
434 Enum.reduce(stats, acc, fn {k, v}, acc ->
435 inc_stat(acc, k, v)
436 end)
437 end)
438 end
439
440 @spec current_time :: integer
441 def current_time do
442 Trinity.SimSystem.monotonic_time(:microsecond)
443 end
444
445 @doc """
446 Times a function and logs how long it takes.
447
448 ## Examples
449
450 iex> time_fn("hello world", fn -> :foo end)
451 :foo
452
453 $ "hello world took 0 ms"
454
455 """
456 def time_fn(label, func) when is_binary(label) and is_function(func) do
457 {time, result} = :timer.tc(func)
458 IO.inspect "#{label} took #{time / 1000} ms"
459 result
460 end
461end