defmodule Hobbes.Workloads do require Logger alias Trinity.{Sim, SimServer, SimLogger} alias Hobbes.Structs.Cluster import Hobbes.Utils defmodule Workload do @callback run(context :: map, opts :: keyword) :: {:ok, term} end defmodule Runner do use GenServer alias Trinity.SimServer defmodule State do @enforce_keys [:workload_module, :opts, :context, :results] defstruct @enforce_keys end @spec start_link(module, keyword, map) :: GenServer.on_start def start_link(workload_module, opts, context) do SimServer.start_link(__MODULE__, {workload_module, opts, context}) end def run(server), do: SimServer.cast(server, :run) def get_results(server), do: SimServer.call(server, :get_results, :infinity) def init({workload_module, opts, context}) when is_atom(workload_module) and is_list(opts) and is_map(context) do {:ok, %State{ workload_module: workload_module, opts: opts, context: context, results: nil, }} end def handle_call(:get_results, _from, %State{} = state) do {:reply, state.results, state} end def handle_cast(:run, %State{} = state) do results = state.workload_module.run(state.context, state.opts) {:noreply, %State{state | results: results}} end end @type workload_opts :: [ simulated: boolean, ] @spec run([{module, keyword}], keyword) :: :ok def run(workloads, opts \\ []) when is_list(workloads) do {count, opts} = Keyword.pop(opts, :count, 1) {seed, opts} = Keyword.pop(opts, :seed, nil) seeds = if seed do [seed] else 100..(100 + count - 1) end {:ok, supervisor} = Task.Supervisor.start_link() results = Enum.map(seeds, fn seed -> Task.Supervisor.async_nolink(supervisor, fn -> if opts[:simulated] do Sim.run_simulation(fn -> run_one(seed, workloads, opts) end, seed: seed) else run_one(seed, workloads, opts) end end) end) |> Task.yield_many(:infinity) |> Enum.map(fn {_task, result} -> result end) |> Enum.zip(seeds) {succeeded, failed} = Enum.split_with(results, fn {{:ok, _value}, _seed} -> true {{:exit, _error}, _seed} -> false end) succeeded_message = succeeded |> Enum.map(fn {{:ok, {:ok, result}}, seed} -> """ Seed #{seed} succeeded with results: #{indent(result.message)}\ """ end) |> Enum.join("\n") failed_message = failed |> Enum.map(fn {{:exit, {:timeout, _mfa} = reason}, seed} -> """ - Seed #{inspect(seed)} timed out #{Exception.format_exit(reason) |> color(:red) |> indent()}\ """ {{:exit, {exception, stacktrace}}, seed} -> """ - Seed #{inspect(seed)} failed with error: #{Exception.format_banner(:error, exception, stacktrace) |> color(:red) |> indent()} #{"stacktrace:" |> color(:cyan) |> indent()} #{Exception.format_stacktrace(stacktrace)} """ {{:exit, reason}, seed} -> """ - Seed #{inspect(seed)} failed with reason: #{inspect(reason) |> color(:red)} """ end) |> Enum.join("\n") num_failed = length(failed) header = """ Results for test #{inspect(opts[:name] || "Untitled")} #{length(results)} test workloads, #{num_failed} failures\ """ Logger.info """ #{color(header, :cyan)} #{(if num_failed > 0, do: failed_message, else: succeeded_message) |> indent()}\ """ if num_failed > 0, do: raise "At least one workload failed!" :ok end def run_one(_seed, workloads, opts \\ []) do {cluster_opts, _opts} = Keyword.pop(opts, :cluster_opts, []) # Use a distributed cluster in simulation cluster_opts = Keyword.put(cluster_opts, :distributed, Sim.simulated?()) {:ok, coordinator_pids} = Hobbes.Sandbox.start_cluster(cluster_opts) {:ok, %Cluster{} = cluster} = backoff(nil, fn _acc -> case Hobbes.get_cluster(coordinator_pids) do {:ok, _cluster} = result -> {:halt, result} {:error, _err} -> {:cont, nil} end end, 20) context = %{cluster: cluster} runners = Enum.map(workloads, fn {module, opts} when is_atom(module) and is_list(opts) -> {:ok, pid} = Runner.start_link(module, opts, context) pid end) Enum.each(runners, &Runner.run/1) results = Enum.map(runners, &Runner.get_results/1) result_logs = Enum.zip(workloads, results) |> Enum.map(fn {{module, opts}, {:ok, message}} when is_binary(message) -> """ - #{inspect(module)} with opts #{inspect(opts)} #{message |> color(:light_black) |> indent(4)}\ """ end) |> Enum.join("\n\n") {hash, log_size} = case Sim.simulated?() do true -> {SimLogger.get_hash, SimLogger.get_log_size()} false -> {0, 0} end #Logger.debug SimLogger.log_tail(10_000) |> Enum.join("\n") readable_hash = :crypto.hash(:sha256, <>) |> Base.encode32(padding: false) |> String.slice(0, 8) message = """ Logged #{log_size} events with hash #{hash} SHA256: #{readable_hash} #{result_logs} """ {:ok, %{message: message}} end defp indent(string, spaces \\ 2) when is_binary(string) and is_integer(spaces) do whitespace = String.duplicate(" ", spaces) whitespace <> String.replace(string, "\n", "\n" <> whitespace) end defp color(string, color) when is_atom(color) do IO.ANSI.format([color, string]) |> IO.iodata_to_binary() end def table(rows, opts \\ []) do cols = Keyword.get_lazy(opts, :cols, fn -> Enum.map(1..length(hd(rows)), &Integer.to_string/1) end) align = Keyword.get_lazy(opts, :align, fn -> Enum.map(1..length(hd(rows)), fn _i -> :left end) end) limit = case Keyword.get(opts, :limit, :infinity) do :infinity -> :infinity # +3 for header limit when is_integer(limit) and limit > 0 -> limit + 3 end rows = [cols | rows] col_sizes = Enum.map(0..(length(hd(rows)) - 1), fn c_i -> rows |> Enum.map(fn r -> Enum.at(r, c_i) |> String.length() end) |> Enum.max() end) col_meta = Enum.zip(col_sizes, align) rows |> Enum.map(fn r -> r |> Enum.zip(col_meta) |> Enum.map(fn {v, {c_size, c_align}} -> case c_align do :left -> String.pad_trailing(v, c_size) :right -> String.pad_leading(v, c_size) end end) |> Enum.join(" | ") end) |> then(fn [header | rest] -> sep = String.duplicate("-", String.length(header)) [sep | [header | [sep | rest]]] end) |> then(fn rows -> row_count = length(rows) case row_count > limit do true -> rows = Enum.take(rows, limit) Enum.join(rows, "\n") <> "\n#{row_count - limit} more rows..." false -> Enum.join(rows, "\n") end end) end def mean([]), do: 0 def mean(numbers), do: Enum.sum(numbers) / length(numbers) def pretty_number(number) when is_integer(number) do number |> Integer.to_string() |> commas() end def pretty_number(number) when is_float(number) do string = :erlang.float_to_binary(number, decimals: 2) [i, f] = String.split(string, ".") commas(i) <> "." <> f end defp commas(number_string) when is_binary(number_string) do number_string |> String.to_charlist() |> Enum.reverse() |> Enum.chunk_every(3) |> Enum.map(&Enum.reverse/1) |> Enum.reverse() |> Enum.join(",") end end