this repo has no description
1module Types = Types
2module Codec = Codec
3module Crypto = Crypto
4module Buffer_pool = Buffer_pool
5module Protocol_pure = Protocol_pure
6module Membership = Membership
7module Dissemination = Dissemination
8module Pending_acks = Pending_acks
9module Transport = Transport
10module Protocol = Protocol
11
12module Cluster = struct
13 type t = { protocol : Protocol.t; sw : Eio.Switch.t }
14
15 let create ~sw ~(env : _ Types.env) ~config =
16 let net = env.stdenv#net in
17 let clock = env.stdenv#clock in
18 let mono_clock = env.stdenv#mono_clock in
19 let secure_random = env.stdenv#secure_random in
20
21 let node_name =
22 match config.Types.node_name with
23 | Some name -> name
24 | None -> Printf.sprintf "node-%d" (Random.int 100000)
25 in
26 let self_id = Types.node_id_of_string node_name in
27
28 let udp_sock =
29 Transport.create_udp_socket net ~sw ~addr:config.bind_addr
30 ~port:config.bind_port
31 in
32
33 let tcp_listener =
34 Transport.create_tcp_listener net ~sw ~addr:config.bind_addr
35 ~port:config.bind_port ~backlog:10
36 in
37
38 let self_addr =
39 `Udp (Eio.Net.Ipaddr.of_raw config.bind_addr, config.bind_port)
40 in
41 let self = Types.make_node_info ~id:self_id ~addr:self_addr ~meta:"" in
42
43 match
44 Protocol.create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock
45 ~mono_clock ~secure_random
46 with
47 | Error `Invalid_key -> Error `Invalid_key
48 | Ok protocol -> Ok { protocol; sw }
49
50 let start t =
51 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_protocol t.protocol);
52 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_udp_receiver t.protocol);
53 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_tcp_listener t.protocol)
54
55 let shutdown t = Protocol.shutdown t.protocol
56 let local_node t = Protocol.local_node t.protocol
57 let members t = Protocol.members t.protocol |> List.map Membership.Member.node
58 let member_count t = Protocol.member_count t.protocol
59 let events t = Protocol.events t.protocol
60 let stats t = Protocol.stats t.protocol
61 let add_member t node_info = Protocol.add_member t.protocol node_info
62 let remove_member t node_id = Protocol.remove_member t.protocol node_id
63
64 let join t ~seed_nodes =
65 let parse_and_try seed =
66 match Transport.parse_udp_addr seed with
67 | Error _ -> false
68 | Ok addr ->
69 let node_id = Types.node_id_of_string seed in
70 let node = Types.make_node_info ~id:node_id ~addr ~meta:"" in
71 Protocol.add_member t.protocol node;
72 true
73 in
74 let any_success = List.exists parse_and_try seed_nodes in
75 if any_success then Ok () else Error `No_seeds_reachable
76
77 let broadcast t ~topic ~payload =
78 Protocol.broadcast t.protocol ~topic ~payload
79
80 let on_message t handler = Protocol.on_message t.protocol handler
81
82 let is_alive t node_id =
83 match
84 Membership.find
85 ( Protocol.members t.protocol |> fun _ ->
86 let p = t.protocol in
87 p.Protocol.members )
88 node_id
89 with
90 | None -> false
91 | Some member ->
92 let snap = Membership.Member.snapshot_now member in
93 snap.Types.state = Types.Alive
94
95 let find_node t node_id =
96 match Membership.find t.protocol.Protocol.members node_id with
97 | None -> None
98 | Some member -> Some (Membership.Member.node member)
99
100 let is_healthy t =
101 let s = stats t in
102 s.Types.nodes_alive > 0
103end
104
105let default_config = Types.default_config
106let version = "0.1.0"