this repo has no description

Compare changes

Choose any two refs to compare.

-40
AGENTS.md
··· 1 - # Agent Instructions 2 - 3 - This project uses **bd** (beads) for issue tracking. Run `bd onboard` to get started. 4 - 5 - ## Quick Reference 6 - 7 - ```bash 8 - bd ready # Find available work 9 - bd show <id> # View issue details 10 - bd update <id> --status in_progress # Claim work 11 - bd close <id> # Complete work 12 - bd sync # Sync with git 13 - ``` 14 - 15 - ## Landing the Plane (Session Completion) 16 - 17 - **When ending a work session**, you MUST complete ALL steps below. Work is NOT complete until `git push` succeeds. 18 - 19 - **MANDATORY WORKFLOW:** 20 - 21 - 1. **File issues for remaining work** - Create issues for anything that needs follow-up 22 - 2. **Run quality gates** (if code changed) - Tests, linters, builds 23 - 3. **Update issue status** - Close finished work, update in-progress items 24 - 4. **PUSH TO REMOTE** - This is MANDATORY: 25 - ```bash 26 - git pull --rebase 27 - bd sync 28 - git push 29 - git status # MUST show "up to date with origin" 30 - ``` 31 - 5. **Clean up** - Clear stashes, prune remote branches 32 - 6. **Verify** - All changes committed AND pushed 33 - 7. **Hand off** - Provide context for next session 34 - 35 - **CRITICAL RULES:** 36 - - Work is NOT complete until `git push` succeeds 37 - - NEVER stop before pushing - that leaves work stranded locally 38 - - NEVER say "ready to push when you are" - YOU must push 39 - - If push fails, resolve and retry until it succeeds 40 -
+177
README.md
··· 1 + # swim 2 + 3 + An OCaml 5 implementation of the SWIM (Scalable Weakly-consistent Infection-style Process Group Membership) protocol for cluster membership and failure detection. 4 + 5 + ## Overview 6 + 7 + This library provides: 8 + 9 + - **Membership Management**: Automatic discovery and tracking of cluster nodes 10 + - **Failure Detection**: Identifies unreachable nodes using periodic probes and indirect checks 11 + - **Gossip Protocol**: Propagates state changes (Alive/Suspect/Dead) across the cluster 12 + - **Messaging**: Cluster-wide broadcast (gossip-based) and direct point-to-point UDP messaging 13 + - **Encryption**: Optional AES-256-GCM encryption for all network traffic 14 + 15 + Built on [Eio](https://github.com/ocaml-multicore/eio) for effect-based concurrency and [Kcas](https://github.com/ocaml-multicore/kcas) for lock-free shared state. 16 + 17 + ## Requirements 18 + 19 + - OCaml >= 5.1 20 + - Dune >= 3.20 21 + 22 + ## Installation 23 + 24 + ```bash 25 + opam install . 26 + ``` 27 + 28 + Or add to your dune-project: 29 + 30 + ``` 31 + (depends (swim (>= 0.1.0))) 32 + ``` 33 + 34 + ## Usage 35 + 36 + ### Basic Example 37 + 38 + ```ocaml 39 + open Swim.Types 40 + 41 + let config = { 42 + default_config with 43 + bind_port = 7946; 44 + node_name = Some "node-1"; 45 + secret_key = "your-32-byte-secret-key-here!!!"; (* 32 bytes for AES-256 *) 46 + encryption_enabled = true; 47 + } 48 + 49 + let () = 50 + Eio_main.run @@ fun env -> 51 + Eio.Switch.run @@ fun sw -> 52 + let env_wrap = { stdenv = env; sw } in 53 + match Swim.Cluster.create ~sw ~env:env_wrap ~config with 54 + | Error `Invalid_key -> failwith "Invalid secret key" 55 + | Ok cluster -> 56 + Swim.Cluster.start cluster; 57 + 58 + (* Join an existing cluster *) 59 + let seed_nodes = ["192.168.1.10:7946"] in 60 + (match Swim.Cluster.join cluster ~seed_nodes with 61 + | Ok () -> Printf.printf "Joined cluster\n" 62 + | Error `No_seeds_reachable -> Printf.printf "Failed to join\n"); 63 + 64 + (* Send a broadcast message to all nodes *) 65 + Swim.Cluster.broadcast cluster ~topic:"config" ~payload:"v2"; 66 + 67 + (* Send a direct message to a specific node *) 68 + let target = node_id_of_string "node-2" in 69 + Swim.Cluster.send cluster ~target ~topic:"ping" ~payload:"hello"; 70 + 71 + (* Handle incoming messages *) 72 + Swim.Cluster.on_message cluster (fun sender topic payload -> 73 + Printf.printf "From %s: [%s] %s\n" 74 + (node_id_to_string sender.id) topic payload); 75 + 76 + (* Listen for membership events *) 77 + Eio.Fiber.fork ~sw (fun () -> 78 + let stream = Swim.Cluster.events cluster in 79 + while true do 80 + match Eio.Stream.take stream with 81 + | Join node -> Printf.printf "Joined: %s\n" (node_id_to_string node.id) 82 + | Leave node -> Printf.printf "Left: %s\n" (node_id_to_string node.id) 83 + | Suspect_event node -> Printf.printf "Suspect: %s\n" (node_id_to_string node.id) 84 + | Alive_event node -> Printf.printf "Alive: %s\n" (node_id_to_string node.id) 85 + | Update _ -> () 86 + done); 87 + 88 + Eio.Fiber.await_cancel () 89 + ``` 90 + 91 + ### Configuration Options 92 + 93 + | Field | Default | Description | 94 + |-------|---------|-------------| 95 + | `bind_addr` | "0.0.0.0" | Interface to bind listeners | 96 + | `bind_port` | 7946 | Port for SWIM protocol | 97 + | `protocol_interval` | 1.0 | Seconds between probe rounds | 98 + | `probe_timeout` | 0.5 | Seconds to wait for Ack | 99 + | `indirect_checks` | 3 | Peers to ask for indirect probes | 100 + | `secret_key` | (zeros) | 32-byte key for AES-256-GCM | 101 + | `encryption_enabled` | false | Enable encryption | 102 + 103 + ## Interoperability Testing 104 + 105 + The library includes interoperability tests with HashiCorp's [memberlist](https://github.com/hashicorp/memberlist) (Go). This verifies protocol compatibility with the reference implementation. 106 + 107 + ### Prerequisites 108 + 109 + - Go >= 1.19 110 + - OCaml environment with dune 111 + 112 + ### Running Interop Tests 113 + 114 + The interop test suite starts a Go memberlist node and an OCaml node, then verifies they can discover each other and exchange messages. 115 + 116 + ```bash 117 + # Build the OCaml project 118 + dune build 119 + 120 + # Build the Go memberlist server 121 + cd interop && go build -o memberlist-server main.go && cd .. 122 + 123 + # Run the interop test 124 + bash test/scripts/test_interop.sh 125 + 126 + # Run with encryption enabled 127 + bash test/scripts/test_interop_encrypted.sh 128 + ``` 129 + 130 + ### Manual Interop Testing 131 + 132 + Start the Go node: 133 + 134 + ```bash 135 + cd interop 136 + go run main.go -name go-node -bind 127.0.0.1 -port 7946 137 + ``` 138 + 139 + In another terminal, start the OCaml node: 140 + 141 + ```bash 142 + dune exec swim-interop-test 143 + ``` 144 + 145 + The OCaml node will connect to the Go node and print membership statistics for 30 seconds. 146 + 147 + ### Available Test Scripts 148 + 149 + | Script | Description | 150 + |--------|-------------| 151 + | `test/scripts/test_interop.sh` | Basic interop test | 152 + | `test/scripts/test_interop_encrypted.sh` | Interop with AES encryption | 153 + | `test/scripts/test_interop_udp_only.sh` | UDP-only communication test | 154 + | `test/scripts/test_interop_go_joins.sh` | Go node joining OCaml cluster | 155 + 156 + ### Debug Utilities 157 + 158 + ```bash 159 + # Test packet encoding/decoding 160 + dune exec swim-debug-codec 161 + 162 + # Receive and display incoming SWIM packets 163 + dune exec swim-debug-recv 164 + 165 + # Send manual ping to a target node 166 + dune exec swim-debug-ping 167 + ``` 168 + 169 + ## Running Tests 170 + 171 + ```bash 172 + dune runtest 173 + ``` 174 + 175 + ## License 176 + 177 + ISC License. See [LICENSE](LICENSE) for details.
+151
docs/usage.md
··· 1 + # SWIM Protocol Library - Usage Guide 2 + 3 + This library provides a production-ready implementation of the SWIM (Scalable Weakly-consistent Infection-style Process Group Membership) protocol in OCaml 5. It handles cluster membership, failure detection, and messaging. 4 + 5 + ## Key Features 6 + 7 + - **Membership**: Automatic discovery and failure detection. 8 + - **Gossip**: Efficient state propagation (Alive/Suspect/Dead). 9 + - **Messaging**: 10 + - **Broadcast**: Eventual consistency (gossip-based) for cluster-wide updates. 11 + - **Direct Send**: High-throughput point-to-point UDP messaging. 12 + - **Security**: AES-256-GCM encryption. 13 + - **Zero-Copy**: Optimized buffer management for high performance. 14 + 15 + ## Getting Started 16 + 17 + ### 1. Define Configuration 18 + 19 + Start with `default_config` and customize as needed. 20 + 21 + ```ocaml 22 + open Swim.Types 23 + 24 + let config = { 25 + default_config with 26 + bind_port = 7946; 27 + node_name = Some "node-1"; 28 + secret_key = "your-32-byte-secret-key-must-be-32-bytes"; (* 32 bytes for AES-256 *) 29 + encryption_enabled = true; 30 + } 31 + ``` 32 + 33 + ### 2. Create and Start a Cluster Node 34 + 35 + Use `Cluster.create` within an Eio switch. 36 + 37 + ```ocaml 38 + module Cluster = Swim.Cluster 39 + 40 + let () = 41 + Eio_main.run @@ fun env -> 42 + Eio.Switch.run @@ fun sw -> 43 + 44 + (* Create environment wrapper *) 45 + let env_wrap = { stdenv = env; sw } in 46 + 47 + match Cluster.create ~sw ~env:env_wrap ~config with 48 + | Error `Invalid_key -> failwith "Invalid secret key" 49 + | Ok cluster -> 50 + (* Start background daemons (protocol loop, UDP receiver, TCP listener) *) 51 + Cluster.start cluster; 52 + 53 + Printf.printf "Node started!\n%!"; 54 + 55 + (* Keep running *) 56 + Eio.Fiber.await_cancel () 57 + ``` 58 + 59 + ### 3. Joining a Cluster 60 + 61 + To join an existing cluster, you need the address of at least one seed node. 62 + 63 + ```ocaml 64 + let seed_nodes = ["192.168.1.10:7946"] in 65 + match Cluster.join cluster ~seed_nodes with 66 + | Ok () -> Printf.printf "Joined cluster successfully\n" 67 + | Error `No_seeds_reachable -> Printf.printf "Failed to join cluster\n" 68 + ``` 69 + 70 + ## Messaging 71 + 72 + ### Broadcast (Gossip) 73 + Use `broadcast` to send data to **all** nodes. This uses the gossip protocol (piggybacking on membership messages). It is bandwidth-efficient but has higher latency and is eventually consistent. 74 + 75 + **Best for:** Configuration updates, low-frequency state sync. 76 + 77 + ```ocaml 78 + Cluster.broadcast cluster 79 + ~topic:"config-update" 80 + ~payload:"{\"version\": 2}" 81 + ``` 82 + 83 + ### Direct Send (Point-to-Point) 84 + Use `send` to send a message directly to a specific node via UDP. This is high-throughput and low-latency. 85 + 86 + **Best for:** RPC, high-volume data transfer, direct coordination. 87 + 88 + ```ocaml 89 + (* Send by Node ID *) 90 + let target_node_id = node_id_of_string "node-2" in 91 + Cluster.send cluster 92 + ~target:target_node_id 93 + ~topic:"ping" 94 + ~payload:"pong" 95 + 96 + (* Send by Address (if Node ID unknown) *) 97 + let addr = `Udp (Eio.Net.Ipaddr.of_raw "\192\168\001\010", 7946) in 98 + Cluster.send_to_addr cluster 99 + ~addr 100 + ~topic:"alert" 101 + ~payload:"alert-data" 102 + ``` 103 + 104 + ### Handling Messages 105 + Register a callback to handle incoming messages (both broadcast and direct). 106 + 107 + ```ocaml 108 + Cluster.on_message cluster (fun sender topic payload -> 109 + Printf.printf "Received '%s' from %s: %s\n" 110 + topic 111 + (node_id_to_string sender.id) 112 + payload 113 + ) 114 + ``` 115 + 116 + ## Membership Events 117 + 118 + Listen for node lifecycle events. 119 + 120 + ```ocaml 121 + Eio.Fiber.fork ~sw (fun () -> 122 + let stream = Cluster.events cluster in 123 + while true do 124 + match Eio.Stream.take stream with 125 + | Join node -> Printf.printf "Node joined: %s\n" (node_id_to_string node.id) 126 + | Leave node -> Printf.printf "Node left: %s\n" (node_id_to_string node.id) 127 + | Suspect_event node -> Printf.printf "Node suspected: %s\n" (node_id_to_string node.id) 128 + | Alive_event node -> Printf.printf "Node alive again: %s\n" (node_id_to_string node.id) 129 + | Update _ -> () 130 + done 131 + ) 132 + ``` 133 + 134 + ## Configuration Options 135 + 136 + | Field | Default | Description | 137 + |-------|---------|-------------| 138 + | `bind_addr` | "0.0.0.0" | Interface to bind UDP/TCP listeners. | 139 + | `bind_port` | 7946 | Port for SWIM protocol. | 140 + | `protocol_interval` | 1.0 | Seconds between probe rounds. Lower = faster failure detection, higher bandwidth. | 141 + | `probe_timeout` | 0.5 | Seconds to wait for Ack. | 142 + | `indirect_checks` | 3 | Number of peers to ask for indirect probes. | 143 + | `udp_buffer_size` | 1400 | Max UDP packet size (MTU). | 144 + | `secret_key` | (zeros) | 32-byte key for AES-256-GCM. | 145 + | `max_gossip_queue_depth` | 5000 | Max items in broadcast queue before dropping oldest (prevents leaks). | 146 + 147 + ## Performance Tips 148 + 149 + 1. **Buffer Pool**: The library uses zero-copy buffer pools. Ensure `send_buffer_count` and `recv_buffer_count` are sufficient for your load (default 16). 150 + 2. **Gossip Limit**: If broadcasting aggressively, `max_gossip_queue_depth` protects memory but may drop messages. Use `Direct Send` for high volume. 151 + 3. **Eio**: Run within an Eio domain/switch. The library is designed for OCaml 5 multicore.
+19 -15
dune-project
··· 1 1 (lang dune 3.20) 2 2 3 3 (name swim) 4 + (version 0.1.0) 4 5 5 6 (generate_opam_files true) 6 7 7 8 (source 8 - (github gdiazlo/swim)) 9 + (uri git+https://tangled.org/gdiazlo.tngl.sh/swim)) 9 10 10 - (authors "Guillermo Diaz-Romero <guillermo.diaz@gmail.com>") 11 + (authors "Gabriel Diaz") 11 12 12 - (maintainers "Guillermo Diaz-Romero <guillermo.diaz@gmail.com>") 13 + (maintainers "Gabriel Diaz") 13 14 14 - (license MIT) 15 + (license ISC) 15 16 16 - (documentation https://github.com/gdiazlo/swim) 17 + (homepage https://tangled.org/gdiazlo.tngl.sh/swim) 18 + (bug_reports https://tangled.org/gdiazlo.tngl.sh/swim/issues) 19 + (documentation https://tangled.org/gdiazlo.tngl.sh/swim) 17 20 18 21 (package 19 22 (name swim) ··· 23 26 (depends 24 27 (ocaml (>= 5.1)) 25 28 (dune (>= 3.20)) 26 - (eio (>= 1.0)) 27 - (eio_main (>= 1.0)) 29 + (eio (>= 1.3)) 28 30 (kcas (>= 0.7)) 29 31 (kcas_data (>= 0.7)) 30 - (mirage-crypto (>= 1.0)) 31 - (mirage-crypto-rng (>= 1.0)) 32 - (cstruct (>= 6.0)) 33 - (mtime (>= 2.0)) 32 + (mirage-crypto (>= 2.0)) 33 + (mirage-crypto-rng (>= 2.0)) 34 + (cstruct (>= 6.2)) 35 + (mtime (>= 2.1)) 34 36 (msgpck (>= 1.7)) 35 - (qcheck (>= 0.21)) 36 - (qcheck-alcotest (>= 0.21)) 37 - (alcotest (>= 1.7)) 38 - (logs (>= 0.7))) 37 + (logs (>= 0.10)) 38 + (fmt (>= 0.11)) 39 + (eio_main (and (>= 1.3) :with-test)) 40 + (qcheck (and (>= 0.21) :with-test)) 41 + (qcheck-alcotest (and (>= 0.21) :with-test)) 42 + (alcotest (and (>= 1.7) :with-test))) 39 43 (tags 40 44 (swim cluster membership gossip "failure detection" ocaml5 eio)))
+8 -63
lib/buffer_pool.ml
··· 1 - (** Lock-free buffer pool using Kcas and Eio. 2 - 3 - Provides pre-allocated buffers for zero-copy I/O operations. Uses 4 - Kcas_data.Queue for lock-free buffer storage and Eio.Semaphore for blocking 5 - acquire when pool is exhausted. *) 6 - 7 - type t = { 8 - buffers : Cstruct.t Kcas_data.Queue.t; 9 - buf_size : int; 10 - total : int; 11 - semaphore : Eio.Semaphore.t; 12 - } 1 + type t = { pool : Cstruct.t Eio.Stream.t; buf_size : int; capacity : int } 13 2 14 3 let create ~size ~count = 15 - let buffers = Kcas_data.Queue.create () in 4 + let pool = Eio.Stream.create count in 16 5 for _ = 1 to count do 17 - Kcas.Xt.commit 18 - { 19 - tx = 20 - (fun ~xt -> Kcas_data.Queue.Xt.add ~xt (Cstruct.create size) buffers); 21 - } 6 + Eio.Stream.add pool (Cstruct.create size) 22 7 done; 23 - { 24 - buffers; 25 - buf_size = size; 26 - total = count; 27 - semaphore = Eio.Semaphore.make count; 28 - } 8 + { pool; buf_size = size; capacity = count } 29 9 30 - let acquire t = 31 - Eio.Semaphore.acquire t.semaphore; 32 - let buf_opt = 33 - Kcas.Xt.commit 34 - { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) } 35 - in 36 - match buf_opt with 37 - | Some buf -> 38 - Cstruct.memset buf 0; 39 - buf 40 - | None -> 41 - (* Should not happen if semaphore is properly synchronized, 42 - but handle gracefully by allocating a new buffer *) 43 - Cstruct.create t.buf_size 44 - 45 - let try_acquire t = 46 - (* Check if semaphore has available permits without blocking *) 47 - if Eio.Semaphore.get_value t.semaphore > 0 then begin 48 - (* Race condition possible here - another fiber might acquire between 49 - get_value and acquire. In that case, acquire will block briefly. 50 - For truly non-blocking behavior, we'd need atomic CAS on semaphore. *) 51 - Eio.Semaphore.acquire t.semaphore; 52 - let buf_opt = 53 - Kcas.Xt.commit 54 - { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) } 55 - in 56 - match buf_opt with 57 - | Some buf -> 58 - Cstruct.memset buf 0; 59 - Some buf 60 - | None -> Some (Cstruct.create t.buf_size) 61 - end 62 - else None 63 - 64 - let release t buf = 65 - Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Queue.Xt.add ~xt buf t.buffers) }; 66 - Eio.Semaphore.release t.semaphore 10 + let acquire t = Eio.Stream.take t.pool 11 + let release t buf = Eio.Stream.add t.pool buf 67 12 68 13 let with_buffer t f = 69 14 let buf = acquire t in 70 15 Fun.protect ~finally:(fun () -> release t buf) (fun () -> f buf) 71 16 72 - let available t = Eio.Semaphore.get_value t.semaphore 73 - let total t = t.total 17 + let available t = Eio.Stream.length t.pool 18 + let total t = t.capacity 74 19 let size t = t.buf_size
-1
lib/buffer_pool.mli
··· 2 2 3 3 val create : size:int -> count:int -> t 4 4 val acquire : t -> Cstruct.t 5 - val try_acquire : t -> Cstruct.t option 6 5 val release : t -> Cstruct.t -> unit 7 6 val with_buffer : t -> (Cstruct.t -> 'a) -> 'a 8 7 val available : t -> int
+30 -16
lib/codec.ml
··· 537 537 | [] -> 538 538 encode_internal_msg_to_cstruct ~self_name ~self_port packet.primary ~buf 539 539 | piggyback -> ( 540 - let encode_one msg = 541 - let temp_buf = Cstruct.create 2048 in 542 - match 543 - encode_internal_msg_to_cstruct ~self_name ~self_port msg ~buf:temp_buf 544 - with 545 - | Error _ -> None 546 - | Ok len -> Some (Cstruct.sub temp_buf 0 len, len) 547 - in 548 - let primary_result = encode_one packet.primary in 549 - let piggyback_results = List.filter_map encode_one piggyback in 550 - match primary_result with 551 - | None -> Error `Buffer_too_small 552 - | Some (primary_cs, primary_len) -> 553 - let all_msgs = primary_cs :: List.map fst piggyback_results in 554 - let all_lens = primary_len :: List.map snd piggyback_results in 555 - encode_compound_to_cstruct ~msgs:all_msgs ~msg_lens:all_lens ~dst:buf) 540 + let msgs = packet.primary :: piggyback in 541 + let num_msgs = List.length msgs in 542 + if num_msgs > 255 then failwith "too many messages for compound" 543 + else 544 + let header_size = 1 + 1 + (num_msgs * 2) in 545 + if header_size > Cstruct.length buf then Error `Buffer_too_small 546 + else 547 + let rec encode_msgs i msgs current_offset = 548 + match msgs with 549 + | [] -> Ok current_offset 550 + | msg :: rest -> ( 551 + if current_offset >= Cstruct.length buf then 552 + Error `Buffer_too_small 553 + else 554 + let slice = Cstruct.shift buf current_offset in 555 + match 556 + encode_internal_msg_to_cstruct ~self_name ~self_port msg 557 + ~buf:slice 558 + with 559 + | Error _ -> Error `Buffer_too_small 560 + | Ok len -> 561 + Cstruct.BE.set_uint16 buf (2 + (i * 2)) len; 562 + encode_msgs (i + 1) rest (current_offset + len)) 563 + in 564 + match encode_msgs 0 msgs header_size with 565 + | Ok final_offset -> 566 + Cstruct.set_uint8 buf 0 (message_type_to_int Compound_msg); 567 + Cstruct.set_uint8 buf 1 num_msgs; 568 + Ok final_offset 569 + | Error e -> Error e) 556 570 557 571 let decode_packet (buf : Cstruct.t) : (Types.packet, Types.decode_error) result 558 572 =
+5 -3
lib/dissemination.ml
··· 10 10 11 11 let create () = { queue = Kcas_data.Queue.create (); depth = Kcas.Loc.make 0 } 12 12 13 - let enqueue t msg ~transmits ~created = 13 + let enqueue t msg ~transmits ~created ~limit = 14 14 let item = { msg; transmits = Kcas.Loc.make transmits; created } in 15 15 Kcas.Xt.commit 16 16 { 17 17 tx = 18 18 (fun ~xt -> 19 - Kcas_data.Queue.Xt.add ~xt item t.queue; 20 - Kcas.Xt.modify ~xt t.depth succ); 19 + let d = Kcas.Xt.get ~xt t.depth in 20 + if d >= limit then ignore (Kcas_data.Queue.Xt.take_opt ~xt t.queue) 21 + else Kcas.Xt.set ~xt t.depth (d + 1); 22 + Kcas_data.Queue.Xt.add ~xt item t.queue); 21 23 } 22 24 23 25 let depth t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.depth) }
+4 -1
lib/dissemination.mli
··· 9 9 type t 10 10 11 11 val create : unit -> t 12 - val enqueue : t -> protocol_msg -> transmits:int -> created:Mtime.span -> unit 12 + 13 + val enqueue : 14 + t -> protocol_msg -> transmits:int -> created:Mtime.span -> limit:int -> unit 15 + 13 16 val depth : t -> int 14 17 15 18 val drain :
+4 -5
lib/dune
··· 2 2 (name swim) 3 3 (public_name swim) 4 4 (flags (:standard -w -34-69)) 5 - (libraries 6 - eio 7 - eio_main 8 - kcas 9 - kcas_data 5 + (libraries 6 + eio 7 + kcas 8 + kcas_data 10 9 mirage-crypto 11 10 mirage-crypto-rng 12 11 cstruct
+2 -1
lib/protocol.ml
··· 94 94 Protocol_pure.retransmit_limit t.config 95 95 ~node_count:(Membership.count t.members) 96 96 in 97 - Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t); 97 + Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t) 98 + ~limit:t.config.max_gossip_queue_depth; 98 99 Dissemination.invalidate t.broadcast_queue 99 100 ~invalidates:Protocol_pure.invalidates msg 100 101
+14 -2
lib/types.ml
··· 116 116 encryption_enabled : bool; 117 117 gossip_verify_incoming : bool; 118 118 gossip_verify_outgoing : bool; 119 + max_gossip_queue_depth : int; 119 120 } 120 121 121 122 let default_config = ··· 139 140 encryption_enabled = false; 140 141 gossip_verify_incoming = true; 141 142 gossip_verify_outgoing = true; 143 + max_gossip_queue_depth = 5000; 142 144 } 143 145 144 146 type 'a env = { ··· 403 405 } 404 406 | User_msg { topic; payload; origin } -> 405 407 let origin_str = node_id_to_string origin in 408 + let topic_len = String.length topic in 409 + let origin_len = String.length origin_str in 406 410 let encoded = 407 - Printf.sprintf "%d:%s%d:%s%s" (String.length topic) topic 408 - (String.length origin_str) origin_str payload 411 + String.concat "" 412 + [ 413 + string_of_int topic_len; 414 + ":"; 415 + topic; 416 + string_of_int origin_len; 417 + ":"; 418 + origin_str; 419 + payload; 420 + ] 409 421 in 410 422 Wire.User_data encoded 411 423
+1
lib/types.mli
··· 92 92 encryption_enabled : bool; 93 93 gossip_verify_incoming : bool; 94 94 gossip_verify_outgoing : bool; 95 + max_gossip_queue_depth : int; 95 96 } 96 97 97 98 val default_config : config
+19 -17
swim.opam
··· 1 1 # This file is generated by dune, edit dune-project instead 2 2 opam-version: "2.0" 3 + version: "0.1.0" 3 4 synopsis: 4 5 "SWIM protocol library for cluster membership and failure detection" 5 6 description: 6 7 "Production-ready SWIM (Scalable Weakly-consistent Infection-style Process Group Membership) protocol library in OCaml 5 for cluster membership, failure detection, and lightweight pub/sub messaging. Features lock-free coordination via kcas, zero-copy buffer management, and AES-256-GCM encryption." 7 - maintainer: ["Guillermo Diaz-Romero <guillermo.diaz@gmail.com>"] 8 - authors: ["Guillermo Diaz-Romero <guillermo.diaz@gmail.com>"] 9 - license: "MIT" 8 + maintainer: ["Gabriel Diaz"] 9 + authors: ["Gabriel Diaz"] 10 + license: "ISC" 10 11 tags: [ 11 12 "swim" "cluster" "membership" "gossip" "failure detection" "ocaml5" "eio" 12 13 ] 13 - homepage: "https://github.com/gdiazlo/swim" 14 - doc: "https://github.com/gdiazlo/swim" 15 - bug-reports: "https://github.com/gdiazlo/swim/issues" 14 + homepage: "https://tangled.org/gdiazlo.tngl.sh/swim" 15 + doc: "https://tangled.org/gdiazlo.tngl.sh/swim" 16 + bug-reports: "https://tangled.org/gdiazlo.tngl.sh/swim/issues" 16 17 depends: [ 17 18 "ocaml" {>= "5.1"} 18 19 "dune" {>= "3.20" & >= "3.20"} 19 - "eio" {>= "1.0"} 20 - "eio_main" {>= "1.0"} 20 + "eio" {>= "1.3"} 21 21 "kcas" {>= "0.7"} 22 22 "kcas_data" {>= "0.7"} 23 - "mirage-crypto" {>= "1.0"} 24 - "mirage-crypto-rng" {>= "1.0"} 25 - "cstruct" {>= "6.0"} 26 - "mtime" {>= "2.0"} 23 + "mirage-crypto" {>= "2.0"} 24 + "mirage-crypto-rng" {>= "2.0"} 25 + "cstruct" {>= "6.2"} 26 + "mtime" {>= "2.1"} 27 27 "msgpck" {>= "1.7"} 28 - "qcheck" {>= "0.21"} 29 - "qcheck-alcotest" {>= "0.21"} 30 - "alcotest" {>= "1.7"} 31 - "logs" {>= "0.7"} 28 + "logs" {>= "0.10"} 29 + "fmt" {>= "0.11"} 30 + "eio_main" {>= "1.3" & with-test} 31 + "qcheck" {>= "0.21" & with-test} 32 + "qcheck-alcotest" {>= "0.21" & with-test} 33 + "alcotest" {>= "1.7" & with-test} 32 34 "odoc" {with-doc} 33 35 ] 34 36 build: [ ··· 45 47 "@doc" {with-doc} 46 48 ] 47 49 ] 48 - dev-repo: "git+https://github.com/gdiazlo/swim.git" 50 + dev-repo: "git+https://tangled.org/gdiazlo.tngl.sh/swim" 49 51 x-maintenance-intent: ["(latest)"]
+3 -1
test/generators.ml
··· 198 198 and+ label = oneof [ return ""; gen_topic ] 199 199 and+ encryption_enabled = bool 200 200 and+ gossip_verify_incoming = bool 201 - and+ gossip_verify_outgoing = bool in 201 + and+ gossip_verify_outgoing = bool 202 + and+ max_gossip_queue_depth = int_range 10 10000 in 202 203 { 203 204 bind_addr; 204 205 bind_port; ··· 219 220 encryption_enabled; 220 221 gossip_verify_incoming; 221 222 gossip_verify_outgoing; 223 + max_gossip_queue_depth; 222 224 } 223 225 224 226 let gen_decode_error : decode_error QCheck.Gen.t =
+21
test/scripts/test_interop.sh
··· 1 + #!/bin/bash 2 + set -e 3 + 4 + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" 5 + REPO_ROOT="$SCRIPT_DIR/../.." 6 + 7 + echo "Starting Go memberlist server WITHOUT encryption..." 8 + cd "$REPO_ROOT/interop" 9 + ./memberlist-server -name go-node -port 7946 & 10 + GO_PID=$! 11 + sleep 2 12 + 13 + echo "Starting OCaml SWIM client..." 14 + cd "$REPO_ROOT" 15 + timeout 25 ./_build/default/bin/interop_test.exe || true 16 + 17 + echo "Killing Go server..." 18 + kill $GO_PID 2>/dev/null || true 19 + wait $GO_PID 2>/dev/null || true 20 + 21 + echo "Done"
+24
test/scripts/test_interop_encrypted.sh
··· 1 + #!/bin/bash 2 + set -e 3 + 4 + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" 5 + REPO_ROOT="$SCRIPT_DIR/../.." 6 + 7 + # Test key: 16 bytes (0x00-0x0f) in hex 8 + TEST_KEY="000102030405060708090a0b0c0d0e0f" 9 + 10 + echo "Starting Go memberlist server WITH encryption..." 11 + cd "$REPO_ROOT/interop" 12 + ./memberlist-server -name go-node -port 7946 -key "$TEST_KEY" & 13 + GO_PID=$! 14 + sleep 2 15 + 16 + echo "Starting OCaml SWIM client WITH encryption..." 17 + cd "$REPO_ROOT" 18 + timeout 25 ./_build/default/bin/interop_test.exe --encrypt || true 19 + 20 + echo "Killing Go server..." 21 + kill $GO_PID 2>/dev/null || true 22 + wait $GO_PID 2>/dev/null || true 23 + 24 + echo "Done"
+29
test/scripts/test_interop_go_joins.sh
··· 1 + #!/bin/bash 2 + set -e 3 + 4 + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" 5 + REPO_ROOT="$SCRIPT_DIR/../.." 6 + 7 + # Test where Go node joins to OCaml node (reverse direction) 8 + 9 + echo "Starting OCaml SWIM server..." 10 + cd "$REPO_ROOT" 11 + timeout 25 ./_build/default/bin/interop_test.exe & 12 + OCAML_PID=$! 13 + sleep 2 14 + 15 + echo "Starting Go memberlist and joining to OCaml..." 16 + cd "$REPO_ROOT/interop" 17 + ./memberlist-server -name go-node -port 7946 -join "127.0.0.1:7947" & 18 + GO_PID=$! 19 + 20 + # Let them communicate for a while 21 + sleep 15 22 + 23 + echo "Killing processes..." 24 + kill $GO_PID 2>/dev/null || true 25 + kill $OCAML_PID 2>/dev/null || true 26 + wait $GO_PID 2>/dev/null || true 27 + wait $OCAML_PID 2>/dev/null || true 28 + 29 + echo "Done"
+31
test/scripts/test_interop_udp_only.sh
··· 1 + #!/bin/bash 2 + set -e 3 + 4 + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" 5 + REPO_ROOT="$SCRIPT_DIR/../.." 6 + 7 + # Test UDP-only communication (no TCP join) 8 + # Both nodes start independently, OCaml adds Go to its membership 9 + # They should then be able to gossip via UDP 10 + 11 + echo "Starting Go memberlist server (no join)..." 12 + cd "$REPO_ROOT/interop" 13 + ./memberlist-server -name go-node -port 7946 & 14 + GO_PID=$! 15 + sleep 2 16 + 17 + echo "Starting OCaml SWIM client (adds Go node manually)..." 18 + cd "$REPO_ROOT" 19 + timeout 20 ./_build/default/bin/interop_test.exe & 20 + OCAML_PID=$! 21 + 22 + # Let them communicate 23 + sleep 15 24 + 25 + echo "Killing processes..." 26 + kill $GO_PID 2>/dev/null || true 27 + kill $OCAML_PID 2>/dev/null || true 28 + wait $GO_PID 2>/dev/null || true 29 + wait $OCAML_PID 2>/dev/null || true 30 + 31 + echo "Done"
-18
test_interop.sh
··· 1 - #!/bin/bash 2 - set -e 3 - 4 - echo "Starting Go memberlist server WITHOUT encryption..." 5 - cd /home/gdiazlo/data/src/swim/interop 6 - ./memberlist-server -name go-node -port 7946 & 7 - GO_PID=$! 8 - sleep 2 9 - 10 - echo "Starting OCaml SWIM client..." 11 - cd /home/gdiazlo/data/src/swim 12 - timeout 25 ./_build/default/bin/interop_test.exe || true 13 - 14 - echo "Killing Go server..." 15 - kill $GO_PID 2>/dev/null || true 16 - wait $GO_PID 2>/dev/null || true 17 - 18 - echo "Done"
-21
test_interop_encrypted.sh
··· 1 - #!/bin/bash 2 - set -e 3 - 4 - # Test key: 16 bytes (0x00-0x0f) in hex 5 - TEST_KEY="000102030405060708090a0b0c0d0e0f" 6 - 7 - echo "Starting Go memberlist server WITH encryption..." 8 - cd /home/gdiazlo/data/src/swim/interop 9 - ./memberlist-server -name go-node -port 7946 -key "$TEST_KEY" & 10 - GO_PID=$! 11 - sleep 2 12 - 13 - echo "Starting OCaml SWIM client WITH encryption..." 14 - cd /home/gdiazlo/data/src/swim 15 - timeout 25 ./_build/default/bin/interop_test.exe --encrypt || true 16 - 17 - echo "Killing Go server..." 18 - kill $GO_PID 2>/dev/null || true 19 - wait $GO_PID 2>/dev/null || true 20 - 21 - echo "Done"
-26
test_interop_go_joins.sh
··· 1 - #!/bin/bash 2 - set -e 3 - 4 - # Test where Go node joins to OCaml node (reverse direction) 5 - 6 - echo "Starting OCaml SWIM server..." 7 - cd /home/gdiazlo/data/src/swim 8 - timeout 25 ./_build/default/bin/interop_test.exe & 9 - OCAML_PID=$! 10 - sleep 2 11 - 12 - echo "Starting Go memberlist and joining to OCaml..." 13 - cd /home/gdiazlo/data/src/swim/interop 14 - ./memberlist-server -name go-node -port 7946 -join "127.0.0.1:7947" & 15 - GO_PID=$! 16 - 17 - # Let them communicate for a while 18 - sleep 15 19 - 20 - echo "Killing processes..." 21 - kill $GO_PID 2>/dev/null || true 22 - kill $OCAML_PID 2>/dev/null || true 23 - wait $GO_PID 2>/dev/null || true 24 - wait $OCAML_PID 2>/dev/null || true 25 - 26 - echo "Done"
-28
test_interop_udp_only.sh
··· 1 - #!/bin/bash 2 - set -e 3 - 4 - # Test UDP-only communication (no TCP join) 5 - # Both nodes start independently, OCaml adds Go to its membership 6 - # They should then be able to gossip via UDP 7 - 8 - echo "Starting Go memberlist server (no join)..." 9 - cd /home/gdiazlo/data/src/swim/interop 10 - ./memberlist-server -name go-node -port 7946 & 11 - GO_PID=$! 12 - sleep 2 13 - 14 - echo "Starting OCaml SWIM client (adds Go node manually)..." 15 - cd /home/gdiazlo/data/src/swim 16 - timeout 20 ./_build/default/bin/interop_test.exe & 17 - OCAML_PID=$! 18 - 19 - # Let them communicate 20 - sleep 15 21 - 22 - echo "Killing processes..." 23 - kill $GO_PID 2>/dev/null || true 24 - kill $OCAML_PID 2>/dev/null || true 25 - wait $GO_PID 2>/dev/null || true 26 - wait $OCAML_PID 2>/dev/null || true 27 - 28 - echo "Done"