DTN controller and policy language for satellite networks
1(*---------------------------------------------------------------------------
2 Copyright (c) 2025 Thomas Gazagnaire. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6module Log = (val Logs.src_log (Logs.Src.create "borealis.engine"))
7
8type config = {
9 local_node : Cgr.Node.t;
10 local_eid : Bundle.eid;
11 admin_eid : Bundle.eid;
12}
13
14let make_config ~node_id =
15 {
16 local_node = Cgr.Node.v (Int64.to_string node_id);
17 local_eid = Bundle.Ipn (node_id, 1L);
18 admin_eid = Bundle.Ipn (node_id, 0L);
19 }
20
21type mutable_stats = {
22 mutable bundles_received : int;
23 mutable bundles_forwarded : int;
24 mutable bundles_delivered : int;
25 mutable bundles_dropped : int;
26 mutable admin_handled : int;
27}
28
29type t = {
30 config : config;
31 store : Store.t;
32 mutable contact_plan : Cgr.Contact_plan.t;
33 mutable policy : Policy.t;
34 mstats : mutable_stats;
35 start_time : float;
36}
37
38let create ~config ~policy ~contact_plan =
39 {
40 config;
41 store = Store.create ();
42 contact_plan;
43 policy;
44 mstats =
45 {
46 bundles_received = 0;
47 bundles_forwarded = 0;
48 bundles_delivered = 0;
49 bundles_dropped = 0;
50 admin_handled = 0;
51 };
52 start_time = Unix.gettimeofday ();
53 }
54
55let config t = t.config
56let store t = t.store
57let contact_plan t = t.contact_plan
58let set_contact_plan t plan = t.contact_plan <- plan
59let policy t = t.policy
60let set_policy t p = t.policy <- p
61
62type forward_request = {
63 bundle : Bundle.t;
64 next_hop : Bundle.eid;
65 via : Action.cla option;
66}
67
68type process_result =
69 | Forward of forward_request list
70 | Stored
71 | Delivered
72 | Dropped of string
73 | Admin_handled
74
75let is_local t eid =
76 match (eid, t.config.local_eid) with
77 | Bundle.Ipn (n1, _), Bundle.Ipn (n2, _) -> n1 = n2
78 | Bundle.Dtn s1, Bundle.Dtn s2 -> String.equal s1 s2
79 | _ -> eid = t.config.local_eid || eid = t.config.admin_eid
80
81let eid_to_node eid =
82 match eid with
83 | Bundle.Dtn_none -> None
84 | Bundle.Dtn path -> Some (Cgr.Node.v path)
85 | Bundle.Ipn (node, _service) -> Some (Cgr.Node.v (Int64.to_string node))
86
87let find_route t dest =
88 match eid_to_node dest with
89 | None -> None
90 | Some dst ->
91 Cgr.find_route t.contact_plan ~src:t.config.local_node ~dst
92 ~time:(Unix.gettimeofday ())
93
94let route_to_next_hop route =
95 match Cgr.Route.hops route with
96 | [] -> None
97 | first_contact :: _ ->
98 let to_node = Cgr.Contact.to_ first_contact in
99 Some (Bundle.Dtn (Cgr.Node.name to_node))
100
101let rec execute_action t bundle = function
102 | Action.Forward { next_hop; custody = _; via } ->
103 t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1;
104 Forward [ { bundle; next_hop; via } ]
105 | Action.Forward_route -> (
106 match find_route t bundle.Bundle.primary.destination with
107 | None ->
108 Log.debug (fun m -> m "No route, storing bundle");
109 Store.store t.store ~bundle ~condition:Action.Until_route
110 ~custody:false;
111 Stored
112 | Some route -> (
113 match route_to_next_hop route with
114 | None -> Dropped "empty route"
115 | Some next_hop ->
116 t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1;
117 Forward [ { bundle; next_hop; via = None } ]))
118 | Action.Store { until } ->
119 Store.store t.store ~bundle ~condition:until ~custody:false;
120 Stored
121 | Action.Drop { reason } ->
122 t.mstats.bundles_dropped <- t.mstats.bundles_dropped + 1;
123 Dropped reason
124 | Action.Accept_custody -> (
125 (* Accept custody and forward via CGR *)
126 match find_route t bundle.Bundle.primary.destination with
127 | None ->
128 Store.store t.store ~bundle ~condition:Action.Until_route
129 ~custody:true;
130 Stored
131 | Some route -> (
132 match route_to_next_hop route with
133 | None ->
134 Store.store t.store ~bundle ~condition:Action.Until_route
135 ~custody:true;
136 Stored
137 | Some next_hop ->
138 t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1;
139 Forward [ { bundle; next_hop; via = None } ]))
140 | Action.Refuse_custody { reason } -> Dropped ("custody refused: " ^ reason)
141 | Action.Rate_limit _ ->
142 (* Rate limiting not implemented yet, just forward *)
143 execute_action t bundle Action.Forward_route
144
145let process t bundle ~tenant =
146 t.mstats.bundles_received <- t.mstats.bundles_received + 1;
147 let dest = bundle.Bundle.primary.destination in
148
149 (* Check if this is for us *)
150 if is_local t dest then (
151 t.mstats.bundles_delivered <- t.mstats.bundles_delivered + 1;
152 if bundle.Bundle.primary.flags.is_admin_record then Admin_handled
153 else Delivered)
154 else
155 (* Evaluate policy *)
156 let ctx : Policy.context =
157 {
158 bundle;
159 current_time = Unix.gettimeofday ();
160 tenant;
161 contact_plan = t.contact_plan;
162 local_node = t.config.local_node;
163 }
164 in
165 match Policy.eval ctx t.policy with
166 | Policy.No_match ->
167 Log.warn (fun m -> m "No policy match, dropping bundle");
168 t.mstats.bundles_dropped <- t.mstats.bundles_dropped + 1;
169 Dropped "no policy match"
170 | Policy.Actions [] -> Dropped "empty action list"
171 | Policy.Actions (action :: _) ->
172 (* Execute first action (could extend to handle multiple) *)
173 execute_action t bundle action
174
175let process_admin t record =
176 t.mstats.admin_handled <- t.mstats.admin_handled + 1;
177 match record with
178 | Admin.Query Admin.Query_status ->
179 let status : Admin.status =
180 {
181 node_id = t.config.local_eid;
182 uptime_secs = Unix.gettimeofday () -. t.start_time;
183 bundles_stored = Store.count t.store;
184 bundles_forwarded = t.mstats.bundles_forwarded;
185 bundles_delivered = t.mstats.bundles_delivered;
186 bundles_dropped = t.mstats.bundles_dropped;
187 active_contacts =
188 List.length
189 (Cgr.Contact_plan.active_at t.contact_plan
190 ~time:(Unix.gettimeofday ()));
191 }
192 in
193 Some (Admin.Response (Admin.Response_status status))
194 | Admin.Query Admin.Query_contacts ->
195 let contacts =
196 List.map
197 (fun c : Admin.contact ->
198 {
199 from_node = Cgr.Node.name (Cgr.Contact.from c);
200 to_node = Cgr.Node.name (Cgr.Contact.to_ c);
201 start_time = Cgr.Contact.start c;
202 stop_time = Cgr.Contact.stop c;
203 rate_bps = Cgr.Contact.rate c;
204 owlt_secs = Cgr.Contact.owlt c;
205 })
206 (Cgr.Contact_plan.contacts t.contact_plan)
207 in
208 Some (Admin.Response (Admin.Response_contacts { contacts }))
209 | Admin.Contact_update { contacts } ->
210 let new_contacts =
211 List.map
212 (fun (c : Admin.contact) ->
213 Cgr.Contact.v ~from:(Cgr.Node.v c.from_node)
214 ~to_:(Cgr.Node.v c.to_node) ~start:c.start_time ~stop:c.stop_time
215 ~rate:c.rate_bps ~owlt:c.owlt_secs ())
216 contacts
217 in
218 t.contact_plan <- Cgr.Contact_plan.of_list new_contacts;
219 Log.info (fun m ->
220 m "Contact plan updated with %d contacts" (List.length contacts));
221 None
222 | Admin.Policy_update { source; _ } ->
223 Log.info (fun m -> m "Policy update received: %s" source);
224 (* Policy compilation not implemented yet *)
225 None
226 | _ -> None
227
228let on_contact_start t contact =
229 let to_node = Cgr.Contact.to_ contact in
230 let to_eid = Bundle.Dtn (Cgr.Node.name to_node) in
231 let ready = Store.ready_for_contact t.store to_eid in
232 List.map
233 (fun entry ->
234 Store.remove t.store entry.Store.id;
235 t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1;
236 { bundle = entry.bundle; next_hop = to_eid; via = None })
237 ready
238
239let on_contact_end _t _contact = ()
240
241let cleanup_expired t ~current_time =
242 let expired = Store.expired t.store ~current_time in
243 List.iter
244 (fun entry ->
245 Store.remove t.store entry.Store.id;
246 t.mstats.bundles_dropped <- t.mstats.bundles_dropped + 1)
247 expired;
248 List.length expired
249
250let check_routes t =
251 let waiting = Store.ready_for_route t.store in
252 List.filter_map
253 (fun entry ->
254 match find_route t entry.Store.bundle.Bundle.primary.destination with
255 | None -> None
256 | Some route -> (
257 match route_to_next_hop route with
258 | None -> None
259 | Some next_hop ->
260 Store.remove t.store entry.id;
261 t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1;
262 Some { bundle = entry.bundle; next_hop; via = None }))
263 waiting
264
265type stats = {
266 bundles_received : int;
267 bundles_forwarded : int;
268 bundles_delivered : int;
269 bundles_dropped : int;
270 admin_handled : int;
271}
272
273let stats t =
274 {
275 bundles_received = t.mstats.bundles_received;
276 bundles_forwarded = t.mstats.bundles_forwarded;
277 bundles_delivered = t.mstats.bundles_delivered;
278 bundles_dropped = t.mstats.bundles_dropped;
279 admin_handled = t.mstats.admin_handled;
280 }
281
282let pp_stats ppf s =
283 Fmt.pf ppf
284 "@[<v>received: %d@,forwarded: %d@,delivered: %d@,dropped: %d@,admin: %d@]"
285 s.bundles_received s.bundles_forwarded s.bundles_delivered s.bundles_dropped
286 s.admin_handled