this repo has no description
1defmodule Hobbes.Workloads.Model do
2 alias Hobbes.Transaction
3 alias Hobbes.Transaction.TxnState
4 alias Hobbes.Structs.Cluster
5
6 alias Hobbes.Workloads.Model.DatabaseModel
7 alias Hobbes.Workloads.Model.DatabaseModel.HistoryTxn
8
9 alias Trinity.{SimProcess, SimServer}
10
11 @behaviour Hobbes.Workloads.Workload
12
13 defmodule Client do
14 use GenServer
15 alias Trinity.{SimProcess, SimServer}
16
17 import Hobbes.Utils
18
19 defmodule State do
20 @enforce_keys [
21 :cluster,
22 :tick_ms,
23
24 :history,
25 :stopped,
26 ]
27 defstruct @enforce_keys
28 end
29
30 def start_link(opts), do: SimServer.start_link(__MODULE__, opts)
31
32 def init(%{cluster: %Cluster{} = cluster, tick_ms: tick_ms}) do
33 state = %State{
34 cluster: cluster,
35 tick_ms: tick_ms,
36 history: [],
37 stopped: false,
38 }
39 SimProcess.send self(), :tick
40 {:ok, state}
41 end
42
43 def handle_call(:stop, _from, %State{} = state) do
44 {:reply, %{history: state.history}, %{state | stopped: true}}
45 end
46
47 def handle_info(:tick, %State{stopped: true} = state), do: {:noreply, state}
48
49 def handle_info(:tick, %State{} = state) do
50 state = tick(state)
51 SimProcess.send_after self(), :tick, state.tick_ms
52 {:noreply, state}
53 end
54
55 defp tick(%State{} = state) do
56 reads = random_reads(state)
57 mutations = random_mutations(state)
58
59 with {:ok, %TxnState{} = txn} <- Transaction.new(state.cluster),
60 {:ok, {%TxnState{} = txn, read_results}} <- do_reads(txn, reads)
61 do
62 txn = add_mutations(txn, mutations)
63
64 # Ensure some transactions receive :transaction_too_old
65 if Enum.random(1..10) == 1 do
66 SimProcess.sleep(6_000)
67 end
68
69 case Transaction.commit(txn) do
70 {:ok, %TxnState{} = txn} ->
71 append_history(state, HistoryTxn.new(:committed, txn, read_results, mutations))
72
73 {:error, {err, %TxnState{} = txn}} when err in [:transaction_too_old, :read_conflict] ->
74 append_history(state, HistoryTxn.new(err, txn, read_results, mutations))
75
76 # TODO
77 {:error, :timeout} -> raise "Timeout not supported"
78 end
79 else
80 # Transaction.new() timed out
81 {:error, :timeout} -> state
82
83 # Read errors
84 # TODO: we still want to check these reads
85 {:error, {%TxnState{} = _txn, _results}} -> state
86 end
87 end
88
89 defp append_history(%State{} = state, %HistoryTxn{} = ht) do
90 %{state | history: [ht | state.history]}
91 end
92
93 @spec add_mutations(TxnState.t, list) :: TxnState.t
94 defp add_mutations(%TxnState{} = txn, mutations) do
95 Enum.reduce(mutations, txn, fn
96 {:write, key, value}, txn -> Transaction.write(txn, key, value)
97 {:clear_range, sk, ek}, txn -> Transaction.clear_range(txn, sk, ek)
98 end)
99 end
100
101 @spec do_reads(TxnState.t, [{binary, binary} | binary]) :: {:ok, {TxnState.t, list}} | {:error, {TxnState.t, list}}
102 defp do_reads(%TxnState{} = txn, reads) when is_list(reads) do
103 Enum.reduce_while(reads, {txn, []}, fn
104 {:read_range, {sk, ek}} = read, {txn, acc} ->
105 case Transaction.read_range(txn, sk, ek) do
106 {:ok, {pairs, txn}} ->
107 acc = [{read, pairs} | acc]
108 {:cont, {txn, acc}}
109 {:error, _err} ->
110 {:halt, {:error, {txn, acc}}}
111 end
112
113 {:read, keys} = read, {txn, acc} ->
114 case Transaction.read(txn, keys) do
115 {:ok, {result, txn}} ->
116 acc = [{read, result} | acc]
117 {:cont, {txn, acc}}
118 {:error, _err} ->
119 {:halt, {:error, {txn, acc}}}
120 end
121 end)
122 |> case do
123 {:error, {_txn, _acc}} = error ->
124 error
125 {%TxnState{} = txn, results} when is_list(results) ->
126 {:ok, {txn, results}}
127 end
128 end
129
130 defp random_mutations(%State{} = state) do
131 count = Enum.random(1..10)
132 Enum.map(1..count, fn _i ->
133 case Enum.random(1..4) do
134 1 ->
135 {sk, ek} = random_range(state)
136 {:clear_range, sk, ek}
137
138 # TODO: :clear
139 _ ->
140 {:write, random_key(state), random_key(state)}
141 end
142 end)
143 end
144
145 defp random_reads(%State{} = state) do
146 count = Enum.random([1, 1, 1, 2, 2, 2, 3, 3, 4, 10])
147 Enum.map(1..count, fn _i ->
148 case Enum.random(1..4) do
149 1 ->
150 {:read_range, random_range(state)}
151 _ ->
152 key_count = Enum.random([1, 1, 1, 2, 2, 2, 3, 3, 4, 10])
153 {:read, Enum.map(1..key_count, fn _i -> random_key(state) end)}
154 end
155 end)
156 end
157
158 defp random_range(%State{} = state) do
159 k1 = random_key(state)
160 k2 = random_key(state)
161 cond do
162 k1 < k2 -> {k1, k2}
163 k1 > k2 -> {k2, k1}
164 k1 == k2 -> {k1, next_key(k1)}
165 end
166 end
167
168 defp random_key(%State{} = _state) do
169 case Enum.random(1..10) do
170 1 -> ""
171 2 -> "\xFF"
172 _ -> Enum.random(1..100) |> Integer.to_string() |> String.pad_leading(3, "0")
173 end
174 end
175 end
176
177 def run(%{cluster: %Cluster{} = cluster}, opts) do
178 client_count = Keyword.get(opts, :clients, 10)
179 duration_ms = Keyword.get(opts, :duration_ms, 10_000)
180 client_tick_ms = Keyword.get(opts, :client_tick_ms, 100)
181
182 clients =
183 Enum.map(1..client_count, fn _i ->
184 {:ok, pid} = Client.start_link(%{cluster: cluster, tick_ms: client_tick_ms})
185 pid
186 end)
187
188 SimProcess.sleep(duration_ms)
189
190 results =
191 clients
192 |> Enum.map(&SimServer.send_request(&1, :stop))
193 |> Enum.map(&SimServer.receive_response(&1, 300_000))
194 |> Enum.map(fn {:reply, reply} -> reply end)
195
196 history =
197 results
198 |> Enum.map(fn %{history: history} -> history end)
199 |> Enum.concat()
200
201 db_model = DatabaseModel.new()
202 DatabaseModel.validate_history(db_model, history)
203
204 {
205 :ok,
206 """
207 Checked #{length(history)} transactions from #{client_count} clients.
208 """
209 }
210 end
211end