this repo has no description
1defmodule Hobbes.Servers.CommitBuffer do
2 use GenServer
3 alias Trinity.{SimProcess, SimServer, SimLogger}
4 require SimLogger
5
6 import ExUnit.Assertions, only: [assert: 1]
7
8 alias Hobbes.ShardTagMap
9 alias Hobbes.Structs.{Cluster, TLogGeneration, Server, ResolveBatch, CommitTxn, LogBatch}
10
11 alias Hobbes.Servers.{Sequencer, Resolver, TLog}
12
13 import Hobbes.Utils
14
15 # TODO: centralize types?
16 @type shard :: {binary, binary, {[integer], [pid]}}
17 @type commit_info :: %{commit_version: non_neg_integer, batch_index: non_neg_integer}
18
19 @flush_interval_ms 1
20 @commit_interval_us 1000
21 @empty_commit_interval_us 10_000
22
23 @max_buffer_size 300
24
25 defmodule State do
26 @type t :: %__MODULE__{
27 id: non_neg_integer,
28 cluster: Cluster.t,
29 shard_map: ShardTagMap.t,
30 last_committed_version: non_neg_integer,
31
32 buffer: list,
33 buffer_size: non_neg_integer,
34 last_commit_timestamp: integer,
35
36 storage_servers: map,
37 }
38
39 @enforce_keys [
40 :id,
41 :cluster,
42 :shard_map,
43 :last_committed_version,
44
45 :buffer,
46 :buffer_size,
47 :last_commit_timestamp,
48
49 :storage_servers,
50 ]
51 defstruct @enforce_keys
52 end
53
54 def start_link(arg), do: SimServer.start_link(__MODULE__, arg)
55
56 @doc ~S"""
57 Returns shards for a key or key range.
58
59 ## Examples
60
61 iex> get_shards(server, "foo")
62 [{"", "\xFF", [1, 2, 3]}]
63
64 iex> get_shards(server, {"foo", "zoo"})
65 [
66 {"", "goo", [1, 2, 3]},
67 {"goo", "\xFF", [4, 5, 6]},
68 ]
69
70 """
71 @spec get_shards(pid, binary | {binary, binary}) :: {:ok, [shard]}
72 def get_shards(server, key_or_range) when is_binary(key_or_range) or is_tuple(key_or_range) do
73 {:ok, [shards]} = get_shards_multi(server, [key_or_range])
74 {:ok, shards}
75 end
76
77 @doc ~S"""
78 Returns shards for each key or key range.
79
80 ## Examples
81
82 iex> get_shards_multi(server, ["foo", "bar"])
83 [
84 [{"f", "\xFF", [4, 5, 6]}],
85 [{"", "f", [1, 2, 3]}],
86 ]
87
88 """
89 @spec get_shards_multi(pid, [binary | {binary, binary}]) :: {:ok, [[shard]]} | {:error, :timeout}
90 def get_shards_multi(server, keys_or_ranges) when is_list(keys_or_ranges) do
91 try do
92 SimServer.call(server, {:get_shards, keys_or_ranges}, 1000)
93 catch
94 :exit, {:timeout, _} -> {:error, :timeout}
95 end
96 end
97
98 @spec commit(pid, CommitTxn.t) :: {:ok, commit_info} | {:error, :timeout | {:transaction_too_old | :read_conflict | :database_locked, commit_info}}
99 def commit(server, %CommitTxn{} = txn) do
100 try do
101 SimServer.call(server, {:commit, txn})
102 catch
103 :exit, {:timeout, _} -> {:error, :timeout}
104 end
105 end
106
107 def init(%{id: id, cluster: %Cluster{} = cluster, storage_teams_pairs: storage_teams_pairs, key_storage_pairs: key_storage_pairs}) do
108 %TLogGeneration{} = current_generation = hd(cluster.tlog_generations)
109 assert current_generation.generation == cluster.generation
110
111 state = %State{
112 id: id,
113 cluster: cluster,
114 shard_map: ShardTagMap.new(current_generation),
115 last_committed_version: 0,
116
117 buffer: [],
118 buffer_size: 0,
119 last_commit_timestamp: current_time(),
120
121 storage_servers: %{},
122 }
123
124 ShardTagMap.load_meta_pairs(state.shard_map, storage_teams_pairs)
125 ShardTagMap.load_meta_pairs(state.shard_map, key_storage_pairs)
126
127 SimProcess.send_after(self(), :flush, @flush_interval_ms)
128 {:ok, state}
129 end
130
131 def handle_call({:get_shards, keys_or_ranges}, _from, state) when is_list(keys_or_ranges) do
132 stm = state.shard_map
133 storage_servers = state.storage_servers
134
135 shard_lists =
136 Enum.map(keys_or_ranges, fn key_or_range ->
137 ShardTagMap.shards_for_key_or_range(stm, key_or_range)
138 |> Enum.map(fn {sk, ek, storage_server_ids} ->
139 storage_pids = Enum.map(storage_server_ids, &Map.get(storage_servers, &1))
140 {sk, ek, {storage_server_ids, storage_pids}}
141 end)
142 end)
143
144 {:reply, {:ok, shard_lists}, state}
145 end
146
147 def handle_call({:commit, %CommitTxn{} = txn}, from, state) do
148 txn = %{txn | from: from, batch_index: state.buffer_size}
149 state = %{state | buffer: [txn | state.buffer], buffer_size: state.buffer_size + 1}
150
151 state = maybe_commit_batch(state)
152 {:noreply, state}
153 end
154
155 def handle_info(:flush, %State{} = state) do
156 state = maybe_commit_batch(state)
157 SimProcess.send_after(self(), :flush, @flush_interval_ms)
158 {:noreply, state}
159 end
160
161 def handle_info({:update_cluster, %Cluster{} = cluster}, %State{} = state) do
162 cond do
163 cluster.generation < state.cluster.generation -> {:noreply, state}
164 cluster.generation == state.cluster.generation -> {:noreply, %{state | cluster: cluster}}
165 cluster.generation > state.cluster.generation -> exit(:shutdown)
166 end
167 end
168
169 def handle_info({:update_storage_servers, storage_map}, %State{} = state) when is_map(storage_map) do
170 # Validate for sanity
171 Enum.each(storage_map, fn {id, pid} ->
172 # Note: an assert failure here would actually be nondeterministic WRT which pair triggers it but it doesn't really matter
173 assert is_integer(id)
174 assert is_pid(pid)
175 end)
176
177 {:noreply, %{state | storage_servers: storage_map}}
178 end
179
180 defp maybe_commit_batch(%State{} = state) do
181 buffer_size = state.buffer_size
182 elapsed_us = current_time() - state.last_commit_timestamp
183
184 cond do
185 buffer_size >= @max_buffer_size ->
186 commit_batch(state)
187
188 buffer_size > 0 and elapsed_us >= @commit_interval_us ->
189 commit_batch(state)
190
191 elapsed_us >= @empty_commit_interval_us ->
192 commit_batch(state)
193
194 true -> state
195 end
196 end
197
198 # If we have not recovered there are not yet any TLogs to write to
199 defp commit_batch(%State{} = state) when state.cluster.status != :normal do
200 state
201 end
202
203 defp commit_batch(%State{} = state) do
204 assert state.cluster.status == :normal
205
206 [%Server{pid: seq_pid}] = get_servers(state.cluster, Hobbes.Servers.Sequencer)
207
208 {commit_version, prev_commit_version} =
209 case Sequencer.get_commit_version(seq_pid) do
210 {:ok, {_cv, _pcv} = versions} ->
211 versions
212 {:error, :timeout} ->
213 SimLogger.debug "CommitBuffer (id=#{state.id}, generation=#{state.cluster.generation})): failed to get commit version, exiting"
214 exit(:shutdown)
215 end
216
217 transactions_reversed = state.buffer
218
219 # Filter transactions which started outside the mvcc window
220 read_version_floor = commit_version - mvcc_window()
221 {transactions_reversed, old_transactions} =
222 Enum.split_with(transactions_reversed, fn %CommitTxn{} = txn ->
223 # TODO: maybe validate instead of asserting?
224 assert is_integer(txn.read_version) or txn.read_version == :write_only
225
226 # TODO: make sure this is not off by one anywhere (e.g. WRT resolvers, storage)
227 # Note: (:write_only > integer, so :write_only transactions always pass)
228 txn.read_version > read_version_floor
229 end)
230
231 # Buffer is reversed, so these are in the correct order
232 resolver_txns =
233 Enum.reduce(transactions_reversed, [], fn %CommitTxn{} = txn, acc ->
234 [{
235 txn.read_version,
236 txn.read_conflicts,
237 txn.write_conflicts,
238 Enum.filter(txn.mutations, &meta_mutation?/1),
239 } | acc]
240 end)
241
242 batch = %ResolveBatch{
243 commit_buffer_id: state.id,
244 commit_version: commit_version,
245 prev_commit_version: prev_commit_version,
246 transactions: resolver_txns,
247 }
248
249 [%Server{pid: resolver_pid}] = get_servers(state.cluster, Hobbes.Servers.Resolver)
250 {txn_results_reversed, meta_log} =
251 case Resolver.resolve_batch(resolver_pid, batch) do
252 {:ok, result} ->
253 result
254 {:error, :timeout} ->
255 SimLogger.debug "CommitBuffer (id=#{state.id}, generation=#{state.cluster.generation})): failed to resolve batch, exiting"
256 exit(:shutdown)
257 end
258
259 # Apply meta mutations received, including our own from this batch
260 Enum.each(meta_log, fn {_commit_version, mutations} ->
261 ShardTagMap.apply_metadata_mutations(state.shard_map, mutations)
262 end)
263
264 {allowed_transactions, rejected_transactions} =
265 # Both are reversed, so they come out of this the right way around
266 Enum.zip(transactions_reversed, txn_results_reversed)
267 |> Enum.reduce({[], []}, fn
268 {txn, true}, {a, r} -> {[txn | a], r}
269 {txn, false}, {a, r} -> {a, [txn | r]}
270 end)
271
272 # If the database is locked, filter out all non-meta transactions
273 # TODO: use a lock-aware flag instead like FDB
274 # TODO: removed for now, bring back later without MetaStore
275 #{allowed_transactions, locked_transactions} =
276 # case MetaStore.locked?(state.meta_store, commit_version) do
277 # true -> Enum.split_with(allowed_transactions, fn txn -> has_meta?(txn.mutations) end)
278 # false -> {allowed_transactions, []}
279 # end
280 locked_transactions = []
281
282 # Add storage tags to each mutation (including special meta tag for meta mutations)
283 tlog_mutations =
284 allowed_transactions
285 |> Enum.map(fn %CommitTxn{} = txn -> txn.mutations end)
286 |> Enum.concat()
287 |> then(fn mutations ->
288 mutations ++ compute_special_mutations(mutations)
289 end)
290 |> then(fn mutations ->
291 ShardTagMap.tag_and_slice_mutations(state.shard_map, mutations)
292 end)
293
294 tlog_ids = hd(state.cluster.tlog_generations).tlog_ids
295
296 # Send sliced mutations to each tlog
297 tlog_ids
298 |> Enum.map(fn tlog_id ->
299 tagged_mutations = Map.fetch!(tlog_mutations, tlog_id)
300
301 log_batch = %LogBatch{
302 commit_buffer_id: state.id,
303 commit_version: commit_version,
304 prev_commit_version: prev_commit_version,
305 tagged_mutations: tagged_mutations,
306 last_committed_version: state.last_committed_version,
307 }
308
309 %Server{pid: tlog_pid} = Map.fetch!(state.cluster.servers, tlog_id)
310 TLog.write_batch_send(tlog_pid, log_batch)
311 end)
312 |> Enum.each(fn req_id ->
313 case TLog.write_batch_receive(req_id) do
314 :ok -> :noop
315 # If a commit fails, we trigger recovery (or are already recovering)
316 {:error, err} when err in [:tlog_locked, :timeout] ->
317 SimLogger.debug "CommitBuffer (id=#{state.id}, generation=#{state.cluster.generation})): failed to write batch to TLog (error=#{err}), exiting"
318 exit(:shutdown)
319 end
320 end)
321
322 # Once all tlogs have replied (made durable), notify sequencer this version is committed
323 case Sequencer.notify_committed(seq_pid, commit_version) do
324 :ok -> :noop
325 {:error, :timeout} ->
326 SimLogger.debug "CommitBuffer (id=#{state.id}, generation=#{state.cluster.generation})): failed to notify sequencer of commit, exiting"
327 exit(:shutdown)
328 end
329
330 # Reply to clients
331
332 Enum.each(allowed_transactions, fn %CommitTxn{} = txn ->
333 SimServer.reply(txn.from, {:ok, %{commit_version: commit_version, batch_index: txn.batch_index}})
334 end)
335
336 Enum.each(locked_transactions, fn %CommitTxn{} = txn ->
337 # TODO: use a different error message
338 SimServer.reply(txn.from, {:error, {:database_locked, %{commit_version: commit_version, batch_index: txn.batch_index}}})
339 end)
340
341 Enum.each(old_transactions, fn %CommitTxn{} = txn ->
342 SimServer.reply(txn.from, {:error, {:transaction_too_old, %{commit_version: commit_version, batch_index: txn.batch_index}}})
343 end)
344
345 Enum.each(rejected_transactions, fn %CommitTxn{} = txn ->
346 SimServer.reply(txn.from, {:error, {:read_conflict, %{commit_version: commit_version, batch_index: txn.batch_index}}})
347 end)
348
349 case length(allowed_transactions) do
350 0 -> SimLogger.debug "CommitBuffer (id=#{state.id}) committed empty batch"
351 batch_size -> SimLogger.debug "CommitBuffer (id=#{state.id}) committed batch (batch_size=#{batch_size} commit_version=#{commit_version})"
352 end
353
354 %{state |
355 last_committed_version: commit_version,
356 buffer: [],
357 buffer_size: 0,
358 last_commit_timestamp: current_time(),
359 }
360 end
361end