(*--------------------------------------------------------------------------- Copyright (c) 2025 Thomas Gazagnaire. All rights reserved. SPDX-License-Identifier: ISC ---------------------------------------------------------------------------*) module Log = (val Logs.src_log (Logs.Src.create "borealis.engine")) type config = { local_node : Cgr.Node.t; local_eid : Bundle.eid; admin_eid : Bundle.eid; } let make_config ~node_id = { local_node = Cgr.Node.v (Int64.to_string node_id); local_eid = Bundle.Ipn (node_id, 1L); admin_eid = Bundle.Ipn (node_id, 0L); } type mutable_stats = { mutable bundles_received : int; mutable bundles_forwarded : int; mutable bundles_delivered : int; mutable bundles_dropped : int; mutable admin_handled : int; } type t = { config : config; store : Store.t; mutable contact_plan : Cgr.Contact_plan.t; mutable policy : Policy.t; mstats : mutable_stats; start_time : float; } let create ~config ~policy ~contact_plan = { config; store = Store.create (); contact_plan; policy; mstats = { bundles_received = 0; bundles_forwarded = 0; bundles_delivered = 0; bundles_dropped = 0; admin_handled = 0; }; start_time = Unix.gettimeofday (); } let config t = t.config let store t = t.store let contact_plan t = t.contact_plan let set_contact_plan t plan = t.contact_plan <- plan let policy t = t.policy let set_policy t p = t.policy <- p type forward_request = { bundle : Bundle.t; next_hop : Bundle.eid; via : Action.cla option; } type process_result = | Forward of forward_request list | Stored | Delivered | Dropped of string | Admin_handled let is_local t eid = match (eid, t.config.local_eid) with | Bundle.Ipn (n1, _), Bundle.Ipn (n2, _) -> n1 = n2 | Bundle.Dtn s1, Bundle.Dtn s2 -> String.equal s1 s2 | _ -> eid = t.config.local_eid || eid = t.config.admin_eid let eid_to_node eid = match eid with | Bundle.Dtn_none -> None | Bundle.Dtn path -> Some (Cgr.Node.v path) | Bundle.Ipn (node, _service) -> Some (Cgr.Node.v (Int64.to_string node)) let find_route t dest = match eid_to_node dest with | None -> None | Some dst -> Cgr.find_route t.contact_plan ~src:t.config.local_node ~dst ~time:(Unix.gettimeofday ()) let route_to_next_hop route = match Cgr.Route.hops route with | [] -> None | first_contact :: _ -> let to_node = Cgr.Contact.to_ first_contact in Some (Bundle.Dtn (Cgr.Node.name to_node)) let rec execute_action t bundle = function | Action.Forward { next_hop; custody = _; via } -> t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; Forward [ { bundle; next_hop; via } ] | Action.Forward_route -> ( match find_route t bundle.Bundle.primary.destination with | None -> Log.debug (fun m -> m "No route, storing bundle"); Store.store t.store ~bundle ~condition:Action.Until_route ~custody:false; Stored | Some route -> ( match route_to_next_hop route with | None -> Dropped "empty route" | Some next_hop -> t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; Forward [ { bundle; next_hop; via = None } ])) | Action.Store { until } -> Store.store t.store ~bundle ~condition:until ~custody:false; Stored | Action.Drop { reason } -> t.mstats.bundles_dropped <- t.mstats.bundles_dropped + 1; Dropped reason | Action.Accept_custody -> ( (* Accept custody and forward via CGR *) match find_route t bundle.Bundle.primary.destination with | None -> Store.store t.store ~bundle ~condition:Action.Until_route ~custody:true; Stored | Some route -> ( match route_to_next_hop route with | None -> Store.store t.store ~bundle ~condition:Action.Until_route ~custody:true; Stored | Some next_hop -> t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; Forward [ { bundle; next_hop; via = None } ])) | Action.Refuse_custody { reason } -> Dropped ("custody refused: " ^ reason) | Action.Rate_limit _ -> (* Rate limiting not implemented yet, just forward *) execute_action t bundle Action.Forward_route let process t bundle ~tenant = t.mstats.bundles_received <- t.mstats.bundles_received + 1; let dest = bundle.Bundle.primary.destination in (* Check if this is for us *) if is_local t dest then ( t.mstats.bundles_delivered <- t.mstats.bundles_delivered + 1; if bundle.Bundle.primary.flags.is_admin_record then Admin_handled else Delivered) else (* Evaluate policy *) let ctx : Policy.context = { bundle; current_time = Unix.gettimeofday (); tenant; contact_plan = t.contact_plan; local_node = t.config.local_node; } in match Policy.eval ctx t.policy with | Policy.No_match -> Log.warn (fun m -> m "No policy match, dropping bundle"); t.mstats.bundles_dropped <- t.mstats.bundles_dropped + 1; Dropped "no policy match" | Policy.Actions [] -> Dropped "empty action list" | Policy.Actions (action :: _) -> (* Execute first action (could extend to handle multiple) *) execute_action t bundle action let process_admin t record = t.mstats.admin_handled <- t.mstats.admin_handled + 1; match record with | Admin.Query Admin.Query_status -> let status : Admin.status = { node_id = t.config.local_eid; uptime_secs = Unix.gettimeofday () -. t.start_time; bundles_stored = Store.count t.store; bundles_forwarded = t.mstats.bundles_forwarded; bundles_delivered = t.mstats.bundles_delivered; bundles_dropped = t.mstats.bundles_dropped; active_contacts = List.length (Cgr.Contact_plan.active_at t.contact_plan ~time:(Unix.gettimeofday ())); } in Some (Admin.Response (Admin.Response_status status)) | Admin.Query Admin.Query_contacts -> let contacts = List.map (fun c : Admin.contact -> { from_node = Cgr.Node.name (Cgr.Contact.from c); to_node = Cgr.Node.name (Cgr.Contact.to_ c); start_time = Cgr.Contact.start c; stop_time = Cgr.Contact.stop c; rate_bps = Cgr.Contact.rate c; owlt_secs = Cgr.Contact.owlt c; }) (Cgr.Contact_plan.contacts t.contact_plan) in Some (Admin.Response (Admin.Response_contacts { contacts })) | Admin.Contact_update { contacts } -> let new_contacts = List.map (fun (c : Admin.contact) -> Cgr.Contact.v ~from:(Cgr.Node.v c.from_node) ~to_:(Cgr.Node.v c.to_node) ~start:c.start_time ~stop:c.stop_time ~rate:c.rate_bps ~owlt:c.owlt_secs ()) contacts in t.contact_plan <- Cgr.Contact_plan.of_list new_contacts; Log.info (fun m -> m "Contact plan updated with %d contacts" (List.length contacts)); None | Admin.Policy_update { source; _ } -> Log.info (fun m -> m "Policy update received: %s" source); (* Policy compilation not implemented yet *) None | _ -> None let on_contact_start t contact = let to_node = Cgr.Contact.to_ contact in let to_eid = Bundle.Dtn (Cgr.Node.name to_node) in let ready = Store.ready_for_contact t.store to_eid in List.map (fun entry -> Store.remove t.store entry.Store.id; t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; { bundle = entry.bundle; next_hop = to_eid; via = None }) ready let on_contact_end _t _contact = () let cleanup_expired t ~current_time = let expired = Store.expired t.store ~current_time in List.iter (fun entry -> Store.remove t.store entry.Store.id; t.mstats.bundles_dropped <- t.mstats.bundles_dropped + 1) expired; List.length expired let check_routes t = let waiting = Store.ready_for_route t.store in List.filter_map (fun entry -> match find_route t entry.Store.bundle.Bundle.primary.destination with | None -> None | Some route -> ( match route_to_next_hop route with | None -> None | Some next_hop -> Store.remove t.store entry.id; t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; Some { bundle = entry.bundle; next_hop; via = None })) waiting type stats = { bundles_received : int; bundles_forwarded : int; bundles_delivered : int; bundles_dropped : int; admin_handled : int; } let stats t = { bundles_received = t.mstats.bundles_received; bundles_forwarded = t.mstats.bundles_forwarded; bundles_delivered = t.mstats.bundles_delivered; bundles_dropped = t.mstats.bundles_dropped; admin_handled = t.mstats.admin_handled; } let pp_stats ppf s = Fmt.pf ppf "@[received: %d@,forwarded: %d@,delivered: %d@,dropped: %d@,admin: %d@]" s.bundles_received s.bundles_forwarded s.bundles_delivered s.bundles_dropped s.admin_handled