this repo has no description
1defmodule Hobbes.Workloads.ReadWrite do
2 @moduledoc """
3 This workload tests reading and writing (a relatively large number of) keys.
4
5 A number of clients are started in parallel, each of which generates and
6 commits transactions containing the following:
7
8 A random number of writes, each of which has predictable keys and values:
9 `hash("key:client_number:write_number")` and likewise for the value.
10
11 A random number of reads, each of which reads a random key already written
12 by a previous transaction and ensures its value is correct.
13
14 We use the hashing scheme simply to avoid having to keep all of the previous
15 keys/values in memory on the clients, as we would have to do if they were
16 truly random.
17
18 At the end of the test, a range read is also performed to ensure that the number
19 of writes reported by the clients corresponds to the number of k/v pairs actually
20 found in the database.
21
22 Note that this test does not currently produce any conflicts (at all),
23 as the keys for each client are distinct (though the hashes ensure
24 they are not segregated to distinct *ranges*).
25 """
26
27 @behaviour Hobbes.Workloads.Workload
28
29 alias Hobbes.Transaction
30 alias Hobbes.Structs.{Cluster, Server}
31
32 alias Trinity.{SimProcess, SimServer}
33
34 import Hobbes.Utils
35 import Hobbes.Workloads
36
37 defmodule Client do
38 use GenServer
39 alias Trinity.{SimProcess, SimServer}
40
41 defmodule State do
42 @enforce_keys [
43 :cluster,
44 :id,
45 :tick_ms,
46 ]
47 defstruct @enforce_keys ++ [
48 write_count: 0,
49 stats: %{},
50 stopped: false,
51 ]
52 end
53
54 def start_link(opts) when is_map(opts) do
55 SimServer.start_link(__MODULE__, opts)
56 end
57
58 def init(%{cluster: %Cluster{} = cluster, id: id, tick_ms: tick_ms}) do
59 SimProcess.send_after(self(), :tick, tick_ms)
60
61 {:ok, %State{
62 cluster: cluster,
63 id: id,
64 tick_ms: tick_ms,
65 }}
66 end
67
68 def handle_call(:stop, _from, %State{} = state) do
69 {:reply, state.stats, %{state | stopped: true}}
70 end
71
72 def handle_info(:tick, %State{stopped: true} = state), do: {:noreply, state}
73
74 def handle_info(:tick, state) do
75 state = work(state)
76 SimProcess.send_after(self(), :tick, state.tick_ms)
77 {:noreply, state}
78 end
79
80 defp work(state) do
81 with {:ok, txn} <- Transaction.new(state.cluster),
82 {:ok, {txn, state}} <- check_reads(txn, state),
83 {txn, state} = write_keys(txn, state),
84 {:ok, _txn} <- Transaction.commit(txn)
85 do
86 %{state | stats: inc_stat(state.stats, :transactions)}
87 else
88 {:error, _err} ->
89 %{state | stats: inc_stat(state.stats, :failed_transactions)}
90 end
91 end
92
93 defp check_reads(txn, %State{write_count: 0} = state), do: {:ok, {txn, state}}
94
95 defp check_reads(%Transaction.TxnState{} = txn, %State{} = state) do
96 num_reads = Enum.random(1..10)
97
98 read_keys = Enum.map(1..num_reads, fn _i ->
99 write_i = Enum.random(0..(state.write_count - 1))
100 {write_i, key_for(state.id, write_i)}
101 end)
102
103 case Transaction.read(txn, Enum.map(read_keys, &elem(&1, 1))) do
104 {:ok, {results, txn}} ->
105 Enum.each(read_keys, fn {write_i, key} ->
106 expected_value = value_for(state.id, write_i)
107 received_value = Map.fetch!(results, key)
108
109 if expected_value != received_value do
110 raise """
111 Received unexpected value!
112 Key: #{inspect(key)}
113 Expected: #{inspect(expected_value)}
114 Received: #{inspect(received_value)}
115 """
116 end
117 end)
118
119 {:ok, {txn, %{state | stats: inc_stat(state.stats, :reads, num_reads)}}}
120
121 {:error, _err} = error -> error
122 end
123 end
124
125 defp write_keys(%Transaction.TxnState{} = txn, %State{} = state) do
126 writes = Enum.random(1..10)
127 write_range = state.write_count..(state.write_count + writes - 1)
128
129 txn = Enum.reduce(write_range, txn, fn i, txn ->
130 Transaction.write(txn, key_for(state.id, i), value_for(state.id, i))
131 end)
132 {txn, %{state | write_count: state.write_count + writes, stats: inc_stat(state.stats, :writes, writes)}}
133 end
134
135 defp key_for(id, i), do: hash("key:#{id}:#{i}")
136 defp value_for(id, i), do: hash("value:#{id}:#{i}")
137
138 defp hash(string) when is_binary(string) do
139 hash = :crypto.hash(:sha256, string) |> Base.encode16()
140 hash <> " " <> string
141 end
142 end
143
144 def run(%{cluster: %Cluster{} = cluster}, opts) do
145 num_clients = Keyword.get(opts, :clients, 10)
146 client_tick_ms = Keyword.get(opts, :client_tick_ms, 100)
147 duration_ms = Keyword.get(opts, :duration_ms, 10_000)
148 profiling = Keyword.get(opts, :profiling, false)
149 check = Keyword.get(opts, :check, true)
150
151 start_time = current_time()
152
153 clients = Enum.map(1..num_clients, fn id ->
154 {:ok, pid} = Client.start_link(%{cluster: cluster, id: id, tick_ms: client_tick_ms})
155 pid
156 end)
157
158 SimProcess.sleep(duration_ms)
159
160 client_stats =
161 clients
162 |> Enum.map(&SimServer.send_request(&1, :stop))
163 |> Enum.map(&SimServer.receive_response(&1, 30_000))
164 |> Enum.map(fn {:reply, reply} -> reply end)
165
166 end_time = current_time()
167 duration_s = (end_time - start_time) / 1_000_000
168
169 total_stats = sum_stats(client_stats)
170 per_second = Map.new(total_stats, fn {k, v} -> {k, v / duration_s} end)
171
172 client_stats_table =
173 client_stats
174 |> Enum.with_index()
175 |> Enum.map(fn {stats, i} ->
176 [
177 to_string(i),
178 pretty_number(Map.get(stats, :transactions, 0)),
179 pretty_number(Map.get(stats, :reads, 0)),
180 pretty_number(Map.get(stats, :writes, 0)),
181 ]
182 end)
183 |> table(
184 cols: ["i", "Txns", "Reads", "Writes"],
185 align: [:right, :right, :right, :right],
186 limit: 10
187 )
188
189 total_stats_table = table(
190 [([:transactions, :reads, :writes] |> Enum.map(&Map.get(total_stats, &1, 0)) |> Enum.map(&pretty_number/1)) ++ [to_string(duration_s)]],
191 cols: ["Txns", "Reads", "Writes", "Duration (s)"]
192 )
193 per_second_table = table(
194 [[:transactions, :reads, :writes] |> Enum.map(&Map.get(per_second, &1, 0)) |> Enum.map(&pretty_number/1)],
195 cols: ["Txns / s", "Reads / s", "Writes / s"]
196 )
197
198 profiling_stats =
199 case profiling do
200 true ->
201 cluster.servers
202 |> Map.values()
203 |> Enum.sort_by(&(&1.id))
204 |> Enum.map(fn %Server{pid: pid} = server ->
205 {:reductions, reductions} = Process.info(pid, :reductions)
206 {:memory, memory} = Process.info(pid, :memory)
207
208 stats = %{
209 reds: reductions,
210 mem: memory,
211 }
212 {server.id, server.type, pretty_stats(stats)}
213 [
214 to_string(server.id),
215 to_string(server.type) |> String.split(".") |> List.last(),
216 pretty_number(reductions),
217 pretty_number(memory),
218 ]
219 end)
220 |> table(
221 cols: ["ID", "Type", "Reductions", "Memory (B)"],
222 align: [:right, :left, :right, :right]
223 )
224
225 false ->
226 nil
227 end
228
229
230 shards = list_shards(cluster)
231 shards_table =
232 shards
233 |> Enum.map(fn {shard_key, {from_ids, to_ids}, size_bytes} ->
234 [
235 inspect(shard_key),
236 "#{inspect(from_ids)} -> #{inspect(to_ids)}",
237 pretty_number(size_bytes / 1_000_000) <> " MB",
238 ]
239 end)
240 |> Hobbes.Workloads.table(
241 cols: ["Shard", "Servers", "Size (MB)"],
242 align: [:left, :left, :right]
243 )
244
245 teams_table =
246 shards
247 |> Enum.group_by(fn {_shard_key, {from_ids, _to_ids}, _size_bytes} -> from_ids end)
248 |> Enum.sort_by(&elem(&1, 0))
249 |> Enum.map(fn {from_ids, shards} ->
250 shard_count = length(shards)
251 total_size = Enum.sum_by(shards, fn {_, _, size_bytes} -> size_bytes end)
252 [
253 inspect(from_ids),
254 pretty_number(shard_count),
255 pretty_number(total_size / 1_000_000) <> " MB",
256 ]
257 end)
258 |> Hobbes.Workloads.table(
259 cols: ["Team", "Shards", "MB (Total)"],
260 align: [:left, :right, :right]
261 )
262
263 if check do
264 check_database(cluster, Map.fetch!(total_stats, :writes))
265 end
266
267 {
268 :ok,
269 """
270 Client stats:
271 #{client_stats_table}
272
273 Total stats:
274 #{total_stats_table}
275
276 Per second:
277 #{per_second_table}
278
279 Profiling:
280 #{profiling_stats || "Disabled"}
281
282 Shards (#{length(shards)}):
283 #{shards_table}
284
285 Teams
286 #{teams_table}
287 """,
288 }
289 end
290
291 defp check_database(%Cluster{} = cluster, num_writes) do
292 {:ok, txn} = Transaction.new(cluster)
293 {:ok, {pairs, _txn}} = Transaction.read_range(txn, "", "\xFF")
294
295 if length(pairs) != num_writes do
296 raise """
297 Expected #{num_writes} pairs, but only found #{length(pairs)}!
298 """
299 end
300
301 :ok
302 end
303
304 def list_shards(%Cluster{} = cluster) do
305 {:ok, txn} = Transaction.new(cluster)
306 {:ok, {result, _txn}} = Transaction.read_range(txn, "\xFF/key_servers/", "\xFF/key_servers/\xFF\xFF")
307
308 result
309 |> Enum.map(fn {"\xFF/key_servers/" <> shard_key, servers} ->
310 {shard_key, servers}
311 end)
312 |> then(fn shards -> shards ++ [{"\xFF\xFF", nil}] end)
313 |> Enum.chunk_every(2, 1, :discard)
314 |> Enum.map(fn [{start_key, value}, {end_key, _}] ->
315 [from_ids, to_ids] = unpack_key_servers(value)
316
317 %{pid: pid} = Map.fetch!(cluster.servers, hd(from_ids))
318 {:ok, %{size_bytes: size_bytes}} = Hobbes.Servers.Storage.get_shard_stats(pid, start_key, end_key)
319
320 {start_key, {from_ids, to_ids}, size_bytes}
321 end)
322 end
323
324 defp pretty_stats(stats) when is_map(stats) do
325 Map.new(stats, fn {k, v} -> {k, pretty_number(v)} end)
326 end
327end