this repo has no description
1module Types = Types
2module Codec = Codec
3module Crypto = Crypto
4module Lzw = Lzw
5module Buffer_pool = Buffer_pool
6module Protocol_pure = Protocol_pure
7module Membership = Membership
8module Dissemination = Dissemination
9module Pending_acks = Pending_acks
10module Transport = Transport
11module Protocol = Protocol
12
13module Cluster = struct
14 type t = { protocol : Protocol.t; sw : Eio.Switch.t }
15
16 let create ~sw ~(env : _ Types.env) ~config =
17 let net = env.stdenv#net in
18 let clock = env.stdenv#clock in
19 let mono_clock = env.stdenv#mono_clock in
20 let secure_random = env.stdenv#secure_random in
21
22 let node_name =
23 match config.Types.node_name with
24 | Some name -> name
25 | None -> Printf.sprintf "node-%d" (Random.int 100000)
26 in
27 let self_id = Types.node_id_of_string node_name in
28
29 let udp_sock =
30 Transport.create_udp_socket net ~sw ~addr:config.bind_addr
31 ~port:config.bind_port
32 in
33
34 let tcp_listener =
35 Transport.create_tcp_listener net ~sw ~addr:config.bind_addr
36 ~port:config.bind_port ~backlog:10
37 in
38
39 let self_addr =
40 `Udp (Eio.Net.Ipaddr.of_raw config.bind_addr, config.bind_port)
41 in
42 let self = Types.make_node_info ~id:self_id ~addr:self_addr ~meta:"" in
43
44 match
45 Protocol.create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock
46 ~mono_clock ~secure_random
47 with
48 | Error `Invalid_key -> Error `Invalid_key
49 | Ok protocol -> Ok { protocol; sw }
50
51 let start t =
52 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_protocol t.protocol);
53 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_udp_receiver t.protocol);
54 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_tcp_listener t.protocol)
55
56 let shutdown t = Protocol.shutdown t.protocol
57 let local_node t = Protocol.local_node t.protocol
58 let members t = Protocol.members t.protocol |> List.map Membership.Member.node
59 let member_count t = Protocol.member_count t.protocol
60 let events t = Protocol.events t.protocol
61 let stats t = Protocol.stats t.protocol
62 let add_member t node_info = Protocol.add_member t.protocol node_info
63 let remove_member t node_id = Protocol.remove_member t.protocol node_id
64
65 let join t ~seed_nodes =
66 let parse_and_try seed =
67 match Transport.parse_udp_addr seed with
68 | Error _ -> false
69 | Ok addr ->
70 let node_id = Types.node_id_of_string seed in
71 let node = Types.make_node_info ~id:node_id ~addr ~meta:"" in
72 Protocol.add_member t.protocol node;
73 true
74 in
75 let any_success = List.exists parse_and_try seed_nodes in
76 if any_success then Ok () else Error `No_seeds_reachable
77
78 let broadcast t ~topic ~payload =
79 Protocol.broadcast t.protocol ~topic ~payload
80
81 let on_message t handler = Protocol.on_message t.protocol handler
82
83 let is_alive t node_id =
84 match
85 Membership.find
86 ( Protocol.members t.protocol |> fun _ ->
87 let p = t.protocol in
88 p.Protocol.members )
89 node_id
90 with
91 | None -> false
92 | Some member ->
93 let snap = Membership.Member.snapshot_now member in
94 snap.Types.state = Types.Alive
95
96 let find_node t node_id =
97 match Membership.find t.protocol.Protocol.members node_id with
98 | None -> None
99 | Some member -> Some (Membership.Member.node member)
100
101 let is_healthy t =
102 let s = stats t in
103 s.Types.nodes_alive > 0
104end
105
106let default_config = Types.default_config
107let version = "0.1.0"