this repo has no description
1defmodule Hobbes.Workloads.Model.DatabaseModel do
2 alias Hobbes.Transaction.TxnState
3 alias Hobbes.Structs.RangeResult
4 alias Hobbes.KV.TestKV
5 alias Hobbes.TestVersionMap
6
7 import Hobbes.Utils
8
9 alias Hobbes.Workloads.Model.DatabaseModel
10
11 import ExUnit.Assertions, only: [assert: 1]
12
13 defmodule HistoryTxn do
14 @enforce_keys [
15 :status,
16
17 :read_version,
18 :commit_version,
19 :batch_index,
20
21 :read_results,
22 :mutations,
23 ]
24 defstruct @enforce_keys
25
26 def new(status, %TxnState{} = txn, read_results, mutations)
27 when is_atom(status) and is_list(read_results) and is_list(mutations) do
28 %HistoryTxn{
29 status: status,
30
31 read_version: txn.read_version,
32 commit_version: txn.commit_version,
33 batch_index: txn.batch_index,
34
35 read_results: read_results,
36 mutations: mutations,
37 }
38 end
39 end
40
41 @type t :: %__MODULE__{
42 kv: TestKV.t,
43 vm: TestVersionMap.t,
44 }
45 @enforce_keys [:kv, :vm]
46 defstruct @enforce_keys
47
48 def new do
49 %DatabaseModel{
50 kv: TestKV.new(),
51 vm: TestVersionMap.new(),
52 }
53 end
54
55 def validate_history(%DatabaseModel{} = dm, history) when is_list(history) do
56 history =
57 Enum.sort_by(history, fn %HistoryTxn{} = ht ->
58 {ht.commit_version, ht.batch_index}
59 end)
60
61 Enum.reduce(history, dm, fn %HistoryTxn{} = txn, dm ->
62 validate_transaction(dm, txn)
63 end)
64 end
65
66 defp validate_transaction(%DatabaseModel{} = dm, %HistoryTxn{} = txn) do
67 :ok = validate_reads(dm.kv, txn)
68
69 validate_status(txn.status, dm, txn)
70 end
71
72 defp validate_status(:committed, %DatabaseModel{} = dm, %HistoryTxn{} = txn) do
73 commit_version = txn.commit_version
74 assert commit_version
75 # TODO: check for read conflict
76
77 kv =
78 Enum.reduce(txn.mutations, dm.kv, fn
79 {:write, k, v}, kv -> TestKV.put(kv, commit_version, k, v)
80 # TODO: clear
81 {:clear_range, sk, ek}, kv -> TestKV.delete_range(kv, commit_version, sk, ek)
82 end)
83
84 write_conflicts =
85 Enum.map(txn.mutations, fn
86 {:write, k, _v} -> k
87 # TODO: :clear
88 {:clear_range, sk, ek} -> {sk, ek}
89 end)
90 vm = TestVersionMap.add_writes(dm.vm, commit_version, write_conflicts)
91
92 %{dm | kv: kv, vm: vm}
93 end
94
95 defp validate_status(:read_conflict, %DatabaseModel{} = dm, %HistoryTxn{} = txn) do
96 reads =
97 Enum.map(txn.read_results, fn
98 {{:read, keys}, _results} -> keys
99 {{:read_range, {_sk, _ek} = range}, _result} -> [range]
100 end)
101 |> Enum.concat()
102
103 read_version = txn.read_version
104 vm = dm.vm
105
106 read_conflict? =
107 Enum.any?(reads, fn key_or_range ->
108 TestVersionMap.written_after?(vm, read_version, key_or_range)
109 end)
110
111 # TODO: this check will probably have to be disabled for sharded resolvers
112 if read_conflict? != true do
113 raise """
114 Transaction was rejected for :read_conflict but there was no read conflict!
115
116 Read version: #{inspect(txn.read_version)}
117 Commit version: #{inspect(txn.commit_version)}
118
119 Reads:
120 #{inspect(reads, pretty: true)}
121 """
122 end
123
124 dm
125 end
126
127 defp validate_status(:transaction_too_old, %DatabaseModel{} = dm, %HistoryTxn{} = txn) do
128 read_version_floor = txn.commit_version - mvcc_window()
129 allow? = txn.read_version > read_version_floor
130
131 if allow? != false do
132 raise """
133 Transaction was rejected for :transaction_too_old but the read version was not too old!
134
135 Read version: #{inspect(txn.read_version)}
136 RV floor: #{inspect(read_version_floor)}
137
138 Commit version: #{inspect(txn.commit_version)}
139 """
140 end
141
142 dm
143 end
144
145 defp validate_reads(%TestKV{} = kv, %HistoryTxn{} = txn) do
146 read_version = txn.read_version
147
148 Enum.each(txn.read_results, fn
149 {{:read, keys}, results} ->
150 Enum.each(keys, fn key ->
151 expected = Map.fetch!(results, key)
152 received = TestKV.get(kv, read_version, key)
153
154 if expected != received do
155 read_error(kv, txn, key, expected, received)
156 end
157 end)
158
159 {{:read_range, {sk, ek} = range}, expected} ->
160 %RangeResult{pairs: received} = TestKV.scan(kv, read_version, sk, ek)
161
162 if expected != received do
163 read_error(kv, txn, range, expected, received)
164 end
165 end)
166
167 :ok
168 end
169
170 defp read_error(%TestKV{} = _kv, %HistoryTxn{} = txn, read, expected, received) do
171 raise """
172 Error while validating transaction: read results do not match!
173
174 Read version: #{inspect(txn.read_version)}
175
176 Key/Range: #{inspect(read)}
177
178 Expected: #{inspect(expected, pretty: true)}
179 Received: #{inspect(received, pretty: true)}
180 """
181 end
182end