DTN controller and policy language for satellite networks
at main 304 lines 9.9 kB view raw
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2025 Thomas Gazagnaire. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6(** Admin record CBOR schema. 7 8 Uses Cbort codecs for type-safe encoding/decoding. Wire format uses 9 integer-keyed maps for compactness (following COSE/CWT conventions). *) 10 11type status = { 12 node_id : Bundle.eid; 13 uptime_secs : float; 14 bundles_stored : int; 15 bundles_forwarded : int; 16 bundles_delivered : int; 17 bundles_dropped : int; 18 active_contacts : int; 19} 20 21type contact = { 22 from_node : string; 23 to_node : string; 24 start_time : float; 25 stop_time : float; 26 rate_bps : float; 27 owlt_secs : float; 28} 29 30type contact_plan = { contacts : contact list } 31type config_delta = { key : string; value : string option } 32 33type query = 34 | Query_status 35 | Query_contacts 36 | Query_policy 37 | Query_bundles of { filter : string option } 38 39type response = 40 | Response_status of status 41 | Response_contacts of contact_plan 42 | Response_policy of { source : string } 43 | Response_bundles of { count : int; bundle_ids : string list } 44 | Response_error of { code : int; message : string } 45 46type t = 47 | Status_report of status 48 | Policy_update of { compiled : string; source : string } 49 | Contact_update of contact_plan 50 | Config_update of config_delta list 51 | Query of query 52 | Response of response 53 54(** {1 CBOR Schema Definitions} 55 56 All records use integer keys for wire compactness. *) 57 58(** EID codec - delegates to Bundle module's CBOR representation. *) 59let eid_codec : Bundle.eid Cbort.t = 60 Cbort.conv 61 (fun cbor -> 62 match Bundle.eid_of_cbor cbor with 63 | Ok eid -> Ok eid 64 | Error msg -> Error msg) 65 Bundle.eid_to_cbor Cbort.any 66 67(** Status record codec. 68 69 Wire format (integer-keyed map): 70 {v 71 { 72 1: eid, // node_id 73 2: float, // uptime_secs 74 3: int, // bundles_stored 75 4: int, // bundles_forwarded 76 5: int, // bundles_delivered 77 6: int, // bundles_dropped 78 7: int // active_contacts 79 } 80 v} *) 81let status_codec : status Cbort.t = 82 let open Cbort.Obj_int in 83 finish 84 (let* node_id = mem 1 (fun s -> s.node_id) eid_codec in 85 let* uptime_secs = mem 2 (fun s -> s.uptime_secs) Cbort.float in 86 let* bundles_stored = mem 3 (fun s -> s.bundles_stored) Cbort.int in 87 let* bundles_forwarded = mem 4 (fun s -> s.bundles_forwarded) Cbort.int in 88 let* bundles_delivered = mem 5 (fun s -> s.bundles_delivered) Cbort.int in 89 let* bundles_dropped = mem 6 (fun s -> s.bundles_dropped) Cbort.int in 90 let* active_contacts = mem 7 (fun s -> s.active_contacts) Cbort.int in 91 return 92 { 93 node_id; 94 uptime_secs; 95 bundles_stored; 96 bundles_forwarded; 97 bundles_delivered; 98 bundles_dropped; 99 active_contacts; 100 }) 101 102(** Contact record codec. 103 104 Wire format (array for compactness): 105 {v [from_node, to_node, start_time, stop_time, rate_bps, owlt_secs] v} *) 106let contact_codec : contact Cbort.t = 107 let open Cbort in 108 let tuple6 a b c d e f = 109 conv 110 (fun (a, (b, (c, (d, (e, f))))) -> 111 Ok 112 { 113 from_node = a; 114 to_node = b; 115 start_time = c; 116 stop_time = d; 117 rate_bps = e; 118 owlt_secs = f; 119 }) 120 (fun r -> 121 ( r.from_node, 122 (r.to_node, (r.start_time, (r.stop_time, (r.rate_bps, r.owlt_secs)))) 123 )) 124 (tuple2 a (tuple2 b (tuple2 c (tuple2 d (tuple2 e f))))) 125 in 126 tuple6 string string float float float float 127 128(** Contact plan codec. *) 129let contact_plan_codec : contact_plan Cbort.t = 130 Cbort.map 131 (fun contacts -> { contacts }) 132 (fun cp -> cp.contacts) 133 (Cbort.array contact_codec) 134 135(** Config delta codec. 136 137 Wire format: [key, value] where value is text or null. *) 138let config_delta_codec : config_delta Cbort.t = 139 Cbort.map 140 (fun (key, value) -> { key; value }) 141 (fun d -> (d.key, d.value)) 142 (Cbort.tuple2 Cbort.string (Cbort.nullable Cbort.string)) 143 144(** Query codec using tag-based variants. 145 146 Wire format: 147 - Tag 0: Query_status (null payload) 148 - Tag 1: Query_contacts (null payload) 149 - Tag 2: Query_policy (null payload) 150 - Tag 3: Query_bundles (filter: text?) *) 151let query_codec : query Cbort.t = 152 let open Cbort.Variant in 153 variant 154 [ 155 case0 0 Query_status (function Query_status -> true | _ -> false); 156 case0 1 Query_contacts (function Query_contacts -> true | _ -> false); 157 case0 2 Query_policy (function Query_policy -> true | _ -> false); 158 case 3 159 (Cbort.nullable Cbort.string) 160 (fun filter -> Query_bundles { filter }) 161 (function Query_bundles { filter } -> Some filter | _ -> None); 162 ] 163 164(** Response codec using tag-based variants. 165 166 Wire format: 167 - Tag 0: Response_status (status) 168 - Tag 1: Response_contacts (contact_plan) 169 - Tag 2: Response_policy (source: text) 170 - Tag 3: Response_bundles (count: int, bundle_ids: text list) 171 - Tag 255: Response_error (code: int, message: text) *) 172let response_codec : response Cbort.t = 173 let open Cbort.Variant in 174 variant 175 [ 176 case 0 status_codec 177 (fun s -> Response_status s) 178 (function Response_status s -> Some s | _ -> None); 179 case 1 contact_plan_codec 180 (fun cp -> Response_contacts cp) 181 (function Response_contacts cp -> Some cp | _ -> None); 182 case 2 Cbort.string 183 (fun source -> Response_policy { source }) 184 (function Response_policy { source } -> Some source | _ -> None); 185 case 3 186 (Cbort.tuple2 Cbort.int (Cbort.array Cbort.string)) 187 (fun (count, bundle_ids) -> Response_bundles { count; bundle_ids }) 188 (function 189 | Response_bundles { count; bundle_ids } -> Some (count, bundle_ids) 190 | _ -> None); 191 case 255 192 (Cbort.tuple2 Cbort.int Cbort.string) 193 (fun (code, message) -> Response_error { code; message }) 194 (function 195 | Response_error { code; message } -> Some (code, message) | _ -> None); 196 ] 197 198(** Admin record codec using tag-based variants. 199 200 Wire format: 201 - Tag 1: Status_report (status) 202 - Tag 2: Policy_update (compiled: bytes, source: text) 203 - Tag 3: Contact_update (contact_plan) 204 - Tag 4: Config_update (config_delta list) 205 - Tag 5: Query (query) 206 - Tag 6: Response (response) *) 207let codec : t Cbort.t = 208 let open Cbort.Variant in 209 variant 210 [ 211 case 1 status_codec 212 (fun s -> Status_report s) 213 (function Status_report s -> Some s | _ -> None); 214 case 2 215 (Cbort.tuple2 Cbort.bytes Cbort.string) 216 (fun (compiled, source) -> Policy_update { compiled; source }) 217 (function 218 | Policy_update { compiled; source } -> Some (compiled, source) 219 | _ -> None); 220 case 3 contact_plan_codec 221 (fun cp -> Contact_update cp) 222 (function Contact_update cp -> Some cp | _ -> None); 223 case 4 224 (Cbort.array config_delta_codec) 225 (fun deltas -> Config_update deltas) 226 (function Config_update deltas -> Some deltas | _ -> None); 227 case 5 query_codec 228 (fun q -> Query q) 229 (function Query q -> Some q | _ -> None); 230 case 6 response_codec 231 (fun r -> Response r) 232 (function Response r -> Some r | _ -> None); 233 ] 234 235(** {1 Encoding/Decoding} *) 236 237let encode t = Cbort.encode_string codec t 238 239let decode bytes = 240 match Cbort.decode_string codec bytes with 241 | Ok v -> Ok v 242 | Error e -> Error (Cbort.Error.to_string e) 243 244(** {1 Bundle Helpers} *) 245 246let make_bundle ~source ~destination ~timestamp record = 247 let payload = encode record in 248 let flags = { Bundle.bundle_flags_default with is_admin_record = true } in 249 Bundle.v ~flags ~source ~destination ~creation_timestamp:timestamp ~payload () 250 251let extract bundle = 252 if not bundle.Bundle.primary.flags.is_admin_record then 253 Error "not an admin bundle" 254 else 255 match Bundle.payload bundle with 256 | None -> Error "no payload" 257 | Some payload -> decode payload 258 259let is_admin_bundle bundle = bundle.Bundle.primary.flags.is_admin_record 260 261(** {1 Pretty Printers} *) 262 263let pp_status ppf s = 264 Fmt.pf ppf 265 "@[<v>node: %a@,\ 266 uptime: %.0fs@,\ 267 stored: %d@,\ 268 forwarded: %d@,\ 269 delivered: %d@,\ 270 dropped: %d@,\ 271 contacts: %d@]" 272 Bundle.pp_eid s.node_id s.uptime_secs s.bundles_stored s.bundles_forwarded 273 s.bundles_delivered s.bundles_dropped s.active_contacts 274 275let pp_contact ppf c = 276 Fmt.pf ppf "%s->%s [%.0f-%.0f] @%.0f bps (owlt: %.3fs)" c.from_node c.to_node 277 c.start_time c.stop_time c.rate_bps c.owlt_secs 278 279let pp_query ppf = function 280 | Query_status -> Fmt.string ppf "status" 281 | Query_contacts -> Fmt.string ppf "contacts" 282 | Query_policy -> Fmt.string ppf "policy" 283 | Query_bundles { filter } -> 284 Fmt.pf ppf "bundles(%a)" Fmt.(option string) filter 285 286let pp_response ppf = function 287 | Response_status s -> Fmt.pf ppf "status: %a" pp_status s 288 | Response_contacts cp -> 289 Fmt.pf ppf "contacts: %a" Fmt.(list pp_contact) cp.contacts 290 | Response_policy { source } -> Fmt.pf ppf "policy: %s" source 291 | Response_bundles { count; bundle_ids } -> 292 Fmt.pf ppf "bundles: %d [%a]" count 293 Fmt.(list ~sep:comma string) 294 bundle_ids 295 | Response_error { code; message } -> Fmt.pf ppf "error(%d): %s" code message 296 297let pp ppf = function 298 | Status_report s -> Fmt.pf ppf "status_report(%a)" pp_status s 299 | Policy_update { source; _ } -> Fmt.pf ppf "policy_update(%s)" source 300 | Contact_update cp -> 301 Fmt.pf ppf "contact_update(%d)" (List.length cp.contacts) 302 | Config_update deltas -> Fmt.pf ppf "config_update(%d)" (List.length deltas) 303 | Query q -> Fmt.pf ppf "query(%a)" pp_query q 304 | Response r -> Fmt.pf ppf "response(%a)" pp_response r