Matrix protocol in OCaml, Eio specialised
1(** Send queue for serialized message sending and offline support.
2
3 This module provides:
4 - Offline message queueing
5 - Automatic retry with backoff
6 - Transaction ID tracking for deduplication
7 - Local echo support
8 - Media upload coordination
9
10 Each room has its own queue to serialize sends, preventing race conditions
11 and ensuring messages are sent in order. *)
12
13(** {1 Queue Request Types} *)
14
15(** Type of queued request. *)
16type request_kind =
17 | Event of {
18 event_type : string;
19 content : Jsont.json;
20 txn_id : string;
21 }
22 | MediaUpload of {
23 content_type : string;
24 data_size : int;
25 local_path : string option;
26 txn_id : string;
27 }
28 | Reaction of {
29 relates_to : Matrix_proto.Id.Event_id.t;
30 key : string;
31 txn_id : string;
32 }
33 | Redaction of {
34 event_id : Matrix_proto.Id.Event_id.t;
35 reason : string option;
36 txn_id : string;
37 }
38
39(** Request state. *)
40type request_state =
41 | Pending
42 | Sending
43 | Sent
44 | Failed of string
45 | Cancelled
46
47(** Queued request with metadata. *)
48type queued_request = {
49 id : int;
50 room_id : Matrix_proto.Id.Room_id.t;
51 kind : request_kind;
52 mutable state : request_state;
53 created_at : int64;
54 mutable retry_count : int;
55 mutable last_error : string option;
56 mutable depends_on : int option;
57 mutable dependents : int list;
58}
59
60(** Result of sending a request. *)
61type send_result =
62 | Sent_ok of { event_id : Matrix_proto.Id.Event_id.t option }
63 | Send_failed of { error : string; retryable : bool }
64 | Send_cancelled
65
66(** {1 Room Send Queue} *)
67
68(** Room-specific send queue. *)
69type room_send_queue = {
70 room_id : Matrix_proto.Id.Room_id.t;
71 mutable requests : queued_request list;
72 mutable next_id : int;
73 mutable enabled : bool;
74 mutable is_processing : bool;
75 max_retries : int;
76 retry_delay_ms : int;
77 mutable on_state_change : (queued_request -> unit) option;
78}
79
80(** {1 Send Handle} *)
81
82(** Handle for a queued request, allowing cancellation and status checks. *)
83type send_handle = {
84 request_id : int;
85 txn_id : string;
86 room_id : Matrix_proto.Id.Room_id.t;
87 queue : room_send_queue;
88}
89
90(** {1 Global Send Queue} *)
91
92(** Global send queue manager. *)
93type t = {
94 user_id : Matrix_proto.Id.User_id.t;
95 mutable room_queues : (string * room_send_queue) list;
96 mutable globally_enabled : bool;
97 mutable on_error : (queued_request -> string -> unit) option;
98}
99
100(** {1 Queue Creation} *)
101
102val create_room_queue :
103 room_id:Matrix_proto.Id.Room_id.t ->
104 ?max_retries:int ->
105 ?retry_delay_ms:int ->
106 unit ->
107 room_send_queue
108(** Create a new room send queue. *)
109
110val create : user_id:Matrix_proto.Id.User_id.t -> t
111(** Create a new global send queue manager. *)
112
113val get_room_queue : t -> Matrix_proto.Id.Room_id.t -> room_send_queue
114(** Get or create a room queue. *)
115
116(** {1 Enqueueing Requests} *)
117
118val generate_txn_id : unit -> string
119(** Generate a new transaction ID. *)
120
121val enqueue : room_send_queue -> request_kind -> send_handle
122(** Enqueue a request. *)
123
124val send_message :
125 t ->
126 room_id:Matrix_proto.Id.Room_id.t ->
127 event_type:string ->
128 content:Jsont.json ->
129 send_handle
130(** Enqueue a message event. *)
131
132val send_text : t -> room_id:Matrix_proto.Id.Room_id.t -> body:string -> send_handle
133(** Enqueue a text message. *)
134
135val send_reaction :
136 t ->
137 room_id:Matrix_proto.Id.Room_id.t ->
138 relates_to:Matrix_proto.Id.Event_id.t ->
139 key:string ->
140 send_handle
141(** Enqueue a reaction. *)
142
143val send_redaction :
144 t ->
145 room_id:Matrix_proto.Id.Room_id.t ->
146 event_id:Matrix_proto.Id.Event_id.t ->
147 ?reason:string ->
148 unit ->
149 send_handle
150(** Enqueue a redaction. *)
151
152(** {1 Dependencies} *)
153
154val add_dependency : parent:send_handle -> child:send_handle -> unit
155(** Add a dependency between requests. *)
156
157(** {1 Request State Management} *)
158
159val cancel : send_handle -> bool
160(** Cancel a queued request. Returns true if cancelled. *)
161
162val abort : send_handle -> bool
163(** Abort a request (cancel and remove). *)
164
165val get_request : send_handle -> queued_request option
166(** Get request by handle. *)
167
168val is_pending : send_handle -> bool
169(** Check if request is still pending. *)
170
171val is_sent : send_handle -> bool
172(** Check if request was sent. *)
173
174(** {1 Queue Processing} *)
175
176val next_sendable : room_send_queue -> queued_request option
177(** Get next sendable request from queue. *)
178
179val mark_sending : room_send_queue -> queued_request -> unit
180(** Mark request as being sent. *)
181
182val mark_sent : room_send_queue -> queued_request -> unit
183(** Mark request as successfully sent. *)
184
185val mark_failed : room_send_queue -> queued_request -> string -> retryable:bool -> unit
186(** Mark request as failed with optional retry. *)
187
188val cleanup_queue : room_send_queue -> unit
189(** Remove completed/cancelled/failed requests. *)
190
191(** {1 Queue Statistics} *)
192
193val pending_count : room_send_queue -> int
194(** Count of pending requests in a room queue. *)
195
196val total_pending : t -> int
197(** Count of all pending requests across all rooms. *)
198
199val pending_requests : room_send_queue -> queued_request list
200(** Get all pending requests for a room. *)
201
202val failed_requests : room_send_queue -> queued_request list
203(** Get all failed requests for a room. *)
204
205(** {1 Queue Control} *)
206
207val set_room_enabled : room_send_queue -> bool -> unit
208(** Enable/disable a room queue. *)
209
210val set_enabled : t -> bool -> unit
211(** Enable/disable all queues globally. *)
212
213val is_enabled : t -> bool
214(** Check if globally enabled. *)
215
216val is_room_enabled : room_send_queue -> bool
217(** Check if a room queue is enabled. *)
218
219(** {1 Event Callbacks} *)
220
221val on_state_change : room_send_queue -> (queued_request -> unit) -> unit
222(** Set callback for state changes. *)
223
224val on_error : t -> (queued_request -> string -> unit) -> unit
225(** Set global error callback. *)
226
227(** {1 Persistence} *)
228
229(** Serializable queue state. *)
230type persisted_request = {
231 p_room_id : string;
232 p_kind : request_kind;
233 p_created_at : int64;
234 p_retry_count : int;
235 p_depends_on : int option;
236}
237
238val request_to_persisted : queued_request -> persisted_request
239(** Convert request to persistable form. *)
240
241val requests_to_persist : t -> persisted_request list
242(** Get all pending requests for persistence. *)
243
244val restore_requests : t -> persisted_request list -> unit
245(** Restore requests from persistence. *)
246
247(** {1 Local Echo} *)
248
249val local_echo_event : queued_request -> (string * Jsont.json * string) option
250(** Create a local echo event from a queued request.
251 Returns [(event_type, content, txn_id)] or [None]. *)
252
253val matches_txn_id : queued_request -> event_id:Matrix_proto.Id.Event_id.t -> bool
254(** Check if an event_id matches a transaction ID (for local echo replacement). *)
255
256(** {1 Retry Logic} *)
257
258val retry_delay : room_send_queue -> queued_request -> int
259(** Calculate delay for next retry (exponential backoff). *)
260
261val should_retry : room_send_queue -> queued_request -> bool
262(** Check if a request should be retried. *)
263
264(** {1 Media Upload Support} *)
265
266val send_media :
267 t ->
268 room_id:Matrix_proto.Id.Room_id.t ->
269 content_type:string ->
270 data_size:int ->
271 ?local_path:string ->
272 event_content:Jsont.json ->
273 unit ->
274 send_handle * send_handle
275(** Create a media upload request with dependent event send.
276 Returns [(upload_handle, event_handle)]. *)
277
278(** {1 Internal} *)
279
280val txn_id_of_kind : request_kind -> string
281(** Get transaction ID from request kind. *)
282
283val dependencies_satisfied : room_send_queue -> queued_request -> bool
284(** Check if a request's dependencies are satisfied. *)
285
286val update_state : room_send_queue -> queued_request -> request_state -> unit
287(** Update request state. *)