this repo has no description
at master 287 lines 7.8 kB view raw
1defmodule Hobbes.Workloads do 2 require Logger 3 4 alias Trinity.{Sim, SimServer, SimLogger} 5 alias Hobbes.Structs.Cluster 6 7 import Hobbes.Utils 8 9 defmodule Workload do 10 @callback run(context :: map, opts :: keyword) :: {:ok, term} 11 end 12 13 defmodule Runner do 14 use GenServer 15 alias Trinity.SimServer 16 17 defmodule State do 18 @enforce_keys [:workload_module, :opts, :context, :results] 19 defstruct @enforce_keys 20 end 21 22 @spec start_link(module, keyword, map) :: GenServer.on_start 23 def start_link(workload_module, opts, context) do 24 SimServer.start_link(__MODULE__, {workload_module, opts, context}) 25 end 26 27 def run(server), do: SimServer.cast(server, :run) 28 29 def get_results(server), do: SimServer.call(server, :get_results, :infinity) 30 31 def init({workload_module, opts, context}) 32 when is_atom(workload_module) and is_list(opts) and is_map(context) do 33 {:ok, %State{ 34 workload_module: workload_module, 35 opts: opts, 36 context: context, 37 results: nil, 38 }} 39 end 40 41 def handle_call(:get_results, _from, %State{} = state) do 42 {:reply, state.results, state} 43 end 44 45 def handle_cast(:run, %State{} = state) do 46 results = state.workload_module.run(state.context, state.opts) 47 {:noreply, %State{state | results: results}} 48 end 49 end 50 51 @type workload_opts :: [ 52 simulated: boolean, 53 ] 54 55 @spec run([{module, keyword}], keyword) :: :ok 56 def run(workloads, opts \\ []) when is_list(workloads) do 57 {count, opts} = Keyword.pop(opts, :count, 1) 58 {seed, opts} = Keyword.pop(opts, :seed, nil) 59 60 seeds = 61 if seed do 62 [seed] 63 else 64 100..(100 + count - 1) 65 end 66 67 {:ok, supervisor} = Task.Supervisor.start_link() 68 69 results = 70 Enum.map(seeds, fn seed -> 71 Task.Supervisor.async_nolink(supervisor, fn -> 72 if opts[:simulated] do 73 Sim.run_simulation(fn -> 74 run_one(seed, workloads, opts) 75 end, seed: seed) 76 else 77 run_one(seed, workloads, opts) 78 end 79 end) 80 end) 81 |> Task.yield_many(:infinity) 82 |> Enum.map(fn {_task, result} -> result end) 83 |> Enum.zip(seeds) 84 85 {succeeded, failed} = Enum.split_with(results, fn 86 {{:ok, _value}, _seed} -> true 87 {{:exit, _error}, _seed} -> false 88 end) 89 90 succeeded_message = 91 succeeded 92 |> Enum.map(fn {{:ok, {:ok, result}}, seed} -> 93 """ 94 Seed #{seed} succeeded with results: 95 #{indent(result.message)}\ 96 """ 97 end) 98 |> Enum.join("\n") 99 100 failed_message = 101 failed 102 |> Enum.map(fn 103 {{:exit, {:timeout, _mfa} = reason}, seed} -> 104 """ 105 - Seed #{inspect(seed)} timed out 106 #{Exception.format_exit(reason) |> color(:red) |> indent()}\ 107 """ 108 109 {{:exit, {exception, stacktrace}}, seed} -> 110 """ 111 - Seed #{inspect(seed)} failed with error: 112 #{Exception.format_banner(:error, exception, stacktrace) |> color(:red) |> indent()} 113 #{"stacktrace:" |> color(:cyan) |> indent()} 114 #{Exception.format_stacktrace(stacktrace)} 115 """ 116 117 {{:exit, reason}, seed} -> 118 """ 119 - Seed #{inspect(seed)} failed with reason: 120 #{inspect(reason) |> color(:red)} 121 """ 122 end) 123 |> Enum.join("\n") 124 125 num_failed = length(failed) 126 127 header = """ 128 129 Results for test #{inspect(opts[:name] || "Untitled")} 130 131 #{length(results)} test workloads, #{num_failed} failures\ 132 """ 133 134 Logger.info """ 135 #{color(header, :cyan)} 136 137 #{(if num_failed > 0, do: failed_message, else: succeeded_message) |> indent()}\ 138 """ 139 140 if num_failed > 0, do: raise "At least one workload failed!" 141 142 :ok 143 end 144 145 def run_one(_seed, workloads, opts \\ []) do 146 {cluster_opts, _opts} = Keyword.pop(opts, :cluster_opts, []) 147 148 # Use a distributed cluster in simulation 149 cluster_opts = Keyword.put(cluster_opts, :distributed, Sim.simulated?()) 150 {:ok, coordinator_pids} = Hobbes.Sandbox.start_cluster(cluster_opts) 151 152 {:ok, %Cluster{} = cluster} = backoff(nil, fn _acc -> 153 case Hobbes.get_cluster(coordinator_pids) do 154 {:ok, _cluster} = result -> {:halt, result} 155 {:error, _err} -> {:cont, nil} 156 end 157 end, 20) 158 159 context = %{cluster: cluster} 160 161 runners = Enum.map(workloads, fn {module, opts} when is_atom(module) and is_list(opts) -> 162 {:ok, pid} = Runner.start_link(module, opts, context) 163 pid 164 end) 165 166 Enum.each(runners, &Runner.run/1) 167 results = Enum.map(runners, &Runner.get_results/1) 168 169 result_logs = 170 Enum.zip(workloads, results) 171 |> Enum.map(fn {{module, opts}, {:ok, message}} when is_binary(message) -> 172 """ 173 - #{inspect(module)} with opts #{inspect(opts)} 174 #{message |> color(:light_black) |> indent(4)}\ 175 """ 176 end) 177 |> Enum.join("\n\n") 178 179 {hash, log_size} = 180 case Sim.simulated?() do 181 true -> {SimLogger.get_hash, SimLogger.get_log_size()} 182 false -> {0, 0} 183 end 184 #Logger.debug SimLogger.log_tail(10_000) |> Enum.join("\n") 185 186 readable_hash = 187 :crypto.hash(:sha256, <<hash::integer-32>>) 188 |> Base.encode32(padding: false) 189 |> String.slice(0, 8) 190 191 message = """ 192 Logged #{log_size} events with hash #{hash} 193 SHA256: #{readable_hash} 194 195 #{result_logs} 196 """ 197 {:ok, %{message: message}} 198 end 199 200 defp indent(string, spaces \\ 2) when is_binary(string) and is_integer(spaces) do 201 whitespace = String.duplicate(" ", spaces) 202 whitespace <> String.replace(string, "\n", "\n" <> whitespace) 203 end 204 205 defp color(string, color) when is_atom(color) do 206 IO.ANSI.format([color, string]) 207 |> IO.iodata_to_binary() 208 end 209 210 def table(rows, opts \\ []) do 211 cols = Keyword.get_lazy(opts, :cols, fn -> 212 Enum.map(1..length(hd(rows)), &Integer.to_string/1) 213 end) 214 215 align = Keyword.get_lazy(opts, :align, fn -> 216 Enum.map(1..length(hd(rows)), fn _i -> :left end) 217 end) 218 219 limit = case Keyword.get(opts, :limit, :infinity) do 220 :infinity -> :infinity 221 # +3 for header 222 limit when is_integer(limit) and limit > 0 -> limit + 3 223 end 224 225 rows = [cols | rows] 226 col_sizes = Enum.map(0..(length(hd(rows)) - 1), fn c_i -> 227 rows 228 |> Enum.map(fn r -> Enum.at(r, c_i) |> String.length() end) 229 |> Enum.max() 230 end) 231 col_meta = Enum.zip(col_sizes, align) 232 233 rows 234 |> Enum.map(fn r -> 235 r 236 |> Enum.zip(col_meta) 237 |> Enum.map(fn {v, {c_size, c_align}} -> 238 case c_align do 239 :left -> String.pad_trailing(v, c_size) 240 :right -> String.pad_leading(v, c_size) 241 end 242 end) 243 |> Enum.join(" | ") 244 end) 245 |> then(fn [header | rest] -> 246 sep = String.duplicate("-", String.length(header)) 247 [sep | [header | [sep | rest]]] 248 end) 249 |> then(fn rows -> 250 row_count = length(rows) 251 252 case row_count > limit do 253 true -> 254 rows = Enum.take(rows, limit) 255 Enum.join(rows, "\n") <> "\n#{row_count - limit} more rows..." 256 257 false -> 258 Enum.join(rows, "\n") 259 end 260 end) 261 end 262 263 def mean([]), do: 0 264 def mean(numbers), do: Enum.sum(numbers) / length(numbers) 265 266 def pretty_number(number) when is_integer(number) do 267 number 268 |> Integer.to_string() 269 |> commas() 270 end 271 272 def pretty_number(number) when is_float(number) do 273 string = :erlang.float_to_binary(number, decimals: 2) 274 [i, f] = String.split(string, ".") 275 commas(i) <> "." <> f 276 end 277 278 defp commas(number_string) when is_binary(number_string) do 279 number_string 280 |> String.to_charlist() 281 |> Enum.reverse() 282 |> Enum.chunk_every(3) 283 |> Enum.map(&Enum.reverse/1) 284 |> Enum.reverse() 285 |> Enum.join(",") 286 end 287end