this repo has no description
1module Types = Types 2module Codec = Codec 3module Crypto = Crypto 4module Lzw = Lzw 5module Buffer_pool = Buffer_pool 6module Protocol_pure = Protocol_pure 7module Membership = Membership 8module Dissemination = Dissemination 9module Pending_acks = Pending_acks 10module Transport = Transport 11module Protocol = Protocol 12 13module Cluster = struct 14 type t = { protocol : Protocol.t; sw : Eio.Switch.t } 15 16 let create ~sw ~(env : _ Types.env) ~config = 17 let net = env.stdenv#net in 18 let clock = env.stdenv#clock in 19 let mono_clock = env.stdenv#mono_clock in 20 let secure_random = env.stdenv#secure_random in 21 22 let node_name = 23 match config.Types.node_name with 24 | Some name -> name 25 | None -> Printf.sprintf "node-%d" (Random.int 100000) 26 in 27 let self_id = Types.node_id_of_string node_name in 28 29 let udp_sock = 30 Transport.create_udp_socket net ~sw ~addr:config.bind_addr 31 ~port:config.bind_port 32 in 33 34 let tcp_listener = 35 Transport.create_tcp_listener net ~sw ~addr:config.bind_addr 36 ~port:config.bind_port ~backlog:10 37 in 38 39 let self_addr = 40 `Udp (Eio.Net.Ipaddr.of_raw config.bind_addr, config.bind_port) 41 in 42 let self = Types.make_node_info ~id:self_id ~addr:self_addr ~meta:"" in 43 44 match 45 Protocol.create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock 46 ~mono_clock ~secure_random 47 with 48 | Error `Invalid_key -> Error `Invalid_key 49 | Ok protocol -> Ok { protocol; sw } 50 51 let start t = 52 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_protocol t.protocol); 53 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_udp_receiver t.protocol); 54 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_tcp_listener t.protocol) 55 56 let shutdown t = Protocol.shutdown t.protocol 57 let local_node t = Protocol.local_node t.protocol 58 let members t = Protocol.members t.protocol |> List.map Membership.Member.node 59 let member_count t = Protocol.member_count t.protocol 60 let events t = Protocol.events t.protocol 61 let stats t = Protocol.stats t.protocol 62 let add_member t node_info = Protocol.add_member t.protocol node_info 63 let remove_member t node_id = Protocol.remove_member t.protocol node_id 64 65 let join t ~seed_nodes = 66 let parse_and_try seed = 67 match Transport.parse_udp_addr seed with 68 | Error _ -> false 69 | Ok addr -> 70 let node_id = Types.node_id_of_string seed in 71 let node = Types.make_node_info ~id:node_id ~addr ~meta:"" in 72 Protocol.add_member t.protocol node; 73 true 74 in 75 let any_success = List.exists parse_and_try seed_nodes in 76 if any_success then Ok () else Error `No_seeds_reachable 77 78 let broadcast t ~topic ~payload = 79 Protocol.broadcast t.protocol ~topic ~payload 80 81 let on_message t handler = Protocol.on_message t.protocol handler 82 83 let is_alive t node_id = 84 match 85 Membership.find 86 ( Protocol.members t.protocol |> fun _ -> 87 let p = t.protocol in 88 p.Protocol.members ) 89 node_id 90 with 91 | None -> false 92 | Some member -> 93 let snap = Membership.Member.snapshot_now member in 94 snap.Types.state = Types.Alive 95 96 let find_node t node_id = 97 match Membership.find t.protocol.Protocol.members node_id with 98 | None -> None 99 | Some member -> Some (Membership.Member.node member) 100 101 let is_healthy t = 102 let s = stats t in 103 s.Types.nodes_alive > 0 104end 105 106let default_config = Types.default_config 107let version = "0.1.0"