this repo has no description
at master 211 lines 6.1 kB view raw
1defmodule Hobbes.Workloads.Model do 2 alias Hobbes.Transaction 3 alias Hobbes.Transaction.TxnState 4 alias Hobbes.Structs.Cluster 5 6 alias Hobbes.Workloads.Model.DatabaseModel 7 alias Hobbes.Workloads.Model.DatabaseModel.HistoryTxn 8 9 alias Trinity.{SimProcess, SimServer} 10 11 @behaviour Hobbes.Workloads.Workload 12 13 defmodule Client do 14 use GenServer 15 alias Trinity.{SimProcess, SimServer} 16 17 import Hobbes.Utils 18 19 defmodule State do 20 @enforce_keys [ 21 :cluster, 22 :tick_ms, 23 24 :history, 25 :stopped, 26 ] 27 defstruct @enforce_keys 28 end 29 30 def start_link(opts), do: SimServer.start_link(__MODULE__, opts) 31 32 def init(%{cluster: %Cluster{} = cluster, tick_ms: tick_ms}) do 33 state = %State{ 34 cluster: cluster, 35 tick_ms: tick_ms, 36 history: [], 37 stopped: false, 38 } 39 SimProcess.send self(), :tick 40 {:ok, state} 41 end 42 43 def handle_call(:stop, _from, %State{} = state) do 44 {:reply, %{history: state.history}, %{state | stopped: true}} 45 end 46 47 def handle_info(:tick, %State{stopped: true} = state), do: {:noreply, state} 48 49 def handle_info(:tick, %State{} = state) do 50 state = tick(state) 51 SimProcess.send_after self(), :tick, state.tick_ms 52 {:noreply, state} 53 end 54 55 defp tick(%State{} = state) do 56 reads = random_reads(state) 57 mutations = random_mutations(state) 58 59 with {:ok, %TxnState{} = txn} <- Transaction.new(state.cluster), 60 {:ok, {%TxnState{} = txn, read_results}} <- do_reads(txn, reads) 61 do 62 txn = add_mutations(txn, mutations) 63 64 # Ensure some transactions receive :transaction_too_old 65 if Enum.random(1..10) == 1 do 66 SimProcess.sleep(6_000) 67 end 68 69 case Transaction.commit(txn) do 70 {:ok, %TxnState{} = txn} -> 71 append_history(state, HistoryTxn.new(:committed, txn, read_results, mutations)) 72 73 {:error, {err, %TxnState{} = txn}} when err in [:transaction_too_old, :read_conflict] -> 74 append_history(state, HistoryTxn.new(err, txn, read_results, mutations)) 75 76 # TODO 77 {:error, :timeout} -> raise "Timeout not supported" 78 end 79 else 80 # Transaction.new() timed out 81 {:error, :timeout} -> state 82 83 # Read errors 84 # TODO: we still want to check these reads 85 {:error, {%TxnState{} = _txn, _results}} -> state 86 end 87 end 88 89 defp append_history(%State{} = state, %HistoryTxn{} = ht) do 90 %{state | history: [ht | state.history]} 91 end 92 93 @spec add_mutations(TxnState.t, list) :: TxnState.t 94 defp add_mutations(%TxnState{} = txn, mutations) do 95 Enum.reduce(mutations, txn, fn 96 {:write, key, value}, txn -> Transaction.write(txn, key, value) 97 {:clear_range, sk, ek}, txn -> Transaction.clear_range(txn, sk, ek) 98 end) 99 end 100 101 @spec do_reads(TxnState.t, [{binary, binary} | binary]) :: {:ok, {TxnState.t, list}} | {:error, {TxnState.t, list}} 102 defp do_reads(%TxnState{} = txn, reads) when is_list(reads) do 103 Enum.reduce_while(reads, {txn, []}, fn 104 {:read_range, {sk, ek}} = read, {txn, acc} -> 105 case Transaction.read_range(txn, sk, ek) do 106 {:ok, {pairs, txn}} -> 107 acc = [{read, pairs} | acc] 108 {:cont, {txn, acc}} 109 {:error, _err} -> 110 {:halt, {:error, {txn, acc}}} 111 end 112 113 {:read, keys} = read, {txn, acc} -> 114 case Transaction.read(txn, keys) do 115 {:ok, {result, txn}} -> 116 acc = [{read, result} | acc] 117 {:cont, {txn, acc}} 118 {:error, _err} -> 119 {:halt, {:error, {txn, acc}}} 120 end 121 end) 122 |> case do 123 {:error, {_txn, _acc}} = error -> 124 error 125 {%TxnState{} = txn, results} when is_list(results) -> 126 {:ok, {txn, results}} 127 end 128 end 129 130 defp random_mutations(%State{} = state) do 131 count = Enum.random(1..10) 132 Enum.map(1..count, fn _i -> 133 case Enum.random(1..4) do 134 1 -> 135 {sk, ek} = random_range(state) 136 {:clear_range, sk, ek} 137 138 # TODO: :clear 139 _ -> 140 {:write, random_key(state), random_key(state)} 141 end 142 end) 143 end 144 145 defp random_reads(%State{} = state) do 146 count = Enum.random([1, 1, 1, 2, 2, 2, 3, 3, 4, 10]) 147 Enum.map(1..count, fn _i -> 148 case Enum.random(1..4) do 149 1 -> 150 {:read_range, random_range(state)} 151 _ -> 152 key_count = Enum.random([1, 1, 1, 2, 2, 2, 3, 3, 4, 10]) 153 {:read, Enum.map(1..key_count, fn _i -> random_key(state) end)} 154 end 155 end) 156 end 157 158 defp random_range(%State{} = state) do 159 k1 = random_key(state) 160 k2 = random_key(state) 161 cond do 162 k1 < k2 -> {k1, k2} 163 k1 > k2 -> {k2, k1} 164 k1 == k2 -> {k1, next_key(k1)} 165 end 166 end 167 168 defp random_key(%State{} = _state) do 169 case Enum.random(1..10) do 170 1 -> "" 171 2 -> "\xFF" 172 _ -> Enum.random(1..100) |> Integer.to_string() |> String.pad_leading(3, "0") 173 end 174 end 175 end 176 177 def run(%{cluster: %Cluster{} = cluster}, opts) do 178 client_count = Keyword.get(opts, :clients, 10) 179 duration_ms = Keyword.get(opts, :duration_ms, 10_000) 180 client_tick_ms = Keyword.get(opts, :client_tick_ms, 100) 181 182 clients = 183 Enum.map(1..client_count, fn _i -> 184 {:ok, pid} = Client.start_link(%{cluster: cluster, tick_ms: client_tick_ms}) 185 pid 186 end) 187 188 SimProcess.sleep(duration_ms) 189 190 results = 191 clients 192 |> Enum.map(&SimServer.send_request(&1, :stop)) 193 |> Enum.map(&SimServer.receive_response(&1, 300_000)) 194 |> Enum.map(fn {:reply, reply} -> reply end) 195 196 history = 197 results 198 |> Enum.map(fn %{history: history} -> history end) 199 |> Enum.concat() 200 201 db_model = DatabaseModel.new() 202 DatabaseModel.validate_history(db_model, history) 203 204 { 205 :ok, 206 """ 207 Checked #{length(history)} transactions from #{client_count} clients. 208 """ 209 } 210 end 211end