DTN controller and policy language for satellite networks
at main 198 lines 6.8 kB view raw
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2025 Thomas Gazagnaire. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6(** Borealis daemon - network coordination for DTN node. *) 7 8module Log = (val Logs.src_log (Logs.Src.create "borealis.daemon")) 9 10type peer = { 11 node : Cgr.Node.t; 12 address : Eio.Net.Sockaddr.stream; 13 mutable connection : Tcpcl_adapter.t option; 14} 15 16type t = { 17 engine : Engine.t; 18 peers : (string, peer) Hashtbl.t; 19 mutable running : bool; 20} 21 22let create ~engine = { engine; peers = Hashtbl.create 16; running = false } 23 24let add_peer t ~node ~address = 25 let name = Cgr.Node.name node in 26 if not (Hashtbl.mem t.peers name) then ( 27 Hashtbl.add t.peers name { node; address; connection = None }; 28 Log.info (fun m -> m "Added peer %s at %a" name Eio.Net.Sockaddr.pp address)) 29 30let node_of_eid = function 31 | Bundle.Dtn_none -> None 32 | Bundle.Dtn path -> Some (Cgr.Node.v path) 33 | Bundle.Ipn (node, _) -> Some (Cgr.Node.v (Int64.to_string node)) 34 35let find_peer t eid = 36 match node_of_eid eid with 37 | None -> None 38 | Some node -> 39 let name = Cgr.Node.name node in 40 Hashtbl.find_opt t.peers name 41 42let connect_peer t ~sw ~net peer = 43 match peer.connection with 44 | Some conn when Tcpcl_adapter.is_connected conn -> () 45 | _ -> ( 46 let config = Engine.config t.engine in 47 Log.info (fun m -> 48 m "Connecting to %s at %a" (Cgr.Node.name peer.node) 49 Eio.Net.Sockaddr.pp peer.address); 50 try 51 let conn = 52 Tcpcl_adapter.connect ~sw ~net ~local_eid:config.local_eid 53 ~addr:peer.address 54 in 55 if Tcpcl_adapter.is_connected conn then ( 56 peer.connection <- Some conn; 57 Log.info (fun m -> m "Connected to %s" (Cgr.Node.name peer.node))) 58 else 59 Log.warn (fun m -> 60 m "Failed to connect to %s" (Cgr.Node.name peer.node)) 61 with exn -> 62 Log.warn (fun m -> 63 m "Connection to %s failed: %s" (Cgr.Node.name peer.node) 64 (Printexc.to_string exn))) 65 66let send_bundle t ~sw ~net bundle next_hop = 67 match find_peer t next_hop with 68 | None -> 69 Log.warn (fun m -> m "No peer for %a" Bundle.pp_eid next_hop); 70 Error "no peer" 71 | Some peer -> ( 72 connect_peer t ~sw ~net peer; 73 match peer.connection with 74 | None -> Error "not connected" 75 | Some conn -> ( 76 match Tcpcl_adapter.send_bundle conn bundle with 77 | Ok () -> 78 Log.debug (fun m -> m "Sent bundle to %a" Bundle.pp_eid next_hop); 79 Ok () 80 | Error e -> 81 Log.warn (fun m -> 82 m "Failed to send to %a: %s" Bundle.pp_eid next_hop e); 83 Error e)) 84 85let process_bundle t ~sw ~net bundle = 86 let result = Engine.process t.engine bundle ~tenant:None in 87 match result with 88 | Engine.Forward reqs -> 89 List.iter 90 (fun (req : Engine.forward_request) -> 91 ignore (send_bundle t ~sw ~net req.bundle req.next_hop)) 92 reqs 93 | Engine.Stored -> Log.debug (fun m -> m "Bundle stored") 94 | Engine.Delivered -> Log.info (fun m -> m "Bundle delivered locally") 95 | Engine.Dropped reason -> Log.info (fun m -> m "Bundle dropped: %s" reason) 96 | Engine.Admin_handled -> ( 97 match Admin.extract bundle with 98 | Error _ -> () 99 | Ok record -> ( 100 match Engine.process_admin t.engine record with 101 | None -> () 102 | Some response -> 103 (* Send response back *) 104 let source = bundle.Bundle.primary.source in 105 let dest = (Engine.config t.engine).admin_eid in 106 let timestamp : Bundle.timestamp = 107 { 108 time = Int64.of_float (Unix.gettimeofday () *. 1000.); 109 seq = 0L; 110 } 111 in 112 let resp_bundle = 113 Admin.make_bundle ~source:dest ~destination:source ~timestamp 114 response 115 in 116 ignore (send_bundle t ~sw ~net resp_bundle source))) 117 118let handle_connection t ~sw ~net socket = 119 let config = Engine.config t.engine in 120 let conn = Tcpcl_adapter.accept ~sw ~local_eid:config.local_eid socket in 121 if Tcpcl_adapter.is_connected conn then ( 122 Log.info (fun m -> 123 m "Accepted connection from %s" 124 (Option.value ~default:"unknown" (Tcpcl_adapter.peer_eid conn))); 125 (* Receive loop *) 126 let rec loop () = 127 match Tcpcl_adapter.receive conn with 128 | Ok bundle -> 129 process_bundle t ~sw ~net bundle; 130 loop () 131 | Error e -> 132 Log.debug (fun m -> m "Connection closed: %s" e); 133 Tcpcl_adapter.close_connection conn 134 in 135 loop ()) 136 137let listen t ~sw ~net ~port = 138 let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in 139 let socket = Eio.Net.listen ~sw ~backlog:10 ~reuse_addr:true net addr in 140 Log.info (fun m -> m "Listening on port %d" port); 141 let rec accept_loop () = 142 let conn, _addr = Eio.Net.accept ~sw socket in 143 Eio.Fiber.fork ~sw (fun () -> handle_connection t ~sw ~net conn); 144 accept_loop () 145 in 146 accept_loop () 147 148let check_contacts t ~sw ~net = 149 let plan = Engine.contact_plan t.engine in 150 let now = Unix.gettimeofday () in 151 let active = Cgr.Contact_plan.active_at plan ~time:now in 152 List.iter 153 (fun contact -> 154 let to_node = Cgr.Contact.to_ contact in 155 let name = Cgr.Node.name to_node in 156 match Hashtbl.find_opt t.peers name with 157 | None -> () 158 | Some peer -> 159 (* Connect if contact is active and we're not connected *) 160 connect_peer t ~sw ~net peer; 161 (* Release stored bundles *) 162 let reqs = Engine.on_contact_start t.engine contact in 163 List.iter 164 (fun (req : Engine.forward_request) -> 165 ignore (send_bundle t ~sw ~net req.bundle req.next_hop)) 166 reqs) 167 active 168 169let check_routes t ~sw ~net = 170 let reqs = Engine.check_routes t.engine in 171 List.iter 172 (fun (req : Engine.forward_request) -> 173 ignore (send_bundle t ~sw ~net req.bundle req.next_hop)) 174 reqs 175 176let periodic_tasks t ~sw ~net ~clock ~interval = 177 let rec loop () = 178 Eio.Time.sleep clock interval; 179 if t.running then ( 180 check_contacts t ~sw ~net; 181 check_routes t ~sw ~net; 182 let now = Eio.Time.now clock in 183 let _ = Engine.cleanup_expired t.engine ~current_time:now in 184 let stats = Engine.stats t.engine in 185 Log.debug (fun m -> m "Stats: %a" Engine.pp_stats stats); 186 loop ()) 187 in 188 loop () 189 190let run t ~sw ~net ~clock ~port = 191 t.running <- true; 192 Eio.Fiber.all 193 [ 194 (fun () -> listen t ~sw ~net ~port); 195 (fun () -> periodic_tasks t ~sw ~net ~clock ~interval:5.0); 196 ] 197 198let stop t = t.running <- false