this repo has no description
at master 327 lines 10 kB view raw
1defmodule Hobbes.Workloads.ReadWrite do 2 @moduledoc """ 3 This workload tests reading and writing (a relatively large number of) keys. 4 5 A number of clients are started in parallel, each of which generates and 6 commits transactions containing the following: 7 8 A random number of writes, each of which has predictable keys and values: 9 `hash("key:client_number:write_number")` and likewise for the value. 10 11 A random number of reads, each of which reads a random key already written 12 by a previous transaction and ensures its value is correct. 13 14 We use the hashing scheme simply to avoid having to keep all of the previous 15 keys/values in memory on the clients, as we would have to do if they were 16 truly random. 17 18 At the end of the test, a range read is also performed to ensure that the number 19 of writes reported by the clients corresponds to the number of k/v pairs actually 20 found in the database. 21 22 Note that this test does not currently produce any conflicts (at all), 23 as the keys for each client are distinct (though the hashes ensure 24 they are not segregated to distinct *ranges*). 25 """ 26 27 @behaviour Hobbes.Workloads.Workload 28 29 alias Hobbes.Transaction 30 alias Hobbes.Structs.{Cluster, Server} 31 32 alias Trinity.{SimProcess, SimServer} 33 34 import Hobbes.Utils 35 import Hobbes.Workloads 36 37 defmodule Client do 38 use GenServer 39 alias Trinity.{SimProcess, SimServer} 40 41 defmodule State do 42 @enforce_keys [ 43 :cluster, 44 :id, 45 :tick_ms, 46 ] 47 defstruct @enforce_keys ++ [ 48 write_count: 0, 49 stats: %{}, 50 stopped: false, 51 ] 52 end 53 54 def start_link(opts) when is_map(opts) do 55 SimServer.start_link(__MODULE__, opts) 56 end 57 58 def init(%{cluster: %Cluster{} = cluster, id: id, tick_ms: tick_ms}) do 59 SimProcess.send_after(self(), :tick, tick_ms) 60 61 {:ok, %State{ 62 cluster: cluster, 63 id: id, 64 tick_ms: tick_ms, 65 }} 66 end 67 68 def handle_call(:stop, _from, %State{} = state) do 69 {:reply, state.stats, %{state | stopped: true}} 70 end 71 72 def handle_info(:tick, %State{stopped: true} = state), do: {:noreply, state} 73 74 def handle_info(:tick, state) do 75 state = work(state) 76 SimProcess.send_after(self(), :tick, state.tick_ms) 77 {:noreply, state} 78 end 79 80 defp work(state) do 81 with {:ok, txn} <- Transaction.new(state.cluster), 82 {:ok, {txn, state}} <- check_reads(txn, state), 83 {txn, state} = write_keys(txn, state), 84 {:ok, _txn} <- Transaction.commit(txn) 85 do 86 %{state | stats: inc_stat(state.stats, :transactions)} 87 else 88 {:error, _err} -> 89 %{state | stats: inc_stat(state.stats, :failed_transactions)} 90 end 91 end 92 93 defp check_reads(txn, %State{write_count: 0} = state), do: {:ok, {txn, state}} 94 95 defp check_reads(%Transaction.TxnState{} = txn, %State{} = state) do 96 num_reads = Enum.random(1..10) 97 98 read_keys = Enum.map(1..num_reads, fn _i -> 99 write_i = Enum.random(0..(state.write_count - 1)) 100 {write_i, key_for(state.id, write_i)} 101 end) 102 103 case Transaction.read(txn, Enum.map(read_keys, &elem(&1, 1))) do 104 {:ok, {results, txn}} -> 105 Enum.each(read_keys, fn {write_i, key} -> 106 expected_value = value_for(state.id, write_i) 107 received_value = Map.fetch!(results, key) 108 109 if expected_value != received_value do 110 raise """ 111 Received unexpected value! 112 Key: #{inspect(key)} 113 Expected: #{inspect(expected_value)} 114 Received: #{inspect(received_value)} 115 """ 116 end 117 end) 118 119 {:ok, {txn, %{state | stats: inc_stat(state.stats, :reads, num_reads)}}} 120 121 {:error, _err} = error -> error 122 end 123 end 124 125 defp write_keys(%Transaction.TxnState{} = txn, %State{} = state) do 126 writes = Enum.random(1..10) 127 write_range = state.write_count..(state.write_count + writes - 1) 128 129 txn = Enum.reduce(write_range, txn, fn i, txn -> 130 Transaction.write(txn, key_for(state.id, i), value_for(state.id, i)) 131 end) 132 {txn, %{state | write_count: state.write_count + writes, stats: inc_stat(state.stats, :writes, writes)}} 133 end 134 135 defp key_for(id, i), do: hash("key:#{id}:#{i}") 136 defp value_for(id, i), do: hash("value:#{id}:#{i}") 137 138 defp hash(string) when is_binary(string) do 139 hash = :crypto.hash(:sha256, string) |> Base.encode16() 140 hash <> " " <> string 141 end 142 end 143 144 def run(%{cluster: %Cluster{} = cluster}, opts) do 145 num_clients = Keyword.get(opts, :clients, 10) 146 client_tick_ms = Keyword.get(opts, :client_tick_ms, 100) 147 duration_ms = Keyword.get(opts, :duration_ms, 10_000) 148 profiling = Keyword.get(opts, :profiling, false) 149 check = Keyword.get(opts, :check, true) 150 151 start_time = current_time() 152 153 clients = Enum.map(1..num_clients, fn id -> 154 {:ok, pid} = Client.start_link(%{cluster: cluster, id: id, tick_ms: client_tick_ms}) 155 pid 156 end) 157 158 SimProcess.sleep(duration_ms) 159 160 client_stats = 161 clients 162 |> Enum.map(&SimServer.send_request(&1, :stop)) 163 |> Enum.map(&SimServer.receive_response(&1, 30_000)) 164 |> Enum.map(fn {:reply, reply} -> reply end) 165 166 end_time = current_time() 167 duration_s = (end_time - start_time) / 1_000_000 168 169 total_stats = sum_stats(client_stats) 170 per_second = Map.new(total_stats, fn {k, v} -> {k, v / duration_s} end) 171 172 client_stats_table = 173 client_stats 174 |> Enum.with_index() 175 |> Enum.map(fn {stats, i} -> 176 [ 177 to_string(i), 178 pretty_number(Map.get(stats, :transactions, 0)), 179 pretty_number(Map.get(stats, :reads, 0)), 180 pretty_number(Map.get(stats, :writes, 0)), 181 ] 182 end) 183 |> table( 184 cols: ["i", "Txns", "Reads", "Writes"], 185 align: [:right, :right, :right, :right], 186 limit: 10 187 ) 188 189 total_stats_table = table( 190 [([:transactions, :reads, :writes] |> Enum.map(&Map.get(total_stats, &1, 0)) |> Enum.map(&pretty_number/1)) ++ [to_string(duration_s)]], 191 cols: ["Txns", "Reads", "Writes", "Duration (s)"] 192 ) 193 per_second_table = table( 194 [[:transactions, :reads, :writes] |> Enum.map(&Map.get(per_second, &1, 0)) |> Enum.map(&pretty_number/1)], 195 cols: ["Txns / s", "Reads / s", "Writes / s"] 196 ) 197 198 profiling_stats = 199 case profiling do 200 true -> 201 cluster.servers 202 |> Map.values() 203 |> Enum.sort_by(&(&1.id)) 204 |> Enum.map(fn %Server{pid: pid} = server -> 205 {:reductions, reductions} = Process.info(pid, :reductions) 206 {:memory, memory} = Process.info(pid, :memory) 207 208 stats = %{ 209 reds: reductions, 210 mem: memory, 211 } 212 {server.id, server.type, pretty_stats(stats)} 213 [ 214 to_string(server.id), 215 to_string(server.type) |> String.split(".") |> List.last(), 216 pretty_number(reductions), 217 pretty_number(memory), 218 ] 219 end) 220 |> table( 221 cols: ["ID", "Type", "Reductions", "Memory (B)"], 222 align: [:right, :left, :right, :right] 223 ) 224 225 false -> 226 nil 227 end 228 229 230 shards = list_shards(cluster) 231 shards_table = 232 shards 233 |> Enum.map(fn {shard_key, {from_ids, to_ids}, size_bytes} -> 234 [ 235 inspect(shard_key), 236 "#{inspect(from_ids)} -> #{inspect(to_ids)}", 237 pretty_number(size_bytes / 1_000_000) <> " MB", 238 ] 239 end) 240 |> Hobbes.Workloads.table( 241 cols: ["Shard", "Servers", "Size (MB)"], 242 align: [:left, :left, :right] 243 ) 244 245 teams_table = 246 shards 247 |> Enum.group_by(fn {_shard_key, {from_ids, _to_ids}, _size_bytes} -> from_ids end) 248 |> Enum.sort_by(&elem(&1, 0)) 249 |> Enum.map(fn {from_ids, shards} -> 250 shard_count = length(shards) 251 total_size = Enum.sum_by(shards, fn {_, _, size_bytes} -> size_bytes end) 252 [ 253 inspect(from_ids), 254 pretty_number(shard_count), 255 pretty_number(total_size / 1_000_000) <> " MB", 256 ] 257 end) 258 |> Hobbes.Workloads.table( 259 cols: ["Team", "Shards", "MB (Total)"], 260 align: [:left, :right, :right] 261 ) 262 263 if check do 264 check_database(cluster, Map.fetch!(total_stats, :writes)) 265 end 266 267 { 268 :ok, 269 """ 270 Client stats: 271 #{client_stats_table} 272 273 Total stats: 274 #{total_stats_table} 275 276 Per second: 277 #{per_second_table} 278 279 Profiling: 280 #{profiling_stats || "Disabled"} 281 282 Shards (#{length(shards)}): 283 #{shards_table} 284 285 Teams 286 #{teams_table} 287 """, 288 } 289 end 290 291 defp check_database(%Cluster{} = cluster, num_writes) do 292 {:ok, txn} = Transaction.new(cluster) 293 {:ok, {pairs, _txn}} = Transaction.read_range(txn, "", "\xFF") 294 295 if length(pairs) != num_writes do 296 raise """ 297 Expected #{num_writes} pairs, but only found #{length(pairs)}! 298 """ 299 end 300 301 :ok 302 end 303 304 def list_shards(%Cluster{} = cluster) do 305 {:ok, txn} = Transaction.new(cluster) 306 {:ok, {result, _txn}} = Transaction.read_range(txn, "\xFF/key_servers/", "\xFF/key_servers/\xFF\xFF") 307 308 result 309 |> Enum.map(fn {"\xFF/key_servers/" <> shard_key, servers} -> 310 {shard_key, servers} 311 end) 312 |> then(fn shards -> shards ++ [{"\xFF\xFF", nil}] end) 313 |> Enum.chunk_every(2, 1, :discard) 314 |> Enum.map(fn [{start_key, value}, {end_key, _}] -> 315 [from_ids, to_ids] = unpack_key_servers(value) 316 317 %{pid: pid} = Map.fetch!(cluster.servers, hd(from_ids)) 318 {:ok, %{size_bytes: size_bytes}} = Hobbes.Servers.Storage.get_shard_stats(pid, start_key, end_key) 319 320 {start_key, {from_ids, to_ids}, size_bytes} 321 end) 322 end 323 324 defp pretty_stats(stats) when is_map(stats) do 325 Map.new(stats, fn {k, v} -> {k, pretty_number(v)} end) 326 end 327end