defmodule Hobbes.Workloads.Model do alias Hobbes.Transaction alias Hobbes.Transaction.TxnState alias Hobbes.Structs.Cluster alias Hobbes.Workloads.Model.DatabaseModel alias Hobbes.Workloads.Model.DatabaseModel.HistoryTxn alias Trinity.{SimProcess, SimServer} @behaviour Hobbes.Workloads.Workload defmodule Client do use GenServer alias Trinity.{SimProcess, SimServer} import Hobbes.Utils defmodule State do @enforce_keys [ :cluster, :tick_ms, :history, :stopped, ] defstruct @enforce_keys end def start_link(opts), do: SimServer.start_link(__MODULE__, opts) def init(%{cluster: %Cluster{} = cluster, tick_ms: tick_ms}) do state = %State{ cluster: cluster, tick_ms: tick_ms, history: [], stopped: false, } SimProcess.send self(), :tick {:ok, state} end def handle_call(:stop, _from, %State{} = state) do {:reply, %{history: state.history}, %{state | stopped: true}} end def handle_info(:tick, %State{stopped: true} = state), do: {:noreply, state} def handle_info(:tick, %State{} = state) do state = tick(state) SimProcess.send_after self(), :tick, state.tick_ms {:noreply, state} end defp tick(%State{} = state) do reads = random_reads(state) mutations = random_mutations(state) with {:ok, %TxnState{} = txn} <- Transaction.new(state.cluster), {:ok, {%TxnState{} = txn, read_results}} <- do_reads(txn, reads) do txn = add_mutations(txn, mutations) # Ensure some transactions receive :transaction_too_old if Enum.random(1..10) == 1 do SimProcess.sleep(6_000) end case Transaction.commit(txn) do {:ok, %TxnState{} = txn} -> append_history(state, HistoryTxn.new(:committed, txn, read_results, mutations)) {:error, {err, %TxnState{} = txn}} when err in [:transaction_too_old, :read_conflict] -> append_history(state, HistoryTxn.new(err, txn, read_results, mutations)) # TODO {:error, :timeout} -> raise "Timeout not supported" end else # Transaction.new() timed out {:error, :timeout} -> state # Read errors # TODO: we still want to check these reads {:error, {%TxnState{} = _txn, _results}} -> state end end defp append_history(%State{} = state, %HistoryTxn{} = ht) do %{state | history: [ht | state.history]} end @spec add_mutations(TxnState.t, list) :: TxnState.t defp add_mutations(%TxnState{} = txn, mutations) do Enum.reduce(mutations, txn, fn {:write, key, value}, txn -> Transaction.write(txn, key, value) {:clear_range, sk, ek}, txn -> Transaction.clear_range(txn, sk, ek) end) end @spec do_reads(TxnState.t, [{binary, binary} | binary]) :: {:ok, {TxnState.t, list}} | {:error, {TxnState.t, list}} defp do_reads(%TxnState{} = txn, reads) when is_list(reads) do Enum.reduce_while(reads, {txn, []}, fn {:read_range, {sk, ek}} = read, {txn, acc} -> case Transaction.read_range(txn, sk, ek) do {:ok, {pairs, txn}} -> acc = [{read, pairs} | acc] {:cont, {txn, acc}} {:error, _err} -> {:halt, {:error, {txn, acc}}} end {:read, keys} = read, {txn, acc} -> case Transaction.read(txn, keys) do {:ok, {result, txn}} -> acc = [{read, result} | acc] {:cont, {txn, acc}} {:error, _err} -> {:halt, {:error, {txn, acc}}} end end) |> case do {:error, {_txn, _acc}} = error -> error {%TxnState{} = txn, results} when is_list(results) -> {:ok, {txn, results}} end end defp random_mutations(%State{} = state) do count = Enum.random(1..10) Enum.map(1..count, fn _i -> case Enum.random(1..4) do 1 -> {sk, ek} = random_range(state) {:clear_range, sk, ek} # TODO: :clear _ -> {:write, random_key(state), random_key(state)} end end) end defp random_reads(%State{} = state) do count = Enum.random([1, 1, 1, 2, 2, 2, 3, 3, 4, 10]) Enum.map(1..count, fn _i -> case Enum.random(1..4) do 1 -> {:read_range, random_range(state)} _ -> key_count = Enum.random([1, 1, 1, 2, 2, 2, 3, 3, 4, 10]) {:read, Enum.map(1..key_count, fn _i -> random_key(state) end)} end end) end defp random_range(%State{} = state) do k1 = random_key(state) k2 = random_key(state) cond do k1 < k2 -> {k1, k2} k1 > k2 -> {k2, k1} k1 == k2 -> {k1, next_key(k1)} end end defp random_key(%State{} = _state) do case Enum.random(1..10) do 1 -> "" 2 -> "\xFF" _ -> Enum.random(1..100) |> Integer.to_string() |> String.pad_leading(3, "0") end end end def run(%{cluster: %Cluster{} = cluster}, opts) do client_count = Keyword.get(opts, :clients, 10) duration_ms = Keyword.get(opts, :duration_ms, 10_000) client_tick_ms = Keyword.get(opts, :client_tick_ms, 100) clients = Enum.map(1..client_count, fn _i -> {:ok, pid} = Client.start_link(%{cluster: cluster, tick_ms: client_tick_ms}) pid end) SimProcess.sleep(duration_ms) results = clients |> Enum.map(&SimServer.send_request(&1, :stop)) |> Enum.map(&SimServer.receive_response(&1, 300_000)) |> Enum.map(fn {:reply, reply} -> reply end) history = results |> Enum.map(fn %{history: history} -> history end) |> Enum.concat() db_model = DatabaseModel.new() DatabaseModel.validate_history(db_model, history) { :ok, """ Checked #{length(history)} transactions from #{client_count} clients. """ } end end