open Types module Encoder = struct type t = { buf : Cstruct.t; mutable pos : int } let create ~buf = { buf; pos = 0 } let write_byte t v = Cstruct.set_uint8 t.buf t.pos v; t.pos <- t.pos + 1 let write_int16_be t v = Cstruct.BE.set_uint16 t.buf t.pos v; t.pos <- t.pos + 2 let write_int32_be t v = Cstruct.BE.set_uint32 t.buf t.pos v; t.pos <- t.pos + 4 let write_int64_be t v = Cstruct.BE.set_uint64 t.buf t.pos v; t.pos <- t.pos + 8 let write_string t s = let len = String.length s in write_int16_be t len; Cstruct.blit_from_string s 0 t.buf t.pos len; t.pos <- t.pos + len let write_bytes t cs = let len = Cstruct.length cs in Cstruct.blit cs 0 t.buf t.pos len; t.pos <- t.pos + len let to_cstruct t = Cstruct.sub t.buf 0 t.pos let reset t = t.pos <- 0 let remaining t = Cstruct.length t.buf - t.pos let pos t = t.pos end module Decoder = struct type t = { buf : Cstruct.t; mutable pos : int } let create buf = { buf; pos = 0 } let read_byte t = let v = Cstruct.get_uint8 t.buf t.pos in t.pos <- t.pos + 1; v let read_int16_be t = let v = Cstruct.BE.get_uint16 t.buf t.pos in t.pos <- t.pos + 2; v let read_int32_be t = let v = Cstruct.BE.get_uint32 t.buf t.pos in t.pos <- t.pos + 4; v let read_int64_be t = let v = Cstruct.BE.get_uint64 t.buf t.pos in t.pos <- t.pos + 8; v let read_string t = let len = read_int16_be t in let s = Cstruct.to_string ~off:t.pos ~len t.buf in t.pos <- t.pos + len; s let read_bytes t ~len = let cs = Cstruct.sub t.buf t.pos len in t.pos <- t.pos + len; cs let remaining t = Cstruct.length t.buf - t.pos let is_empty t = t.pos >= Cstruct.length t.buf let pos t = t.pos end let magic = "SWIM" let version = 1 let tag_ping = 0x01 let tag_ping_req = 0x02 let tag_ack = 0x03 let tag_alive = 0x04 let tag_suspect = 0x05 let tag_dead = 0x06 let tag_user_msg = 0x07 let ip_to_string ip = Fmt.to_to_string Eio.Net.Ipaddr.pp ip let parse_ipv4 s = Scanf.sscanf s "%d.%d.%d.%d" (fun a b c d -> let buf = Bytes.create 4 in Bytes.set_uint8 buf 0 a; Bytes.set_uint8 buf 1 b; Bytes.set_uint8 buf 2 c; Bytes.set_uint8 buf 3 d; Eio.Net.Ipaddr.of_raw (Bytes.to_string buf)) let parse_ipv6 s = let parts = String.split_on_char ':' s in let buf = Bytes.create 16 in let rec fill idx = function | [] -> () | "" :: rest when List.exists (( = ) "") rest -> let tail_len = List.length (List.filter (( <> ) "") rest) in let zeros = 8 - idx - tail_len in for i = 0 to (zeros * 2) - 1 do Bytes.set_uint8 buf ((idx * 2) + i) 0 done; fill (idx + zeros) rest | "" :: rest -> fill idx rest | h :: rest -> let v = int_of_string ("0x" ^ h) in Bytes.set_uint8 buf (idx * 2) (v lsr 8); Bytes.set_uint8 buf ((idx * 2) + 1) (v land 0xff); fill (idx + 1) rest in fill 0 parts; Eio.Net.Ipaddr.of_raw (Bytes.to_string buf) let ip_of_string s = if String.contains s ':' then parse_ipv6 s else parse_ipv4 s let encode_addr enc (addr : Eio.Net.Sockaddr.datagram) = match addr with | `Udp (ip, port) -> Encoder.write_string enc (ip_to_string ip); Encoder.write_int16_be enc port | `Unix _ -> failwith "Unix sockets not supported for SWIM protocol" let decode_addr dec : Eio.Net.Sockaddr.datagram = let ip_str = Decoder.read_string dec in let port = Decoder.read_int16_be dec in `Udp (ip_of_string ip_str, port) let encode_node_id enc (node_id : node_id) = Encoder.write_string enc (node_id_to_string node_id) let decode_node_id dec : node_id = node_id_of_string (Decoder.read_string dec) let encode_node enc (node : node_info) = encode_node_id enc node.id; encode_addr enc node.addr; Encoder.write_string enc node.meta let decode_node dec : node_info = let id = decode_node_id dec in let addr = decode_addr dec in let meta = Decoder.read_string dec in { id; addr; meta } let encode_incarnation enc (inc : incarnation) = Encoder.write_int32_be enc (Int32.of_int (incarnation_to_int inc)) let decode_incarnation dec : incarnation = incarnation_of_int (Int32.to_int (Decoder.read_int32_be dec)) let encode_option encode_elem enc = function | None -> Encoder.write_byte enc 0 | Some v -> Encoder.write_byte enc 1; encode_elem enc v let decode_option decode_elem dec = match Decoder.read_byte dec with 0 -> None | _ -> Some (decode_elem dec) let encode_msg enc msg = match msg with | Ping { seq; sender } -> Encoder.write_byte enc tag_ping; Encoder.write_int32_be enc (Int32.of_int seq); encode_node enc sender | Ping_req { seq; target; sender } -> Encoder.write_byte enc tag_ping_req; Encoder.write_int32_be enc (Int32.of_int seq); encode_node_id enc target; encode_node enc sender | Ack { seq; responder; payload } -> Encoder.write_byte enc tag_ack; Encoder.write_int32_be enc (Int32.of_int seq); encode_node enc responder; encode_option Encoder.write_string enc payload | Alive { node; incarnation } -> Encoder.write_byte enc tag_alive; encode_node enc node; encode_incarnation enc incarnation | Suspect { node; incarnation; suspector } -> Encoder.write_byte enc tag_suspect; encode_node_id enc node; encode_incarnation enc incarnation; encode_node_id enc suspector | Dead { node; incarnation; declarator } -> Encoder.write_byte enc tag_dead; encode_node_id enc node; encode_incarnation enc incarnation; encode_node_id enc declarator | User_msg { topic; payload; origin } -> Encoder.write_byte enc tag_user_msg; Encoder.write_string enc topic; Encoder.write_string enc payload; encode_node_id enc origin let decode_msg dec : (protocol_msg, decode_error) result = let tag = Decoder.read_byte dec in match tag with | t when t = tag_ping -> let seq = Int32.to_int (Decoder.read_int32_be dec) in let sender = decode_node dec in Ok (Ping { seq; sender }) | t when t = tag_ping_req -> let seq = Int32.to_int (Decoder.read_int32_be dec) in let target = decode_node_id dec in let sender = decode_node dec in Ok (Ping_req { seq; target; sender }) | t when t = tag_ack -> let seq = Int32.to_int (Decoder.read_int32_be dec) in let responder = decode_node dec in let payload = decode_option Decoder.read_string dec in Ok (Ack { seq; responder; payload }) | t when t = tag_alive -> let node = decode_node dec in let incarnation = decode_incarnation dec in Ok (Alive { node; incarnation }) | t when t = tag_suspect -> let node = decode_node_id dec in let incarnation = decode_incarnation dec in let suspector = decode_node_id dec in Ok (Suspect { node; incarnation; suspector }) | t when t = tag_dead -> let node = decode_node_id dec in let incarnation = decode_incarnation dec in let declarator = decode_node_id dec in Ok (Dead { node; incarnation; declarator }) | t when t = tag_user_msg -> let topic = Decoder.read_string dec in let payload = Decoder.read_string dec in let origin = decode_node_id dec in Ok (User_msg { topic; payload; origin }) | t -> Error (Invalid_tag t) let encode_packet packet ~buf = let enc = Encoder.create ~buf in Encoder.write_bytes enc (Cstruct.of_string magic); Encoder.write_byte enc version; Encoder.write_string enc packet.cluster; let msg_count = 1 + List.length packet.piggyback in Encoder.write_int16_be enc msg_count; encode_msg enc packet.primary; List.iter (encode_msg enc) packet.piggyback; if Encoder.remaining enc < 0 then Error `Buffer_too_small else Ok (Encoder.pos enc) let decode_packet buf : (packet, decode_error) result = let dec = Decoder.create buf in let magic_bytes = Decoder.read_bytes dec ~len:4 in if Cstruct.to_string magic_bytes <> magic then Error Invalid_magic else let ver = Decoder.read_byte dec in if ver <> version then Error (Unsupported_version ver) else let cluster = Decoder.read_string dec in let msg_count = Decoder.read_int16_be dec in let rec decode_msgs acc remaining = if remaining = 0 then Ok (List.rev acc) else match decode_msg dec with | Error e -> Error e | Ok msg -> decode_msgs (msg :: acc) (remaining - 1) in match decode_msgs [] msg_count with | Error e -> Error e | Ok [] -> Error Truncated_message | Ok (primary :: piggyback) -> Ok { cluster; primary; piggyback } let node_id_size node_id = 2 + String.length (node_id_to_string node_id) let addr_size (addr : Eio.Net.Sockaddr.datagram) = match addr with | `Udp (ip, _) -> let ip_str = ip_to_string ip in 2 + String.length ip_str + 2 | `Unix _ -> failwith "Unix sockets not supported for SWIM protocol" let node_size (node : node_info) = node_id_size node.id + addr_size node.addr + 2 + String.length node.meta let option_size f = function None -> 1 | Some v -> 1 + f v let encoded_size msg = match msg with | Ping { sender; _ } -> 1 + 4 + node_size sender | Ping_req { target; sender; _ } -> 1 + 4 + node_id_size target + node_size sender | Ack { responder; payload; _ } -> 1 + 4 + node_size responder + option_size (fun s -> 2 + String.length s) payload | Alive { node; _ } -> 1 + node_size node + 4 | Suspect { node; suspector; _ } -> 1 + node_id_size node + 4 + node_id_size suspector | Dead { node; declarator; _ } -> 1 + node_id_size node + 4 + node_id_size declarator | User_msg { topic; payload; origin } -> 1 + 2 + String.length topic + 2 + String.length payload + node_id_size origin