this repo has no description
1open Types
2
3module Encoder = struct
4 type t = { buf : Cstruct.t; mutable pos : int }
5
6 let create ~buf = { buf; pos = 0 }
7
8 let write_byte t v =
9 Cstruct.set_uint8 t.buf t.pos v;
10 t.pos <- t.pos + 1
11
12 let write_int16_be t v =
13 Cstruct.BE.set_uint16 t.buf t.pos v;
14 t.pos <- t.pos + 2
15
16 let write_int32_be t v =
17 Cstruct.BE.set_uint32 t.buf t.pos v;
18 t.pos <- t.pos + 4
19
20 let write_int64_be t v =
21 Cstruct.BE.set_uint64 t.buf t.pos v;
22 t.pos <- t.pos + 8
23
24 let write_string t s =
25 let len = String.length s in
26 write_int16_be t len;
27 Cstruct.blit_from_string s 0 t.buf t.pos len;
28 t.pos <- t.pos + len
29
30 let write_bytes t cs =
31 let len = Cstruct.length cs in
32 Cstruct.blit cs 0 t.buf t.pos len;
33 t.pos <- t.pos + len
34
35 let to_cstruct t = Cstruct.sub t.buf 0 t.pos
36 let reset t = t.pos <- 0
37 let remaining t = Cstruct.length t.buf - t.pos
38 let pos t = t.pos
39end
40
41module Decoder = struct
42 type t = { buf : Cstruct.t; mutable pos : int }
43
44 let create buf = { buf; pos = 0 }
45
46 let read_byte t =
47 let v = Cstruct.get_uint8 t.buf t.pos in
48 t.pos <- t.pos + 1;
49 v
50
51 let read_int16_be t =
52 let v = Cstruct.BE.get_uint16 t.buf t.pos in
53 t.pos <- t.pos + 2;
54 v
55
56 let read_int32_be t =
57 let v = Cstruct.BE.get_uint32 t.buf t.pos in
58 t.pos <- t.pos + 4;
59 v
60
61 let read_int64_be t =
62 let v = Cstruct.BE.get_uint64 t.buf t.pos in
63 t.pos <- t.pos + 8;
64 v
65
66 let read_string t =
67 let len = read_int16_be t in
68 let s = Cstruct.to_string ~off:t.pos ~len t.buf in
69 t.pos <- t.pos + len;
70 s
71
72 let read_bytes t ~len =
73 let cs = Cstruct.sub t.buf t.pos len in
74 t.pos <- t.pos + len;
75 cs
76
77 let remaining t = Cstruct.length t.buf - t.pos
78 let is_empty t = t.pos >= Cstruct.length t.buf
79 let pos t = t.pos
80end
81
82let magic = "SWIM"
83let version = 1
84let tag_ping = 0x01
85let tag_ping_req = 0x02
86let tag_ack = 0x03
87let tag_alive = 0x04
88let tag_suspect = 0x05
89let tag_dead = 0x06
90let tag_user_msg = 0x07
91let ip_to_string ip = Fmt.to_to_string Eio.Net.Ipaddr.pp ip
92
93let parse_ipv4 s =
94 Scanf.sscanf s "%d.%d.%d.%d" (fun a b c d ->
95 let buf = Bytes.create 4 in
96 Bytes.set_uint8 buf 0 a;
97 Bytes.set_uint8 buf 1 b;
98 Bytes.set_uint8 buf 2 c;
99 Bytes.set_uint8 buf 3 d;
100 Eio.Net.Ipaddr.of_raw (Bytes.to_string buf))
101
102let parse_ipv6 s =
103 let parts = String.split_on_char ':' s in
104 let buf = Bytes.create 16 in
105 let rec fill idx = function
106 | [] -> ()
107 | "" :: rest when List.exists (( = ) "") rest ->
108 let tail_len = List.length (List.filter (( <> ) "") rest) in
109 let zeros = 8 - idx - tail_len in
110 for i = 0 to (zeros * 2) - 1 do
111 Bytes.set_uint8 buf ((idx * 2) + i) 0
112 done;
113 fill (idx + zeros) rest
114 | "" :: rest -> fill idx rest
115 | h :: rest ->
116 let v = int_of_string ("0x" ^ h) in
117 Bytes.set_uint8 buf (idx * 2) (v lsr 8);
118 Bytes.set_uint8 buf ((idx * 2) + 1) (v land 0xff);
119 fill (idx + 1) rest
120 in
121 fill 0 parts;
122 Eio.Net.Ipaddr.of_raw (Bytes.to_string buf)
123
124let ip_of_string s =
125 if String.contains s ':' then parse_ipv6 s else parse_ipv4 s
126
127let encode_addr enc (addr : Eio.Net.Sockaddr.datagram) =
128 match addr with
129 | `Udp (ip, port) ->
130 Encoder.write_string enc (ip_to_string ip);
131 Encoder.write_int16_be enc port
132 | `Unix _ -> failwith "Unix sockets not supported for SWIM protocol"
133
134let decode_addr dec : Eio.Net.Sockaddr.datagram =
135 let ip_str = Decoder.read_string dec in
136 let port = Decoder.read_int16_be dec in
137 `Udp (ip_of_string ip_str, port)
138
139let encode_node_id enc (node_id : node_id) =
140 Encoder.write_string enc (node_id_to_string node_id)
141
142let decode_node_id dec : node_id = node_id_of_string (Decoder.read_string dec)
143
144let encode_node enc (node : node_info) =
145 encode_node_id enc node.id;
146 encode_addr enc node.addr;
147 Encoder.write_string enc node.meta
148
149let decode_node dec : node_info =
150 let id = decode_node_id dec in
151 let addr = decode_addr dec in
152 let meta = Decoder.read_string dec in
153 { id; addr; meta }
154
155let encode_incarnation enc (inc : incarnation) =
156 Encoder.write_int32_be enc (Int32.of_int (incarnation_to_int inc))
157
158let decode_incarnation dec : incarnation =
159 incarnation_of_int (Int32.to_int (Decoder.read_int32_be dec))
160
161let encode_option encode_elem enc = function
162 | None -> Encoder.write_byte enc 0
163 | Some v ->
164 Encoder.write_byte enc 1;
165 encode_elem enc v
166
167let decode_option decode_elem dec =
168 match Decoder.read_byte dec with 0 -> None | _ -> Some (decode_elem dec)
169
170let encode_msg enc msg =
171 match msg with
172 | Ping { seq; sender } ->
173 Encoder.write_byte enc tag_ping;
174 Encoder.write_int32_be enc (Int32.of_int seq);
175 encode_node enc sender
176 | Ping_req { seq; target; sender } ->
177 Encoder.write_byte enc tag_ping_req;
178 Encoder.write_int32_be enc (Int32.of_int seq);
179 encode_node_id enc target;
180 encode_node enc sender
181 | Ack { seq; responder; payload } ->
182 Encoder.write_byte enc tag_ack;
183 Encoder.write_int32_be enc (Int32.of_int seq);
184 encode_node enc responder;
185 encode_option Encoder.write_string enc payload
186 | Alive { node; incarnation } ->
187 Encoder.write_byte enc tag_alive;
188 encode_node enc node;
189 encode_incarnation enc incarnation
190 | Suspect { node; incarnation; suspector } ->
191 Encoder.write_byte enc tag_suspect;
192 encode_node_id enc node;
193 encode_incarnation enc incarnation;
194 encode_node_id enc suspector
195 | Dead { node; incarnation; declarator } ->
196 Encoder.write_byte enc tag_dead;
197 encode_node_id enc node;
198 encode_incarnation enc incarnation;
199 encode_node_id enc declarator
200 | User_msg { topic; payload; origin } ->
201 Encoder.write_byte enc tag_user_msg;
202 Encoder.write_string enc topic;
203 Encoder.write_string enc payload;
204 encode_node_id enc origin
205
206let decode_msg dec : (protocol_msg, decode_error) result =
207 let tag = Decoder.read_byte dec in
208 match tag with
209 | t when t = tag_ping ->
210 let seq = Int32.to_int (Decoder.read_int32_be dec) in
211 let sender = decode_node dec in
212 Ok (Ping { seq; sender })
213 | t when t = tag_ping_req ->
214 let seq = Int32.to_int (Decoder.read_int32_be dec) in
215 let target = decode_node_id dec in
216 let sender = decode_node dec in
217 Ok (Ping_req { seq; target; sender })
218 | t when t = tag_ack ->
219 let seq = Int32.to_int (Decoder.read_int32_be dec) in
220 let responder = decode_node dec in
221 let payload = decode_option Decoder.read_string dec in
222 Ok (Ack { seq; responder; payload })
223 | t when t = tag_alive ->
224 let node = decode_node dec in
225 let incarnation = decode_incarnation dec in
226 Ok (Alive { node; incarnation })
227 | t when t = tag_suspect ->
228 let node = decode_node_id dec in
229 let incarnation = decode_incarnation dec in
230 let suspector = decode_node_id dec in
231 Ok (Suspect { node; incarnation; suspector })
232 | t when t = tag_dead ->
233 let node = decode_node_id dec in
234 let incarnation = decode_incarnation dec in
235 let declarator = decode_node_id dec in
236 Ok (Dead { node; incarnation; declarator })
237 | t when t = tag_user_msg ->
238 let topic = Decoder.read_string dec in
239 let payload = Decoder.read_string dec in
240 let origin = decode_node_id dec in
241 Ok (User_msg { topic; payload; origin })
242 | t -> Error (Invalid_tag t)
243
244let encode_packet packet ~buf =
245 let enc = Encoder.create ~buf in
246 Encoder.write_bytes enc (Cstruct.of_string magic);
247 Encoder.write_byte enc version;
248 Encoder.write_string enc packet.cluster;
249 let msg_count = 1 + List.length packet.piggyback in
250 Encoder.write_int16_be enc msg_count;
251 encode_msg enc packet.primary;
252 List.iter (encode_msg enc) packet.piggyback;
253 if Encoder.remaining enc < 0 then Error `Buffer_too_small
254 else Ok (Encoder.pos enc)
255
256let decode_packet buf : (packet, decode_error) result =
257 let dec = Decoder.create buf in
258 let magic_bytes = Decoder.read_bytes dec ~len:4 in
259 if Cstruct.to_string magic_bytes <> magic then Error Invalid_magic
260 else
261 let ver = Decoder.read_byte dec in
262 if ver <> version then Error (Unsupported_version ver)
263 else
264 let cluster = Decoder.read_string dec in
265 let msg_count = Decoder.read_int16_be dec in
266 let rec decode_msgs acc remaining =
267 if remaining = 0 then Ok (List.rev acc)
268 else
269 match decode_msg dec with
270 | Error e -> Error e
271 | Ok msg -> decode_msgs (msg :: acc) (remaining - 1)
272 in
273 match decode_msgs [] msg_count with
274 | Error e -> Error e
275 | Ok [] -> Error Truncated_message
276 | Ok (primary :: piggyback) -> Ok { cluster; primary; piggyback }
277
278let node_id_size node_id = 2 + String.length (node_id_to_string node_id)
279
280let addr_size (addr : Eio.Net.Sockaddr.datagram) =
281 match addr with
282 | `Udp (ip, _) ->
283 let ip_str = ip_to_string ip in
284 2 + String.length ip_str + 2
285 | `Unix _ -> failwith "Unix sockets not supported for SWIM protocol"
286
287let node_size (node : node_info) =
288 node_id_size node.id + addr_size node.addr + 2 + String.length node.meta
289
290let option_size f = function None -> 1 | Some v -> 1 + f v
291
292let encoded_size msg =
293 match msg with
294 | Ping { sender; _ } -> 1 + 4 + node_size sender
295 | Ping_req { target; sender; _ } ->
296 1 + 4 + node_id_size target + node_size sender
297 | Ack { responder; payload; _ } ->
298 1 + 4 + node_size responder
299 + option_size (fun s -> 2 + String.length s) payload
300 | Alive { node; _ } -> 1 + node_size node + 4
301 | Suspect { node; suspector; _ } ->
302 1 + node_id_size node + 4 + node_id_size suspector
303 | Dead { node; declarator; _ } ->
304 1 + node_id_size node + 4 + node_id_size declarator
305 | User_msg { topic; payload; origin } ->
306 1 + 2 + String.length topic + 2 + String.length payload
307 + node_id_size origin