this repo has no description

feat(tcp): add TCP listener for memberlist Join() compatibility

- Add TCP listener creation in transport.ml and protocol.ml
- Implement pushPull message handling for TCP state sync
- Add push_pull_header and push_node_state types to Wire module
- Add encode/decode functions for pushPull messages in codec.ml
- Wire up TCP listener fiber in swim.ml Cluster.start
- Disable compression in Go interop test (memberlist uses LZ4)

This enables Go memberlist nodes to Join() to our OCaml SWIM cluster
via TCP pushPull state exchange.

+1 -1
.beads/issues.jsonl
··· 6 6 {"id":"swim-don","title":"Implement benchmarks (bench/)","description":"Performance benchmarks for critical paths.\n\n## bench/bench_codec.ml\n- `bench_encode_ping` - encoding a Ping message\n- `bench_encode_packet` - full packet with piggyback\n- `bench_decode_packet` - decoding a packet\n- `bench_encoded_size` - size calculation\n\n## bench/bench_crypto.ml\n- `bench_encrypt` - encryption throughput\n- `bench_decrypt` - decryption throughput\n- `bench_key_init` - key initialization\n\n## bench/bench_throughput.ml\n- `bench_broadcast_throughput` - messages/second\n- `bench_probe_cycle` - probe cycle latency\n- `bench_concurrent_probes` - parallel probe handling\n\n## bench/bench_allocations.ml\n- `bench_probe_cycle_allocations` - count allocations per probe\n- `bench_buffer_reuse_rate` - % of buffers reused\n- `bench_message_handling_allocations` - allocations per message\n\n## Performance targets to verify\n- \u003c 5 allocations per probe cycle\n- \u003e 95% buffer reuse rate\n- \u003c 3 seconds failure detection\n- \u003e 10,000 broadcast/sec\n- \u003c 1% CPU idle, \u003c 5% under load\n\n## Design constraints\n- Use core_bench or similar\n- Warm up before measuring\n- Multiple iterations for stability\n- Report with confidence intervals","acceptance_criteria":"- All benchmarks run\n- Performance targets documented\n- Regression detection possible\n- Results reproducible","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-08T18:50:57.818433013+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T18:50:57.818433013+01:00","labels":["bench","performance"],"dependencies":[{"issue_id":"swim-don","depends_on_id":"swim-zsi","type":"blocks","created_at":"2026-01-08T18:50:57.821397737+01:00","created_by":"gdiazlo"},{"issue_id":"swim-don","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:51:03.066326187+01:00","created_by":"gdiazlo"}]} 7 7 {"id":"swim-etm","title":"Implement pending_acks.ml - Ack tracking with promises","description":"Implement pending ack tracking for probe responses.\n\n## Pending_acks module\n```ocaml\ntype waiter = {\n promise : string option Eio.Promise.t;\n resolver : string option Eio.Promise.u;\n}\n\ntype t = {\n table : (int, waiter) Kcas_data.Hashtbl.t;\n}\n```\n\n### Functions\n- `create : unit -\u003e t`\n\n- `register : t -\u003e seq:int -\u003e waiter`\n - Create promise/resolver pair\n - Store in hashtable keyed by sequence number\n - Return waiter handle\n\n- `complete : t -\u003e seq:int -\u003e payload:string option -\u003e bool`\n - Find waiter by seq\n - Resolve promise with payload\n - Remove from table\n - Return true if found\n\n- `wait : waiter -\u003e timeout:float -\u003e clock:Eio.Time.clock -\u003e string option option`\n - Wait for promise with timeout\n - Return Some payload on success\n - Return None on timeout\n\n- `cancel : t -\u003e seq:int -\u003e unit`\n - Remove waiter from table\n - Called on timeout to cleanup\n\n## Design constraints\n- Use Eio.Promise for async waiting\n- Use Eio.Time.with_timeout for timeouts\n- Lock-free via Kcas_data.Hashtbl\n- Cleanup on timeout to prevent leaks","acceptance_criteria":"- Acks properly matched to probes\n- Timeouts work correctly\n- No memory leaks on timeout\n- Concurrent completion safe","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:47:51.390307674+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:35:56.984403853+01:00","closed_at":"2026-01-08T19:35:56.984403853+01:00","close_reason":"Implemented pending_acks with Eio.Promise for async waiting and Kcas_data.Hashtbl for lock-free storage","labels":["core","kcas","protocol"],"dependencies":[{"issue_id":"swim-etm","depends_on_id":"swim-oun","type":"blocks","created_at":"2026-01-08T18:47:51.394677184+01:00","created_by":"gdiazlo"},{"issue_id":"swim-etm","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:47:57.657173744+01:00","created_by":"gdiazlo"}]} 8 8 {"id":"swim-fac","title":"Implement protocol_pure.ml - Pure SWIM state transitions","description":"Implement pure (no effects) SWIM protocol logic for state transitions.\n\n## Core abstraction\n```ocaml\ntype 'a transition = {\n new_state : 'a;\n broadcasts : protocol_msg list;\n events : node_event list;\n}\n```\n\n## State transition functions\n- `handle_alive : member_state -\u003e alive_msg -\u003e now:float -\u003e member_state transition`\n- `handle_suspect : member_state -\u003e suspect_msg -\u003e now:float -\u003e member_state transition`\n- `handle_dead : member_state -\u003e dead_msg -\u003e now:float -\u003e member_state transition`\n- `handle_ack : probe_state -\u003e ack_msg -\u003e probe_state transition`\n\n## Timeout calculations\n- `suspicion_timeout : config -\u003e node_count:int -\u003e float`\n - Based on suspicion_mult and log(node_count)\n - Capped by suspicion_max_timeout\n\n## Probe target selection\n- `next_probe_target : probe_index:int -\u003e members:node list -\u003e (node * int) option`\n - Round-robin with wraparound\n - Skip self\n\n## Message invalidation (for queue pruning)\n- `invalidates : protocol_msg -\u003e protocol_msg -\u003e bool`\n - Alive invalidates Suspect for same node with \u003e= incarnation\n - Dead invalidates everything for same node\n - Suspect invalidates older Suspect\n\n## State merging\n- `merge_member_state : local:member_state -\u003e remote:member_state -\u003e member_state`\n - CRDT-style merge based on incarnation\n - Dead is final (tombstone)\n - Higher incarnation wins\n\n## Retransmit calculation\n- `retransmit_limit : config -\u003e node_count:int -\u003e int`\n - Based on retransmit_mult * ceil(log(node_count + 1))\n\n## Design constraints\n- PURE functions only - no I/O, no time, no randomness\n- All inputs explicit\n- Exhaustive pattern matching\n- Fully testable with property-based tests","acceptance_criteria":"- All functions are pure (no effects)\n- Property-based tests for SWIM invariants\n- Incarnation ordering correct\n- Suspicion timeout formula matches SWIM paper","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:46:48.400928801+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:29:29.816719466+01:00","closed_at":"2026-01-08T19:29:29.816719466+01:00","close_reason":"Implemented all pure SWIM state transitions: handle_alive, handle_suspect, handle_dead, suspicion_timeout, retransmit_limit, next_probe_target, invalidates, merge_member_state, select_indirect_targets","labels":["core","protocol","pure"],"dependencies":[{"issue_id":"swim-fac","depends_on_id":"swim-td8","type":"blocks","created_at":"2026-01-08T18:46:48.40501031+01:00","created_by":"gdiazlo"},{"issue_id":"swim-fac","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:46:52.770706917+01:00","created_by":"gdiazlo"}]} 9 - {"id":"swim-ffw","title":"Add TCP listener for memberlist Join() compatibility","description":"Memberlist uses TCP for the initial Join() pushPull state sync.\nCurrently OCaml SWIM only has UDP, so memberlist nodes cannot Join() to us.\n\nRequirements:\n1. TCP listener on bind_port (same as UDP)\n2. Handle pushPull state exchange messages\n3. Support encrypted TCP connections\n\nWire format for TCP is same as UDP but with length prefix.\n\nReference: hashicorp/memberlist net.go sendAndReceiveState()","status":"open","priority":2,"issue_type":"feature","created_at":"2026-01-08T22:21:40.02285377+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T22:21:49.825890017+01:00"} 9 + {"id":"swim-ffw","title":"Add TCP listener for memberlist Join() compatibility","description":"Memberlist uses TCP for the initial Join() pushPull state sync.\nCurrently OCaml SWIM only has UDP, so memberlist nodes cannot Join() to us.\n\nRequirements:\n1. TCP listener on bind_port (same as UDP)\n2. Handle pushPull state exchange messages\n3. Support encrypted TCP connections\n\nWire format for TCP is same as UDP but with length prefix.\n\nReference: hashicorp/memberlist net.go sendAndReceiveState()","status":"closed","priority":2,"issue_type":"feature","created_at":"2026-01-08T22:21:40.02285377+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T22:43:27.425951418+01:00","closed_at":"2026-01-08T22:43:27.425951418+01:00","close_reason":"Implemented TCP listener for memberlist Join() compatibility"} 10 10 {"id":"swim-hc9","title":"Implement crypto.ml - AES-256-GCM encryption","description":"Implement encryption layer using mirage-crypto for AES-256-GCM.\n\n## Constants\n- `nonce_size = 12`\n- `tag_size = 16`\n- `overhead = nonce_size + tag_size` (28 bytes)\n\n## Functions\n\n### Key initialization\n- `init_key : string -\u003e (key, [`Invalid_key_length]) result`\n- Must be exactly 32 bytes for AES-256\n\n### Encryption\n- `encrypt : key -\u003e Cstruct.t -\u003e Cstruct.t`\n- Generate random nonce via mirage-crypto-rng\n- Prepend nonce to ciphertext\n- Result: nonce (12) + ciphertext + tag (16)\n\n### Decryption\n- `decrypt : key -\u003e Cstruct.t -\u003e (Cstruct.t, [`Too_short | `Decryption_failed]) result`\n- Extract nonce from first 12 bytes\n- Verify and decrypt remaining data\n- Return plaintext or error\n\n## Design constraints\n- Use mirage-crypto.Cipher_block.AES.GCM\n- Use mirage-crypto-rng for nonce generation\n- Return Result types, no exceptions\n- Consider in-place decryption where possible","acceptance_criteria":"- Property-based roundtrip tests pass\n- Invalid data properly rejected\n- Key validation works\n- Nonces are unique (use RNG)","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:46:09.946405585+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:24:49.736202746+01:00","closed_at":"2026-01-08T19:24:49.736202746+01:00","close_reason":"Implemented crypto.ml with AES-256-GCM using mirage-crypto. Uses Eio.Flow for secure random nonce generation.","labels":["core","crypto","security"],"dependencies":[{"issue_id":"swim-hc9","depends_on_id":"swim-oun","type":"blocks","created_at":"2026-01-08T18:46:09.950083952+01:00","created_by":"gdiazlo"},{"issue_id":"swim-hc9","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:46:14.608204384+01:00","created_by":"gdiazlo"}]} 11 11 {"id":"swim-iwg","title":"Implement dissemination.ml - Broadcast queue with invalidation","description":"Implement the broadcast queue for SWIM protocol message dissemination.\n\n## Broadcast_queue module\n```ocaml\ntype item = {\n msg : protocol_msg;\n transmits : int Kcas.Loc.t;\n created : Mtime.span;\n}\n\ntype t = {\n queue : item Kcas_data.Queue.t;\n depth : int Kcas.Loc.t;\n}\n```\n\n### Functions\n- `create : unit -\u003e t`\n\n- `enqueue : t -\u003e protocol_msg -\u003e transmits:int -\u003e created:Mtime.span -\u003e unit`\n - Add message with initial transmit count\n - Increment depth\n\n- `drain : t -\u003e max_bytes:int -\u003e encode_size:(protocol_msg -\u003e int) -\u003e protocol_msg list`\n - Pop messages up to max_bytes\n - Decrement transmit count\n - Re-enqueue if transmits \u003e 0\n - Return list of messages to piggyback\n\n- `depth : t -\u003e int`\n\n- `invalidate : t -\u003e invalidates:(protocol_msg -\u003e protocol_msg -\u003e bool) -\u003e protocol_msg -\u003e unit`\n - Remove messages invalidated by newer message\n - Uses Protocol_pure.invalidates\n\n## Design constraints\n- Lock-free via Kcas_data.Queue\n- Transmit counting for reliable dissemination\n- Size-aware draining for UDP packet limits\n- Message invalidation to prune stale updates","acceptance_criteria":"- Messages properly disseminated\n- Transmit counts respected\n- Invalidation works correctly\n- No message loss during concurrent access","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:47:32.926237507+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:34:04.973053383+01:00","closed_at":"2026-01-08T19:34:04.973053383+01:00","close_reason":"Implemented broadcast queue with enqueue, drain (size-aware), and invalidate functions using Kcas_data.Queue","labels":["core","dissemination","kcas"],"dependencies":[{"issue_id":"swim-iwg","depends_on_id":"swim-td8","type":"blocks","created_at":"2026-01-08T18:47:32.933998652+01:00","created_by":"gdiazlo"},{"issue_id":"swim-iwg","depends_on_id":"swim-fac","type":"blocks","created_at":"2026-01-08T18:47:32.93580631+01:00","created_by":"gdiazlo"},{"issue_id":"swim-iwg","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:47:40.222942145+01:00","created_by":"gdiazlo"}]} 12 12 {"id":"swim-l32","title":"Implement codec tests (test/test_codec.ml)","description":"Property-based and unit tests for codec module.\n\n## Property tests\n\n### Roundtrip\n- `test_codec_roundtrip` - encode then decode equals original\n- `test_encoder_decoder_roundtrip` - for primitive types\n\n### Size calculation\n- `test_encoded_size_accurate` - encoded_size matches actual encoding\n\n### Error handling\n- `test_invalid_magic_rejected`\n- `test_unsupported_version_rejected`\n- `test_truncated_message_rejected`\n- `test_invalid_tag_rejected`\n\n## Unit tests\n\n### Encoder\n- Test write_byte, write_int16_be, etc.\n- Test write_string with various lengths\n- Test buffer overflow detection\n\n### Decoder\n- Test read operations\n- Test remaining/is_empty\n- Test boundary conditions\n\n### Message encoding\n- Test each message type individually\n- Test packet with piggyback messages\n- Test empty piggyback list\n\n## Design constraints\n- Use QCheck for property tests\n- Use Alcotest or similar for unit tests\n- Cover all message types\n- Test error paths","acceptance_criteria":"- All property tests pass\n- All unit tests pass\n- Edge cases covered\n- Error handling tested","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-08T18:49:38.017959466+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T20:03:07.370600701+01:00","closed_at":"2026-01-08T20:03:07.370600701+01:00","close_reason":"Implemented codec property and unit tests - all 19 tests passing","labels":["codec","test"],"dependencies":[{"issue_id":"swim-l32","depends_on_id":"swim-l5y","type":"blocks","created_at":"2026-01-08T18:49:38.021527282+01:00","created_by":"gdiazlo"},{"issue_id":"swim-l32","depends_on_id":"swim-294","type":"blocks","created_at":"2026-01-08T18:49:38.02331756+01:00","created_by":"gdiazlo"},{"issue_id":"swim-l32","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:49:42.065502393+01:00","created_by":"gdiazlo"}]}
+1
interop/main.go
··· 40 40 config.BindPort = *port 41 41 config.AdvertisePort = *port 42 42 config.Events = &eventDelegate{} 43 + config.EnableCompression = false 43 44 44 45 config.LogOutput = os.Stdout 45 46
+137
lib/codec.ml
··· 645 645 match decode_compound_from_cstruct buf with 646 646 | Error e -> Error e 647 647 | Ok (css, trunc) -> Ok (List.map Cstruct.to_string css, trunc) 648 + 649 + let encode_push_pull_header (h : push_pull_header) : Msgpck.t = 650 + Msgpck.Map 651 + [ 652 + (Msgpck.String "Nodes", Msgpck.of_int h.pp_nodes); 653 + (Msgpck.String "UserStateLen", Msgpck.of_int h.pp_user_state_len); 654 + (Msgpck.String "Join", Msgpck.Bool h.pp_join); 655 + ] 656 + 657 + let decode_push_pull_header (m : Msgpck.t) : (push_pull_header, string) result = 658 + match m with 659 + | Msgpck.Map fields -> 660 + let get_int key = 661 + match List.assoc_opt (Msgpck.String key) fields with 662 + | Some (Msgpck.Int i) -> Ok i 663 + | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i) 664 + | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i) 665 + | _ -> Ok 0 666 + in 667 + let get_bool key = 668 + match List.assoc_opt (Msgpck.String key) fields with 669 + | Some (Msgpck.Bool b) -> Ok b 670 + | _ -> Ok false 671 + in 672 + let ( let* ) = Result.bind in 673 + let* pp_nodes = get_int "Nodes" in 674 + let* pp_user_state_len = get_int "UserStateLen" in 675 + let* pp_join = get_bool "Join" in 676 + Ok { pp_nodes; pp_user_state_len; pp_join } 677 + | _ -> Error "expected map for push_pull_header" 678 + 679 + let encode_push_node_state (s : push_node_state) : Msgpck.t = 680 + Msgpck.Map 681 + [ 682 + (Msgpck.String "Name", Msgpck.String s.pns_name); 683 + (Msgpck.String "Addr", Msgpck.Bytes s.pns_addr); 684 + (Msgpck.String "Port", Msgpck.of_int s.pns_port); 685 + (Msgpck.String "Meta", Msgpck.Bytes s.pns_meta); 686 + (Msgpck.String "Incarnation", Msgpck.of_int s.pns_incarnation); 687 + (Msgpck.String "State", Msgpck.of_int s.pns_state); 688 + (Msgpck.String "Vsn", Msgpck.List (List.map Msgpck.of_int s.pns_vsn)); 689 + ] 690 + 691 + let decode_push_node_state (m : Msgpck.t) : (push_node_state, string) result = 692 + match m with 693 + | Msgpck.Map fields -> 694 + let get_string key = 695 + match List.assoc_opt (Msgpck.String key) fields with 696 + | Some (Msgpck.String s) -> Ok s 697 + | Some (Msgpck.Bytes s) -> Ok s 698 + | Some Msgpck.Nil -> Ok "" 699 + | _ -> Ok "" 700 + in 701 + let get_int key = 702 + match List.assoc_opt (Msgpck.String key) fields with 703 + | Some (Msgpck.Int i) -> Ok i 704 + | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i) 705 + | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i) 706 + | _ -> Ok 0 707 + in 708 + let get_int_list key = 709 + match List.assoc_opt (Msgpck.String key) fields with 710 + | Some (Msgpck.List items) -> 711 + Ok 712 + (List.filter_map 713 + (function 714 + | Msgpck.Int i -> Some i 715 + | Msgpck.Int32 i -> Some (Int32.to_int i) 716 + | Msgpck.Uint32 i -> Some (Int32.to_int i) 717 + | _ -> None) 718 + items) 719 + | _ -> Ok [] 720 + in 721 + let ( let* ) = Result.bind in 722 + let* pns_name = get_string "Name" in 723 + let* pns_addr = get_string "Addr" in 724 + let* pns_port = get_int "Port" in 725 + let* pns_meta = get_string "Meta" in 726 + let* pns_incarnation = get_int "Incarnation" in 727 + let* pns_state = get_int "State" in 728 + let* pns_vsn = get_int_list "Vsn" in 729 + Ok 730 + { 731 + pns_name; 732 + pns_addr; 733 + pns_port; 734 + pns_meta; 735 + pns_incarnation; 736 + pns_state; 737 + pns_vsn; 738 + } 739 + | _ -> Error "expected map for push_node_state" 740 + 741 + let encode_push_pull_msg ~(header : push_pull_header) 742 + ~(nodes : push_node_state list) ~(user_state : string) : string = 743 + let buf = Buffer.create 1024 in 744 + Buffer.add_char buf (Char.chr (message_type_to_int Push_pull_msg)); 745 + ignore (Msgpck.StringBuf.write buf (encode_push_pull_header header)); 746 + List.iter 747 + (fun n -> ignore (Msgpck.StringBuf.write buf (encode_push_node_state n))) 748 + nodes; 749 + Buffer.add_string buf user_state; 750 + Buffer.contents buf 751 + 752 + let decode_push_pull_msg (data : string) : 753 + ( push_pull_header * push_node_state list * string, 754 + Types.decode_error ) 755 + result = 756 + if String.length data < 1 then Error Types.Truncated_message 757 + else 758 + let header_size, header_msgpack = Msgpck.String.read data in 759 + match decode_push_pull_header header_msgpack with 760 + | Error e -> Error (Types.Msgpack_error e) 761 + | Ok header -> ( 762 + let rec read_nodes offset remaining acc = 763 + if remaining <= 0 then Ok (List.rev acc, offset) 764 + else if offset >= String.length data then 765 + Error Types.Truncated_message 766 + else 767 + let rest = String.sub data offset (String.length data - offset) in 768 + let node_size, node_msgpack = Msgpck.String.read rest in 769 + match decode_push_node_state node_msgpack with 770 + | Error e -> Error (Types.Msgpack_error e) 771 + | Ok node -> 772 + read_nodes (offset + node_size) (remaining - 1) (node :: acc) 773 + in 774 + match read_nodes header_size header.pp_nodes [] with 775 + | Error e -> Error e 776 + | Ok (nodes, offset) -> 777 + let user_state = 778 + if header.pp_user_state_len > 0 && offset < String.length data 779 + then 780 + String.sub data offset 781 + (min header.pp_user_state_len (String.length data - offset)) 782 + else "" 783 + in 784 + Ok (header, nodes, user_state))
+1
lib/dune
··· 1 1 (library 2 2 (name swim) 3 3 (public_name swim) 4 + (flags (:standard -w -34-69)) 4 5 (libraries 5 6 eio 6 7 eio_main
+207 -1
lib/protocol.ml
··· 12 12 send_pool : Buffer_pool.t; 13 13 recv_pool : Buffer_pool.t; 14 14 udp_sock : [ `Generic ] Eio.Net.datagram_socket_ty Eio.Resource.t; 15 + tcp_listener : [ `Generic ] Eio.Net.listening_socket_ty Eio.Resource.t; 15 16 event_stream : node_event Eio.Stream.t; 16 17 user_handlers : (node_info -> string -> string -> unit) list Kcas.Loc.t; 17 18 cipher_key : Crypto.key; ··· 20 21 clock : float Eio.Time.clock_ty Eio.Resource.t; 21 22 mono_clock : Eio.Time.Mono.ty Eio.Resource.t; 22 23 secure_random : Eio.Flow.source_ty Eio.Resource.t; 24 + sw : Eio.Switch.t; 23 25 } 24 26 25 27 let next_seq t = ··· 224 226 process_udp_packet t ~buf:received ~src) 225 227 done 226 228 229 + let build_local_state t ~is_join = 230 + let members = Membership.to_list t.members in 231 + let self_node = 232 + let addr_bytes, port = 233 + match t.self.addr with 234 + | `Udp (ip, p) -> (Types.ip_to_bytes ip, p) 235 + | `Unix _ -> ("", 0) 236 + in 237 + Types.Wire. 238 + { 239 + pns_name = Types.node_id_to_string t.self.id; 240 + pns_addr = addr_bytes; 241 + pns_port = port; 242 + pns_meta = t.self.meta; 243 + pns_incarnation = Types.incarnation_to_int (get_incarnation t); 244 + pns_state = 0; 245 + pns_vsn = Types.default_vsn; 246 + } 247 + in 248 + let member_nodes = 249 + List.map 250 + (fun member -> 251 + let node = Membership.Member.node member in 252 + let snap = Membership.Member.snapshot_now member in 253 + let addr_bytes, port = 254 + match node.addr with 255 + | `Udp (ip, p) -> (Types.ip_to_bytes ip, p) 256 + | `Unix _ -> ("", 0) 257 + in 258 + Types.Wire. 259 + { 260 + pns_name = Types.node_id_to_string node.id; 261 + pns_addr = addr_bytes; 262 + pns_port = port; 263 + pns_meta = node.meta; 264 + pns_incarnation = Types.incarnation_to_int snap.incarnation; 265 + pns_state = Types.member_state_to_int snap.state; 266 + pns_vsn = Types.default_vsn; 267 + }) 268 + members 269 + in 270 + let all_nodes = self_node :: member_nodes in 271 + let header = 272 + Types.Wire. 273 + { 274 + pp_nodes = List.length all_nodes; 275 + pp_user_state_len = 0; 276 + pp_join = is_join; 277 + } 278 + in 279 + (header, all_nodes) 280 + 281 + let merge_remote_state t (nodes : Types.Wire.push_node_state list) ~is_join = 282 + List.iter 283 + (fun (pns : Types.Wire.push_node_state) -> 284 + let node_id = Types.node_id_of_string pns.pns_name in 285 + if not (Types.equal_node_id node_id t.self.id) then 286 + let ip = Types.ip_of_bytes pns.pns_addr in 287 + let node_info = 288 + Types.make_node_info ~id:node_id 289 + ~addr:(`Udp (ip, pns.pns_port)) 290 + ~meta:pns.pns_meta 291 + in 292 + match Membership.find t.members node_id with 293 + | None -> 294 + if pns.pns_state <= 1 then begin 295 + let now = now_mtime t in 296 + let member = Membership.Member.create ~now node_info in 297 + Membership.add t.members member; 298 + emit_event t (Types.Join node_info) 299 + end 300 + | Some existing -> 301 + let snap = Membership.Member.snapshot_now existing in 302 + let remote_inc = Types.incarnation_of_int pns.pns_incarnation in 303 + if Types.compare_incarnation remote_inc snap.incarnation > 0 then begin 304 + let now = now_mtime t in 305 + let new_state = Types.member_state_of_int pns.pns_state in 306 + Membership.update_member t.members node_id 307 + { 308 + update = 309 + (fun m ~xt -> 310 + match new_state with 311 + | Types.Alive -> 312 + Membership.Member.set_alive ~xt m 313 + ~incarnation:remote_inc ~now 314 + | Types.Suspect -> 315 + Membership.Member.set_suspect ~xt m 316 + ~incarnation:remote_inc ~now 317 + | Types.Dead | Types.Left -> 318 + Membership.Member.set_dead ~xt m 319 + ~incarnation:remote_inc ~now); 320 + } 321 + |> ignore 322 + end) 323 + nodes; 324 + if is_join then 325 + update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 }) 326 + 327 + let read_exact flow buf n = 328 + let rec loop offset remaining = 329 + if remaining <= 0 then Ok () 330 + else 331 + let chunk = Cstruct.sub buf offset remaining in 332 + match Eio.Flow.single_read flow chunk with 333 + | 0 -> Error `Connection_closed 334 + | read -> loop (offset + read) (remaining - read) 335 + | exception End_of_file -> Error `Connection_closed 336 + | exception _ -> Error `Read_error 337 + in 338 + loop 0 n 339 + 340 + let read_available flow buf = 341 + match Eio.Flow.single_read flow buf with 342 + | n -> n 343 + | exception End_of_file -> 0 344 + | exception _ -> 0 345 + 346 + let handle_tcp_connection t flow = 347 + let buf = Cstruct.create 65536 in 348 + match read_exact flow buf 1 with 349 + | Error _ -> () 350 + | Ok () -> ( 351 + let msg_type_byte = Cstruct.get_uint8 buf 0 in 352 + let get_push_pull_payload () = 353 + let n = read_available flow (Cstruct.shift buf 1) in 354 + if n > 0 then Some (Cstruct.sub buf 1 n) else None 355 + in 356 + let payload_opt = 357 + if msg_type_byte = Types.Wire.message_type_to_int Types.Wire.Encrypt_msg 358 + then 359 + match get_push_pull_payload () with 360 + | Some encrypted -> ( 361 + match Crypto.decrypt ~key:t.cipher_key encrypted with 362 + | Ok decrypted -> Some decrypted 363 + | Error _ -> None) 364 + | None -> None 365 + else if 366 + msg_type_byte 367 + = Types.Wire.message_type_to_int Types.Wire.Has_label_msg 368 + then 369 + match read_exact flow buf 1 with 370 + | Error _ -> None 371 + | Ok () -> 372 + let label_len = Cstruct.get_uint8 buf 0 in 373 + if label_len > 0 then 374 + match read_exact flow buf label_len with 375 + | Error _ -> None 376 + | Ok () -> ( 377 + match read_exact flow buf 1 with 378 + | Error _ -> None 379 + | Ok () -> 380 + let inner_type = Cstruct.get_uint8 buf 0 in 381 + if 382 + inner_type 383 + = Types.Wire.message_type_to_int 384 + Types.Wire.Push_pull_msg 385 + then get_push_pull_payload () 386 + else None) 387 + else None 388 + else if 389 + msg_type_byte 390 + = Types.Wire.message_type_to_int Types.Wire.Push_pull_msg 391 + then get_push_pull_payload () 392 + else None 393 + in 394 + match payload_opt with 395 + | None -> () 396 + | Some payload -> ( 397 + let data = Cstruct.to_string payload in 398 + match Codec.decode_push_pull_msg data with 399 + | Error _ -> () 400 + | Ok (header, nodes, _user_state) -> ( 401 + merge_remote_state t nodes ~is_join:header.pp_join; 402 + let resp_header, resp_nodes = 403 + build_local_state t ~is_join:false 404 + in 405 + let response = 406 + Codec.encode_push_pull_msg ~header:resp_header ~nodes:resp_nodes 407 + ~user_state:"" 408 + in 409 + let resp_buf = 410 + if t.config.encryption_enabled then 411 + let plain = Cstruct.of_string response in 412 + let encrypted = 413 + Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random 414 + plain 415 + in 416 + encrypted 417 + else Cstruct.of_string response 418 + in 419 + try Eio.Flow.write flow [ resp_buf ] with _ -> ()))) 420 + 421 + let run_tcp_listener t = 422 + while not (is_shutdown t) do 423 + match Eio.Net.accept ~sw:t.sw t.tcp_listener with 424 + | flow, _addr -> 425 + (try handle_tcp_connection t flow with _ -> ()); 426 + Eio.Flow.close flow 427 + | exception _ -> () 428 + done 429 + 227 430 let probe_member t (member : Membership.Member.t) = 228 431 let target = Membership.Member.node member in 229 432 let seq = next_seq t in ··· 317 520 Eio.Time.sleep t.clock t.config.protocol_interval 318 521 done 319 522 320 - let create ~config ~self ~udp_sock ~clock ~mono_clock ~secure_random = 523 + let create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock ~mono_clock 524 + ~secure_random = 321 525 match Crypto.init_key config.secret_key with 322 526 | Error _ -> Error `Invalid_key 323 527 | Ok cipher_key -> ··· 338 542 Buffer_pool.create ~size:config.udp_buffer_size 339 543 ~count:config.recv_buffer_count; 340 544 udp_sock; 545 + tcp_listener; 341 546 event_stream = Eio.Stream.create 100; 342 547 user_handlers = Kcas.Loc.make []; 343 548 cipher_key; ··· 346 551 clock; 347 552 mono_clock; 348 553 secure_random; 554 + sw; 349 555 } 350 556 351 557 let shutdown t =
+9 -2
lib/swim.ml
··· 30 30 ~port:config.bind_port 31 31 in 32 32 33 + let tcp_listener = 34 + Transport.create_tcp_listener net ~sw ~addr:config.bind_addr 35 + ~port:config.bind_port ~backlog:10 36 + in 37 + 33 38 let self_addr = 34 39 `Udp (Eio.Net.Ipaddr.of_raw config.bind_addr, config.bind_port) 35 40 in 36 41 let self = Types.make_node_info ~id:self_id ~addr:self_addr ~meta:"" in 37 42 38 43 match 39 - Protocol.create ~config ~self ~udp_sock ~clock ~mono_clock ~secure_random 44 + Protocol.create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock 45 + ~mono_clock ~secure_random 40 46 with 41 47 | Error `Invalid_key -> Error `Invalid_key 42 48 | Ok protocol -> Ok { protocol; sw } 43 49 44 50 let start t = 45 51 Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_protocol t.protocol); 46 - Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_udp_receiver t.protocol) 52 + Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_udp_receiver t.protocol); 53 + Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_tcp_listener t.protocol) 47 54 48 55 let shutdown t = Protocol.shutdown t.protocol 49 56 let local_node t = Protocol.local_node t.protocol
+16
lib/types.ml
··· 266 266 type dead = { incarnation : int; node : string; from : string } 267 267 type compress = { algo : int; buf : string } 268 268 269 + type push_pull_header = { 270 + pp_nodes : int; 271 + pp_user_state_len : int; 272 + pp_join : bool; 273 + } 274 + 275 + type push_node_state = { 276 + pns_name : string; 277 + pns_addr : string; 278 + pns_port : int; 279 + pns_meta : string; 280 + pns_incarnation : int; 281 + pns_state : int; 282 + pns_vsn : int list; 283 + } 284 + 269 285 type protocol_msg = 270 286 | Ping of ping 271 287 | Indirect_ping of indirect_ping_req
+16
lib/types.mli
··· 178 178 type dead = { incarnation : int; node : string; from : string } 179 179 type compress = { algo : int; buf : string } 180 180 181 + type push_pull_header = { 182 + pp_nodes : int; 183 + pp_user_state_len : int; 184 + pp_join : bool; 185 + } 186 + 187 + type push_node_state = { 188 + pns_name : string; 189 + pns_addr : string; 190 + pns_port : int; 191 + pns_meta : string; 192 + pns_incarnation : int; 193 + pns_state : int; 194 + pns_vsn : int list; 195 + } 196 + 181 197 type protocol_msg = 182 198 | Ping of ping 183 199 | Indirect_ping of indirect_ping_req
+26
test_interop_go_joins.sh
··· 1 + #!/bin/bash 2 + set -e 3 + 4 + # Test where Go node joins to OCaml node (reverse direction) 5 + 6 + echo "Starting OCaml SWIM server..." 7 + cd /home/gdiazlo/data/src/swim 8 + timeout 25 ./_build/default/bin/interop_test.exe & 9 + OCAML_PID=$! 10 + sleep 2 11 + 12 + echo "Starting Go memberlist and joining to OCaml..." 13 + cd /home/gdiazlo/data/src/swim/interop 14 + ./memberlist-server -name go-node -port 7946 -join "127.0.0.1:7947" & 15 + GO_PID=$! 16 + 17 + # Let them communicate for a while 18 + sleep 15 19 + 20 + echo "Killing processes..." 21 + kill $GO_PID 2>/dev/null || true 22 + kill $OCAML_PID 2>/dev/null || true 23 + wait $GO_PID 2>/dev/null || true 24 + wait $OCAML_PID 2>/dev/null || true 25 + 26 + echo "Done"
+28
test_interop_udp_only.sh
··· 1 + #!/bin/bash 2 + set -e 3 + 4 + # Test UDP-only communication (no TCP join) 5 + # Both nodes start independently, OCaml adds Go to its membership 6 + # They should then be able to gossip via UDP 7 + 8 + echo "Starting Go memberlist server (no join)..." 9 + cd /home/gdiazlo/data/src/swim/interop 10 + ./memberlist-server -name go-node -port 7946 & 11 + GO_PID=$! 12 + sleep 2 13 + 14 + echo "Starting OCaml SWIM client (adds Go node manually)..." 15 + cd /home/gdiazlo/data/src/swim 16 + timeout 20 ./_build/default/bin/interop_test.exe & 17 + OCAML_PID=$! 18 + 19 + # Let them communicate 20 + sleep 15 21 + 22 + echo "Killing processes..." 23 + kill $GO_PID 2>/dev/null || true 24 + kill $OCAML_PID 2>/dev/null || true 25 + wait $GO_PID 2>/dev/null || true 26 + wait $OCAML_PID 2>/dev/null || true 27 + 28 + echo "Done"