···103103 case acc || get_shards.() do
104104 {:ok, shards} = shard_result ->
105105 case do_multi_read(txn, keys, shards) do
106106+ # Successful read, return results immediately
106107 {:ok, _map} = read_result -> {:halt, read_result}
108108+ # Shard information is outdated, retry get_shards
107109 {:error, :wrong_server} = error -> {:cont, get_shards.(), error}
110110+ # These are retryable, retry with the same shards
108111 {:error, :timeout} = error -> {:cont, shard_result, error}
109112 {:error, :read_version_too_new} = error -> {:cont, shard_result, error}
113113+ # These are not retryable, halt immediately
110114 {:error, :read_version_too_old} = error -> {:halt, error}
111115 {:error, :wrong_generation} = error -> {:halt, error}
112116 end
···189193 case acc || get_ranges.() do
190194 {:ok, ranges} = acc ->
191195 case do_read_split_range(txn, ranges) do
196196+ # Successful read, return results immediately
192197 {:ok, _pairs} = result -> {:halt, result}
198198+ # Shard information is outdated, retry get_ranges
193199 {:error, :wrong_server} = error -> {:cont, nil, error}
200200+ # These are retryable, retry with the same shards
194201 {:error, :timeout} = error -> {:cont, acc, error}
195202 {:error, :read_version_too_new} = error -> {:cont, acc, error}
203203+ # These are not retryable, halt immediately
196204 {:error, :read_version_too_old} = error -> {:halt, error}
197205 {:error, :wrong_generation} = error -> {:halt, error}
198206 end
+9
lib/utils.ex
···376376 |> Enum.sort()
377377 end
378378379379+ @doc """
380380+ Retries a function `fun` with an accumulator.
381381+382382+ The function should return:
383383+ - `{:cont, acc, error}` - where `error` is the return value if we're out of attempts
384384+ - `{:halt, result}` - where `result` should be returned immediately
385385+386386+ The `acc` will be passed along to each invocation as long as `{:cont, ...}` is returned.
387387+ """
379388 @spec retry_acc(term, (term -> {:cont, term, term} | {:halt, term}), pos_integer) :: term
380389 def retry_acc(acc, fun, count \\ 6) when is_function(fun, 1) and is_integer(count) and count > 0 do
381390 do_retry_acc(acc, fun, count, 1)
+8-6
lib/workloads.ex
···149149 cluster_opts = Keyword.put(cluster_opts, :distributed, Sim.simulated?())
150150 {:ok, coordinator_pids} = Hobbes.Sandbox.start_cluster(cluster_opts)
151151152152- {:ok, %Cluster{} = cluster} = retry_acc(nil, fn _acc ->
153153- case Hobbes.get_cluster(coordinator_pids) do
154154- {:ok, _cluster} = result -> {:halt, result}
155155- {:error, _err} = error -> {:cont, nil, error}
156156- end
157157- end, 20)
152152+ # Keep retrying get_cluster until coordinators are connected
153153+ {:ok, %Cluster{} = cluster} =
154154+ retry_acc(nil, fn nil ->
155155+ case Hobbes.get_cluster(coordinator_pids) do
156156+ {:ok, _cluster} = result -> {:halt, result}
157157+ {:error, _err} = error -> {:cont, nil, error}
158158+ end
159159+ end, 20)
158160159161 context = %{cluster: cluster}
160162