DTN controller and policy language for satellite networks
at main 286 lines 9.3 kB view raw
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2025 Thomas Gazagnaire. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6module Log = (val Logs.src_log (Logs.Src.create "borealis.engine")) 7 8type config = { 9 local_node : Cgr.Node.t; 10 local_eid : Bundle.eid; 11 admin_eid : Bundle.eid; 12} 13 14let make_config ~node_id = 15 { 16 local_node = Cgr.Node.v (Int64.to_string node_id); 17 local_eid = Bundle.Ipn (node_id, 1L); 18 admin_eid = Bundle.Ipn (node_id, 0L); 19 } 20 21type mutable_stats = { 22 mutable bundles_received : int; 23 mutable bundles_forwarded : int; 24 mutable bundles_delivered : int; 25 mutable bundles_dropped : int; 26 mutable admin_handled : int; 27} 28 29type t = { 30 config : config; 31 store : Store.t; 32 mutable contact_plan : Cgr.Contact_plan.t; 33 mutable policy : Policy.t; 34 mstats : mutable_stats; 35 start_time : float; 36} 37 38let create ~config ~policy ~contact_plan = 39 { 40 config; 41 store = Store.create (); 42 contact_plan; 43 policy; 44 mstats = 45 { 46 bundles_received = 0; 47 bundles_forwarded = 0; 48 bundles_delivered = 0; 49 bundles_dropped = 0; 50 admin_handled = 0; 51 }; 52 start_time = Unix.gettimeofday (); 53 } 54 55let config t = t.config 56let store t = t.store 57let contact_plan t = t.contact_plan 58let set_contact_plan t plan = t.contact_plan <- plan 59let policy t = t.policy 60let set_policy t p = t.policy <- p 61 62type forward_request = { 63 bundle : Bundle.t; 64 next_hop : Bundle.eid; 65 via : Action.cla option; 66} 67 68type process_result = 69 | Forward of forward_request list 70 | Stored 71 | Delivered 72 | Dropped of string 73 | Admin_handled 74 75let is_local t eid = 76 match (eid, t.config.local_eid) with 77 | Bundle.Ipn (n1, _), Bundle.Ipn (n2, _) -> n1 = n2 78 | Bundle.Dtn s1, Bundle.Dtn s2 -> String.equal s1 s2 79 | _ -> eid = t.config.local_eid || eid = t.config.admin_eid 80 81let eid_to_node eid = 82 match eid with 83 | Bundle.Dtn_none -> None 84 | Bundle.Dtn path -> Some (Cgr.Node.v path) 85 | Bundle.Ipn (node, _service) -> Some (Cgr.Node.v (Int64.to_string node)) 86 87let find_route t dest = 88 match eid_to_node dest with 89 | None -> None 90 | Some dst -> 91 Cgr.find_route t.contact_plan ~src:t.config.local_node ~dst 92 ~time:(Unix.gettimeofday ()) 93 94let route_to_next_hop route = 95 match Cgr.Route.hops route with 96 | [] -> None 97 | first_contact :: _ -> 98 let to_node = Cgr.Contact.to_ first_contact in 99 Some (Bundle.Dtn (Cgr.Node.name to_node)) 100 101let rec execute_action t bundle = function 102 | Action.Forward { next_hop; custody = _; via } -> 103 t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; 104 Forward [ { bundle; next_hop; via } ] 105 | Action.Forward_route -> ( 106 match find_route t bundle.Bundle.primary.destination with 107 | None -> 108 Log.debug (fun m -> m "No route, storing bundle"); 109 Store.store t.store ~bundle ~condition:Action.Until_route 110 ~custody:false; 111 Stored 112 | Some route -> ( 113 match route_to_next_hop route with 114 | None -> Dropped "empty route" 115 | Some next_hop -> 116 t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; 117 Forward [ { bundle; next_hop; via = None } ])) 118 | Action.Store { until } -> 119 Store.store t.store ~bundle ~condition:until ~custody:false; 120 Stored 121 | Action.Drop { reason } -> 122 t.mstats.bundles_dropped <- t.mstats.bundles_dropped + 1; 123 Dropped reason 124 | Action.Accept_custody -> ( 125 (* Accept custody and forward via CGR *) 126 match find_route t bundle.Bundle.primary.destination with 127 | None -> 128 Store.store t.store ~bundle ~condition:Action.Until_route 129 ~custody:true; 130 Stored 131 | Some route -> ( 132 match route_to_next_hop route with 133 | None -> 134 Store.store t.store ~bundle ~condition:Action.Until_route 135 ~custody:true; 136 Stored 137 | Some next_hop -> 138 t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; 139 Forward [ { bundle; next_hop; via = None } ])) 140 | Action.Refuse_custody { reason } -> Dropped ("custody refused: " ^ reason) 141 | Action.Rate_limit _ -> 142 (* Rate limiting not implemented yet, just forward *) 143 execute_action t bundle Action.Forward_route 144 145let process t bundle ~tenant = 146 t.mstats.bundles_received <- t.mstats.bundles_received + 1; 147 let dest = bundle.Bundle.primary.destination in 148 149 (* Check if this is for us *) 150 if is_local t dest then ( 151 t.mstats.bundles_delivered <- t.mstats.bundles_delivered + 1; 152 if bundle.Bundle.primary.flags.is_admin_record then Admin_handled 153 else Delivered) 154 else 155 (* Evaluate policy *) 156 let ctx : Policy.context = 157 { 158 bundle; 159 current_time = Unix.gettimeofday (); 160 tenant; 161 contact_plan = t.contact_plan; 162 local_node = t.config.local_node; 163 } 164 in 165 match Policy.eval ctx t.policy with 166 | Policy.No_match -> 167 Log.warn (fun m -> m "No policy match, dropping bundle"); 168 t.mstats.bundles_dropped <- t.mstats.bundles_dropped + 1; 169 Dropped "no policy match" 170 | Policy.Actions [] -> Dropped "empty action list" 171 | Policy.Actions (action :: _) -> 172 (* Execute first action (could extend to handle multiple) *) 173 execute_action t bundle action 174 175let process_admin t record = 176 t.mstats.admin_handled <- t.mstats.admin_handled + 1; 177 match record with 178 | Admin.Query Admin.Query_status -> 179 let status : Admin.status = 180 { 181 node_id = t.config.local_eid; 182 uptime_secs = Unix.gettimeofday () -. t.start_time; 183 bundles_stored = Store.count t.store; 184 bundles_forwarded = t.mstats.bundles_forwarded; 185 bundles_delivered = t.mstats.bundles_delivered; 186 bundles_dropped = t.mstats.bundles_dropped; 187 active_contacts = 188 List.length 189 (Cgr.Contact_plan.active_at t.contact_plan 190 ~time:(Unix.gettimeofday ())); 191 } 192 in 193 Some (Admin.Response (Admin.Response_status status)) 194 | Admin.Query Admin.Query_contacts -> 195 let contacts = 196 List.map 197 (fun c : Admin.contact -> 198 { 199 from_node = Cgr.Node.name (Cgr.Contact.from c); 200 to_node = Cgr.Node.name (Cgr.Contact.to_ c); 201 start_time = Cgr.Contact.start c; 202 stop_time = Cgr.Contact.stop c; 203 rate_bps = Cgr.Contact.rate c; 204 owlt_secs = Cgr.Contact.owlt c; 205 }) 206 (Cgr.Contact_plan.contacts t.contact_plan) 207 in 208 Some (Admin.Response (Admin.Response_contacts { contacts })) 209 | Admin.Contact_update { contacts } -> 210 let new_contacts = 211 List.map 212 (fun (c : Admin.contact) -> 213 Cgr.Contact.v ~from:(Cgr.Node.v c.from_node) 214 ~to_:(Cgr.Node.v c.to_node) ~start:c.start_time ~stop:c.stop_time 215 ~rate:c.rate_bps ~owlt:c.owlt_secs ()) 216 contacts 217 in 218 t.contact_plan <- Cgr.Contact_plan.of_list new_contacts; 219 Log.info (fun m -> 220 m "Contact plan updated with %d contacts" (List.length contacts)); 221 None 222 | Admin.Policy_update { source; _ } -> 223 Log.info (fun m -> m "Policy update received: %s" source); 224 (* Policy compilation not implemented yet *) 225 None 226 | _ -> None 227 228let on_contact_start t contact = 229 let to_node = Cgr.Contact.to_ contact in 230 let to_eid = Bundle.Dtn (Cgr.Node.name to_node) in 231 let ready = Store.ready_for_contact t.store to_eid in 232 List.map 233 (fun entry -> 234 Store.remove t.store entry.Store.id; 235 t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; 236 { bundle = entry.bundle; next_hop = to_eid; via = None }) 237 ready 238 239let on_contact_end _t _contact = () 240 241let cleanup_expired t ~current_time = 242 let expired = Store.expired t.store ~current_time in 243 List.iter 244 (fun entry -> 245 Store.remove t.store entry.Store.id; 246 t.mstats.bundles_dropped <- t.mstats.bundles_dropped + 1) 247 expired; 248 List.length expired 249 250let check_routes t = 251 let waiting = Store.ready_for_route t.store in 252 List.filter_map 253 (fun entry -> 254 match find_route t entry.Store.bundle.Bundle.primary.destination with 255 | None -> None 256 | Some route -> ( 257 match route_to_next_hop route with 258 | None -> None 259 | Some next_hop -> 260 Store.remove t.store entry.id; 261 t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; 262 Some { bundle = entry.bundle; next_hop; via = None })) 263 waiting 264 265type stats = { 266 bundles_received : int; 267 bundles_forwarded : int; 268 bundles_delivered : int; 269 bundles_dropped : int; 270 admin_handled : int; 271} 272 273let stats t = 274 { 275 bundles_received = t.mstats.bundles_received; 276 bundles_forwarded = t.mstats.bundles_forwarded; 277 bundles_delivered = t.mstats.bundles_delivered; 278 bundles_dropped = t.mstats.bundles_dropped; 279 admin_handled = t.mstats.admin_handled; 280 } 281 282let pp_stats ppf s = 283 Fmt.pf ppf 284 "@[<v>received: %d@,forwarded: %d@,delivered: %d@,dropped: %d@,admin: %d@]" 285 s.bundles_received s.bundles_forwarded s.bundles_delivered s.bundles_dropped 286 s.admin_handled