DTN controller and policy language for satellite networks
1(*---------------------------------------------------------------------------
2 Copyright (c) 2025 Thomas Gazagnaire. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6(** Borealis daemon - network coordination for DTN node. *)
7
8module Log = (val Logs.src_log (Logs.Src.create "borealis.daemon"))
9
10type peer = {
11 node : Cgr.Node.t;
12 address : Eio.Net.Sockaddr.stream;
13 mutable connection : Tcpcl_adapter.t option;
14}
15
16type t = {
17 engine : Engine.t;
18 peers : (string, peer) Hashtbl.t;
19 mutable running : bool;
20}
21
22let create ~engine = { engine; peers = Hashtbl.create 16; running = false }
23
24let add_peer t ~node ~address =
25 let name = Cgr.Node.name node in
26 if not (Hashtbl.mem t.peers name) then (
27 Hashtbl.add t.peers name { node; address; connection = None };
28 Log.info (fun m -> m "Added peer %s at %a" name Eio.Net.Sockaddr.pp address))
29
30let node_of_eid = function
31 | Bundle.Dtn_none -> None
32 | Bundle.Dtn path -> Some (Cgr.Node.v path)
33 | Bundle.Ipn (node, _) -> Some (Cgr.Node.v (Int64.to_string node))
34
35let find_peer t eid =
36 match node_of_eid eid with
37 | None -> None
38 | Some node ->
39 let name = Cgr.Node.name node in
40 Hashtbl.find_opt t.peers name
41
42let connect_peer t ~sw ~net peer =
43 match peer.connection with
44 | Some conn when Tcpcl_adapter.is_connected conn -> ()
45 | _ -> (
46 let config = Engine.config t.engine in
47 Log.info (fun m ->
48 m "Connecting to %s at %a" (Cgr.Node.name peer.node)
49 Eio.Net.Sockaddr.pp peer.address);
50 try
51 let conn =
52 Tcpcl_adapter.connect ~sw ~net ~local_eid:config.local_eid
53 ~addr:peer.address
54 in
55 if Tcpcl_adapter.is_connected conn then (
56 peer.connection <- Some conn;
57 Log.info (fun m -> m "Connected to %s" (Cgr.Node.name peer.node)))
58 else
59 Log.warn (fun m ->
60 m "Failed to connect to %s" (Cgr.Node.name peer.node))
61 with exn ->
62 Log.warn (fun m ->
63 m "Connection to %s failed: %s" (Cgr.Node.name peer.node)
64 (Printexc.to_string exn)))
65
66let send_bundle t ~sw ~net bundle next_hop =
67 match find_peer t next_hop with
68 | None ->
69 Log.warn (fun m -> m "No peer for %a" Bundle.pp_eid next_hop);
70 Error "no peer"
71 | Some peer -> (
72 connect_peer t ~sw ~net peer;
73 match peer.connection with
74 | None -> Error "not connected"
75 | Some conn -> (
76 match Tcpcl_adapter.send_bundle conn bundle with
77 | Ok () ->
78 Log.debug (fun m -> m "Sent bundle to %a" Bundle.pp_eid next_hop);
79 Ok ()
80 | Error e ->
81 Log.warn (fun m ->
82 m "Failed to send to %a: %s" Bundle.pp_eid next_hop e);
83 Error e))
84
85let process_bundle t ~sw ~net bundle =
86 let result = Engine.process t.engine bundle ~tenant:None in
87 match result with
88 | Engine.Forward reqs ->
89 List.iter
90 (fun (req : Engine.forward_request) ->
91 ignore (send_bundle t ~sw ~net req.bundle req.next_hop))
92 reqs
93 | Engine.Stored -> Log.debug (fun m -> m "Bundle stored")
94 | Engine.Delivered -> Log.info (fun m -> m "Bundle delivered locally")
95 | Engine.Dropped reason -> Log.info (fun m -> m "Bundle dropped: %s" reason)
96 | Engine.Admin_handled -> (
97 match Admin.extract bundle with
98 | Error _ -> ()
99 | Ok record -> (
100 match Engine.process_admin t.engine record with
101 | None -> ()
102 | Some response ->
103 (* Send response back *)
104 let source = bundle.Bundle.primary.source in
105 let dest = (Engine.config t.engine).admin_eid in
106 let timestamp : Bundle.timestamp =
107 {
108 time = Int64.of_float (Unix.gettimeofday () *. 1000.);
109 seq = 0L;
110 }
111 in
112 let resp_bundle =
113 Admin.make_bundle ~source:dest ~destination:source ~timestamp
114 response
115 in
116 ignore (send_bundle t ~sw ~net resp_bundle source)))
117
118let handle_connection t ~sw ~net socket =
119 let config = Engine.config t.engine in
120 let conn = Tcpcl_adapter.accept ~sw ~local_eid:config.local_eid socket in
121 if Tcpcl_adapter.is_connected conn then (
122 Log.info (fun m ->
123 m "Accepted connection from %s"
124 (Option.value ~default:"unknown" (Tcpcl_adapter.peer_eid conn)));
125 (* Receive loop *)
126 let rec loop () =
127 match Tcpcl_adapter.receive conn with
128 | Ok bundle ->
129 process_bundle t ~sw ~net bundle;
130 loop ()
131 | Error e ->
132 Log.debug (fun m -> m "Connection closed: %s" e);
133 Tcpcl_adapter.close_connection conn
134 in
135 loop ())
136
137let listen t ~sw ~net ~port =
138 let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in
139 let socket = Eio.Net.listen ~sw ~backlog:10 ~reuse_addr:true net addr in
140 Log.info (fun m -> m "Listening on port %d" port);
141 let rec accept_loop () =
142 let conn, _addr = Eio.Net.accept ~sw socket in
143 Eio.Fiber.fork ~sw (fun () -> handle_connection t ~sw ~net conn);
144 accept_loop ()
145 in
146 accept_loop ()
147
148let check_contacts t ~sw ~net =
149 let plan = Engine.contact_plan t.engine in
150 let now = Unix.gettimeofday () in
151 let active = Cgr.Contact_plan.active_at plan ~time:now in
152 List.iter
153 (fun contact ->
154 let to_node = Cgr.Contact.to_ contact in
155 let name = Cgr.Node.name to_node in
156 match Hashtbl.find_opt t.peers name with
157 | None -> ()
158 | Some peer ->
159 (* Connect if contact is active and we're not connected *)
160 connect_peer t ~sw ~net peer;
161 (* Release stored bundles *)
162 let reqs = Engine.on_contact_start t.engine contact in
163 List.iter
164 (fun (req : Engine.forward_request) ->
165 ignore (send_bundle t ~sw ~net req.bundle req.next_hop))
166 reqs)
167 active
168
169let check_routes t ~sw ~net =
170 let reqs = Engine.check_routes t.engine in
171 List.iter
172 (fun (req : Engine.forward_request) ->
173 ignore (send_bundle t ~sw ~net req.bundle req.next_hop))
174 reqs
175
176let periodic_tasks t ~sw ~net ~clock ~interval =
177 let rec loop () =
178 Eio.Time.sleep clock interval;
179 if t.running then (
180 check_contacts t ~sw ~net;
181 check_routes t ~sw ~net;
182 let now = Eio.Time.now clock in
183 let _ = Engine.cleanup_expired t.engine ~current_time:now in
184 let stats = Engine.stats t.engine in
185 Log.debug (fun m -> m "Stats: %a" Engine.pp_stats stats);
186 loop ())
187 in
188 loop ()
189
190let run t ~sw ~net ~clock ~port =
191 t.running <- true;
192 Eio.Fiber.all
193 [
194 (fun () -> listen t ~sw ~net ~port);
195 (fun () -> periodic_tasks t ~sw ~net ~clock ~interval:5.0);
196 ]
197
198let stop t = t.running <- false