(*--------------------------------------------------------------------------- Copyright (c) 2025 Thomas Gazagnaire. All rights reserved. SPDX-License-Identifier: ISC ---------------------------------------------------------------------------*) (** Borealis daemon - network coordination for DTN node. *) module Log = (val Logs.src_log (Logs.Src.create "borealis.daemon")) type peer = { node : Cgr.Node.t; address : Eio.Net.Sockaddr.stream; mutable connection : Tcpcl_adapter.t option; } type t = { engine : Engine.t; peers : (string, peer) Hashtbl.t; mutable running : bool; } let create ~engine = { engine; peers = Hashtbl.create 16; running = false } let add_peer t ~node ~address = let name = Cgr.Node.name node in if not (Hashtbl.mem t.peers name) then ( Hashtbl.add t.peers name { node; address; connection = None }; Log.info (fun m -> m "Added peer %s at %a" name Eio.Net.Sockaddr.pp address)) let node_of_eid = function | Bundle.Dtn_none -> None | Bundle.Dtn path -> Some (Cgr.Node.v path) | Bundle.Ipn (node, _) -> Some (Cgr.Node.v (Int64.to_string node)) let find_peer t eid = match node_of_eid eid with | None -> None | Some node -> let name = Cgr.Node.name node in Hashtbl.find_opt t.peers name let connect_peer t ~sw ~net peer = match peer.connection with | Some conn when Tcpcl_adapter.is_connected conn -> () | _ -> ( let config = Engine.config t.engine in Log.info (fun m -> m "Connecting to %s at %a" (Cgr.Node.name peer.node) Eio.Net.Sockaddr.pp peer.address); try let conn = Tcpcl_adapter.connect ~sw ~net ~local_eid:config.local_eid ~addr:peer.address in if Tcpcl_adapter.is_connected conn then ( peer.connection <- Some conn; Log.info (fun m -> m "Connected to %s" (Cgr.Node.name peer.node))) else Log.warn (fun m -> m "Failed to connect to %s" (Cgr.Node.name peer.node)) with exn -> Log.warn (fun m -> m "Connection to %s failed: %s" (Cgr.Node.name peer.node) (Printexc.to_string exn))) let send_bundle t ~sw ~net bundle next_hop = match find_peer t next_hop with | None -> Log.warn (fun m -> m "No peer for %a" Bundle.pp_eid next_hop); Error "no peer" | Some peer -> ( connect_peer t ~sw ~net peer; match peer.connection with | None -> Error "not connected" | Some conn -> ( match Tcpcl_adapter.send_bundle conn bundle with | Ok () -> Log.debug (fun m -> m "Sent bundle to %a" Bundle.pp_eid next_hop); Ok () | Error e -> Log.warn (fun m -> m "Failed to send to %a: %s" Bundle.pp_eid next_hop e); Error e)) let process_bundle t ~sw ~net bundle = let result = Engine.process t.engine bundle ~tenant:None in match result with | Engine.Forward reqs -> List.iter (fun (req : Engine.forward_request) -> ignore (send_bundle t ~sw ~net req.bundle req.next_hop)) reqs | Engine.Stored -> Log.debug (fun m -> m "Bundle stored") | Engine.Delivered -> Log.info (fun m -> m "Bundle delivered locally") | Engine.Dropped reason -> Log.info (fun m -> m "Bundle dropped: %s" reason) | Engine.Admin_handled -> ( match Admin.extract bundle with | Error _ -> () | Ok record -> ( match Engine.process_admin t.engine record with | None -> () | Some response -> (* Send response back *) let source = bundle.Bundle.primary.source in let dest = (Engine.config t.engine).admin_eid in let timestamp : Bundle.timestamp = { time = Int64.of_float (Unix.gettimeofday () *. 1000.); seq = 0L; } in let resp_bundle = Admin.make_bundle ~source:dest ~destination:source ~timestamp response in ignore (send_bundle t ~sw ~net resp_bundle source))) let handle_connection t ~sw ~net socket = let config = Engine.config t.engine in let conn = Tcpcl_adapter.accept ~sw ~local_eid:config.local_eid socket in if Tcpcl_adapter.is_connected conn then ( Log.info (fun m -> m "Accepted connection from %s" (Option.value ~default:"unknown" (Tcpcl_adapter.peer_eid conn))); (* Receive loop *) let rec loop () = match Tcpcl_adapter.receive conn with | Ok bundle -> process_bundle t ~sw ~net bundle; loop () | Error e -> Log.debug (fun m -> m "Connection closed: %s" e); Tcpcl_adapter.close_connection conn in loop ()) let listen t ~sw ~net ~port = let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in let socket = Eio.Net.listen ~sw ~backlog:10 ~reuse_addr:true net addr in Log.info (fun m -> m "Listening on port %d" port); let rec accept_loop () = let conn, _addr = Eio.Net.accept ~sw socket in Eio.Fiber.fork ~sw (fun () -> handle_connection t ~sw ~net conn); accept_loop () in accept_loop () let check_contacts t ~sw ~net = let plan = Engine.contact_plan t.engine in let now = Unix.gettimeofday () in let active = Cgr.Contact_plan.active_at plan ~time:now in List.iter (fun contact -> let to_node = Cgr.Contact.to_ contact in let name = Cgr.Node.name to_node in match Hashtbl.find_opt t.peers name with | None -> () | Some peer -> (* Connect if contact is active and we're not connected *) connect_peer t ~sw ~net peer; (* Release stored bundles *) let reqs = Engine.on_contact_start t.engine contact in List.iter (fun (req : Engine.forward_request) -> ignore (send_bundle t ~sw ~net req.bundle req.next_hop)) reqs) active let check_routes t ~sw ~net = let reqs = Engine.check_routes t.engine in List.iter (fun (req : Engine.forward_request) -> ignore (send_bundle t ~sw ~net req.bundle req.next_hop)) reqs let periodic_tasks t ~sw ~net ~clock ~interval = let rec loop () = Eio.Time.sleep clock interval; if t.running then ( check_contacts t ~sw ~net; check_routes t ~sw ~net; let now = Eio.Time.now clock in let _ = Engine.cleanup_expired t.engine ~current_time:now in let stats = Engine.stats t.engine in Log.debug (fun m -> m "Stats: %a" Engine.pp_stats stats); loop ()) in loop () let run t ~sw ~net ~clock ~port = t.running <- true; Eio.Fiber.all [ (fun () -> listen t ~sw ~net ~port); (fun () -> periodic_tasks t ~sw ~net ~clock ~interval:5.0); ] let stop t = t.running <- false