this repo has no description
1defmodule Hobbes.Workloads do
2 require Logger
3
4 alias Trinity.{Sim, SimServer, SimLogger}
5 alias Hobbes.Structs.Cluster
6
7 import Hobbes.Utils
8
9 defmodule Workload do
10 @callback run(context :: map, opts :: keyword) :: {:ok, term}
11 end
12
13 defmodule Runner do
14 use GenServer
15 alias Trinity.SimServer
16
17 defmodule State do
18 @enforce_keys [:workload_module, :opts, :context, :results]
19 defstruct @enforce_keys
20 end
21
22 @spec start_link(module, keyword, map) :: GenServer.on_start
23 def start_link(workload_module, opts, context) do
24 SimServer.start_link(__MODULE__, {workload_module, opts, context})
25 end
26
27 def run(server), do: SimServer.cast(server, :run)
28
29 def get_results(server), do: SimServer.call(server, :get_results, :infinity)
30
31 def init({workload_module, opts, context})
32 when is_atom(workload_module) and is_list(opts) and is_map(context) do
33 {:ok, %State{
34 workload_module: workload_module,
35 opts: opts,
36 context: context,
37 results: nil,
38 }}
39 end
40
41 def handle_call(:get_results, _from, %State{} = state) do
42 {:reply, state.results, state}
43 end
44
45 def handle_cast(:run, %State{} = state) do
46 results = state.workload_module.run(state.context, state.opts)
47 {:noreply, %State{state | results: results}}
48 end
49 end
50
51 @type workload_opts :: [
52 simulated: boolean,
53 ]
54
55 @spec run([{module, keyword}], keyword) :: :ok
56 def run(workloads, opts \\ []) when is_list(workloads) do
57 {count, opts} = Keyword.pop(opts, :count, 1)
58 {seed, opts} = Keyword.pop(opts, :seed, nil)
59
60 seeds =
61 if seed do
62 [seed]
63 else
64 100..(100 + count - 1)
65 end
66
67 {:ok, supervisor} = Task.Supervisor.start_link()
68
69 results =
70 Enum.map(seeds, fn seed ->
71 Task.Supervisor.async_nolink(supervisor, fn ->
72 if opts[:simulated] do
73 Sim.run_simulation(fn ->
74 run_one(seed, workloads, opts)
75 end, seed: seed)
76 else
77 run_one(seed, workloads, opts)
78 end
79 end)
80 end)
81 |> Task.yield_many(:infinity)
82 |> Enum.map(fn {_task, result} -> result end)
83 |> Enum.zip(seeds)
84
85 {succeeded, failed} = Enum.split_with(results, fn
86 {{:ok, _value}, _seed} -> true
87 {{:exit, _error}, _seed} -> false
88 end)
89
90 succeeded_message =
91 succeeded
92 |> Enum.map(fn {{:ok, {:ok, result}}, seed} ->
93 """
94 Seed #{seed} succeeded with results:
95 #{indent(result.message)}\
96 """
97 end)
98 |> Enum.join("\n")
99
100 failed_message =
101 failed
102 |> Enum.map(fn
103 {{:exit, {:timeout, _mfa} = reason}, seed} ->
104 """
105 - Seed #{inspect(seed)} timed out
106 #{Exception.format_exit(reason) |> color(:red) |> indent()}\
107 """
108
109 {{:exit, {exception, stacktrace}}, seed} ->
110 """
111 - Seed #{inspect(seed)} failed with error:
112 #{Exception.format_banner(:error, exception, stacktrace) |> color(:red) |> indent()}
113 #{"stacktrace:" |> color(:cyan) |> indent()}
114 #{Exception.format_stacktrace(stacktrace)}
115 """
116
117 {{:exit, reason}, seed} ->
118 """
119 - Seed #{inspect(seed)} failed with reason:
120 #{inspect(reason) |> color(:red)}
121 """
122 end)
123 |> Enum.join("\n")
124
125 num_failed = length(failed)
126
127 header = """
128
129 Results for test #{inspect(opts[:name] || "Untitled")}
130
131 #{length(results)} test workloads, #{num_failed} failures\
132 """
133
134 Logger.info """
135 #{color(header, :cyan)}
136
137 #{(if num_failed > 0, do: failed_message, else: succeeded_message) |> indent()}\
138 """
139
140 if num_failed > 0, do: raise "At least one workload failed!"
141
142 :ok
143 end
144
145 def run_one(_seed, workloads, opts \\ []) do
146 {cluster_opts, _opts} = Keyword.pop(opts, :cluster_opts, [])
147
148 # Use a distributed cluster in simulation
149 cluster_opts = Keyword.put(cluster_opts, :distributed, Sim.simulated?())
150 {:ok, coordinator_pids} = Hobbes.Sandbox.start_cluster(cluster_opts)
151
152 {:ok, %Cluster{} = cluster} = backoff(nil, fn _acc ->
153 case Hobbes.get_cluster(coordinator_pids) do
154 {:ok, _cluster} = result -> {:halt, result}
155 {:error, _err} -> {:cont, nil}
156 end
157 end, 20)
158
159 context = %{cluster: cluster}
160
161 runners = Enum.map(workloads, fn {module, opts} when is_atom(module) and is_list(opts) ->
162 {:ok, pid} = Runner.start_link(module, opts, context)
163 pid
164 end)
165
166 Enum.each(runners, &Runner.run/1)
167 results = Enum.map(runners, &Runner.get_results/1)
168
169 result_logs =
170 Enum.zip(workloads, results)
171 |> Enum.map(fn {{module, opts}, {:ok, message}} when is_binary(message) ->
172 """
173 - #{inspect(module)} with opts #{inspect(opts)}
174 #{message |> color(:light_black) |> indent(4)}\
175 """
176 end)
177 |> Enum.join("\n\n")
178
179 {hash, log_size} =
180 case Sim.simulated?() do
181 true -> {SimLogger.get_hash, SimLogger.get_log_size()}
182 false -> {0, 0}
183 end
184 #Logger.debug SimLogger.log_tail(10_000) |> Enum.join("\n")
185
186 readable_hash =
187 :crypto.hash(:sha256, <<hash::integer-32>>)
188 |> Base.encode32(padding: false)
189 |> String.slice(0, 8)
190
191 message = """
192 Logged #{log_size} events with hash #{hash}
193 SHA256: #{readable_hash}
194
195 #{result_logs}
196 """
197 {:ok, %{message: message}}
198 end
199
200 defp indent(string, spaces \\ 2) when is_binary(string) and is_integer(spaces) do
201 whitespace = String.duplicate(" ", spaces)
202 whitespace <> String.replace(string, "\n", "\n" <> whitespace)
203 end
204
205 defp color(string, color) when is_atom(color) do
206 IO.ANSI.format([color, string])
207 |> IO.iodata_to_binary()
208 end
209
210 def table(rows, opts \\ []) do
211 cols = Keyword.get_lazy(opts, :cols, fn ->
212 Enum.map(1..length(hd(rows)), &Integer.to_string/1)
213 end)
214
215 align = Keyword.get_lazy(opts, :align, fn ->
216 Enum.map(1..length(hd(rows)), fn _i -> :left end)
217 end)
218
219 limit = case Keyword.get(opts, :limit, :infinity) do
220 :infinity -> :infinity
221 # +3 for header
222 limit when is_integer(limit) and limit > 0 -> limit + 3
223 end
224
225 rows = [cols | rows]
226 col_sizes = Enum.map(0..(length(hd(rows)) - 1), fn c_i ->
227 rows
228 |> Enum.map(fn r -> Enum.at(r, c_i) |> String.length() end)
229 |> Enum.max()
230 end)
231 col_meta = Enum.zip(col_sizes, align)
232
233 rows
234 |> Enum.map(fn r ->
235 r
236 |> Enum.zip(col_meta)
237 |> Enum.map(fn {v, {c_size, c_align}} ->
238 case c_align do
239 :left -> String.pad_trailing(v, c_size)
240 :right -> String.pad_leading(v, c_size)
241 end
242 end)
243 |> Enum.join(" | ")
244 end)
245 |> then(fn [header | rest] ->
246 sep = String.duplicate("-", String.length(header))
247 [sep | [header | [sep | rest]]]
248 end)
249 |> then(fn rows ->
250 row_count = length(rows)
251
252 case row_count > limit do
253 true ->
254 rows = Enum.take(rows, limit)
255 Enum.join(rows, "\n") <> "\n#{row_count - limit} more rows..."
256
257 false ->
258 Enum.join(rows, "\n")
259 end
260 end)
261 end
262
263 def mean([]), do: 0
264 def mean(numbers), do: Enum.sum(numbers) / length(numbers)
265
266 def pretty_number(number) when is_integer(number) do
267 number
268 |> Integer.to_string()
269 |> commas()
270 end
271
272 def pretty_number(number) when is_float(number) do
273 string = :erlang.float_to_binary(number, decimals: 2)
274 [i, f] = String.split(string, ".")
275 commas(i) <> "." <> f
276 end
277
278 defp commas(number_string) when is_binary(number_string) do
279 number_string
280 |> String.to_charlist()
281 |> Enum.reverse()
282 |> Enum.chunk_every(3)
283 |> Enum.map(&Enum.reverse/1)
284 |> Enum.reverse()
285 |> Enum.join(",")
286 end
287end