defmodule Hobbes.Workloads.Model.DatabaseModel do alias Hobbes.Transaction.TxnState alias Hobbes.Structs.RangeResult alias Hobbes.KV.TestKV alias Hobbes.TestVersionMap import Hobbes.Utils alias Hobbes.Workloads.Model.DatabaseModel import ExUnit.Assertions, only: [assert: 1] defmodule HistoryTxn do @enforce_keys [ :status, :read_version, :commit_version, :batch_index, :read_results, :mutations, ] defstruct @enforce_keys def new(status, %TxnState{} = txn, read_results, mutations) when is_atom(status) and is_list(read_results) and is_list(mutations) do %HistoryTxn{ status: status, read_version: txn.read_version, commit_version: txn.commit_version, batch_index: txn.batch_index, read_results: read_results, mutations: mutations, } end end @type t :: %__MODULE__{ kv: TestKV.t, vm: TestVersionMap.t, } @enforce_keys [:kv, :vm] defstruct @enforce_keys def new do %DatabaseModel{ kv: TestKV.new(), vm: TestVersionMap.new(), } end def validate_history(%DatabaseModel{} = dm, history) when is_list(history) do history = Enum.sort_by(history, fn %HistoryTxn{} = ht -> {ht.commit_version, ht.batch_index} end) Enum.reduce(history, dm, fn %HistoryTxn{} = txn, dm -> validate_transaction(dm, txn) end) end defp validate_transaction(%DatabaseModel{} = dm, %HistoryTxn{} = txn) do :ok = validate_reads(dm.kv, txn) validate_status(txn.status, dm, txn) end defp validate_status(:committed, %DatabaseModel{} = dm, %HistoryTxn{} = txn) do commit_version = txn.commit_version assert commit_version # TODO: check for read conflict kv = Enum.reduce(txn.mutations, dm.kv, fn {:write, k, v}, kv -> TestKV.put(kv, commit_version, k, v) # TODO: clear {:clear_range, sk, ek}, kv -> TestKV.delete_range(kv, commit_version, sk, ek) end) write_conflicts = Enum.map(txn.mutations, fn {:write, k, _v} -> k # TODO: :clear {:clear_range, sk, ek} -> {sk, ek} end) vm = TestVersionMap.add_writes(dm.vm, commit_version, write_conflicts) %{dm | kv: kv, vm: vm} end defp validate_status(:read_conflict, %DatabaseModel{} = dm, %HistoryTxn{} = txn) do reads = Enum.map(txn.read_results, fn {{:read, keys}, _results} -> keys {{:read_range, {_sk, _ek} = range}, _result} -> [range] end) |> Enum.concat() read_version = txn.read_version vm = dm.vm read_conflict? = Enum.any?(reads, fn key_or_range -> TestVersionMap.written_after?(vm, read_version, key_or_range) end) # TODO: this check will probably have to be disabled for sharded resolvers if read_conflict? != true do raise """ Transaction was rejected for :read_conflict but there was no read conflict! Read version: #{inspect(txn.read_version)} Commit version: #{inspect(txn.commit_version)} Reads: #{inspect(reads, pretty: true)} """ end dm end defp validate_status(:transaction_too_old, %DatabaseModel{} = dm, %HistoryTxn{} = txn) do read_version_floor = txn.commit_version - mvcc_window() allow? = txn.read_version > read_version_floor if allow? != false do raise """ Transaction was rejected for :transaction_too_old but the read version was not too old! Read version: #{inspect(txn.read_version)} RV floor: #{inspect(read_version_floor)} Commit version: #{inspect(txn.commit_version)} """ end dm end defp validate_reads(%TestKV{} = kv, %HistoryTxn{} = txn) do read_version = txn.read_version Enum.each(txn.read_results, fn {{:read, keys}, results} -> Enum.each(keys, fn key -> expected = Map.fetch!(results, key) received = TestKV.get(kv, read_version, key) if expected != received do read_error(kv, txn, key, expected, received) end end) {{:read_range, {sk, ek} = range}, expected} -> %RangeResult{pairs: received} = TestKV.scan(kv, read_version, sk, ek) if expected != received do read_error(kv, txn, range, expected, received) end end) :ok end defp read_error(%TestKV{} = _kv, %HistoryTxn{} = txn, read, expected, received) do raise """ Error while validating transaction: read results do not match! Read version: #{inspect(txn.read_version)} Key/Range: #{inspect(read)} Expected: #{inspect(expected, pretty: true)} Received: #{inspect(received, pretty: true)} """ end end