+1
-1
.beads/issues.jsonl
+1
-1
.beads/issues.jsonl
···
1
1
{"id":"swim-294","title":"Implement test generators (test/generators.ml)","description":"Create QCheck generators for property-based testing.\n\n## Generators to implement\n\n### Basic types\n- `gen_node_id : node_id QCheck.Gen.t`\n- `gen_incarnation : incarnation QCheck.Gen.t`\n- `gen_member_state : member_state QCheck.Gen.t`\n\n### Node types\n- `gen_node_info : node_info QCheck.Gen.t`\n - Generate valid addresses\n - Random metadata strings\n\n### Protocol messages\n- `gen_ping : protocol_msg QCheck.Gen.t`\n- `gen_ping_req : protocol_msg QCheck.Gen.t`\n- `gen_ack : protocol_msg QCheck.Gen.t`\n- `gen_alive : protocol_msg QCheck.Gen.t`\n- `gen_suspect : protocol_msg QCheck.Gen.t`\n- `gen_dead : protocol_msg QCheck.Gen.t`\n- `gen_user_msg : protocol_msg QCheck.Gen.t`\n- `gen_protocol_msg : protocol_msg QCheck.Gen.t` (uniform choice)\n\n### Packets\n- `gen_packet : packet QCheck.Gen.t`\n - Valid cluster names\n - Primary + piggyback messages\n\n### Binary data\n- `gen_cstruct : Cstruct.t QCheck.Gen.t`\n - Various sizes\n\n### Arbitrary instances\n- `arb_*` wrappers with shrinkers where useful\n\n## Design constraints\n- Use QCheck.Gen combinators\n- Generate valid data by construction\n- Include edge cases (empty strings, max values)","acceptance_criteria":"- All message types have generators\n- Generators produce valid data\n- Good distribution of test cases","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-08T18:49:22.04090675+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T20:00:13.745057699+01:00","closed_at":"2026-01-08T20:00:13.745057699+01:00","close_reason":"Implemented all QCheck generators for SWIM types","labels":["qcheck","test"],"dependencies":[{"issue_id":"swim-294","depends_on_id":"swim-td8","type":"blocks","created_at":"2026-01-08T18:49:22.044472866+01:00","created_by":"gdiazlo"},{"issue_id":"swim-294","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:49:26.910584411+01:00","created_by":"gdiazlo"}]}
2
2
{"id":"swim-461","title":"Implement crypto tests (test/test_crypto.ml)","description":"Property-based and unit tests for crypto module.\n\n## Property tests\n\n### Roundtrip\n- `test_crypto_roundtrip` - encrypt then decrypt equals original\n- Test with various data sizes\n\n### Key validation\n- `test_invalid_key_length_rejected`\n- Test 31, 32, 33 byte keys\n\n## Unit tests\n\n### Encryption\n- Test output size = input size + overhead (28 bytes)\n- Test nonce is prepended\n- Test different plaintexts produce different ciphertexts\n\n### Decryption\n- Test successful decryption\n- Test tampered ciphertext fails\n- Test truncated data fails\n- Test wrong key fails\n\n### Key initialization\n- Test valid 32-byte key\n- Test invalid lengths rejected\n\n## Security tests\n- Verify nonces are unique (probabilistic)\n- Verify ciphertext differs from plaintext\n\n## Design constraints\n- Use QCheck for property tests\n- Test all error paths\n- Don't expose key material in errors","acceptance_criteria":"- All property tests pass\n- All unit tests pass\n- Security properties verified\n- Error handling tested","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-08T18:49:51.401236876+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T20:05:13.159541271+01:00","closed_at":"2026-01-08T20:05:13.159541271+01:00","close_reason":"Implemented crypto property and unit tests - all 13 tests passing","labels":["crypto","security","test"],"dependencies":[{"issue_id":"swim-461","depends_on_id":"swim-hc9","type":"blocks","created_at":"2026-01-08T18:49:51.404483911+01:00","created_by":"gdiazlo"},{"issue_id":"swim-461","depends_on_id":"swim-294","type":"blocks","created_at":"2026-01-08T18:49:51.405793127+01:00","created_by":"gdiazlo"},{"issue_id":"swim-461","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:49:56.45969199+01:00","created_by":"gdiazlo"}]}
3
-
{"id":"swim-6ea","title":"Refactor codec to use Cstruct/Bigstringaf instead of string","description":"Current codec uses string for protocol buffers which causes unnecessary memory copies. Should use Cstruct or Bigstringaf buffers directly for zero-copy encoding/decoding. Key areas: encode_internal_msg, decode_internal_msg, Wire type conversions.","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-08T21:39:36.33328134+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T21:39:46.831991307+01:00"}
3
+
{"id":"swim-6ea","title":"Refactor codec to use Cstruct/Bigstringaf instead of string","description":"Current codec uses string for protocol buffers which causes unnecessary memory copies. Should use Cstruct or Bigstringaf buffers directly for zero-copy encoding/decoding. Key areas: encode_internal_msg, decode_internal_msg, Wire type conversions.","status":"in_progress","priority":2,"issue_type":"task","created_at":"2026-01-08T21:39:36.33328134+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T21:49:44.036256572+01:00"}
4
4
{"id":"swim-7wx","title":"Make wire protocol compatible with HashiCorp memberlist","notes":"Session progress:\n- Fixed Ping message: node field now correctly set to target (not sender)\n- Added conditional encryption based on config.encryption_enabled\n- Fixed cluster name check (memberlist uses empty string)\n- Verified unencrypted UDP ping/ack works between OCaml and Go\n- All 78 tests pass\n\nRemaining work:\n- Encryption format needs investigation (pkcs7decode error from Go)\n- May need TCP support for full join handshake\n- Buffer refactoring tracked in swim-6ea","status":"in_progress","priority":1,"issue_type":"feature","created_at":"2026-01-08T20:51:59.802585513+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T21:46:09.879921283+01:00"}
5
5
{"id":"swim-90e","title":"Implement transport.ml - Eio UDP/TCP networking","description":"Implement network transport layer using Eio.\n\n## UDP Transport\n\n### Functions\n- `create_udp_socket : Eio.Net.t -\u003e addr:string -\u003e port:int -\u003e Eio.Net.datagram_socket`\n- `send_udp : Eio.Net.datagram_socket -\u003e Eio.Net.Sockaddr.datagram -\u003e Cstruct.t -\u003e unit`\n- `recv_udp : Eio.Net.datagram_socket -\u003e Cstruct.t -\u003e (int * Eio.Net.Sockaddr.datagram)`\n\n## TCP Transport (for large payloads)\n\n### Functions\n- `create_tcp_listener : Eio.Net.t -\u003e addr:string -\u003e port:int -\u003e Eio.Net.listening_socket`\n- `connect_tcp : Eio.Net.t -\u003e addr:Eio.Net.Sockaddr.stream -\u003e timeout:float -\u003e clock:Eio.Time.clock -\u003e (Eio.Net.stream_socket, send_error) result`\n- `send_tcp : Eio.Net.stream_socket -\u003e Cstruct.t -\u003e (unit, send_error) result`\n- `recv_tcp : Eio.Net.stream_socket -\u003e Cstruct.t -\u003e (int, [`Connection_reset]) result`\n\n## Address parsing\n- `parse_addr : string -\u003e (Eio.Net.Sockaddr.datagram, [`Invalid_addr]) result`\n - Parse \"host:port\" format\n\n## Design constraints\n- Use Eio.Net for all I/O\n- No blocking except Eio primitives\n- Proper error handling via Result\n- Support for IPv4 and IPv6","acceptance_criteria":"- UDP send/recv works\n- TCP connect/send/recv works\n- Proper error handling\n- Address parsing robust","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:48:09.296035344+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:39:34.082898832+01:00","closed_at":"2026-01-08T19:39:34.082898832+01:00","close_reason":"Implemented UDP and TCP transport with Eio.Net, plus address parsing (mli skipped due to complex Eio row types)","labels":["core","eio","transport"],"dependencies":[{"issue_id":"swim-90e","depends_on_id":"swim-oun","type":"blocks","created_at":"2026-01-08T18:48:09.299855321+01:00","created_by":"gdiazlo"},{"issue_id":"swim-90e","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:48:15.52111057+01:00","created_by":"gdiazlo"}]}
6
6
{"id":"swim-don","title":"Implement benchmarks (bench/)","description":"Performance benchmarks for critical paths.\n\n## bench/bench_codec.ml\n- `bench_encode_ping` - encoding a Ping message\n- `bench_encode_packet` - full packet with piggyback\n- `bench_decode_packet` - decoding a packet\n- `bench_encoded_size` - size calculation\n\n## bench/bench_crypto.ml\n- `bench_encrypt` - encryption throughput\n- `bench_decrypt` - decryption throughput\n- `bench_key_init` - key initialization\n\n## bench/bench_throughput.ml\n- `bench_broadcast_throughput` - messages/second\n- `bench_probe_cycle` - probe cycle latency\n- `bench_concurrent_probes` - parallel probe handling\n\n## bench/bench_allocations.ml\n- `bench_probe_cycle_allocations` - count allocations per probe\n- `bench_buffer_reuse_rate` - % of buffers reused\n- `bench_message_handling_allocations` - allocations per message\n\n## Performance targets to verify\n- \u003c 5 allocations per probe cycle\n- \u003e 95% buffer reuse rate\n- \u003c 3 seconds failure detection\n- \u003e 10,000 broadcast/sec\n- \u003c 1% CPU idle, \u003c 5% under load\n\n## Design constraints\n- Use core_bench or similar\n- Warm up before measuring\n- Multiple iterations for stability\n- Report with confidence intervals","acceptance_criteria":"- All benchmarks run\n- Performance targets documented\n- Regression detection possible\n- Results reproducible","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-08T18:50:57.818433013+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T18:50:57.818433013+01:00","labels":["bench","performance"],"dependencies":[{"issue_id":"swim-don","depends_on_id":"swim-zsi","type":"blocks","created_at":"2026-01-08T18:50:57.821397737+01:00","created_by":"gdiazlo"},{"issue_id":"swim-don","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:51:03.066326187+01:00","created_by":"gdiazlo"}]}
+240
-157
lib/codec.ml
+240
-157
lib/codec.ml
···
290
290
Ok { algo; buf }
291
291
| _ -> Error "expected map for compress"
292
292
293
-
let encode_msg (msg : protocol_msg) : string =
294
-
let msg_type, payload =
295
-
match msg with
296
-
| Ping p -> (Ping_msg, encode_ping p)
297
-
| Indirect_ping p -> (Indirect_ping_msg, encode_indirect_ping p)
298
-
| Ack a -> (Ack_resp_msg, encode_ack a)
299
-
| Nack n -> (Nack_resp_msg, encode_nack n)
300
-
| Suspect s -> (Suspect_msg, encode_suspect s)
301
-
| Alive a -> (Alive_msg, encode_alive a)
302
-
| Dead d -> (Dead_msg, encode_dead d)
303
-
| User_data _ -> (User_msg, Msgpck.Nil)
304
-
| Compound _ -> (Compound_msg, Msgpck.Nil)
305
-
| Compressed c -> (Compress_msg, encode_compress c)
306
-
| Err e -> (Err_msg, Msgpck.Map [ (Msgpck.String "Error", Msgpck.String e) ])
307
-
in
308
-
let buf = Buffer.create 256 in
309
-
Buffer.add_char buf (Char.chr (message_type_to_int msg_type));
310
-
(match msg with
311
-
| User_data data -> Buffer.add_string buf data
312
-
| _ -> ignore (Msgpck.StringBuf.write buf payload));
313
-
Buffer.contents buf
293
+
let wire_msg_to_msgpck (msg : protocol_msg) : message_type * Msgpck.t =
294
+
match msg with
295
+
| Ping p -> (Ping_msg, encode_ping p)
296
+
| Indirect_ping p -> (Indirect_ping_msg, encode_indirect_ping p)
297
+
| Ack a -> (Ack_resp_msg, encode_ack a)
298
+
| Nack n -> (Nack_resp_msg, encode_nack n)
299
+
| Suspect s -> (Suspect_msg, encode_suspect s)
300
+
| Alive a -> (Alive_msg, encode_alive a)
301
+
| Dead d -> (Dead_msg, encode_dead d)
302
+
| User_data _ -> (User_msg, Msgpck.Nil)
303
+
| Compound _ -> (Compound_msg, Msgpck.Nil)
304
+
| Compressed c -> (Compress_msg, encode_compress c)
305
+
| Err e -> (Err_msg, Msgpck.Map [ (Msgpck.String "Error", Msgpck.String e) ])
314
306
315
-
let decode_msg (buf : string) : (protocol_msg, Types.decode_error) result =
316
-
if String.length buf < 1 then Error Types.Truncated_message
307
+
let encode_msg_to_cstruct (msg : protocol_msg) ~(buf : Cstruct.t) :
308
+
(int, [ `Buffer_too_small ]) result =
309
+
let msg_type, payload = wire_msg_to_msgpck msg in
310
+
let msg_type_byte = message_type_to_int msg_type in
311
+
match msg with
312
+
| User_data data ->
313
+
let total_len = 1 + String.length data in
314
+
if total_len > Cstruct.length buf then Error `Buffer_too_small
315
+
else begin
316
+
Cstruct.set_uint8 buf 0 msg_type_byte;
317
+
Cstruct.blit_from_string data 0 buf 1 (String.length data);
318
+
Ok total_len
319
+
end
320
+
| _ ->
321
+
let payload_size = Msgpck.size payload in
322
+
let total_len = 1 + payload_size in
323
+
if total_len > Cstruct.length buf then Error `Buffer_too_small
324
+
else begin
325
+
Cstruct.set_uint8 buf 0 msg_type_byte;
326
+
let payload_bytes = Bytes.create payload_size in
327
+
let _ = Msgpck.Bytes.write payload_bytes payload in
328
+
Cstruct.blit_from_bytes payload_bytes 0 buf 1 payload_size;
329
+
Ok total_len
330
+
end
331
+
332
+
let decode_msg_from_cstruct (buf : Cstruct.t) :
333
+
(protocol_msg, Types.decode_error) result =
334
+
if Cstruct.length buf < 1 then Error Types.Truncated_message
317
335
else
318
-
let msg_type_byte = Char.code buf.[0] in
336
+
let msg_type_byte = Cstruct.get_uint8 buf 0 in
319
337
match message_type_of_int msg_type_byte with
320
338
| Error n -> Error (Types.Invalid_tag n)
321
339
| Ok msg_type -> (
322
-
let payload = String.sub buf 1 (String.length buf - 1) in
340
+
let payload_len = Cstruct.length buf - 1 in
323
341
match msg_type with
324
-
| User_msg -> Ok (User_data payload)
342
+
| User_msg ->
343
+
let data = Cstruct.to_string ~off:1 ~len:payload_len buf in
344
+
Ok (User_data data)
325
345
| Compound_msg -> Ok (Compound [])
326
346
| _ -> (
327
-
let _, msgpack = Msgpck.String.read payload in
347
+
let payload_bytes = Cstruct.to_bytes ~off:1 ~len:payload_len buf in
348
+
let _, msgpack = Msgpck.Bytes.read payload_bytes in
328
349
match msg_type with
329
350
| Ping_msg -> (
330
351
match decode_ping msgpack with
···
367
388
| _ -> Ok (Err "unknown error"))
368
389
| _ -> Error (Types.Invalid_tag msg_type_byte)))
369
390
370
-
let make_compound_msg (msgs : string list) : string =
371
-
if List.length msgs > 255 then failwith "too many messages for compound"
372
-
else
373
-
let buf = Buffer.create 1024 in
374
-
Buffer.add_char buf (Char.chr (message_type_to_int Compound_msg));
375
-
Buffer.add_char buf (Char.chr (List.length msgs));
376
-
List.iter
377
-
(fun m ->
378
-
let len = String.length m in
379
-
Buffer.add_char buf (Char.chr ((len lsr 8) land 0xff));
380
-
Buffer.add_char buf (Char.chr (len land 0xff)))
381
-
msgs;
382
-
List.iter (Buffer.add_string buf) msgs;
383
-
Buffer.contents buf
384
-
385
-
let decode_compound_msg (buf : string) :
386
-
(string list * int, Types.decode_error) result =
387
-
if String.length buf < 1 then Error Types.Truncated_message
388
-
else
389
-
let num_parts = Char.code buf.[0] in
390
-
let header_size = 1 + (num_parts * 2) in
391
-
if String.length buf < header_size then Error Types.Truncated_message
392
-
else
393
-
let lengths =
394
-
List.init num_parts (fun i ->
395
-
let hi = Char.code buf.[1 + (i * 2)] in
396
-
let lo = Char.code buf.[2 + (i * 2)] in
397
-
(hi lsl 8) lor lo)
398
-
in
399
-
let rec extract_parts offset remaining_lens acc trunc =
400
-
match remaining_lens with
401
-
| [] -> Ok (List.rev acc, trunc)
402
-
| len :: rest ->
403
-
if offset + len > String.length buf then
404
-
Ok (List.rev acc, List.length remaining_lens)
405
-
else
406
-
let part = String.sub buf offset len in
407
-
extract_parts (offset + len) rest (part :: acc) trunc
408
-
in
409
-
extract_parts header_size lengths [] 0
410
-
411
391
let crc32_table =
412
392
Array.init 256 (fun i ->
413
393
let crc = ref (Int32.of_int i) in
···
418
398
done;
419
399
!crc)
420
400
421
-
let crc32 (data : string) : int32 =
401
+
let crc32_cstruct (buf : Cstruct.t) : int32 =
422
402
let crc = ref 0xFFFFFFFFl in
423
-
String.iter
424
-
(fun c ->
425
-
let byte = Char.code c in
426
-
let idx =
427
-
Int32.to_int
428
-
(Int32.logand (Int32.logxor !crc (Int32.of_int byte)) 0xFFl)
429
-
in
430
-
crc := Int32.logxor (Int32.shift_right_logical !crc 8) crc32_table.(idx))
431
-
data;
403
+
for i = 0 to Cstruct.length buf - 1 do
404
+
let byte = Cstruct.get_uint8 buf i in
405
+
let idx =
406
+
Int32.to_int (Int32.logand (Int32.logxor !crc (Int32.of_int byte)) 0xFFl)
407
+
in
408
+
crc := Int32.logxor (Int32.shift_right_logical !crc 8) crc32_table.(idx)
409
+
done;
432
410
Int32.logxor !crc 0xFFFFFFFFl
433
411
434
-
let add_crc (buf : string) : string =
435
-
let crc = crc32 buf in
436
-
let header = Bytes.create 5 in
437
-
Bytes.set header 0 (Char.chr (message_type_to_int Has_crc_msg));
438
-
Bytes.set header 1
439
-
(Char.chr (Int32.to_int (Int32.shift_right_logical crc 24) land 0xff));
440
-
Bytes.set header 2
441
-
(Char.chr (Int32.to_int (Int32.shift_right_logical crc 16) land 0xff));
442
-
Bytes.set header 3
443
-
(Char.chr (Int32.to_int (Int32.shift_right_logical crc 8) land 0xff));
444
-
Bytes.set header 4 (Char.chr (Int32.to_int crc land 0xff));
445
-
Bytes.to_string header ^ buf
412
+
let add_crc_to_cstruct ~(src : Cstruct.t) ~src_len ~(dst : Cstruct.t) :
413
+
(int, [ `Buffer_too_small ]) result =
414
+
let total_len = 5 + src_len in
415
+
if total_len > Cstruct.length dst then Error `Buffer_too_small
416
+
else begin
417
+
let payload = Cstruct.sub src 0 src_len in
418
+
let crc = crc32_cstruct payload in
419
+
Cstruct.set_uint8 dst 0 (message_type_to_int Has_crc_msg);
420
+
Cstruct.BE.set_uint32 dst 1 crc;
421
+
Cstruct.blit payload 0 dst 5 src_len;
422
+
Ok total_len
423
+
end
446
424
447
-
let verify_and_strip_crc (buf : string) : (string, Types.decode_error) result =
448
-
if String.length buf < 5 then Error Types.Truncated_message
449
-
else if Char.code buf.[0] <> message_type_to_int Has_crc_msg then Ok buf
425
+
let verify_and_strip_crc (buf : Cstruct.t) :
426
+
(Cstruct.t, Types.decode_error) result =
427
+
if Cstruct.length buf < 5 then Error Types.Truncated_message
428
+
else if Cstruct.get_uint8 buf 0 <> message_type_to_int Has_crc_msg then Ok buf
450
429
else
451
-
let expected =
452
-
Int32.logor
453
-
(Int32.logor
454
-
(Int32.shift_left (Int32.of_int (Char.code buf.[1])) 24)
455
-
(Int32.shift_left (Int32.of_int (Char.code buf.[2])) 16))
456
-
(Int32.logor
457
-
(Int32.shift_left (Int32.of_int (Char.code buf.[3])) 8)
458
-
(Int32.of_int (Char.code buf.[4])))
459
-
in
460
-
let payload = String.sub buf 5 (String.length buf - 5) in
461
-
let actual = crc32 payload in
430
+
let expected = Cstruct.BE.get_uint32 buf 1 in
431
+
let payload = Cstruct.shift buf 5 in
432
+
let actual = crc32_cstruct payload in
462
433
if expected = actual then Ok payload else Error Types.Invalid_crc
463
434
464
-
let add_label (label : string) (buf : string) : string =
465
-
if label = "" then buf
435
+
let add_label_to_cstruct ~label ~(src : Cstruct.t) ~src_len ~(dst : Cstruct.t) :
436
+
(int, [ `Buffer_too_small ]) result =
437
+
if label = "" then begin
438
+
if src_len > Cstruct.length dst then Error `Buffer_too_small
439
+
else begin
440
+
Cstruct.blit src 0 dst 0 src_len;
441
+
Ok src_len
442
+
end
443
+
end
466
444
else
467
-
let header = Bytes.create (2 + String.length label) in
468
-
Bytes.set header 0 (Char.chr (message_type_to_int Has_label_msg));
469
-
Bytes.set header 1 (Char.chr (String.length label));
470
-
Bytes.blit_string label 0 header 2 (String.length label);
471
-
Bytes.to_string header ^ buf
445
+
let label_len = String.length label in
446
+
let total_len = 2 + label_len + src_len in
447
+
if total_len > Cstruct.length dst then Error `Buffer_too_small
448
+
else begin
449
+
Cstruct.set_uint8 dst 0 (message_type_to_int Has_label_msg);
450
+
Cstruct.set_uint8 dst 1 label_len;
451
+
Cstruct.blit_from_string label 0 dst 2 label_len;
452
+
Cstruct.blit src 0 dst (2 + label_len) src_len;
453
+
Ok total_len
454
+
end
472
455
473
-
let strip_label (buf : string) : (string * string, Types.decode_error) result =
474
-
if String.length buf < 1 then Error Types.Truncated_message
475
-
else if Char.code buf.[0] <> message_type_to_int Has_label_msg then
456
+
let strip_label (buf : Cstruct.t) :
457
+
(Cstruct.t * string, Types.decode_error) result =
458
+
if Cstruct.length buf < 1 then Error Types.Truncated_message
459
+
else if Cstruct.get_uint8 buf 0 <> message_type_to_int Has_label_msg then
476
460
Ok (buf, "")
477
-
else if String.length buf < 2 then Error Types.Truncated_message
461
+
else if Cstruct.length buf < 2 then Error Types.Truncated_message
462
+
else
463
+
let label_len = Cstruct.get_uint8 buf 1 in
464
+
if Cstruct.length buf < 2 + label_len then Error Types.Truncated_message
465
+
else
466
+
let label = Cstruct.to_string ~off:2 ~len:label_len buf in
467
+
let payload = Cstruct.shift buf (2 + label_len) in
468
+
Ok (payload, label)
469
+
470
+
let encode_compound_to_cstruct ~(msgs : Cstruct.t list) ~(msg_lens : int list)
471
+
~(dst : Cstruct.t) : (int, [ `Buffer_too_small ]) result =
472
+
let num_msgs = List.length msgs in
473
+
if num_msgs > 255 then failwith "too many messages for compound"
474
+
else
475
+
let header_size = 1 + 1 + (num_msgs * 2) in
476
+
let total_payload = List.fold_left ( + ) 0 msg_lens in
477
+
let total_len = header_size + total_payload in
478
+
if total_len > Cstruct.length dst then Error `Buffer_too_small
479
+
else begin
480
+
Cstruct.set_uint8 dst 0 (message_type_to_int Compound_msg);
481
+
Cstruct.set_uint8 dst 1 num_msgs;
482
+
List.iteri
483
+
(fun i len -> Cstruct.BE.set_uint16 dst (2 + (i * 2)) len)
484
+
msg_lens;
485
+
let offset = ref header_size in
486
+
List.iter2
487
+
(fun msg len ->
488
+
Cstruct.blit msg 0 dst !offset len;
489
+
offset := !offset + len)
490
+
msgs msg_lens;
491
+
Ok total_len
492
+
end
493
+
494
+
let decode_compound_from_cstruct (buf : Cstruct.t) :
495
+
(Cstruct.t list * int, Types.decode_error) result =
496
+
if Cstruct.length buf < 1 then Error Types.Truncated_message
478
497
else
479
-
let label_len = Char.code buf.[1] in
480
-
if String.length buf < 2 + label_len then Error Types.Truncated_message
498
+
let num_parts = Cstruct.get_uint8 buf 0 in
499
+
let header_size = 1 + (num_parts * 2) in
500
+
if Cstruct.length buf < header_size then Error Types.Truncated_message
481
501
else
482
-
let label = String.sub buf 2 label_len in
483
-
let payload =
484
-
String.sub buf (2 + label_len) (String.length buf - 2 - label_len)
502
+
let lengths =
503
+
List.init num_parts (fun i -> Cstruct.BE.get_uint16 buf (1 + (i * 2)))
504
+
in
505
+
let rec extract_parts offset remaining_lens acc trunc =
506
+
match remaining_lens with
507
+
| [] -> Ok (List.rev acc, trunc)
508
+
| len :: rest ->
509
+
if offset + len > Cstruct.length buf then
510
+
Ok (List.rev acc, List.length remaining_lens)
511
+
else
512
+
let part = Cstruct.sub buf offset len in
513
+
extract_parts (offset + len) rest (part :: acc) trunc
485
514
in
486
-
Ok (payload, label)
515
+
extract_parts header_size lengths [] 0
487
516
488
-
let encode_internal_msg ~self_name ~self_port (msg : Types.protocol_msg) :
489
-
string =
517
+
let encode_internal_msg_to_cstruct ~self_name ~self_port
518
+
(msg : Types.protocol_msg) ~(buf : Cstruct.t) :
519
+
(int, [ `Buffer_too_small ]) result =
490
520
let wire_msg = Types.msg_to_wire ~self_name ~self_port msg in
491
-
encode_msg wire_msg
521
+
encode_msg_to_cstruct wire_msg ~buf
492
522
493
-
let decode_internal_msg ~default_port (buf : string) :
523
+
let decode_internal_msg_from_cstruct ~default_port (buf : Cstruct.t) :
494
524
(Types.protocol_msg, Types.decode_error) result =
495
-
match decode_msg buf with
525
+
match decode_msg_from_cstruct buf with
496
526
| Error e -> Error e
497
527
| Ok wire_msg -> (
498
528
match Types.msg_of_wire ~default_port wire_msg with
···
503
533
(int, [ `Buffer_too_small ]) result =
504
534
let self_name = packet.cluster in
505
535
let self_port = 7946 in
506
-
let primary_encoded =
507
-
encode_internal_msg ~self_name ~self_port packet.primary
508
-
in
509
536
match packet.piggyback with
510
537
| [] ->
511
-
let total_len = String.length primary_encoded in
512
-
if total_len > Cstruct.length buf then Error `Buffer_too_small
513
-
else begin
514
-
Cstruct.blit_from_string primary_encoded 0 buf 0 total_len;
515
-
Ok total_len
516
-
end
517
-
| piggyback ->
518
-
let piggyback_encoded =
519
-
List.map (encode_internal_msg ~self_name ~self_port) piggyback
538
+
encode_internal_msg_to_cstruct ~self_name ~self_port packet.primary ~buf
539
+
| piggyback -> (
540
+
let encode_one msg =
541
+
let temp_buf = Cstruct.create 2048 in
542
+
match
543
+
encode_internal_msg_to_cstruct ~self_name ~self_port msg ~buf:temp_buf
544
+
with
545
+
| Error _ -> None
546
+
| Ok len -> Some (Cstruct.sub temp_buf 0 len, len)
520
547
in
521
-
let compound = make_compound_msg (primary_encoded :: piggyback_encoded) in
522
-
let total_len = String.length compound in
523
-
if total_len > Cstruct.length buf then Error `Buffer_too_small
524
-
else begin
525
-
Cstruct.blit_from_string compound 0 buf 0 total_len;
526
-
Ok total_len
527
-
end
548
+
let primary_result = encode_one packet.primary in
549
+
let piggyback_results = List.filter_map encode_one piggyback in
550
+
match primary_result with
551
+
| None -> Error `Buffer_too_small
552
+
| Some (primary_cs, primary_len) ->
553
+
let all_msgs = primary_cs :: List.map fst piggyback_results in
554
+
let all_lens = primary_len :: List.map snd piggyback_results in
555
+
encode_compound_to_cstruct ~msgs:all_msgs ~msg_lens:all_lens ~dst:buf)
528
556
529
557
let decode_packet (buf : Cstruct.t) : (Types.packet, Types.decode_error) result
530
558
=
531
-
let str = Cstruct.to_string buf in
532
-
if String.length str < 1 then Error Types.Truncated_message
559
+
if Cstruct.length buf < 1 then Error Types.Truncated_message
533
560
else
534
-
let msg_type = Char.code str.[0] in
561
+
let msg_type = Cstruct.get_uint8 buf 0 in
535
562
if msg_type = message_type_to_int Compound_msg then
536
-
let payload = String.sub str 1 (String.length str - 1) in
537
-
match decode_compound_msg payload with
563
+
let payload = Cstruct.shift buf 1 in
564
+
match decode_compound_from_cstruct payload with
538
565
| Error e -> Error e
539
566
| Ok (parts, _truncated) -> (
540
567
match parts with
541
568
| [] -> Error Types.Truncated_message
542
569
| first :: rest -> (
543
-
match decode_internal_msg ~default_port:7946 first with
570
+
match
571
+
decode_internal_msg_from_cstruct ~default_port:7946 first
572
+
with
544
573
| Error e -> Error e
545
574
| Ok primary ->
546
575
let piggyback =
547
576
List.filter_map
548
577
(fun p ->
549
-
match decode_internal_msg ~default_port:7946 p with
578
+
match
579
+
decode_internal_msg_from_cstruct ~default_port:7946 p
580
+
with
550
581
| Ok m -> Some m
551
582
| Error _ -> None)
552
583
rest
553
584
in
554
585
Ok { Types.cluster = ""; primary; piggyback }))
555
586
else
556
-
match decode_internal_msg ~default_port:7946 str with
587
+
match decode_internal_msg_from_cstruct ~default_port:7946 buf with
557
588
| Error e -> Error e
558
589
| Ok primary -> Ok { Types.cluster = ""; primary; piggyback = [] }
559
590
560
591
let encoded_size (msg : Types.protocol_msg) : int =
561
-
let self_name = "" in
562
-
let self_port = 7946 in
563
-
let encoded = encode_internal_msg ~self_name ~self_port msg in
564
-
String.length encoded + 3
592
+
let wire_msg = Types.msg_to_wire ~self_name:"" ~self_port:7946 msg in
593
+
let _, payload = wire_msg_to_msgpck wire_msg in
594
+
1 + Msgpck.size payload + 3
595
+
596
+
let encode_internal_msg ~self_name ~self_port (msg : Types.protocol_msg) :
597
+
string =
598
+
let buf = Cstruct.create 2048 in
599
+
match encode_internal_msg_to_cstruct ~self_name ~self_port msg ~buf with
600
+
| Error _ -> ""
601
+
| Ok len -> Cstruct.to_string ~off:0 ~len buf
602
+
603
+
(* Backward-compatible string wrappers for tests *)
604
+
605
+
let add_crc (data : string) : string =
606
+
let src = Cstruct.of_string data in
607
+
let dst = Cstruct.create (5 + String.length data) in
608
+
match add_crc_to_cstruct ~src ~src_len:(String.length data) ~dst with
609
+
| Error _ -> data
610
+
| Ok len -> Cstruct.to_string ~off:0 ~len dst
611
+
612
+
let verify_and_strip_crc_string (data : string) :
613
+
(string, Types.decode_error) result =
614
+
let buf = Cstruct.of_string data in
615
+
match verify_and_strip_crc buf with
616
+
| Error e -> Error e
617
+
| Ok cs -> Ok (Cstruct.to_string cs)
618
+
619
+
let add_label (label : string) (data : string) : string =
620
+
let src = Cstruct.of_string data in
621
+
let dst = Cstruct.create (2 + String.length label + String.length data) in
622
+
match add_label_to_cstruct ~label ~src ~src_len:(String.length data) ~dst with
623
+
| Error _ -> data
624
+
| Ok len -> Cstruct.to_string ~off:0 ~len dst
625
+
626
+
let strip_label_string (data : string) :
627
+
(string * string, Types.decode_error) result =
628
+
let buf = Cstruct.of_string data in
629
+
match strip_label buf with
630
+
| Error e -> Error e
631
+
| Ok (cs, label) -> Ok (Cstruct.to_string cs, label)
632
+
633
+
let make_compound_msg (msgs : string list) : string =
634
+
let css = List.map Cstruct.of_string msgs in
635
+
let lens = List.map String.length msgs in
636
+
let total_len = 2 + (List.length msgs * 2) + List.fold_left ( + ) 0 lens in
637
+
let dst = Cstruct.create total_len in
638
+
match encode_compound_to_cstruct ~msgs:css ~msg_lens:lens ~dst with
639
+
| Error _ -> ""
640
+
| Ok len -> Cstruct.to_string ~off:0 ~len dst
641
+
642
+
let decode_compound_msg (data : string) :
643
+
(string list * int, Types.decode_error) result =
644
+
let buf = Cstruct.of_string data in
645
+
match decode_compound_from_cstruct buf with
646
+
| Error e -> Error e
647
+
| Ok (css, trunc) -> Ok (List.map Cstruct.to_string css, trunc)
+3
-3
test/test_codec.ml
+3
-3
test/test_codec.ml
···
120
120
let test_crc_roundtrip () =
121
121
let data = "hello world" in
122
122
let with_crc = add_crc data in
123
-
match verify_and_strip_crc with_crc with
123
+
match verify_and_strip_crc_string with_crc with
124
124
| Ok stripped -> Alcotest.(check string) "stripped" data stripped
125
125
| Error _ -> Alcotest.fail "CRC verification failed"
126
126
···
129
129
let with_crc = add_crc data in
130
130
let corrupted = Bytes.of_string with_crc in
131
131
Bytes.set corrupted 6 '\xFF';
132
-
match verify_and_strip_crc (Bytes.to_string corrupted) with
132
+
match verify_and_strip_crc_string (Bytes.to_string corrupted) with
133
133
| Error Invalid_crc -> ()
134
134
| _ -> Alcotest.fail "expected CRC error"
135
135
···
137
137
let label = "my-label" in
138
138
let data = "payload data" in
139
139
let with_label = add_label label data in
140
-
match strip_label with_label with
140
+
match strip_label_string with_label with
141
141
| Ok (stripped, extracted_label) ->
142
142
Alcotest.(check string) "payload" data stripped;
143
143
Alcotest.(check string) "label" label extracted_label