(*--------------------------------------------------------------------------- Copyright (c) 2025 Thomas Gazagnaire. All rights reserved. SPDX-License-Identifier: ISC ---------------------------------------------------------------------------*) (** Admin record CBOR schema. Uses Cbort codecs for type-safe encoding/decoding. Wire format uses integer-keyed maps for compactness (following COSE/CWT conventions). *) type status = { node_id : Bundle.eid; uptime_secs : float; bundles_stored : int; bundles_forwarded : int; bundles_delivered : int; bundles_dropped : int; active_contacts : int; } type contact = { from_node : string; to_node : string; start_time : float; stop_time : float; rate_bps : float; owlt_secs : float; } type contact_plan = { contacts : contact list } type config_delta = { key : string; value : string option } type query = | Query_status | Query_contacts | Query_policy | Query_bundles of { filter : string option } type response = | Response_status of status | Response_contacts of contact_plan | Response_policy of { source : string } | Response_bundles of { count : int; bundle_ids : string list } | Response_error of { code : int; message : string } type t = | Status_report of status | Policy_update of { compiled : string; source : string } | Contact_update of contact_plan | Config_update of config_delta list | Query of query | Response of response (** {1 CBOR Schema Definitions} All records use integer keys for wire compactness. *) (** EID codec - delegates to Bundle module's CBOR representation. *) let eid_codec : Bundle.eid Cbort.t = Cbort.conv (fun cbor -> match Bundle.eid_of_cbor cbor with | Ok eid -> Ok eid | Error msg -> Error msg) Bundle.eid_to_cbor Cbort.any (** Status record codec. Wire format (integer-keyed map): {v { 1: eid, // node_id 2: float, // uptime_secs 3: int, // bundles_stored 4: int, // bundles_forwarded 5: int, // bundles_delivered 6: int, // bundles_dropped 7: int // active_contacts } v} *) let status_codec : status Cbort.t = let open Cbort.Obj_int in finish (let* node_id = mem 1 (fun s -> s.node_id) eid_codec in let* uptime_secs = mem 2 (fun s -> s.uptime_secs) Cbort.float in let* bundles_stored = mem 3 (fun s -> s.bundles_stored) Cbort.int in let* bundles_forwarded = mem 4 (fun s -> s.bundles_forwarded) Cbort.int in let* bundles_delivered = mem 5 (fun s -> s.bundles_delivered) Cbort.int in let* bundles_dropped = mem 6 (fun s -> s.bundles_dropped) Cbort.int in let* active_contacts = mem 7 (fun s -> s.active_contacts) Cbort.int in return { node_id; uptime_secs; bundles_stored; bundles_forwarded; bundles_delivered; bundles_dropped; active_contacts; }) (** Contact record codec. Wire format (array for compactness): {v [from_node, to_node, start_time, stop_time, rate_bps, owlt_secs] v} *) let contact_codec : contact Cbort.t = let open Cbort in let tuple6 a b c d e f = conv (fun (a, (b, (c, (d, (e, f))))) -> Ok { from_node = a; to_node = b; start_time = c; stop_time = d; rate_bps = e; owlt_secs = f; }) (fun r -> ( r.from_node, (r.to_node, (r.start_time, (r.stop_time, (r.rate_bps, r.owlt_secs)))) )) (tuple2 a (tuple2 b (tuple2 c (tuple2 d (tuple2 e f))))) in tuple6 string string float float float float (** Contact plan codec. *) let contact_plan_codec : contact_plan Cbort.t = Cbort.map (fun contacts -> { contacts }) (fun cp -> cp.contacts) (Cbort.array contact_codec) (** Config delta codec. Wire format: [key, value] where value is text or null. *) let config_delta_codec : config_delta Cbort.t = Cbort.map (fun (key, value) -> { key; value }) (fun d -> (d.key, d.value)) (Cbort.tuple2 Cbort.string (Cbort.nullable Cbort.string)) (** Query codec using tag-based variants. Wire format: - Tag 0: Query_status (null payload) - Tag 1: Query_contacts (null payload) - Tag 2: Query_policy (null payload) - Tag 3: Query_bundles (filter: text?) *) let query_codec : query Cbort.t = let open Cbort.Variant in variant [ case0 0 Query_status (function Query_status -> true | _ -> false); case0 1 Query_contacts (function Query_contacts -> true | _ -> false); case0 2 Query_policy (function Query_policy -> true | _ -> false); case 3 (Cbort.nullable Cbort.string) (fun filter -> Query_bundles { filter }) (function Query_bundles { filter } -> Some filter | _ -> None); ] (** Response codec using tag-based variants. Wire format: - Tag 0: Response_status (status) - Tag 1: Response_contacts (contact_plan) - Tag 2: Response_policy (source: text) - Tag 3: Response_bundles (count: int, bundle_ids: text list) - Tag 255: Response_error (code: int, message: text) *) let response_codec : response Cbort.t = let open Cbort.Variant in variant [ case 0 status_codec (fun s -> Response_status s) (function Response_status s -> Some s | _ -> None); case 1 contact_plan_codec (fun cp -> Response_contacts cp) (function Response_contacts cp -> Some cp | _ -> None); case 2 Cbort.string (fun source -> Response_policy { source }) (function Response_policy { source } -> Some source | _ -> None); case 3 (Cbort.tuple2 Cbort.int (Cbort.array Cbort.string)) (fun (count, bundle_ids) -> Response_bundles { count; bundle_ids }) (function | Response_bundles { count; bundle_ids } -> Some (count, bundle_ids) | _ -> None); case 255 (Cbort.tuple2 Cbort.int Cbort.string) (fun (code, message) -> Response_error { code; message }) (function | Response_error { code; message } -> Some (code, message) | _ -> None); ] (** Admin record codec using tag-based variants. Wire format: - Tag 1: Status_report (status) - Tag 2: Policy_update (compiled: bytes, source: text) - Tag 3: Contact_update (contact_plan) - Tag 4: Config_update (config_delta list) - Tag 5: Query (query) - Tag 6: Response (response) *) let codec : t Cbort.t = let open Cbort.Variant in variant [ case 1 status_codec (fun s -> Status_report s) (function Status_report s -> Some s | _ -> None); case 2 (Cbort.tuple2 Cbort.bytes Cbort.string) (fun (compiled, source) -> Policy_update { compiled; source }) (function | Policy_update { compiled; source } -> Some (compiled, source) | _ -> None); case 3 contact_plan_codec (fun cp -> Contact_update cp) (function Contact_update cp -> Some cp | _ -> None); case 4 (Cbort.array config_delta_codec) (fun deltas -> Config_update deltas) (function Config_update deltas -> Some deltas | _ -> None); case 5 query_codec (fun q -> Query q) (function Query q -> Some q | _ -> None); case 6 response_codec (fun r -> Response r) (function Response r -> Some r | _ -> None); ] (** {1 Encoding/Decoding} *) let encode t = Cbort.encode_string codec t let decode bytes = match Cbort.decode_string codec bytes with | Ok v -> Ok v | Error e -> Error (Cbort.Error.to_string e) (** {1 Bundle Helpers} *) let make_bundle ~source ~destination ~timestamp record = let payload = encode record in let flags = { Bundle.bundle_flags_default with is_admin_record = true } in Bundle.v ~flags ~source ~destination ~creation_timestamp:timestamp ~payload () let extract bundle = if not bundle.Bundle.primary.flags.is_admin_record then Error "not an admin bundle" else match Bundle.payload bundle with | None -> Error "no payload" | Some payload -> decode payload let is_admin_bundle bundle = bundle.Bundle.primary.flags.is_admin_record (** {1 Pretty Printers} *) let pp_status ppf s = Fmt.pf ppf "@[node: %a@,\ uptime: %.0fs@,\ stored: %d@,\ forwarded: %d@,\ delivered: %d@,\ dropped: %d@,\ contacts: %d@]" Bundle.pp_eid s.node_id s.uptime_secs s.bundles_stored s.bundles_forwarded s.bundles_delivered s.bundles_dropped s.active_contacts let pp_contact ppf c = Fmt.pf ppf "%s->%s [%.0f-%.0f] @%.0f bps (owlt: %.3fs)" c.from_node c.to_node c.start_time c.stop_time c.rate_bps c.owlt_secs let pp_query ppf = function | Query_status -> Fmt.string ppf "status" | Query_contacts -> Fmt.string ppf "contacts" | Query_policy -> Fmt.string ppf "policy" | Query_bundles { filter } -> Fmt.pf ppf "bundles(%a)" Fmt.(option string) filter let pp_response ppf = function | Response_status s -> Fmt.pf ppf "status: %a" pp_status s | Response_contacts cp -> Fmt.pf ppf "contacts: %a" Fmt.(list pp_contact) cp.contacts | Response_policy { source } -> Fmt.pf ppf "policy: %s" source | Response_bundles { count; bundle_ids } -> Fmt.pf ppf "bundles: %d [%a]" count Fmt.(list ~sep:comma string) bundle_ids | Response_error { code; message } -> Fmt.pf ppf "error(%d): %s" code message let pp ppf = function | Status_report s -> Fmt.pf ppf "status_report(%a)" pp_status s | Policy_update { source; _ } -> Fmt.pf ppf "policy_update(%s)" source | Contact_update cp -> Fmt.pf ppf "contact_update(%d)" (List.length cp.contacts) | Config_update deltas -> Fmt.pf ppf "config_update(%d)" (List.length deltas) | Query q -> Fmt.pf ppf "query(%a)" pp_query q | Response r -> Fmt.pf ppf "response(%a)" pp_response r