this repo has no description
1module Types = Types 2module Codec = Codec 3module Crypto = Crypto 4module Buffer_pool = Buffer_pool 5module Protocol_pure = Protocol_pure 6module Membership = Membership 7module Dissemination = Dissemination 8module Pending_acks = Pending_acks 9module Transport = Transport 10module Protocol = Protocol 11 12module Cluster = struct 13 type t = { protocol : Protocol.t; sw : Eio.Switch.t } 14 15 let create ~sw ~(env : _ Types.env) ~config = 16 let net = env.stdenv#net in 17 let clock = env.stdenv#clock in 18 let mono_clock = env.stdenv#mono_clock in 19 let secure_random = env.stdenv#secure_random in 20 21 let node_name = 22 match config.Types.node_name with 23 | Some name -> name 24 | None -> Printf.sprintf "node-%d" (Random.int 100000) 25 in 26 let self_id = Types.node_id_of_string node_name in 27 28 let udp_sock = 29 Transport.create_udp_socket net ~sw ~addr:config.bind_addr 30 ~port:config.bind_port 31 in 32 33 let self_addr = 34 `Udp (Eio.Net.Ipaddr.of_raw config.bind_addr, config.bind_port) 35 in 36 let self = Types.make_node_info ~id:self_id ~addr:self_addr ~meta:"" in 37 38 match 39 Protocol.create ~config ~self ~udp_sock ~clock ~mono_clock ~secure_random 40 with 41 | Error `Invalid_key -> Error `Invalid_key 42 | Ok protocol -> Ok { protocol; sw } 43 44 let start t = 45 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_protocol t.protocol); 46 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_udp_receiver t.protocol) 47 48 let shutdown t = Protocol.shutdown t.protocol 49 let local_node t = Protocol.local_node t.protocol 50 let members t = Protocol.members t.protocol |> List.map Membership.Member.node 51 let member_count t = Protocol.member_count t.protocol 52 let events t = Protocol.events t.protocol 53 let stats t = Protocol.stats t.protocol 54 let add_member t node_info = Protocol.add_member t.protocol node_info 55 let remove_member t node_id = Protocol.remove_member t.protocol node_id 56 57 let join t ~seed_nodes = 58 let parse_and_try seed = 59 match Transport.parse_udp_addr seed with 60 | Error _ -> false 61 | Ok addr -> 62 let node_id = Types.node_id_of_string seed in 63 let node = Types.make_node_info ~id:node_id ~addr ~meta:"" in 64 Protocol.add_member t.protocol node; 65 true 66 in 67 let any_success = List.exists parse_and_try seed_nodes in 68 if any_success then Ok () else Error `No_seeds_reachable 69 70 let broadcast t ~topic ~payload = 71 Protocol.broadcast t.protocol ~topic ~payload 72 73 let on_message t handler = Protocol.on_message t.protocol handler 74 75 let is_alive t node_id = 76 match 77 Membership.find 78 ( Protocol.members t.protocol |> fun _ -> 79 let p = t.protocol in 80 p.Protocol.members ) 81 node_id 82 with 83 | None -> false 84 | Some member -> 85 let snap = Membership.Member.snapshot_now member in 86 snap.Types.state = Types.Alive 87 88 let find_node t node_id = 89 match Membership.find t.protocol.Protocol.members node_id with 90 | None -> None 91 | Some member -> Some (Membership.Member.node member) 92 93 let is_healthy t = 94 let s = stats t in 95 s.Types.nodes_alive > 0 96end 97 98let default_config = Types.default_config 99let version = "0.1.0"