this repo has no description
1type node_id = Node_id of string [@@unboxed]
2
3let node_id_to_string (Node_id s) = s
4let node_id_of_string s = Node_id s
5let equal_node_id (Node_id a) (Node_id b) = String.equal a b
6let compare_node_id (Node_id a) (Node_id b) = String.compare a b
7
8type incarnation = Incarnation of int [@@unboxed]
9
10let incarnation_to_int (Incarnation i) = i
11let incarnation_of_int i = Incarnation i
12let zero_incarnation = Incarnation 0
13let compare_incarnation (Incarnation a) (Incarnation b) = Int.compare a b
14let incr_incarnation (Incarnation i) = Incarnation (i + 1)
15
16type addr = Eio.Net.Sockaddr.datagram
17type node_info = { id : node_id; addr : addr; meta : string }
18
19let make_node_info ~id ~addr ~meta = { id; addr; meta }
20
21type member_state = Alive | Suspect | Dead | Left
22
23let member_state_to_string = function
24 | Alive -> "alive"
25 | Suspect -> "suspect"
26 | Dead -> "dead"
27 | Left -> "left"
28
29let member_state_to_int = function
30 | Alive -> 0
31 | Suspect -> 1
32 | Dead -> 2
33 | Left -> 3
34
35let member_state_of_int = function
36 | 0 -> Alive
37 | 1 -> Suspect
38 | 2 -> Dead
39 | _ -> Left
40
41type member_snapshot = {
42 node : node_info;
43 state : member_state;
44 incarnation : incarnation;
45 state_change : Mtime.span;
46}
47
48type protocol_msg =
49 | Ping of { seq : int; target : node_id; sender : node_info }
50 | Ping_req of { seq : int; target : node_id; sender : node_info }
51 | Ack of { seq : int; responder : node_info; payload : string option }
52 | Alive of { node : node_info; incarnation : incarnation }
53 | Suspect of {
54 node : node_id;
55 incarnation : incarnation;
56 suspector : node_id;
57 }
58 | Dead of { node : node_id; incarnation : incarnation; declarator : node_id }
59 | User_msg of { topic : string; payload : string; origin : node_id }
60
61type packet = {
62 cluster : string;
63 primary : protocol_msg;
64 piggyback : protocol_msg list;
65}
66
67type decode_error =
68 | Invalid_magic
69 | Unsupported_version of int
70 | Truncated_message
71 | Invalid_tag of int
72 | Decryption_failed
73 | Msgpack_error of string
74 | Invalid_crc
75
76let decode_error_to_string = function
77 | Invalid_magic -> "invalid magic bytes"
78 | Unsupported_version v -> Printf.sprintf "unsupported version: %d" v
79 | Truncated_message -> "truncated message"
80 | Invalid_tag t -> Printf.sprintf "invalid tag: %d" t
81 | Decryption_failed -> "decryption failed"
82 | Msgpack_error s -> Printf.sprintf "msgpack error: %s" s
83 | Invalid_crc -> "invalid CRC checksum"
84
85type send_error = Node_unreachable | Timeout | Connection_reset
86
87let send_error_to_string = function
88 | Node_unreachable -> "node unreachable"
89 | Timeout -> "timeout"
90 | Connection_reset -> "connection reset"
91
92type node_event =
93 | Join of node_info
94 | Leave of node_info
95 | Update of node_info
96 | Suspect_event of node_info
97 | Alive_event of node_info
98
99type config = {
100 bind_addr : string;
101 bind_port : int;
102 node_name : string option;
103 protocol_interval : float;
104 probe_timeout : float;
105 indirect_checks : int;
106 suspicion_mult : int;
107 suspicion_max_timeout : float;
108 retransmit_mult : int;
109 udp_buffer_size : int;
110 tcp_timeout : float;
111 send_buffer_count : int;
112 recv_buffer_count : int;
113 secret_key : string;
114 cluster_name : string;
115 label : string;
116 encryption_enabled : bool;
117 gossip_verify_incoming : bool;
118 gossip_verify_outgoing : bool;
119}
120
121let default_config =
122 {
123 bind_addr = "0.0.0.0";
124 bind_port = 7946;
125 node_name = None;
126 protocol_interval = 1.0;
127 probe_timeout = 0.5;
128 indirect_checks = 3;
129 suspicion_mult = 4;
130 suspicion_max_timeout = 60.0;
131 retransmit_mult = 4;
132 udp_buffer_size = 1400;
133 tcp_timeout = 10.0;
134 send_buffer_count = 16;
135 recv_buffer_count = 16;
136 secret_key = String.make 16 '\x00';
137 cluster_name = "default";
138 label = "";
139 encryption_enabled = false;
140 gossip_verify_incoming = true;
141 gossip_verify_outgoing = true;
142 }
143
144type 'a env = {
145 stdenv : 'a;
146 sw : Eio.Switch.t;
147}
148 constraint
149 'a =
150 < clock : _ Eio.Time.clock
151 ; mono_clock : _ Eio.Time.Mono.t
152 ; net : _ Eio.Net.t
153 ; secure_random : _ Eio.Flow.source
154 ; .. >
155
156type stats = {
157 nodes_alive : int;
158 nodes_suspect : int;
159 nodes_dead : int;
160 msgs_sent : int;
161 msgs_received : int;
162 msgs_dropped : int;
163 queue_depth : int;
164 buffers_available : int;
165 buffers_total : int;
166}
167
168let empty_stats =
169 {
170 nodes_alive = 0;
171 nodes_suspect = 0;
172 nodes_dead = 0;
173 msgs_sent = 0;
174 msgs_received = 0;
175 msgs_dropped = 0;
176 queue_depth = 0;
177 buffers_available = 0;
178 buffers_total = 0;
179 }
180
181module Wire = struct
182 type message_type =
183 | Ping_msg
184 | Indirect_ping_msg
185 | Ack_resp_msg
186 | Suspect_msg
187 | Alive_msg
188 | Dead_msg
189 | Push_pull_msg
190 | Compound_msg
191 | User_msg
192 | Compress_msg
193 | Encrypt_msg
194 | Nack_resp_msg
195 | Has_crc_msg
196 | Err_msg
197 | Has_label_msg
198
199 let message_type_to_int = function
200 | Ping_msg -> 0
201 | Indirect_ping_msg -> 1
202 | Ack_resp_msg -> 2
203 | Suspect_msg -> 3
204 | Alive_msg -> 4
205 | Dead_msg -> 5
206 | Push_pull_msg -> 6
207 | Compound_msg -> 7
208 | User_msg -> 8
209 | Compress_msg -> 9
210 | Encrypt_msg -> 10
211 | Nack_resp_msg -> 11
212 | Has_crc_msg -> 12
213 | Err_msg -> 13
214 | Has_label_msg -> 244
215
216 let message_type_of_int = function
217 | 0 -> Ok Ping_msg
218 | 1 -> Ok Indirect_ping_msg
219 | 2 -> Ok Ack_resp_msg
220 | 3 -> Ok Suspect_msg
221 | 4 -> Ok Alive_msg
222 | 5 -> Ok Dead_msg
223 | 6 -> Ok Push_pull_msg
224 | 7 -> Ok Compound_msg
225 | 8 -> Ok User_msg
226 | 9 -> Ok Compress_msg
227 | 10 -> Ok Encrypt_msg
228 | 11 -> Ok Nack_resp_msg
229 | 12 -> Ok Has_crc_msg
230 | 13 -> Ok Err_msg
231 | 244 -> Ok Has_label_msg
232 | n -> Error n
233
234 type ping = {
235 seq_no : int;
236 node : string;
237 source_addr : string;
238 source_port : int;
239 source_node : string;
240 }
241
242 type indirect_ping_req = {
243 seq_no : int;
244 target : string;
245 port : int;
246 node : string;
247 nack : bool;
248 source_addr : string;
249 source_port : int;
250 source_node : string;
251 }
252
253 type ack_resp = { seq_no : int; payload : string }
254 type nack_resp = { seq_no : int }
255 type suspect = { incarnation : int; node : string; from : string }
256
257 type alive = {
258 incarnation : int;
259 node : string;
260 addr : string;
261 port : int;
262 meta : string;
263 vsn : int list;
264 }
265
266 type dead = { incarnation : int; node : string; from : string }
267 type compress = { algo : int; buf : string }
268
269 type push_pull_header = {
270 pp_nodes : int;
271 pp_user_state_len : int;
272 pp_join : bool;
273 }
274
275 type push_node_state = {
276 pns_name : string;
277 pns_addr : string;
278 pns_port : int;
279 pns_meta : string;
280 pns_incarnation : int;
281 pns_state : int;
282 pns_vsn : int list;
283 }
284
285 type protocol_msg =
286 | Ping of ping
287 | Indirect_ping of indirect_ping_req
288 | Ack of ack_resp
289 | Nack of nack_resp
290 | Suspect of suspect
291 | Alive of alive
292 | Dead of dead
293 | User_data of string
294 | Compound of string list
295 | Compressed of compress
296 | Err of string
297end
298
299let ip_to_bytes ip =
300 let s = Fmt.to_to_string Eio.Net.Ipaddr.pp ip in
301 if String.contains s ':' then (
302 let parts = String.split_on_char ':' s in
303 let buf = Bytes.create 16 in
304 let rec fill idx = function
305 | [] -> ()
306 | "" :: rest when List.exists (( = ) "") rest ->
307 let tail_len = List.length (List.filter (( <> ) "") rest) in
308 let zeros = 8 - idx - tail_len in
309 for i = 0 to (zeros * 2) - 1 do
310 Bytes.set_uint8 buf ((idx * 2) + i) 0
311 done;
312 fill (idx + zeros) rest
313 | "" :: rest -> fill idx rest
314 | h :: rest ->
315 let v = int_of_string ("0x" ^ h) in
316 Bytes.set_uint8 buf (idx * 2) (v lsr 8);
317 Bytes.set_uint8 buf ((idx * 2) + 1) (v land 0xff);
318 fill (idx + 1) rest
319 in
320 fill 0 parts;
321 Bytes.to_string buf)
322 else
323 Scanf.sscanf s "%d.%d.%d.%d" (fun a b c d ->
324 let buf = Bytes.create 4 in
325 Bytes.set_uint8 buf 0 a;
326 Bytes.set_uint8 buf 1 b;
327 Bytes.set_uint8 buf 2 c;
328 Bytes.set_uint8 buf 3 d;
329 Bytes.to_string buf)
330
331let ip_of_bytes s =
332 let len = String.length s in
333 if len = 4 then Eio.Net.Ipaddr.of_raw s
334 else if len = 16 then Eio.Net.Ipaddr.of_raw s
335 else failwith "invalid IP address length"
336
337let default_vsn = [ 1; 5; 5; 0; 0; 0 ]
338
339let node_info_to_wire (info : node_info) ~source_node :
340 string * int * string * string =
341 match info.addr with
342 | `Udp (ip, port) -> (ip_to_bytes ip, port, info.meta, source_node)
343 | `Unix _ -> failwith "Unix sockets not supported"
344
345let node_info_of_wire ~name ~addr ~port ~meta : node_info =
346 let ip = ip_of_bytes addr in
347 { id = node_id_of_string name; addr = `Udp (ip, port); meta }
348
349let msg_to_wire ~self_name ~self_port (msg : protocol_msg) : Wire.protocol_msg =
350 match msg with
351 | Ping { seq; target; sender } ->
352 let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in
353 Wire.Ping
354 {
355 seq_no = seq;
356 node = node_id_to_string target;
357 source_addr = addr;
358 source_port = port;
359 source_node = self_name;
360 }
361 | Ping_req { seq; target; sender } ->
362 let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in
363 let target_addr =
364 match sender.addr with `Udp (ip, _) -> ip_to_bytes ip | `Unix _ -> ""
365 in
366 Wire.Indirect_ping
367 {
368 seq_no = seq;
369 target = target_addr;
370 port = self_port;
371 node = node_id_to_string target;
372 nack = true;
373 source_addr = addr;
374 source_port = port;
375 source_node = self_name;
376 }
377 | Ack { seq; responder = _; payload } ->
378 Wire.Ack { seq_no = seq; payload = Option.value payload ~default:"" }
379 | Alive { node; incarnation } ->
380 let addr, port, meta, _ = node_info_to_wire node ~source_node:"" in
381 Wire.Alive
382 {
383 incarnation = incarnation_to_int incarnation;
384 node = node_id_to_string node.id;
385 addr;
386 port;
387 meta;
388 vsn = default_vsn;
389 }
390 | Suspect { node; incarnation; suspector } ->
391 Wire.Suspect
392 {
393 incarnation = incarnation_to_int incarnation;
394 node = node_id_to_string node;
395 from = node_id_to_string suspector;
396 }
397 | Dead { node; incarnation; declarator } ->
398 Wire.Dead
399 {
400 incarnation = incarnation_to_int incarnation;
401 node = node_id_to_string node;
402 from = node_id_to_string declarator;
403 }
404 | User_msg { topic; payload; origin } ->
405 let origin_str = node_id_to_string origin in
406 let encoded =
407 Printf.sprintf "%d:%s%d:%s%s" (String.length topic) topic
408 (String.length origin_str) origin_str payload
409 in
410 Wire.User_data encoded
411
412let msg_of_wire ~default_port (wmsg : Wire.protocol_msg) : protocol_msg option =
413 match wmsg with
414 | Wire.Ping { seq_no; node; source_addr; source_port; source_node } ->
415 let port = if source_port > 0 then source_port else default_port in
416 let ip =
417 if String.length source_addr > 0 then ip_of_bytes source_addr
418 else Eio.Net.Ipaddr.of_raw "\000\000\000\000"
419 in
420 let sender =
421 {
422 id =
423 node_id_of_string (if source_node <> "" then source_node else node);
424 addr = `Udp (ip, port);
425 meta = "";
426 }
427 in
428 Some (Ping { seq = seq_no; target = node_id_of_string node; sender })
429 | Wire.Indirect_ping
430 { seq_no; target; port; node; source_addr; source_port; source_node; _ }
431 ->
432 let src_port = if source_port > 0 then source_port else default_port in
433 let ip =
434 if String.length source_addr > 0 then ip_of_bytes source_addr
435 else Eio.Net.Ipaddr.of_raw "\000\000\000\000"
436 in
437 let sender =
438 {
439 id = node_id_of_string (if source_node <> "" then source_node else "");
440 addr = `Udp (ip, src_port);
441 meta = "";
442 }
443 in
444 let _ = target in
445 let _ = port in
446 Some (Ping_req { seq = seq_no; target = node_id_of_string node; sender })
447 | Wire.Ack { seq_no; payload } ->
448 let responder =
449 {
450 id = node_id_of_string "";
451 addr = `Udp (Eio.Net.Ipaddr.of_raw "\000\000\000\000", 0);
452 meta = "";
453 }
454 in
455 let payload = if payload = "" then None else Some payload in
456 Some (Ack { seq = seq_no; responder; payload })
457 | Wire.Alive { incarnation; node; addr; port; meta; _ } ->
458 let ip =
459 if String.length addr > 0 then ip_of_bytes addr
460 else Eio.Net.Ipaddr.of_raw "\000\000\000\000"
461 in
462 let node_info =
463 { id = node_id_of_string node; addr = `Udp (ip, port); meta }
464 in
465 Some
466 (Alive
467 { node = node_info; incarnation = incarnation_of_int incarnation })
468 | Wire.Suspect { incarnation; node; from } ->
469 Some
470 (Suspect
471 {
472 node = node_id_of_string node;
473 incarnation = incarnation_of_int incarnation;
474 suspector = node_id_of_string from;
475 })
476 | Wire.Dead { incarnation; node; from } ->
477 Some
478 (Dead
479 {
480 node = node_id_of_string node;
481 incarnation = incarnation_of_int incarnation;
482 declarator = node_id_of_string from;
483 })
484 | Wire.User_data encoded -> (
485 let parse_length s start =
486 let rec find_colon i =
487 if i >= String.length s then None
488 else if s.[i] = ':' then Some i
489 else find_colon (i + 1)
490 in
491 match find_colon start with
492 | None -> None
493 | Some colon_pos -> (
494 let len_str = String.sub s start (colon_pos - start) in
495 match int_of_string_opt len_str with
496 | None -> None
497 | Some len -> Some (len, colon_pos + 1))
498 in
499 match parse_length encoded 0 with
500 | None ->
501 Some
502 (User_msg
503 { topic = ""; payload = encoded; origin = node_id_of_string "" })
504 | Some (topic_len, topic_start) -> (
505 if topic_start + topic_len > String.length encoded then
506 Some
507 (User_msg
508 {
509 topic = "";
510 payload = encoded;
511 origin = node_id_of_string "";
512 })
513 else
514 let topic = String.sub encoded topic_start topic_len in
515 let origin_start = topic_start + topic_len in
516 match parse_length encoded origin_start with
517 | None ->
518 Some
519 (User_msg
520 { topic; payload = ""; origin = node_id_of_string "" })
521 | Some (origin_len, payload_start) ->
522 if payload_start + origin_len > String.length encoded then
523 Some
524 (User_msg
525 { topic; payload = ""; origin = node_id_of_string "" })
526 else
527 let origin = String.sub encoded payload_start origin_len in
528 let data_start = payload_start + origin_len in
529 let payload =
530 String.sub encoded data_start
531 (String.length encoded - data_start)
532 in
533 Some
534 (User_msg
535 { topic; payload; origin = node_id_of_string origin })))
536 | Wire.Nack _ -> None
537 | Wire.Compound _ -> None
538 | Wire.Compressed _ -> None
539 | Wire.Err _ -> None