this repo has no description
1module Types = Types
2module Codec = Codec
3module Crypto = Crypto
4module Buffer_pool = Buffer_pool
5module Protocol_pure = Protocol_pure
6module Membership = Membership
7module Dissemination = Dissemination
8module Pending_acks = Pending_acks
9module Transport = Transport
10module Protocol = Protocol
11
12module Cluster = struct
13 type t = { protocol : Protocol.t; sw : Eio.Switch.t }
14
15 let create ~sw ~(env : _ Types.env) ~config =
16 let net = env.stdenv#net in
17 let clock = env.stdenv#clock in
18 let mono_clock = env.stdenv#mono_clock in
19 let secure_random = env.stdenv#secure_random in
20
21 let node_name =
22 match config.Types.node_name with
23 | Some name -> name
24 | None -> Printf.sprintf "node-%d" (Random.int 100000)
25 in
26 let self_id = Types.node_id_of_string node_name in
27
28 let udp_sock =
29 Transport.create_udp_socket net ~sw ~addr:config.bind_addr
30 ~port:config.bind_port
31 in
32
33 let self_addr =
34 `Udp (Eio.Net.Ipaddr.of_raw config.bind_addr, config.bind_port)
35 in
36 let self = Types.make_node_info ~id:self_id ~addr:self_addr ~meta:"" in
37
38 match
39 Protocol.create ~config ~self ~udp_sock ~clock ~mono_clock ~secure_random
40 with
41 | Error `Invalid_key -> Error `Invalid_key
42 | Ok protocol -> Ok { protocol; sw }
43
44 let start t =
45 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_protocol t.protocol);
46 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_udp_receiver t.protocol)
47
48 let shutdown t = Protocol.shutdown t.protocol
49 let local_node t = Protocol.local_node t.protocol
50 let members t = Protocol.members t.protocol |> List.map Membership.Member.node
51 let member_count t = Protocol.member_count t.protocol
52 let events t = Protocol.events t.protocol
53 let stats t = Protocol.stats t.protocol
54 let add_member t node_info = Protocol.add_member t.protocol node_info
55 let remove_member t node_id = Protocol.remove_member t.protocol node_id
56
57 let join t ~seed_nodes =
58 let parse_and_try seed =
59 match Transport.parse_udp_addr seed with
60 | Error _ -> false
61 | Ok addr ->
62 let node_id = Types.node_id_of_string seed in
63 let node = Types.make_node_info ~id:node_id ~addr ~meta:"" in
64 Protocol.add_member t.protocol node;
65 true
66 in
67 let any_success = List.exists parse_and_try seed_nodes in
68 if any_success then Ok () else Error `No_seeds_reachable
69
70 let broadcast t ~topic ~payload =
71 Protocol.broadcast t.protocol ~topic ~payload
72
73 let on_message t handler = Protocol.on_message t.protocol handler
74
75 let is_alive t node_id =
76 match
77 Membership.find
78 ( Protocol.members t.protocol |> fun _ ->
79 let p = t.protocol in
80 p.Protocol.members )
81 node_id
82 with
83 | None -> false
84 | Some member ->
85 let snap = Membership.Member.snapshot_now member in
86 snap.Types.state = Types.Alive
87
88 let find_node t node_id =
89 match Membership.find t.protocol.Protocol.members node_id with
90 | None -> None
91 | Some member -> Some (Membership.Member.node member)
92
93 let is_healthy t =
94 let s = stats t in
95 s.Types.nodes_alive > 0
96end
97
98let default_config = Types.default_config
99let version = "0.1.0"