this repo has no description
1defmodule Hobbes.Workloads.Cycle do
2 @moduledoc """
3 A "cycle test" workload, where we create a cycle of keys, like this
4 (but with more keys in practice):
5
6 1 -> 2 -> 3 -> 1
7
8 Then, we randomly and concurrently swap keys within transactions,
9 while preserving the cycle within each transaction.
10
11 Such as:
12 1 -> 3 -> 2 -> 1
13
14 If serializable isolation holds, the cycle should still be valid at the end.
15 But, if isolation is violated, two transactions will eventually
16 modify the same keys concurrently and break the cycle, causing this workload
17 to fail its checks.
18 """
19
20 alias Hobbes.Transaction
21 alias Hobbes.Structs.Cluster
22
23 alias Trinity.{SimProcess, SimServer}
24 import Hobbes.Workloads
25
26 @behaviour Hobbes.Workloads.Workload
27
28 defmodule Client do
29 use GenServer
30 alias Trinity.{SimProcess, SimServer}
31
32 import Hobbes.Utils, only: [current_time: 0]
33
34 defmodule State do
35 @enforce_keys [:cluster, :count, :tick_ms, :stopped, :stats, :commit_latencies]
36 defstruct @enforce_keys
37 end
38
39 def start_link(opts) when is_map(opts) do
40 SimServer.start_link(__MODULE__, opts)
41 end
42
43 def init(%{cluster: %Cluster{} = cluster, count: count, tick_ms: tick_ms}) when is_integer(count) do
44 SimProcess.send_after(self(), :tick, 0)
45
46 {:ok, %State{
47 cluster: cluster,
48 count: count,
49 tick_ms: tick_ms,
50 stopped: false,
51 stats: %{},
52 commit_latencies: [],
53 }}
54 end
55
56 def handle_call(:stop, _from, %State{} = state) do
57 {
58 :reply,
59 Map.take(state, [:stats, :commit_latencies]),
60 %{state | stopped: true},
61 }
62 end
63
64 def handle_info(:tick, %State{stopped: true} = state), do: {:noreply, state}
65
66 def handle_info(:tick, %State{} = state) do
67 state = work(state)
68 SimProcess.send_after(self(), :tick, state.tick_ms)
69 {:noreply, state}
70 end
71
72 defp work(%State{} = state) do
73 state = inc_stat(state, :swaps)
74
75 start_time = current_time()
76
77 k1 = "key" <> pad(Enum.random(0..(state.count - 1)))
78 with {:ok, txn} <- Transaction.new(state.cluster),
79 {:ok, {v2, txn}} <- Transaction.read(txn, k1), k2 = "key" <> v2,
80 {:ok, {v3, txn}} <- Transaction.read(txn, k2), k3 = "key" <> v3,
81 {:ok, {v4, txn}} <- Transaction.read(txn, k3),
82
83 # Swaps (1 -> 2 -> 3 -> 4) to (1 -> 3 -> 2 -> 4)
84 # clear(k1) is meant to catch bugs in clear implementation or mutation ordering
85 txn = txn |> Transaction.clear(k1) |> Transaction.write([{k1, v3}, {k3, v2}, {k2, v4}]),
86 #txn = Transaction.write(txn, [{"\xFF" <> k1, v3}, {"\xFF" <> k3, v2}, {"\xFF" <> k2, v4}]),
87
88 # Random delay between reads/commit for additional concurrency
89 SimProcess.sleep(Enum.random(100..300)),
90 {:ok, txn} <- Transaction.commit(txn)
91 do
92 duration = current_time() - start_time
93
94 case Hobbes.Workloads.Cycle.check_cycle_at_version(state.cluster, txn.commit_version) do
95 {:ok, _pairs} -> :noop
96 {:error, _error} -> :noop
97
98 {:error, error, pairs} ->
99 raise """
100 Cycle check error at version #{txn.commit_version}: #{error}
101 Pairs: #{inspect(pairs, pretty: true, limit: :infinity)}\
102 """
103 end
104
105 state = update_in(state.commit_latencies, fn list -> [duration | list] end)
106 inc_stat(state, :success)
107 else
108 {:error, :timeout} ->
109 cluster = Hobbes.refresh_cluster(state.cluster)
110 state = %{state | cluster: cluster}
111 inc_stat(state, :timeout)
112
113 {:error, :read_version_too_old} -> inc_stat(state, :rv_old)
114 {:error, :read_version_too_new} -> inc_stat(state, :rv_new)
115 {:error, :wrong_generation} -> inc_stat(state, :wrng_gen)
116
117 {:error, {:transaction_too_old, _txn}} -> inc_stat(state, :txn_old)
118 {:error, {:read_conflict, _txn}} -> inc_stat(state, :rd_cflt)
119 {:error, {:database_locked, _txn}} -> inc_stat(state, :db_lk)
120
121 {:error, :too_many_retries} -> inc_stat(state, :tm_rtry)
122 end
123 end
124
125 defp inc_stat(%State{} = state, key) do
126 %State{state | stats: Map.update(state.stats, key, 1, &(&1 + 1))}
127 end
128
129 def pad(num) when is_integer(num) do
130 String.pad_leading(Integer.to_string(num), 2, "0")
131 end
132 end
133
134 @type opts :: [
135 keys: non_neg_integer,
136 clients: non_neg_integer,
137 duration_ms: non_neg_integer,
138 ]
139
140 def run(%{cluster: %Cluster{} = cluster}, opts) do
141 key_count = Keyword.get(opts, :keys, 20)
142 client_count = Keyword.get(opts, :clients, 10)
143 client_tick_ms = Keyword.get(opts, :client_tick_ms, 100)
144 duration_ms = Keyword.get(opts, :duration_ms, 10_000)
145
146 build_cycle(cluster, key_count)
147
148 clients = Enum.map(1..client_count, fn _i ->
149 {:ok, pid} = Client.start_link(%{cluster: cluster, count: key_count, tick_ms: client_tick_ms})
150 pid
151 end)
152
153 SimProcess.sleep(duration_ms)
154
155 client_results =
156 clients
157 |> Enum.map(&SimServer.send_request(&1, :stop))
158 |> Enum.map(&SimServer.receive_response(&1, 30_000))
159 |> Enum.map(fn {:reply, reply} -> reply end)
160
161 client_stats = Enum.map(client_results, fn %{stats: stats, commit_latencies: latencies} ->
162 Map.put(stats, :avg_ltcy, mean(latencies) / 1_000_000)
163 end)
164
165 cols = [:swaps, :success, :txn_old, :rd_cflt, :rv_old, :wrng_gen, :tm_rtry, :timeout, :avg_ltcy]
166
167 client_stats_table =
168 client_stats
169 |> Enum.with_index()
170 |> Enum.map(fn {stats, i} ->
171 rows =
172 cols
173 |> Enum.map(&Map.get(stats, &1, 0))
174 |> Enum.map(&pretty_number/1)
175
176 [pretty_number(i) | rows]
177 end)
178 |> table(
179 cols: ["i"] ++ Enum.map(cols, &to_string/1),
180 align: List.duplicate(:right, length(cols) + 1),
181 limit: 20
182 )
183
184 total_stats = Hobbes.Utils.sum_stats(client_stats)
185
186 total_cols = List.delete(cols, :avg_ltcy)
187 total_stats_table = table(
188 [Enum.map(total_cols, &Map.get(total_stats, &1, 0)) |> Enum.map(&to_string/1)],
189 cols: Enum.map(total_cols, &to_string/1)
190 )
191
192 cluster = Hobbes.refresh_cluster(cluster)
193 pairs = check_cycle(cluster, key_count)
194
195 pretty_pairs =
196 pairs
197 |> Enum.map(fn {k, v} -> "#{k}->#{v}" end)
198 |> Enum.join(", ")
199
200 {
201 :ok,
202 """
203 Pairs: #{pretty_pairs}
204
205 Stats (per client):
206 #{client_stats_table}
207
208 Stats (total):
209 #{total_stats_table}
210 """,
211 }
212 end
213
214 defp build_cycle(%Cluster{} = cluster, count) do
215 {:ok, txn} = Transaction.new(cluster)
216
217 txn = Enum.reduce(0..(count - 2), txn, fn i, txn ->
218 Transaction.write(txn, "key" <> Client.pad(i), Client.pad(i + 1))
219 end)
220 txn = Transaction.write(txn, "key" <> Client.pad(count - 1), Client.pad(0))
221 Transaction.commit(txn)
222 end
223
224 defp check_cycle(%Cluster{} = cluster, count) do
225 {:ok, txn} = Transaction.new(cluster)
226
227 all_keys = Enum.map(0..(count - 1), fn i -> "key" <> Client.pad(i) end)
228 {:ok, {key_values, txn}} = Transaction.read(txn, all_keys)
229
230 key0 = "key" <> Client.pad(0)
231
232 {_txn, _visited, pairs, prev} =
233 Enum.reduce(0..(count - 1), {txn, MapSet.new(), [], key0}, fn _i, {txn, visited, pairs, prev} ->
234 value = key_values[prev]
235
236 unless value, do: raise "Expected value for key #{inspect(prev)}, got: #{inspect(value)}"
237 if MapSet.member?(visited, prev), do: raise "Already visited #{inspect(prev)}!"
238
239 {txn, MapSet.put(visited, prev), [{prev, value} | pairs], "key" <> value}
240 end)
241
242 pairs = Enum.reverse(pairs)
243
244 # TODO: better error handling
245 case prev do
246 ^key0 ->
247 pairs
248 _ ->
249 raise """
250 Cycle broken! Expected "key0" at end, got: #{inspect(prev)}
251
252 Pairs:
253 #{inspect(pairs, pretty: true, limit: :infinity)}
254 """
255 end
256 end
257
258 def check_cycle_at_version(%Cluster{} = cluster, version) when is_integer(version) do
259 with {:ok, txn} <- Transaction.new(cluster),
260 txn = Map.put(txn, :read_version, version),
261 {:ok, {pairs, _txn}} <- Transaction.read_range(txn, "key", "key\xFF")
262 do
263 check_pairs(pairs)
264 else
265 {:error, _err} = error -> error
266 end
267 end
268
269 defp check_pairs(pairs) do
270 pairs_map = Map.new(pairs)
271
272 Enum.reduce_while(0..(length(pairs) - 1), {MapSet.new(), "key" <> Client.pad(0)}, fn _i, {visited, prev} ->
273 case MapSet.member?(visited, prev) do
274 true ->
275 {:halt, {:error, "Already visited key #{inspect(prev)}"}}
276 false ->
277 case Map.get(pairs_map, prev) do
278 nil ->
279 {:halt, {:error, "Key #{inspect(prev)} not found!"}}
280 value ->
281 {:cont, {MapSet.put(visited, prev), "key" <> value}}
282 end
283 end
284 end)
285 |> case do
286 {:error, error} -> {:error, error, pairs}
287 _ -> {:ok, pairs}
288 end
289 end
290end