module Types = Types module Codec = Codec module Crypto = Crypto module Lzw = Lzw module Buffer_pool = Buffer_pool module Protocol_pure = Protocol_pure module Membership = Membership module Dissemination = Dissemination module Pending_acks = Pending_acks module Transport = Transport module Protocol = Protocol module Cluster = struct type t = { protocol : Protocol.t; sw : Eio.Switch.t } let create ~sw ~(env : _ Types.env) ~config = let net = env.stdenv#net in let clock = env.stdenv#clock in let mono_clock = env.stdenv#mono_clock in let secure_random = env.stdenv#secure_random in let node_name = match config.Types.node_name with | Some name -> name | None -> Printf.sprintf "node-%d" (Random.int 100000) in let self_id = Types.node_id_of_string node_name in let udp_sock = Transport.create_udp_socket net ~sw ~addr:config.bind_addr ~port:config.bind_port in let tcp_listener = Transport.create_tcp_listener net ~sw ~addr:config.bind_addr ~port:config.bind_port ~backlog:10 in let self_addr = `Udp (Eio.Net.Ipaddr.of_raw config.bind_addr, config.bind_port) in let self = Types.make_node_info ~id:self_id ~addr:self_addr ~meta:"" in match Protocol.create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock ~mono_clock ~secure_random with | Error `Invalid_key -> Error `Invalid_key | Ok protocol -> Ok { protocol; sw } let start t = Eio.Fiber.fork_daemon ~sw:t.sw (fun () -> Protocol.run_protocol t.protocol; `Stop_daemon); Eio.Fiber.fork_daemon ~sw:t.sw (fun () -> Protocol.run_udp_receiver t.protocol; `Stop_daemon); Eio.Fiber.fork_daemon ~sw:t.sw (fun () -> Protocol.run_tcp_listener t.protocol; `Stop_daemon) let shutdown t = Protocol.shutdown t.protocol let local_node t = Protocol.local_node t.protocol let members t = Protocol.members t.protocol |> List.map Membership.Member.node let member_count t = Protocol.member_count t.protocol let events t = Protocol.events t.protocol let stats t = Protocol.stats t.protocol let add_member t node_info = Protocol.add_member t.protocol node_info let remove_member t node_id = Protocol.remove_member t.protocol node_id let join t ~seed_nodes = let parse_and_try seed = match Transport.parse_udp_addr seed with | Error _ -> false | Ok addr -> let node_id = Types.node_id_of_string seed in let node = Types.make_node_info ~id:node_id ~addr ~meta:"" in Protocol.add_member t.protocol node; true in let any_success = List.exists parse_and_try seed_nodes in if any_success then Ok () else Error `No_seeds_reachable let broadcast t ~topic ~payload = Protocol.broadcast t.protocol ~topic ~payload let send t ~target ~topic ~payload = Protocol.send_direct t.protocol ~target ~topic ~payload let send_to_addr t ~addr ~topic ~payload = Protocol.send_to_addr t.protocol ~addr ~topic ~payload let on_message t handler = Protocol.on_message t.protocol handler let is_alive t node_id = match Membership.find ( Protocol.members t.protocol |> fun _ -> let p = t.protocol in p.Protocol.members ) node_id with | None -> false | Some member -> let snap = Membership.Member.snapshot_now member in snap.Types.state = Types.Alive let find_node t node_id = match Membership.find t.protocol.Protocol.members node_id with | None -> None | Some member -> Some (Membership.Member.node member) let is_healthy t = let s = stats t in s.Types.nodes_alive > 0 end let default_config = Types.default_config let version = "0.1.0"