this repo has no description
at master 290 lines 9.0 kB view raw
1defmodule Hobbes.Workloads.Cycle do 2 @moduledoc """ 3 A "cycle test" workload, where we create a cycle of keys, like this 4 (but with more keys in practice): 5 6 1 -> 2 -> 3 -> 1 7 8 Then, we randomly and concurrently swap keys within transactions, 9 while preserving the cycle within each transaction. 10 11 Such as: 12 1 -> 3 -> 2 -> 1 13 14 If serializable isolation holds, the cycle should still be valid at the end. 15 But, if isolation is violated, two transactions will eventually 16 modify the same keys concurrently and break the cycle, causing this workload 17 to fail its checks. 18 """ 19 20 alias Hobbes.Transaction 21 alias Hobbes.Structs.Cluster 22 23 alias Trinity.{SimProcess, SimServer} 24 import Hobbes.Workloads 25 26 @behaviour Hobbes.Workloads.Workload 27 28 defmodule Client do 29 use GenServer 30 alias Trinity.{SimProcess, SimServer} 31 32 import Hobbes.Utils, only: [current_time: 0] 33 34 defmodule State do 35 @enforce_keys [:cluster, :count, :tick_ms, :stopped, :stats, :commit_latencies] 36 defstruct @enforce_keys 37 end 38 39 def start_link(opts) when is_map(opts) do 40 SimServer.start_link(__MODULE__, opts) 41 end 42 43 def init(%{cluster: %Cluster{} = cluster, count: count, tick_ms: tick_ms}) when is_integer(count) do 44 SimProcess.send_after(self(), :tick, 0) 45 46 {:ok, %State{ 47 cluster: cluster, 48 count: count, 49 tick_ms: tick_ms, 50 stopped: false, 51 stats: %{}, 52 commit_latencies: [], 53 }} 54 end 55 56 def handle_call(:stop, _from, %State{} = state) do 57 { 58 :reply, 59 Map.take(state, [:stats, :commit_latencies]), 60 %{state | stopped: true}, 61 } 62 end 63 64 def handle_info(:tick, %State{stopped: true} = state), do: {:noreply, state} 65 66 def handle_info(:tick, %State{} = state) do 67 state = work(state) 68 SimProcess.send_after(self(), :tick, state.tick_ms) 69 {:noreply, state} 70 end 71 72 defp work(%State{} = state) do 73 state = inc_stat(state, :swaps) 74 75 start_time = current_time() 76 77 k1 = "key" <> pad(Enum.random(0..(state.count - 1))) 78 with {:ok, txn} <- Transaction.new(state.cluster), 79 {:ok, {v2, txn}} <- Transaction.read(txn, k1), k2 = "key" <> v2, 80 {:ok, {v3, txn}} <- Transaction.read(txn, k2), k3 = "key" <> v3, 81 {:ok, {v4, txn}} <- Transaction.read(txn, k3), 82 83 # Swaps (1 -> 2 -> 3 -> 4) to (1 -> 3 -> 2 -> 4) 84 # clear(k1) is meant to catch bugs in clear implementation or mutation ordering 85 txn = txn |> Transaction.clear(k1) |> Transaction.write([{k1, v3}, {k3, v2}, {k2, v4}]), 86 #txn = Transaction.write(txn, [{"\xFF" <> k1, v3}, {"\xFF" <> k3, v2}, {"\xFF" <> k2, v4}]), 87 88 # Random delay between reads/commit for additional concurrency 89 SimProcess.sleep(Enum.random(100..300)), 90 {:ok, txn} <- Transaction.commit(txn) 91 do 92 duration = current_time() - start_time 93 94 case Hobbes.Workloads.Cycle.check_cycle_at_version(state.cluster, txn.commit_version) do 95 {:ok, _pairs} -> :noop 96 {:error, _error} -> :noop 97 98 {:error, error, pairs} -> 99 raise """ 100 Cycle check error at version #{txn.commit_version}: #{error} 101 Pairs: #{inspect(pairs, pretty: true, limit: :infinity)}\ 102 """ 103 end 104 105 state = update_in(state.commit_latencies, fn list -> [duration | list] end) 106 inc_stat(state, :success) 107 else 108 {:error, :timeout} -> 109 cluster = Hobbes.refresh_cluster(state.cluster) 110 state = %{state | cluster: cluster} 111 inc_stat(state, :timeout) 112 113 {:error, :read_version_too_old} -> inc_stat(state, :rv_old) 114 {:error, :read_version_too_new} -> inc_stat(state, :rv_new) 115 {:error, :wrong_generation} -> inc_stat(state, :wrng_gen) 116 117 {:error, {:transaction_too_old, _txn}} -> inc_stat(state, :txn_old) 118 {:error, {:read_conflict, _txn}} -> inc_stat(state, :rd_cflt) 119 {:error, {:database_locked, _txn}} -> inc_stat(state, :db_lk) 120 121 {:error, :too_many_retries} -> inc_stat(state, :tm_rtry) 122 end 123 end 124 125 defp inc_stat(%State{} = state, key) do 126 %State{state | stats: Map.update(state.stats, key, 1, &(&1 + 1))} 127 end 128 129 def pad(num) when is_integer(num) do 130 String.pad_leading(Integer.to_string(num), 2, "0") 131 end 132 end 133 134 @type opts :: [ 135 keys: non_neg_integer, 136 clients: non_neg_integer, 137 duration_ms: non_neg_integer, 138 ] 139 140 def run(%{cluster: %Cluster{} = cluster}, opts) do 141 key_count = Keyword.get(opts, :keys, 20) 142 client_count = Keyword.get(opts, :clients, 10) 143 client_tick_ms = Keyword.get(opts, :client_tick_ms, 100) 144 duration_ms = Keyword.get(opts, :duration_ms, 10_000) 145 146 build_cycle(cluster, key_count) 147 148 clients = Enum.map(1..client_count, fn _i -> 149 {:ok, pid} = Client.start_link(%{cluster: cluster, count: key_count, tick_ms: client_tick_ms}) 150 pid 151 end) 152 153 SimProcess.sleep(duration_ms) 154 155 client_results = 156 clients 157 |> Enum.map(&SimServer.send_request(&1, :stop)) 158 |> Enum.map(&SimServer.receive_response(&1, 30_000)) 159 |> Enum.map(fn {:reply, reply} -> reply end) 160 161 client_stats = Enum.map(client_results, fn %{stats: stats, commit_latencies: latencies} -> 162 Map.put(stats, :avg_ltcy, mean(latencies) / 1_000_000) 163 end) 164 165 cols = [:swaps, :success, :txn_old, :rd_cflt, :rv_old, :wrng_gen, :tm_rtry, :timeout, :avg_ltcy] 166 167 client_stats_table = 168 client_stats 169 |> Enum.with_index() 170 |> Enum.map(fn {stats, i} -> 171 rows = 172 cols 173 |> Enum.map(&Map.get(stats, &1, 0)) 174 |> Enum.map(&pretty_number/1) 175 176 [pretty_number(i) | rows] 177 end) 178 |> table( 179 cols: ["i"] ++ Enum.map(cols, &to_string/1), 180 align: List.duplicate(:right, length(cols) + 1), 181 limit: 20 182 ) 183 184 total_stats = Hobbes.Utils.sum_stats(client_stats) 185 186 total_cols = List.delete(cols, :avg_ltcy) 187 total_stats_table = table( 188 [Enum.map(total_cols, &Map.get(total_stats, &1, 0)) |> Enum.map(&to_string/1)], 189 cols: Enum.map(total_cols, &to_string/1) 190 ) 191 192 cluster = Hobbes.refresh_cluster(cluster) 193 pairs = check_cycle(cluster, key_count) 194 195 pretty_pairs = 196 pairs 197 |> Enum.map(fn {k, v} -> "#{k}->#{v}" end) 198 |> Enum.join(", ") 199 200 { 201 :ok, 202 """ 203 Pairs: #{pretty_pairs} 204 205 Stats (per client): 206 #{client_stats_table} 207 208 Stats (total): 209 #{total_stats_table} 210 """, 211 } 212 end 213 214 defp build_cycle(%Cluster{} = cluster, count) do 215 {:ok, txn} = Transaction.new(cluster) 216 217 txn = Enum.reduce(0..(count - 2), txn, fn i, txn -> 218 Transaction.write(txn, "key" <> Client.pad(i), Client.pad(i + 1)) 219 end) 220 txn = Transaction.write(txn, "key" <> Client.pad(count - 1), Client.pad(0)) 221 Transaction.commit(txn) 222 end 223 224 defp check_cycle(%Cluster{} = cluster, count) do 225 {:ok, txn} = Transaction.new(cluster) 226 227 all_keys = Enum.map(0..(count - 1), fn i -> "key" <> Client.pad(i) end) 228 {:ok, {key_values, txn}} = Transaction.read(txn, all_keys) 229 230 key0 = "key" <> Client.pad(0) 231 232 {_txn, _visited, pairs, prev} = 233 Enum.reduce(0..(count - 1), {txn, MapSet.new(), [], key0}, fn _i, {txn, visited, pairs, prev} -> 234 value = key_values[prev] 235 236 unless value, do: raise "Expected value for key #{inspect(prev)}, got: #{inspect(value)}" 237 if MapSet.member?(visited, prev), do: raise "Already visited #{inspect(prev)}!" 238 239 {txn, MapSet.put(visited, prev), [{prev, value} | pairs], "key" <> value} 240 end) 241 242 pairs = Enum.reverse(pairs) 243 244 # TODO: better error handling 245 case prev do 246 ^key0 -> 247 pairs 248 _ -> 249 raise """ 250 Cycle broken! Expected "key0" at end, got: #{inspect(prev)} 251 252 Pairs: 253 #{inspect(pairs, pretty: true, limit: :infinity)} 254 """ 255 end 256 end 257 258 def check_cycle_at_version(%Cluster{} = cluster, version) when is_integer(version) do 259 with {:ok, txn} <- Transaction.new(cluster), 260 txn = Map.put(txn, :read_version, version), 261 {:ok, {pairs, _txn}} <- Transaction.read_range(txn, "key", "key\xFF") 262 do 263 check_pairs(pairs) 264 else 265 {:error, _err} = error -> error 266 end 267 end 268 269 defp check_pairs(pairs) do 270 pairs_map = Map.new(pairs) 271 272 Enum.reduce_while(0..(length(pairs) - 1), {MapSet.new(), "key" <> Client.pad(0)}, fn _i, {visited, prev} -> 273 case MapSet.member?(visited, prev) do 274 true -> 275 {:halt, {:error, "Already visited key #{inspect(prev)}"}} 276 false -> 277 case Map.get(pairs_map, prev) do 278 nil -> 279 {:halt, {:error, "Key #{inspect(prev)} not found!"}} 280 value -> 281 {:cont, {MapSet.put(visited, prev), "key" <> value}} 282 end 283 end 284 end) 285 |> case do 286 {:error, error} -> {:error, error, pairs} 287 _ -> {:ok, pairs} 288 end 289 end 290end