this repo has no description
1type node_id = Node_id of string [@@unboxed]
2
3let node_id_to_string (Node_id s) = s
4let node_id_of_string s = Node_id s
5let equal_node_id (Node_id a) (Node_id b) = String.equal a b
6let compare_node_id (Node_id a) (Node_id b) = String.compare a b
7
8type incarnation = Incarnation of int [@@unboxed]
9
10let incarnation_to_int (Incarnation i) = i
11let incarnation_of_int i = Incarnation i
12let zero_incarnation = Incarnation 0
13let compare_incarnation (Incarnation a) (Incarnation b) = Int.compare a b
14let incr_incarnation (Incarnation i) = Incarnation (i + 1)
15
16type addr = Eio.Net.Sockaddr.datagram
17type node_info = { id : node_id; addr : addr; meta : string }
18
19let make_node_info ~id ~addr ~meta = { id; addr; meta }
20
21type member_state = Alive | Suspect | Dead | Left
22
23let member_state_to_string = function
24 | Alive -> "alive"
25 | Suspect -> "suspect"
26 | Dead -> "dead"
27 | Left -> "left"
28
29let member_state_to_int = function
30 | Alive -> 0
31 | Suspect -> 1
32 | Dead -> 2
33 | Left -> 3
34
35let member_state_of_int = function
36 | 0 -> Alive
37 | 1 -> Suspect
38 | 2 -> Dead
39 | _ -> Left
40
41type member_snapshot = {
42 node : node_info;
43 state : member_state;
44 incarnation : incarnation;
45 state_change : Mtime.span;
46}
47
48type protocol_msg =
49 | Ping of { seq : int; target : node_id; sender : node_info }
50 | Ping_req of { seq : int; target : node_id; sender : node_info }
51 | Ack of { seq : int; responder : node_info; payload : string option }
52 | Alive of { node : node_info; incarnation : incarnation }
53 | Suspect of {
54 node : node_id;
55 incarnation : incarnation;
56 suspector : node_id;
57 }
58 | Dead of { node : node_id; incarnation : incarnation; declarator : node_id }
59 | User_msg of { topic : string; payload : string; origin : node_id }
60
61type packet = {
62 cluster : string;
63 primary : protocol_msg;
64 piggyback : protocol_msg list;
65}
66
67type decode_error =
68 | Invalid_magic
69 | Unsupported_version of int
70 | Truncated_message
71 | Invalid_tag of int
72 | Decryption_failed
73 | Msgpack_error of string
74 | Invalid_crc
75
76let decode_error_to_string = function
77 | Invalid_magic -> "invalid magic bytes"
78 | Unsupported_version v -> Printf.sprintf "unsupported version: %d" v
79 | Truncated_message -> "truncated message"
80 | Invalid_tag t -> Printf.sprintf "invalid tag: %d" t
81 | Decryption_failed -> "decryption failed"
82 | Msgpack_error s -> Printf.sprintf "msgpack error: %s" s
83 | Invalid_crc -> "invalid CRC checksum"
84
85type send_error = Node_unreachable | Timeout | Connection_reset
86
87let send_error_to_string = function
88 | Node_unreachable -> "node unreachable"
89 | Timeout -> "timeout"
90 | Connection_reset -> "connection reset"
91
92type node_event =
93 | Join of node_info
94 | Leave of node_info
95 | Update of node_info
96 | Suspect_event of node_info
97 | Alive_event of node_info
98
99type config = {
100 bind_addr : string;
101 bind_port : int;
102 node_name : string option;
103 protocol_interval : float;
104 probe_timeout : float;
105 indirect_checks : int;
106 suspicion_mult : int;
107 suspicion_max_timeout : float;
108 retransmit_mult : int;
109 udp_buffer_size : int;
110 tcp_timeout : float;
111 send_buffer_count : int;
112 recv_buffer_count : int;
113 secret_key : string;
114 cluster_name : string;
115 label : string;
116 encryption_enabled : bool;
117 gossip_verify_incoming : bool;
118 gossip_verify_outgoing : bool;
119}
120
121let default_config =
122 {
123 bind_addr = "0.0.0.0";
124 bind_port = 7946;
125 node_name = None;
126 protocol_interval = 1.0;
127 probe_timeout = 0.5;
128 indirect_checks = 3;
129 suspicion_mult = 4;
130 suspicion_max_timeout = 60.0;
131 retransmit_mult = 4;
132 udp_buffer_size = 1400;
133 tcp_timeout = 10.0;
134 send_buffer_count = 16;
135 recv_buffer_count = 16;
136 secret_key = String.make 16 '\x00';
137 cluster_name = "default";
138 label = "";
139 encryption_enabled = false;
140 gossip_verify_incoming = true;
141 gossip_verify_outgoing = true;
142 }
143
144type 'a env = {
145 stdenv : 'a;
146 sw : Eio.Switch.t;
147}
148 constraint
149 'a =
150 < clock : _ Eio.Time.clock
151 ; mono_clock : _ Eio.Time.Mono.t
152 ; net : _ Eio.Net.t
153 ; secure_random : _ Eio.Flow.source
154 ; .. >
155
156type stats = {
157 nodes_alive : int;
158 nodes_suspect : int;
159 nodes_dead : int;
160 msgs_sent : int;
161 msgs_received : int;
162 msgs_dropped : int;
163 queue_depth : int;
164 buffers_available : int;
165 buffers_total : int;
166}
167
168let empty_stats =
169 {
170 nodes_alive = 0;
171 nodes_suspect = 0;
172 nodes_dead = 0;
173 msgs_sent = 0;
174 msgs_received = 0;
175 msgs_dropped = 0;
176 queue_depth = 0;
177 buffers_available = 0;
178 buffers_total = 0;
179 }
180
181module Wire = struct
182 type message_type =
183 | Ping_msg
184 | Indirect_ping_msg
185 | Ack_resp_msg
186 | Suspect_msg
187 | Alive_msg
188 | Dead_msg
189 | Push_pull_msg
190 | Compound_msg
191 | User_msg
192 | Compress_msg
193 | Encrypt_msg
194 | Nack_resp_msg
195 | Has_crc_msg
196 | Err_msg
197 | Has_label_msg
198
199 let message_type_to_int = function
200 | Ping_msg -> 0
201 | Indirect_ping_msg -> 1
202 | Ack_resp_msg -> 2
203 | Suspect_msg -> 3
204 | Alive_msg -> 4
205 | Dead_msg -> 5
206 | Push_pull_msg -> 6
207 | Compound_msg -> 7
208 | User_msg -> 8
209 | Compress_msg -> 9
210 | Encrypt_msg -> 10
211 | Nack_resp_msg -> 11
212 | Has_crc_msg -> 12
213 | Err_msg -> 13
214 | Has_label_msg -> 244
215
216 let message_type_of_int = function
217 | 0 -> Ok Ping_msg
218 | 1 -> Ok Indirect_ping_msg
219 | 2 -> Ok Ack_resp_msg
220 | 3 -> Ok Suspect_msg
221 | 4 -> Ok Alive_msg
222 | 5 -> Ok Dead_msg
223 | 6 -> Ok Push_pull_msg
224 | 7 -> Ok Compound_msg
225 | 8 -> Ok User_msg
226 | 9 -> Ok Compress_msg
227 | 10 -> Ok Encrypt_msg
228 | 11 -> Ok Nack_resp_msg
229 | 12 -> Ok Has_crc_msg
230 | 13 -> Ok Err_msg
231 | 244 -> Ok Has_label_msg
232 | n -> Error n
233
234 type ping = {
235 seq_no : int;
236 node : string;
237 source_addr : string;
238 source_port : int;
239 source_node : string;
240 }
241
242 type indirect_ping_req = {
243 seq_no : int;
244 target : string;
245 port : int;
246 node : string;
247 nack : bool;
248 source_addr : string;
249 source_port : int;
250 source_node : string;
251 }
252
253 type ack_resp = { seq_no : int; payload : string }
254 type nack_resp = { seq_no : int }
255 type suspect = { incarnation : int; node : string; from : string }
256
257 type alive = {
258 incarnation : int;
259 node : string;
260 addr : string;
261 port : int;
262 meta : string;
263 vsn : int list;
264 }
265
266 type dead = { incarnation : int; node : string; from : string }
267 type compress = { algo : int; buf : string }
268
269 type protocol_msg =
270 | Ping of ping
271 | Indirect_ping of indirect_ping_req
272 | Ack of ack_resp
273 | Nack of nack_resp
274 | Suspect of suspect
275 | Alive of alive
276 | Dead of dead
277 | User_data of string
278 | Compound of string list
279 | Compressed of compress
280 | Err of string
281end
282
283let ip_to_bytes ip =
284 let s = Fmt.to_to_string Eio.Net.Ipaddr.pp ip in
285 if String.contains s ':' then (
286 let parts = String.split_on_char ':' s in
287 let buf = Bytes.create 16 in
288 let rec fill idx = function
289 | [] -> ()
290 | "" :: rest when List.exists (( = ) "") rest ->
291 let tail_len = List.length (List.filter (( <> ) "") rest) in
292 let zeros = 8 - idx - tail_len in
293 for i = 0 to (zeros * 2) - 1 do
294 Bytes.set_uint8 buf ((idx * 2) + i) 0
295 done;
296 fill (idx + zeros) rest
297 | "" :: rest -> fill idx rest
298 | h :: rest ->
299 let v = int_of_string ("0x" ^ h) in
300 Bytes.set_uint8 buf (idx * 2) (v lsr 8);
301 Bytes.set_uint8 buf ((idx * 2) + 1) (v land 0xff);
302 fill (idx + 1) rest
303 in
304 fill 0 parts;
305 Bytes.to_string buf)
306 else
307 Scanf.sscanf s "%d.%d.%d.%d" (fun a b c d ->
308 let buf = Bytes.create 4 in
309 Bytes.set_uint8 buf 0 a;
310 Bytes.set_uint8 buf 1 b;
311 Bytes.set_uint8 buf 2 c;
312 Bytes.set_uint8 buf 3 d;
313 Bytes.to_string buf)
314
315let ip_of_bytes s =
316 let len = String.length s in
317 if len = 4 then Eio.Net.Ipaddr.of_raw s
318 else if len = 16 then Eio.Net.Ipaddr.of_raw s
319 else failwith "invalid IP address length"
320
321let default_vsn = [ 1; 5; 5; 0; 0; 0 ]
322
323let node_info_to_wire (info : node_info) ~source_node :
324 string * int * string * string =
325 match info.addr with
326 | `Udp (ip, port) -> (ip_to_bytes ip, port, info.meta, source_node)
327 | `Unix _ -> failwith "Unix sockets not supported"
328
329let node_info_of_wire ~name ~addr ~port ~meta : node_info =
330 let ip = ip_of_bytes addr in
331 { id = node_id_of_string name; addr = `Udp (ip, port); meta }
332
333let msg_to_wire ~self_name ~self_port (msg : protocol_msg) : Wire.protocol_msg =
334 match msg with
335 | Ping { seq; target; sender } ->
336 let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in
337 Wire.Ping
338 {
339 seq_no = seq;
340 node = node_id_to_string target;
341 source_addr = addr;
342 source_port = port;
343 source_node = self_name;
344 }
345 | Ping_req { seq; target; sender } ->
346 let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in
347 let target_addr =
348 match sender.addr with `Udp (ip, _) -> ip_to_bytes ip | `Unix _ -> ""
349 in
350 Wire.Indirect_ping
351 {
352 seq_no = seq;
353 target = target_addr;
354 port = self_port;
355 node = node_id_to_string target;
356 nack = true;
357 source_addr = addr;
358 source_port = port;
359 source_node = self_name;
360 }
361 | Ack { seq; responder = _; payload } ->
362 Wire.Ack { seq_no = seq; payload = Option.value payload ~default:"" }
363 | Alive { node; incarnation } ->
364 let addr, port, meta, _ = node_info_to_wire node ~source_node:"" in
365 Wire.Alive
366 {
367 incarnation = incarnation_to_int incarnation;
368 node = node_id_to_string node.id;
369 addr;
370 port;
371 meta;
372 vsn = default_vsn;
373 }
374 | Suspect { node; incarnation; suspector } ->
375 Wire.Suspect
376 {
377 incarnation = incarnation_to_int incarnation;
378 node = node_id_to_string node;
379 from = node_id_to_string suspector;
380 }
381 | Dead { node; incarnation; declarator } ->
382 Wire.Dead
383 {
384 incarnation = incarnation_to_int incarnation;
385 node = node_id_to_string node;
386 from = node_id_to_string declarator;
387 }
388 | User_msg { topic = _; payload; origin = _ } -> Wire.User_data payload
389
390let msg_of_wire ~default_port (wmsg : Wire.protocol_msg) : protocol_msg option =
391 match wmsg with
392 | Wire.Ping { seq_no; node; source_addr; source_port; source_node } ->
393 let port = if source_port > 0 then source_port else default_port in
394 let ip =
395 if String.length source_addr > 0 then ip_of_bytes source_addr
396 else Eio.Net.Ipaddr.of_raw "\000\000\000\000"
397 in
398 let sender =
399 {
400 id =
401 node_id_of_string (if source_node <> "" then source_node else node);
402 addr = `Udp (ip, port);
403 meta = "";
404 }
405 in
406 Some (Ping { seq = seq_no; target = node_id_of_string node; sender })
407 | Wire.Indirect_ping
408 { seq_no; target; port; node; source_addr; source_port; source_node; _ }
409 ->
410 let src_port = if source_port > 0 then source_port else default_port in
411 let ip =
412 if String.length source_addr > 0 then ip_of_bytes source_addr
413 else Eio.Net.Ipaddr.of_raw "\000\000\000\000"
414 in
415 let sender =
416 {
417 id = node_id_of_string (if source_node <> "" then source_node else "");
418 addr = `Udp (ip, src_port);
419 meta = "";
420 }
421 in
422 let _ = target in
423 let _ = port in
424 Some (Ping_req { seq = seq_no; target = node_id_of_string node; sender })
425 | Wire.Ack { seq_no; payload } ->
426 let responder =
427 {
428 id = node_id_of_string "";
429 addr = `Udp (Eio.Net.Ipaddr.of_raw "\000\000\000\000", 0);
430 meta = "";
431 }
432 in
433 let payload = if payload = "" then None else Some payload in
434 Some (Ack { seq = seq_no; responder; payload })
435 | Wire.Alive { incarnation; node; addr; port; meta; _ } ->
436 let ip =
437 if String.length addr > 0 then ip_of_bytes addr
438 else Eio.Net.Ipaddr.of_raw "\000\000\000\000"
439 in
440 let node_info =
441 { id = node_id_of_string node; addr = `Udp (ip, port); meta }
442 in
443 Some
444 (Alive
445 { node = node_info; incarnation = incarnation_of_int incarnation })
446 | Wire.Suspect { incarnation; node; from } ->
447 Some
448 (Suspect
449 {
450 node = node_id_of_string node;
451 incarnation = incarnation_of_int incarnation;
452 suspector = node_id_of_string from;
453 })
454 | Wire.Dead { incarnation; node; from } ->
455 Some
456 (Dead
457 {
458 node = node_id_of_string node;
459 incarnation = incarnation_of_int incarnation;
460 declarator = node_id_of_string from;
461 })
462 | Wire.User_data payload ->
463 Some (User_msg { topic = ""; payload; origin = node_id_of_string "" })
464 | Wire.Nack _ -> None
465 | Wire.Compound _ -> None
466 | Wire.Compressed _ -> None
467 | Wire.Err _ -> None