open Types module Member = struct type t = { node : node_info; state : member_state Kcas.Loc.t; incarnation : incarnation Kcas.Loc.t; state_change_time : Mtime.span Kcas.Loc.t; last_ack_time : Mtime.span Kcas.Loc.t; } let create ?(initial_state : member_state = Types.Alive) ?(initial_incarnation = zero_incarnation) ~(now : Mtime.span) (node : node_info) = { node; state = Kcas.Loc.make initial_state; incarnation = Kcas.Loc.make initial_incarnation; state_change_time = Kcas.Loc.make now; last_ack_time = Kcas.Loc.make now; } let node t = t.node let get_state ~xt t = Kcas.Xt.get ~xt t.state let get_incarnation ~xt t = Kcas.Xt.get ~xt t.incarnation let get_state_change_time ~xt t = Kcas.Xt.get ~xt t.state_change_time let get_last_ack_time ~xt t = Kcas.Xt.get ~xt t.last_ack_time let set_state ~xt t state ~now = Kcas.Xt.set ~xt t.state state; Kcas.Xt.set ~xt t.state_change_time now let set_incarnation ~xt t inc = Kcas.Xt.set ~xt t.incarnation inc let set_alive ~xt t ~incarnation ~now = set_state ~xt t Alive ~now; set_incarnation ~xt t incarnation let set_suspect ~xt t ~incarnation ~now = set_state ~xt t Suspect ~now; set_incarnation ~xt t incarnation let set_dead ~xt t ~incarnation ~now = set_state ~xt t Dead ~now; set_incarnation ~xt t incarnation let record_ack ~xt t ~now = Kcas.Xt.set ~xt t.last_ack_time now let snapshot ~xt t : member_snapshot = { node = t.node; state = get_state ~xt t; incarnation = get_incarnation ~xt t; state_change = get_state_change_time ~xt t; } let snapshot_now t : member_snapshot = Kcas.Xt.commit { tx = (fun ~xt -> snapshot ~xt t) } end type t = { table : (string, Member.t) Kcas_data.Hashtbl.t; count : int Kcas.Loc.t; } let create () = { table = Kcas_data.Hashtbl.create (); count = Kcas.Loc.make 0 } let key_of_id (Node_id s) = s let add t (member : Member.t) = Kcas.Xt.commit { tx = (fun ~xt -> let key = key_of_id member.node.id in match Kcas_data.Hashtbl.Xt.find_opt ~xt t.table key with | Some _ -> () | None -> Kcas_data.Hashtbl.Xt.replace ~xt t.table key member; Kcas.Xt.modify ~xt t.count succ); } let remove t id = Kcas.Xt.commit { tx = (fun ~xt -> let key = key_of_id id in match Kcas_data.Hashtbl.Xt.find_opt ~xt t.table key with | None -> false | Some _ -> Kcas_data.Hashtbl.Xt.remove ~xt t.table key; Kcas.Xt.modify ~xt t.count pred; true); } let find t id = Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Hashtbl.Xt.find_opt ~xt t.table (key_of_id id)); } let mem t id = Kcas.Xt.commit { tx = (fun ~xt -> Option.is_some (Kcas_data.Hashtbl.Xt.find_opt ~xt t.table (key_of_id id))); } let to_list t = Kcas_data.Hashtbl.to_seq t.table |> Seq.map snd |> List.of_seq let to_node_list t = to_list t |> List.map Member.node let count t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.count) } type member_updater = { update : 'x. Member.t -> xt:'x Kcas.Xt.t -> unit } let update_member t id updater = match Kcas_data.Hashtbl.find_opt t.table (key_of_id id) with | None -> false | Some member -> Kcas.Xt.commit { tx = (fun ~xt -> updater.update member ~xt) }; true let iter_alive t f = to_list t |> List.iter (fun m -> let snap = Member.snapshot_now m in if snap.state = Alive then f m snap) let snapshot_all t = to_list t |> List.map Member.snapshot_now