this repo has no description
1module Types = Types 2module Codec = Codec 3module Crypto = Crypto 4module Buffer_pool = Buffer_pool 5module Protocol_pure = Protocol_pure 6module Membership = Membership 7module Dissemination = Dissemination 8module Pending_acks = Pending_acks 9module Transport = Transport 10module Protocol = Protocol 11 12module Cluster = struct 13 type t = { protocol : Protocol.t; sw : Eio.Switch.t } 14 15 let create ~sw ~(env : _ Types.env) ~config = 16 let net = env.stdenv#net in 17 let clock = env.stdenv#clock in 18 let mono_clock = env.stdenv#mono_clock in 19 let secure_random = env.stdenv#secure_random in 20 21 let node_name = 22 match config.Types.node_name with 23 | Some name -> name 24 | None -> Printf.sprintf "node-%d" (Random.int 100000) 25 in 26 let self_id = Types.node_id_of_string node_name in 27 28 let udp_sock = 29 Transport.create_udp_socket net ~sw ~addr:config.bind_addr 30 ~port:config.bind_port 31 in 32 33 let tcp_listener = 34 Transport.create_tcp_listener net ~sw ~addr:config.bind_addr 35 ~port:config.bind_port ~backlog:10 36 in 37 38 let self_addr = 39 `Udp (Eio.Net.Ipaddr.of_raw config.bind_addr, config.bind_port) 40 in 41 let self = Types.make_node_info ~id:self_id ~addr:self_addr ~meta:"" in 42 43 match 44 Protocol.create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock 45 ~mono_clock ~secure_random 46 with 47 | Error `Invalid_key -> Error `Invalid_key 48 | Ok protocol -> Ok { protocol; sw } 49 50 let start t = 51 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_protocol t.protocol); 52 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_udp_receiver t.protocol); 53 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_tcp_listener t.protocol) 54 55 let shutdown t = Protocol.shutdown t.protocol 56 let local_node t = Protocol.local_node t.protocol 57 let members t = Protocol.members t.protocol |> List.map Membership.Member.node 58 let member_count t = Protocol.member_count t.protocol 59 let events t = Protocol.events t.protocol 60 let stats t = Protocol.stats t.protocol 61 let add_member t node_info = Protocol.add_member t.protocol node_info 62 let remove_member t node_id = Protocol.remove_member t.protocol node_id 63 64 let join t ~seed_nodes = 65 let parse_and_try seed = 66 match Transport.parse_udp_addr seed with 67 | Error _ -> false 68 | Ok addr -> 69 let node_id = Types.node_id_of_string seed in 70 let node = Types.make_node_info ~id:node_id ~addr ~meta:"" in 71 Protocol.add_member t.protocol node; 72 true 73 in 74 let any_success = List.exists parse_and_try seed_nodes in 75 if any_success then Ok () else Error `No_seeds_reachable 76 77 let broadcast t ~topic ~payload = 78 Protocol.broadcast t.protocol ~topic ~payload 79 80 let on_message t handler = Protocol.on_message t.protocol handler 81 82 let is_alive t node_id = 83 match 84 Membership.find 85 ( Protocol.members t.protocol |> fun _ -> 86 let p = t.protocol in 87 p.Protocol.members ) 88 node_id 89 with 90 | None -> false 91 | Some member -> 92 let snap = Membership.Member.snapshot_now member in 93 snap.Types.state = Types.Alive 94 95 let find_node t node_id = 96 match Membership.find t.protocol.Protocol.members node_id with 97 | None -> None 98 | Some member -> Some (Membership.Member.node member) 99 100 let is_healthy t = 101 let s = stats t in 102 s.Types.nodes_alive > 0 103end 104 105let default_config = Types.default_config 106let version = "0.1.0"