Matrix protocol in OCaml, Eio specialised
at main 287 lines 7.7 kB view raw
1(** Send queue for serialized message sending and offline support. 2 3 This module provides: 4 - Offline message queueing 5 - Automatic retry with backoff 6 - Transaction ID tracking for deduplication 7 - Local echo support 8 - Media upload coordination 9 10 Each room has its own queue to serialize sends, preventing race conditions 11 and ensuring messages are sent in order. *) 12 13(** {1 Queue Request Types} *) 14 15(** Type of queued request. *) 16type request_kind = 17 | Event of { 18 event_type : string; 19 content : Jsont.json; 20 txn_id : string; 21 } 22 | MediaUpload of { 23 content_type : string; 24 data_size : int; 25 local_path : string option; 26 txn_id : string; 27 } 28 | Reaction of { 29 relates_to : Matrix_proto.Id.Event_id.t; 30 key : string; 31 txn_id : string; 32 } 33 | Redaction of { 34 event_id : Matrix_proto.Id.Event_id.t; 35 reason : string option; 36 txn_id : string; 37 } 38 39(** Request state. *) 40type request_state = 41 | Pending 42 | Sending 43 | Sent 44 | Failed of string 45 | Cancelled 46 47(** Queued request with metadata. *) 48type queued_request = { 49 id : int; 50 room_id : Matrix_proto.Id.Room_id.t; 51 kind : request_kind; 52 mutable state : request_state; 53 created_at : int64; 54 mutable retry_count : int; 55 mutable last_error : string option; 56 mutable depends_on : int option; 57 mutable dependents : int list; 58} 59 60(** Result of sending a request. *) 61type send_result = 62 | Sent_ok of { event_id : Matrix_proto.Id.Event_id.t option } 63 | Send_failed of { error : string; retryable : bool } 64 | Send_cancelled 65 66(** {1 Room Send Queue} *) 67 68(** Room-specific send queue. *) 69type room_send_queue = { 70 room_id : Matrix_proto.Id.Room_id.t; 71 mutable requests : queued_request list; 72 mutable next_id : int; 73 mutable enabled : bool; 74 mutable is_processing : bool; 75 max_retries : int; 76 retry_delay_ms : int; 77 mutable on_state_change : (queued_request -> unit) option; 78} 79 80(** {1 Send Handle} *) 81 82(** Handle for a queued request, allowing cancellation and status checks. *) 83type send_handle = { 84 request_id : int; 85 txn_id : string; 86 room_id : Matrix_proto.Id.Room_id.t; 87 queue : room_send_queue; 88} 89 90(** {1 Global Send Queue} *) 91 92(** Global send queue manager. *) 93type t = { 94 user_id : Matrix_proto.Id.User_id.t; 95 mutable room_queues : (string * room_send_queue) list; 96 mutable globally_enabled : bool; 97 mutable on_error : (queued_request -> string -> unit) option; 98} 99 100(** {1 Queue Creation} *) 101 102val create_room_queue : 103 room_id:Matrix_proto.Id.Room_id.t -> 104 ?max_retries:int -> 105 ?retry_delay_ms:int -> 106 unit -> 107 room_send_queue 108(** Create a new room send queue. *) 109 110val create : user_id:Matrix_proto.Id.User_id.t -> t 111(** Create a new global send queue manager. *) 112 113val get_room_queue : t -> Matrix_proto.Id.Room_id.t -> room_send_queue 114(** Get or create a room queue. *) 115 116(** {1 Enqueueing Requests} *) 117 118val generate_txn_id : unit -> string 119(** Generate a new transaction ID. *) 120 121val enqueue : room_send_queue -> request_kind -> send_handle 122(** Enqueue a request. *) 123 124val send_message : 125 t -> 126 room_id:Matrix_proto.Id.Room_id.t -> 127 event_type:string -> 128 content:Jsont.json -> 129 send_handle 130(** Enqueue a message event. *) 131 132val send_text : t -> room_id:Matrix_proto.Id.Room_id.t -> body:string -> send_handle 133(** Enqueue a text message. *) 134 135val send_reaction : 136 t -> 137 room_id:Matrix_proto.Id.Room_id.t -> 138 relates_to:Matrix_proto.Id.Event_id.t -> 139 key:string -> 140 send_handle 141(** Enqueue a reaction. *) 142 143val send_redaction : 144 t -> 145 room_id:Matrix_proto.Id.Room_id.t -> 146 event_id:Matrix_proto.Id.Event_id.t -> 147 ?reason:string -> 148 unit -> 149 send_handle 150(** Enqueue a redaction. *) 151 152(** {1 Dependencies} *) 153 154val add_dependency : parent:send_handle -> child:send_handle -> unit 155(** Add a dependency between requests. *) 156 157(** {1 Request State Management} *) 158 159val cancel : send_handle -> bool 160(** Cancel a queued request. Returns true if cancelled. *) 161 162val abort : send_handle -> bool 163(** Abort a request (cancel and remove). *) 164 165val get_request : send_handle -> queued_request option 166(** Get request by handle. *) 167 168val is_pending : send_handle -> bool 169(** Check if request is still pending. *) 170 171val is_sent : send_handle -> bool 172(** Check if request was sent. *) 173 174(** {1 Queue Processing} *) 175 176val next_sendable : room_send_queue -> queued_request option 177(** Get next sendable request from queue. *) 178 179val mark_sending : room_send_queue -> queued_request -> unit 180(** Mark request as being sent. *) 181 182val mark_sent : room_send_queue -> queued_request -> unit 183(** Mark request as successfully sent. *) 184 185val mark_failed : room_send_queue -> queued_request -> string -> retryable:bool -> unit 186(** Mark request as failed with optional retry. *) 187 188val cleanup_queue : room_send_queue -> unit 189(** Remove completed/cancelled/failed requests. *) 190 191(** {1 Queue Statistics} *) 192 193val pending_count : room_send_queue -> int 194(** Count of pending requests in a room queue. *) 195 196val total_pending : t -> int 197(** Count of all pending requests across all rooms. *) 198 199val pending_requests : room_send_queue -> queued_request list 200(** Get all pending requests for a room. *) 201 202val failed_requests : room_send_queue -> queued_request list 203(** Get all failed requests for a room. *) 204 205(** {1 Queue Control} *) 206 207val set_room_enabled : room_send_queue -> bool -> unit 208(** Enable/disable a room queue. *) 209 210val set_enabled : t -> bool -> unit 211(** Enable/disable all queues globally. *) 212 213val is_enabled : t -> bool 214(** Check if globally enabled. *) 215 216val is_room_enabled : room_send_queue -> bool 217(** Check if a room queue is enabled. *) 218 219(** {1 Event Callbacks} *) 220 221val on_state_change : room_send_queue -> (queued_request -> unit) -> unit 222(** Set callback for state changes. *) 223 224val on_error : t -> (queued_request -> string -> unit) -> unit 225(** Set global error callback. *) 226 227(** {1 Persistence} *) 228 229(** Serializable queue state. *) 230type persisted_request = { 231 p_room_id : string; 232 p_kind : request_kind; 233 p_created_at : int64; 234 p_retry_count : int; 235 p_depends_on : int option; 236} 237 238val request_to_persisted : queued_request -> persisted_request 239(** Convert request to persistable form. *) 240 241val requests_to_persist : t -> persisted_request list 242(** Get all pending requests for persistence. *) 243 244val restore_requests : t -> persisted_request list -> unit 245(** Restore requests from persistence. *) 246 247(** {1 Local Echo} *) 248 249val local_echo_event : queued_request -> (string * Jsont.json * string) option 250(** Create a local echo event from a queued request. 251 Returns [(event_type, content, txn_id)] or [None]. *) 252 253val matches_txn_id : queued_request -> event_id:Matrix_proto.Id.Event_id.t -> bool 254(** Check if an event_id matches a transaction ID (for local echo replacement). *) 255 256(** {1 Retry Logic} *) 257 258val retry_delay : room_send_queue -> queued_request -> int 259(** Calculate delay for next retry (exponential backoff). *) 260 261val should_retry : room_send_queue -> queued_request -> bool 262(** Check if a request should be retried. *) 263 264(** {1 Media Upload Support} *) 265 266val send_media : 267 t -> 268 room_id:Matrix_proto.Id.Room_id.t -> 269 content_type:string -> 270 data_size:int -> 271 ?local_path:string -> 272 event_content:Jsont.json -> 273 unit -> 274 send_handle * send_handle 275(** Create a media upload request with dependent event send. 276 Returns [(upload_handle, event_handle)]. *) 277 278(** {1 Internal} *) 279 280val txn_id_of_kind : request_kind -> string 281(** Get transaction ID from request kind. *) 282 283val dependencies_satisfied : room_send_queue -> queued_request -> bool 284(** Check if a request's dependencies are satisfied. *) 285 286val update_state : room_send_queue -> queued_request -> request_state -> unit 287(** Update request state. *)