this repo has no description
1open Types 2 3module Encoder = struct 4 type t = { buf : Cstruct.t; mutable pos : int } 5 6 let create ~buf = { buf; pos = 0 } 7 8 let write_byte t v = 9 Cstruct.set_uint8 t.buf t.pos v; 10 t.pos <- t.pos + 1 11 12 let write_int16_be t v = 13 Cstruct.BE.set_uint16 t.buf t.pos v; 14 t.pos <- t.pos + 2 15 16 let write_int32_be t v = 17 Cstruct.BE.set_uint32 t.buf t.pos v; 18 t.pos <- t.pos + 4 19 20 let write_int64_be t v = 21 Cstruct.BE.set_uint64 t.buf t.pos v; 22 t.pos <- t.pos + 8 23 24 let write_string t s = 25 let len = String.length s in 26 write_int16_be t len; 27 Cstruct.blit_from_string s 0 t.buf t.pos len; 28 t.pos <- t.pos + len 29 30 let write_bytes t cs = 31 let len = Cstruct.length cs in 32 Cstruct.blit cs 0 t.buf t.pos len; 33 t.pos <- t.pos + len 34 35 let to_cstruct t = Cstruct.sub t.buf 0 t.pos 36 let reset t = t.pos <- 0 37 let remaining t = Cstruct.length t.buf - t.pos 38 let pos t = t.pos 39end 40 41module Decoder = struct 42 type t = { buf : Cstruct.t; mutable pos : int } 43 44 let create buf = { buf; pos = 0 } 45 46 let read_byte t = 47 let v = Cstruct.get_uint8 t.buf t.pos in 48 t.pos <- t.pos + 1; 49 v 50 51 let read_int16_be t = 52 let v = Cstruct.BE.get_uint16 t.buf t.pos in 53 t.pos <- t.pos + 2; 54 v 55 56 let read_int32_be t = 57 let v = Cstruct.BE.get_uint32 t.buf t.pos in 58 t.pos <- t.pos + 4; 59 v 60 61 let read_int64_be t = 62 let v = Cstruct.BE.get_uint64 t.buf t.pos in 63 t.pos <- t.pos + 8; 64 v 65 66 let read_string t = 67 let len = read_int16_be t in 68 let s = Cstruct.to_string ~off:t.pos ~len t.buf in 69 t.pos <- t.pos + len; 70 s 71 72 let read_bytes t ~len = 73 let cs = Cstruct.sub t.buf t.pos len in 74 t.pos <- t.pos + len; 75 cs 76 77 let remaining t = Cstruct.length t.buf - t.pos 78 let is_empty t = t.pos >= Cstruct.length t.buf 79 let pos t = t.pos 80end 81 82let magic = "SWIM" 83let version = 1 84let tag_ping = 0x01 85let tag_ping_req = 0x02 86let tag_ack = 0x03 87let tag_alive = 0x04 88let tag_suspect = 0x05 89let tag_dead = 0x06 90let tag_user_msg = 0x07 91let ip_to_string ip = Fmt.to_to_string Eio.Net.Ipaddr.pp ip 92 93let parse_ipv4 s = 94 Scanf.sscanf s "%d.%d.%d.%d" (fun a b c d -> 95 let buf = Bytes.create 4 in 96 Bytes.set_uint8 buf 0 a; 97 Bytes.set_uint8 buf 1 b; 98 Bytes.set_uint8 buf 2 c; 99 Bytes.set_uint8 buf 3 d; 100 Eio.Net.Ipaddr.of_raw (Bytes.to_string buf)) 101 102let parse_ipv6 s = 103 let parts = String.split_on_char ':' s in 104 let buf = Bytes.create 16 in 105 let rec fill idx = function 106 | [] -> () 107 | "" :: rest when List.exists (( = ) "") rest -> 108 let tail_len = List.length (List.filter (( <> ) "") rest) in 109 let zeros = 8 - idx - tail_len in 110 for i = 0 to (zeros * 2) - 1 do 111 Bytes.set_uint8 buf ((idx * 2) + i) 0 112 done; 113 fill (idx + zeros) rest 114 | "" :: rest -> fill idx rest 115 | h :: rest -> 116 let v = int_of_string ("0x" ^ h) in 117 Bytes.set_uint8 buf (idx * 2) (v lsr 8); 118 Bytes.set_uint8 buf ((idx * 2) + 1) (v land 0xff); 119 fill (idx + 1) rest 120 in 121 fill 0 parts; 122 Eio.Net.Ipaddr.of_raw (Bytes.to_string buf) 123 124let ip_of_string s = 125 if String.contains s ':' then parse_ipv6 s else parse_ipv4 s 126 127let encode_addr enc (addr : Eio.Net.Sockaddr.datagram) = 128 match addr with 129 | `Udp (ip, port) -> 130 Encoder.write_string enc (ip_to_string ip); 131 Encoder.write_int16_be enc port 132 | `Unix _ -> failwith "Unix sockets not supported for SWIM protocol" 133 134let decode_addr dec : Eio.Net.Sockaddr.datagram = 135 let ip_str = Decoder.read_string dec in 136 let port = Decoder.read_int16_be dec in 137 `Udp (ip_of_string ip_str, port) 138 139let encode_node_id enc (node_id : node_id) = 140 Encoder.write_string enc (node_id_to_string node_id) 141 142let decode_node_id dec : node_id = node_id_of_string (Decoder.read_string dec) 143 144let encode_node enc (node : node_info) = 145 encode_node_id enc node.id; 146 encode_addr enc node.addr; 147 Encoder.write_string enc node.meta 148 149let decode_node dec : node_info = 150 let id = decode_node_id dec in 151 let addr = decode_addr dec in 152 let meta = Decoder.read_string dec in 153 { id; addr; meta } 154 155let encode_incarnation enc (inc : incarnation) = 156 Encoder.write_int32_be enc (Int32.of_int (incarnation_to_int inc)) 157 158let decode_incarnation dec : incarnation = 159 incarnation_of_int (Int32.to_int (Decoder.read_int32_be dec)) 160 161let encode_option encode_elem enc = function 162 | None -> Encoder.write_byte enc 0 163 | Some v -> 164 Encoder.write_byte enc 1; 165 encode_elem enc v 166 167let decode_option decode_elem dec = 168 match Decoder.read_byte dec with 0 -> None | _ -> Some (decode_elem dec) 169 170let encode_msg enc msg = 171 match msg with 172 | Ping { seq; sender } -> 173 Encoder.write_byte enc tag_ping; 174 Encoder.write_int32_be enc (Int32.of_int seq); 175 encode_node enc sender 176 | Ping_req { seq; target; sender } -> 177 Encoder.write_byte enc tag_ping_req; 178 Encoder.write_int32_be enc (Int32.of_int seq); 179 encode_node_id enc target; 180 encode_node enc sender 181 | Ack { seq; responder; payload } -> 182 Encoder.write_byte enc tag_ack; 183 Encoder.write_int32_be enc (Int32.of_int seq); 184 encode_node enc responder; 185 encode_option Encoder.write_string enc payload 186 | Alive { node; incarnation } -> 187 Encoder.write_byte enc tag_alive; 188 encode_node enc node; 189 encode_incarnation enc incarnation 190 | Suspect { node; incarnation; suspector } -> 191 Encoder.write_byte enc tag_suspect; 192 encode_node_id enc node; 193 encode_incarnation enc incarnation; 194 encode_node_id enc suspector 195 | Dead { node; incarnation; declarator } -> 196 Encoder.write_byte enc tag_dead; 197 encode_node_id enc node; 198 encode_incarnation enc incarnation; 199 encode_node_id enc declarator 200 | User_msg { topic; payload; origin } -> 201 Encoder.write_byte enc tag_user_msg; 202 Encoder.write_string enc topic; 203 Encoder.write_string enc payload; 204 encode_node_id enc origin 205 206let decode_msg dec : (protocol_msg, decode_error) result = 207 let tag = Decoder.read_byte dec in 208 match tag with 209 | t when t = tag_ping -> 210 let seq = Int32.to_int (Decoder.read_int32_be dec) in 211 let sender = decode_node dec in 212 Ok (Ping { seq; sender }) 213 | t when t = tag_ping_req -> 214 let seq = Int32.to_int (Decoder.read_int32_be dec) in 215 let target = decode_node_id dec in 216 let sender = decode_node dec in 217 Ok (Ping_req { seq; target; sender }) 218 | t when t = tag_ack -> 219 let seq = Int32.to_int (Decoder.read_int32_be dec) in 220 let responder = decode_node dec in 221 let payload = decode_option Decoder.read_string dec in 222 Ok (Ack { seq; responder; payload }) 223 | t when t = tag_alive -> 224 let node = decode_node dec in 225 let incarnation = decode_incarnation dec in 226 Ok (Alive { node; incarnation }) 227 | t when t = tag_suspect -> 228 let node = decode_node_id dec in 229 let incarnation = decode_incarnation dec in 230 let suspector = decode_node_id dec in 231 Ok (Suspect { node; incarnation; suspector }) 232 | t when t = tag_dead -> 233 let node = decode_node_id dec in 234 let incarnation = decode_incarnation dec in 235 let declarator = decode_node_id dec in 236 Ok (Dead { node; incarnation; declarator }) 237 | t when t = tag_user_msg -> 238 let topic = Decoder.read_string dec in 239 let payload = Decoder.read_string dec in 240 let origin = decode_node_id dec in 241 Ok (User_msg { topic; payload; origin }) 242 | t -> Error (Invalid_tag t) 243 244let encode_packet packet ~buf = 245 let enc = Encoder.create ~buf in 246 Encoder.write_bytes enc (Cstruct.of_string magic); 247 Encoder.write_byte enc version; 248 Encoder.write_string enc packet.cluster; 249 let msg_count = 1 + List.length packet.piggyback in 250 Encoder.write_int16_be enc msg_count; 251 encode_msg enc packet.primary; 252 List.iter (encode_msg enc) packet.piggyback; 253 if Encoder.remaining enc < 0 then Error `Buffer_too_small 254 else Ok (Encoder.pos enc) 255 256let decode_packet buf : (packet, decode_error) result = 257 let dec = Decoder.create buf in 258 let magic_bytes = Decoder.read_bytes dec ~len:4 in 259 if Cstruct.to_string magic_bytes <> magic then Error Invalid_magic 260 else 261 let ver = Decoder.read_byte dec in 262 if ver <> version then Error (Unsupported_version ver) 263 else 264 let cluster = Decoder.read_string dec in 265 let msg_count = Decoder.read_int16_be dec in 266 let rec decode_msgs acc remaining = 267 if remaining = 0 then Ok (List.rev acc) 268 else 269 match decode_msg dec with 270 | Error e -> Error e 271 | Ok msg -> decode_msgs (msg :: acc) (remaining - 1) 272 in 273 match decode_msgs [] msg_count with 274 | Error e -> Error e 275 | Ok [] -> Error Truncated_message 276 | Ok (primary :: piggyback) -> Ok { cluster; primary; piggyback } 277 278let node_id_size node_id = 2 + String.length (node_id_to_string node_id) 279 280let addr_size (addr : Eio.Net.Sockaddr.datagram) = 281 match addr with 282 | `Udp (ip, _) -> 283 let ip_str = ip_to_string ip in 284 2 + String.length ip_str + 2 285 | `Unix _ -> failwith "Unix sockets not supported for SWIM protocol" 286 287let node_size (node : node_info) = 288 node_id_size node.id + addr_size node.addr + 2 + String.length node.meta 289 290let option_size f = function None -> 1 | Some v -> 1 + f v 291 292let encoded_size msg = 293 match msg with 294 | Ping { sender; _ } -> 1 + 4 + node_size sender 295 | Ping_req { target; sender; _ } -> 296 1 + 4 + node_id_size target + node_size sender 297 | Ack { responder; payload; _ } -> 298 1 + 4 + node_size responder 299 + option_size (fun s -> 2 + String.length s) payload 300 | Alive { node; _ } -> 1 + node_size node + 4 301 | Suspect { node; suspector; _ } -> 302 1 + node_id_size node + 4 + node_id_size suspector 303 | Dead { node; declarator; _ } -> 304 1 + node_id_size node + 4 + node_id_size declarator 305 | User_msg { topic; payload; origin } -> 306 1 + 2 + String.length topic + 2 + String.length payload 307 + node_id_size origin