this repo has no description
at master 182 lines 4.8 kB view raw
1defmodule Hobbes.Workloads.Model.DatabaseModel do 2 alias Hobbes.Transaction.TxnState 3 alias Hobbes.Structs.RangeResult 4 alias Hobbes.KV.TestKV 5 alias Hobbes.TestVersionMap 6 7 import Hobbes.Utils 8 9 alias Hobbes.Workloads.Model.DatabaseModel 10 11 import ExUnit.Assertions, only: [assert: 1] 12 13 defmodule HistoryTxn do 14 @enforce_keys [ 15 :status, 16 17 :read_version, 18 :commit_version, 19 :batch_index, 20 21 :read_results, 22 :mutations, 23 ] 24 defstruct @enforce_keys 25 26 def new(status, %TxnState{} = txn, read_results, mutations) 27 when is_atom(status) and is_list(read_results) and is_list(mutations) do 28 %HistoryTxn{ 29 status: status, 30 31 read_version: txn.read_version, 32 commit_version: txn.commit_version, 33 batch_index: txn.batch_index, 34 35 read_results: read_results, 36 mutations: mutations, 37 } 38 end 39 end 40 41 @type t :: %__MODULE__{ 42 kv: TestKV.t, 43 vm: TestVersionMap.t, 44 } 45 @enforce_keys [:kv, :vm] 46 defstruct @enforce_keys 47 48 def new do 49 %DatabaseModel{ 50 kv: TestKV.new(), 51 vm: TestVersionMap.new(), 52 } 53 end 54 55 def validate_history(%DatabaseModel{} = dm, history) when is_list(history) do 56 history = 57 Enum.sort_by(history, fn %HistoryTxn{} = ht -> 58 {ht.commit_version, ht.batch_index} 59 end) 60 61 Enum.reduce(history, dm, fn %HistoryTxn{} = txn, dm -> 62 validate_transaction(dm, txn) 63 end) 64 end 65 66 defp validate_transaction(%DatabaseModel{} = dm, %HistoryTxn{} = txn) do 67 :ok = validate_reads(dm.kv, txn) 68 69 validate_status(txn.status, dm, txn) 70 end 71 72 defp validate_status(:committed, %DatabaseModel{} = dm, %HistoryTxn{} = txn) do 73 commit_version = txn.commit_version 74 assert commit_version 75 # TODO: check for read conflict 76 77 kv = 78 Enum.reduce(txn.mutations, dm.kv, fn 79 {:write, k, v}, kv -> TestKV.put(kv, commit_version, k, v) 80 # TODO: clear 81 {:clear_range, sk, ek}, kv -> TestKV.delete_range(kv, commit_version, sk, ek) 82 end) 83 84 write_conflicts = 85 Enum.map(txn.mutations, fn 86 {:write, k, _v} -> k 87 # TODO: :clear 88 {:clear_range, sk, ek} -> {sk, ek} 89 end) 90 vm = TestVersionMap.add_writes(dm.vm, commit_version, write_conflicts) 91 92 %{dm | kv: kv, vm: vm} 93 end 94 95 defp validate_status(:read_conflict, %DatabaseModel{} = dm, %HistoryTxn{} = txn) do 96 reads = 97 Enum.map(txn.read_results, fn 98 {{:read, keys}, _results} -> keys 99 {{:read_range, {_sk, _ek} = range}, _result} -> [range] 100 end) 101 |> Enum.concat() 102 103 read_version = txn.read_version 104 vm = dm.vm 105 106 read_conflict? = 107 Enum.any?(reads, fn key_or_range -> 108 TestVersionMap.written_after?(vm, read_version, key_or_range) 109 end) 110 111 # TODO: this check will probably have to be disabled for sharded resolvers 112 if read_conflict? != true do 113 raise """ 114 Transaction was rejected for :read_conflict but there was no read conflict! 115 116 Read version: #{inspect(txn.read_version)} 117 Commit version: #{inspect(txn.commit_version)} 118 119 Reads: 120 #{inspect(reads, pretty: true)} 121 """ 122 end 123 124 dm 125 end 126 127 defp validate_status(:transaction_too_old, %DatabaseModel{} = dm, %HistoryTxn{} = txn) do 128 read_version_floor = txn.commit_version - mvcc_window() 129 allow? = txn.read_version > read_version_floor 130 131 if allow? != false do 132 raise """ 133 Transaction was rejected for :transaction_too_old but the read version was not too old! 134 135 Read version: #{inspect(txn.read_version)} 136 RV floor: #{inspect(read_version_floor)} 137 138 Commit version: #{inspect(txn.commit_version)} 139 """ 140 end 141 142 dm 143 end 144 145 defp validate_reads(%TestKV{} = kv, %HistoryTxn{} = txn) do 146 read_version = txn.read_version 147 148 Enum.each(txn.read_results, fn 149 {{:read, keys}, results} -> 150 Enum.each(keys, fn key -> 151 expected = Map.fetch!(results, key) 152 received = TestKV.get(kv, read_version, key) 153 154 if expected != received do 155 read_error(kv, txn, key, expected, received) 156 end 157 end) 158 159 {{:read_range, {sk, ek} = range}, expected} -> 160 %RangeResult{pairs: received} = TestKV.scan(kv, read_version, sk, ek) 161 162 if expected != received do 163 read_error(kv, txn, range, expected, received) 164 end 165 end) 166 167 :ok 168 end 169 170 defp read_error(%TestKV{} = _kv, %HistoryTxn{} = txn, read, expected, received) do 171 raise """ 172 Error while validating transaction: read results do not match! 173 174 Read version: #{inspect(txn.read_version)} 175 176 Key/Range: #{inspect(read)} 177 178 Expected: #{inspect(expected, pretty: true)} 179 Received: #{inspect(received, pretty: true)} 180 """ 181 end 182end