DTN controller and policy language for satellite networks
1(*---------------------------------------------------------------------------
2 Copyright (c) 2025 Thomas Gazagnaire. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6(** Admin record CBOR schema.
7
8 Uses Cbort codecs for type-safe encoding/decoding. Wire format uses
9 integer-keyed maps for compactness (following COSE/CWT conventions). *)
10
11type status = {
12 node_id : Bundle.eid;
13 uptime_secs : float;
14 bundles_stored : int;
15 bundles_forwarded : int;
16 bundles_delivered : int;
17 bundles_dropped : int;
18 active_contacts : int;
19}
20
21type contact = {
22 from_node : string;
23 to_node : string;
24 start_time : float;
25 stop_time : float;
26 rate_bps : float;
27 owlt_secs : float;
28}
29
30type contact_plan = { contacts : contact list }
31type config_delta = { key : string; value : string option }
32
33type query =
34 | Query_status
35 | Query_contacts
36 | Query_policy
37 | Query_bundles of { filter : string option }
38
39type response =
40 | Response_status of status
41 | Response_contacts of contact_plan
42 | Response_policy of { source : string }
43 | Response_bundles of { count : int; bundle_ids : string list }
44 | Response_error of { code : int; message : string }
45
46type t =
47 | Status_report of status
48 | Policy_update of { compiled : string; source : string }
49 | Contact_update of contact_plan
50 | Config_update of config_delta list
51 | Query of query
52 | Response of response
53
54(** {1 CBOR Schema Definitions}
55
56 All records use integer keys for wire compactness. *)
57
58(** EID codec - delegates to Bundle module's CBOR representation. *)
59let eid_codec : Bundle.eid Cbort.t =
60 Cbort.conv
61 (fun cbor ->
62 match Bundle.eid_of_cbor cbor with
63 | Ok eid -> Ok eid
64 | Error msg -> Error msg)
65 Bundle.eid_to_cbor Cbort.any
66
67(** Status record codec.
68
69 Wire format (integer-keyed map):
70 {v
71 {
72 1: eid, // node_id
73 2: float, // uptime_secs
74 3: int, // bundles_stored
75 4: int, // bundles_forwarded
76 5: int, // bundles_delivered
77 6: int, // bundles_dropped
78 7: int // active_contacts
79 }
80 v} *)
81let status_codec : status Cbort.t =
82 let open Cbort.Obj_int in
83 finish
84 (let* node_id = mem 1 (fun s -> s.node_id) eid_codec in
85 let* uptime_secs = mem 2 (fun s -> s.uptime_secs) Cbort.float in
86 let* bundles_stored = mem 3 (fun s -> s.bundles_stored) Cbort.int in
87 let* bundles_forwarded = mem 4 (fun s -> s.bundles_forwarded) Cbort.int in
88 let* bundles_delivered = mem 5 (fun s -> s.bundles_delivered) Cbort.int in
89 let* bundles_dropped = mem 6 (fun s -> s.bundles_dropped) Cbort.int in
90 let* active_contacts = mem 7 (fun s -> s.active_contacts) Cbort.int in
91 return
92 {
93 node_id;
94 uptime_secs;
95 bundles_stored;
96 bundles_forwarded;
97 bundles_delivered;
98 bundles_dropped;
99 active_contacts;
100 })
101
102(** Contact record codec.
103
104 Wire format (array for compactness):
105 {v [from_node, to_node, start_time, stop_time, rate_bps, owlt_secs] v} *)
106let contact_codec : contact Cbort.t =
107 let open Cbort in
108 let tuple6 a b c d e f =
109 conv
110 (fun (a, (b, (c, (d, (e, f))))) ->
111 Ok
112 {
113 from_node = a;
114 to_node = b;
115 start_time = c;
116 stop_time = d;
117 rate_bps = e;
118 owlt_secs = f;
119 })
120 (fun r ->
121 ( r.from_node,
122 (r.to_node, (r.start_time, (r.stop_time, (r.rate_bps, r.owlt_secs))))
123 ))
124 (tuple2 a (tuple2 b (tuple2 c (tuple2 d (tuple2 e f)))))
125 in
126 tuple6 string string float float float float
127
128(** Contact plan codec. *)
129let contact_plan_codec : contact_plan Cbort.t =
130 Cbort.map
131 (fun contacts -> { contacts })
132 (fun cp -> cp.contacts)
133 (Cbort.array contact_codec)
134
135(** Config delta codec.
136
137 Wire format: [key, value] where value is text or null. *)
138let config_delta_codec : config_delta Cbort.t =
139 Cbort.map
140 (fun (key, value) -> { key; value })
141 (fun d -> (d.key, d.value))
142 (Cbort.tuple2 Cbort.string (Cbort.nullable Cbort.string))
143
144(** Query codec using tag-based variants.
145
146 Wire format:
147 - Tag 0: Query_status (null payload)
148 - Tag 1: Query_contacts (null payload)
149 - Tag 2: Query_policy (null payload)
150 - Tag 3: Query_bundles (filter: text?) *)
151let query_codec : query Cbort.t =
152 let open Cbort.Variant in
153 variant
154 [
155 case0 0 Query_status (function Query_status -> true | _ -> false);
156 case0 1 Query_contacts (function Query_contacts -> true | _ -> false);
157 case0 2 Query_policy (function Query_policy -> true | _ -> false);
158 case 3
159 (Cbort.nullable Cbort.string)
160 (fun filter -> Query_bundles { filter })
161 (function Query_bundles { filter } -> Some filter | _ -> None);
162 ]
163
164(** Response codec using tag-based variants.
165
166 Wire format:
167 - Tag 0: Response_status (status)
168 - Tag 1: Response_contacts (contact_plan)
169 - Tag 2: Response_policy (source: text)
170 - Tag 3: Response_bundles (count: int, bundle_ids: text list)
171 - Tag 255: Response_error (code: int, message: text) *)
172let response_codec : response Cbort.t =
173 let open Cbort.Variant in
174 variant
175 [
176 case 0 status_codec
177 (fun s -> Response_status s)
178 (function Response_status s -> Some s | _ -> None);
179 case 1 contact_plan_codec
180 (fun cp -> Response_contacts cp)
181 (function Response_contacts cp -> Some cp | _ -> None);
182 case 2 Cbort.string
183 (fun source -> Response_policy { source })
184 (function Response_policy { source } -> Some source | _ -> None);
185 case 3
186 (Cbort.tuple2 Cbort.int (Cbort.array Cbort.string))
187 (fun (count, bundle_ids) -> Response_bundles { count; bundle_ids })
188 (function
189 | Response_bundles { count; bundle_ids } -> Some (count, bundle_ids)
190 | _ -> None);
191 case 255
192 (Cbort.tuple2 Cbort.int Cbort.string)
193 (fun (code, message) -> Response_error { code; message })
194 (function
195 | Response_error { code; message } -> Some (code, message) | _ -> None);
196 ]
197
198(** Admin record codec using tag-based variants.
199
200 Wire format:
201 - Tag 1: Status_report (status)
202 - Tag 2: Policy_update (compiled: bytes, source: text)
203 - Tag 3: Contact_update (contact_plan)
204 - Tag 4: Config_update (config_delta list)
205 - Tag 5: Query (query)
206 - Tag 6: Response (response) *)
207let codec : t Cbort.t =
208 let open Cbort.Variant in
209 variant
210 [
211 case 1 status_codec
212 (fun s -> Status_report s)
213 (function Status_report s -> Some s | _ -> None);
214 case 2
215 (Cbort.tuple2 Cbort.bytes Cbort.string)
216 (fun (compiled, source) -> Policy_update { compiled; source })
217 (function
218 | Policy_update { compiled; source } -> Some (compiled, source)
219 | _ -> None);
220 case 3 contact_plan_codec
221 (fun cp -> Contact_update cp)
222 (function Contact_update cp -> Some cp | _ -> None);
223 case 4
224 (Cbort.array config_delta_codec)
225 (fun deltas -> Config_update deltas)
226 (function Config_update deltas -> Some deltas | _ -> None);
227 case 5 query_codec
228 (fun q -> Query q)
229 (function Query q -> Some q | _ -> None);
230 case 6 response_codec
231 (fun r -> Response r)
232 (function Response r -> Some r | _ -> None);
233 ]
234
235(** {1 Encoding/Decoding} *)
236
237let encode t = Cbort.encode_string codec t
238
239let decode bytes =
240 match Cbort.decode_string codec bytes with
241 | Ok v -> Ok v
242 | Error e -> Error (Cbort.Error.to_string e)
243
244(** {1 Bundle Helpers} *)
245
246let make_bundle ~source ~destination ~timestamp record =
247 let payload = encode record in
248 let flags = { Bundle.bundle_flags_default with is_admin_record = true } in
249 Bundle.v ~flags ~source ~destination ~creation_timestamp:timestamp ~payload ()
250
251let extract bundle =
252 if not bundle.Bundle.primary.flags.is_admin_record then
253 Error "not an admin bundle"
254 else
255 match Bundle.payload bundle with
256 | None -> Error "no payload"
257 | Some payload -> decode payload
258
259let is_admin_bundle bundle = bundle.Bundle.primary.flags.is_admin_record
260
261(** {1 Pretty Printers} *)
262
263let pp_status ppf s =
264 Fmt.pf ppf
265 "@[<v>node: %a@,\
266 uptime: %.0fs@,\
267 stored: %d@,\
268 forwarded: %d@,\
269 delivered: %d@,\
270 dropped: %d@,\
271 contacts: %d@]"
272 Bundle.pp_eid s.node_id s.uptime_secs s.bundles_stored s.bundles_forwarded
273 s.bundles_delivered s.bundles_dropped s.active_contacts
274
275let pp_contact ppf c =
276 Fmt.pf ppf "%s->%s [%.0f-%.0f] @%.0f bps (owlt: %.3fs)" c.from_node c.to_node
277 c.start_time c.stop_time c.rate_bps c.owlt_secs
278
279let pp_query ppf = function
280 | Query_status -> Fmt.string ppf "status"
281 | Query_contacts -> Fmt.string ppf "contacts"
282 | Query_policy -> Fmt.string ppf "policy"
283 | Query_bundles { filter } ->
284 Fmt.pf ppf "bundles(%a)" Fmt.(option string) filter
285
286let pp_response ppf = function
287 | Response_status s -> Fmt.pf ppf "status: %a" pp_status s
288 | Response_contacts cp ->
289 Fmt.pf ppf "contacts: %a" Fmt.(list pp_contact) cp.contacts
290 | Response_policy { source } -> Fmt.pf ppf "policy: %s" source
291 | Response_bundles { count; bundle_ids } ->
292 Fmt.pf ppf "bundles: %d [%a]" count
293 Fmt.(list ~sep:comma string)
294 bundle_ids
295 | Response_error { code; message } -> Fmt.pf ppf "error(%d): %s" code message
296
297let pp ppf = function
298 | Status_report s -> Fmt.pf ppf "status_report(%a)" pp_status s
299 | Policy_update { source; _ } -> Fmt.pf ppf "policy_update(%s)" source
300 | Contact_update cp ->
301 Fmt.pf ppf "contact_update(%d)" (List.length cp.contacts)
302 | Config_update deltas -> Fmt.pf ppf "config_update(%d)" (List.length deltas)
303 | Query q -> Fmt.pf ppf "query(%a)" pp_query q
304 | Response r -> Fmt.pf ppf "response(%a)" pp_response r