Matrix protocol in OCaml, Eio specialised
1(** Send queue for serialized message sending and offline support.
2
3 This module provides:
4 - Offline message queueing
5 - Automatic retry with backoff
6 - Transaction ID tracking for deduplication
7 - Local echo support
8 - Media upload coordination
9
10 Each room has its own queue to serialize sends, preventing race conditions
11 and ensuring messages are sent in order. *)
12
13(** {1 Queue Request Types} *)
14
15(** Type of queued request *)
16type request_kind =
17 | Event of {
18 event_type : string;
19 content : Jsont.json;
20 txn_id : string;
21 }
22 | MediaUpload of {
23 content_type : string;
24 data_size : int;
25 local_path : string option;
26 txn_id : string;
27 }
28 | Reaction of {
29 relates_to : Matrix_proto.Id.Event_id.t;
30 key : string;
31 txn_id : string;
32 }
33 | Redaction of {
34 event_id : Matrix_proto.Id.Event_id.t;
35 reason : string option;
36 txn_id : string;
37 }
38
39(** Get transaction ID from request kind *)
40let txn_id_of_kind = function
41 | Event { txn_id; _ } -> txn_id
42 | MediaUpload { txn_id; _ } -> txn_id
43 | Reaction { txn_id; _ } -> txn_id
44 | Redaction { txn_id; _ } -> txn_id
45
46(** Request state *)
47type request_state =
48 | Pending (** Waiting to be sent *)
49 | Sending (** Currently being sent *)
50 | Sent (** Successfully sent *)
51 | Failed of string (** Failed with error message *)
52 | Cancelled (** Cancelled by user *)
53
54(** Queued request with metadata *)
55type queued_request = {
56 id : int;
57 room_id : Matrix_proto.Id.Room_id.t;
58 kind : request_kind;
59 mutable state : request_state;
60 created_at : int64;
61 mutable retry_count : int;
62 mutable last_error : string option;
63 (* For dependency tracking *)
64 mutable depends_on : int option; (** ID of parent request *)
65 mutable dependents : int list; (** IDs of dependent requests *)
66}
67
68(** Result of sending a request *)
69type send_result =
70 | Sent_ok of { event_id : Matrix_proto.Id.Event_id.t option }
71 | Send_failed of { error : string; retryable : bool }
72 | Send_cancelled
73
74(** {1 Send Handle} *)
75
76(** Handle for a queued request, allowing cancellation and status checks *)
77type send_handle = {
78 request_id : int;
79 txn_id : string;
80 room_id : Matrix_proto.Id.Room_id.t;
81 queue : room_send_queue;
82}
83
84(** Room-specific send queue *)
85and room_send_queue = {
86 room_id : Matrix_proto.Id.Room_id.t;
87 mutable requests : queued_request list;
88 mutable next_id : int;
89 mutable enabled : bool;
90 mutable is_processing : bool;
91 (* Configuration *)
92 max_retries : int;
93 retry_delay_ms : int;
94 (* Callbacks *)
95 mutable on_state_change : (queued_request -> unit) option;
96}
97
98(** Global send queue manager *)
99type t = {
100 user_id : Matrix_proto.Id.User_id.t;
101 mutable room_queues : (string * room_send_queue) list;
102 mutable globally_enabled : bool;
103 mutable on_error : (queued_request -> string -> unit) option;
104}
105
106(** {1 Queue Creation} *)
107
108(** Create a new room send queue *)
109let create_room_queue ~room_id ?(max_retries = 3) ?(retry_delay_ms = 1000) () = {
110 room_id;
111 requests = [];
112 next_id = 0;
113 enabled = true;
114 is_processing = false;
115 max_retries;
116 retry_delay_ms;
117 on_state_change = None;
118}
119
120(** Create a new global send queue manager *)
121let create ~user_id = {
122 user_id;
123 room_queues = [];
124 globally_enabled = true;
125 on_error = None;
126}
127
128(** Get or create a room queue *)
129let get_room_queue t room_id =
130 let room_id_str = Matrix_proto.Id.Room_id.to_string room_id in
131 match List.assoc_opt room_id_str t.room_queues with
132 | Some queue -> queue
133 | None ->
134 let queue = create_room_queue ~room_id () in
135 t.room_queues <- (room_id_str, queue) :: t.room_queues;
136 queue
137
138(** {1 Enqueueing Requests} *)
139
140(** Generate a new transaction ID *)
141let generate_txn_id () =
142 let random_bytes = Mirage_crypto_rng.generate 16 in
143 "m" ^ (Base64.encode_string ~pad:false random_bytes)
144
145(** Enqueue a request *)
146let enqueue (queue : room_send_queue) kind =
147 let id = queue.next_id in
148 queue.next_id <- queue.next_id + 1;
149 let request = {
150 id;
151 room_id = queue.room_id;
152 kind;
153 state = Pending;
154 created_at = Int64.of_float (Unix.gettimeofday () *. 1000.0);
155 retry_count = 0;
156 last_error = None;
157 depends_on = None;
158 dependents = [];
159 } in
160 queue.requests <- queue.requests @ [request];
161 let handle = {
162 request_id = id;
163 txn_id = txn_id_of_kind kind;
164 room_id = queue.room_id;
165 queue;
166 } in
167 (Option.iter (fun cb -> cb request) queue.on_state_change);
168 handle
169
170(** Enqueue a message event *)
171let send_message t ~room_id ~event_type ~content =
172 let queue = get_room_queue t room_id in
173 let txn_id = generate_txn_id () in
174 enqueue queue (Event { event_type; content; txn_id })
175
176(** Enqueue a text message *)
177let send_text t ~room_id ~body =
178 let content = Jsont.Object (
179 [(("msgtype", Jsont.Meta.none), Jsont.String ("m.text", Jsont.Meta.none));
180 (("body", Jsont.Meta.none), Jsont.String (body, Jsont.Meta.none))],
181 Jsont.Meta.none
182 ) in
183 send_message t ~room_id ~event_type:"m.room.message" ~content
184
185(** Enqueue a reaction *)
186let send_reaction t ~room_id ~relates_to ~key =
187 let queue = get_room_queue t room_id in
188 let txn_id = generate_txn_id () in
189 enqueue queue (Reaction { relates_to; key; txn_id })
190
191(** Enqueue a redaction *)
192let send_redaction t ~room_id ~event_id ?reason () =
193 let queue = get_room_queue t room_id in
194 let txn_id = generate_txn_id () in
195 enqueue queue (Redaction { event_id; reason; txn_id })
196
197(** {1 Dependencies} *)
198
199(** Add a dependency between requests *)
200let add_dependency ~parent:parent_handle ~child:child_handle =
201 let queue = parent_handle.queue in
202 match
203 List.find_opt (fun r -> r.id = parent_handle.request_id) queue.requests,
204 List.find_opt (fun r -> r.id = child_handle.request_id) queue.requests
205 with
206 | Some parent, Some child ->
207 child.depends_on <- Some parent.id;
208 parent.dependents <- child.id :: parent.dependents
209 | _ -> ()
210
211(** Check if a request's dependencies are satisfied *)
212let dependencies_satisfied queue request =
213 match request.depends_on with
214 | None -> true
215 | Some parent_id ->
216 match List.find_opt (fun r -> r.id = parent_id) queue.requests with
217 | Some parent -> parent.state = Sent
218 | None -> true (* Parent removed, assume satisfied *)
219
220(** {1 Request State Management} *)
221
222(** Update request state *)
223let update_state queue request new_state =
224 request.state <- new_state;
225 Option.iter (fun cb -> cb request) queue.on_state_change
226
227(** Cancel a queued request *)
228let cancel handle =
229 let queue = handle.queue in
230 match List.find_opt (fun r -> r.id = handle.request_id) queue.requests with
231 | Some request when request.state = Pending ->
232 update_state queue request Cancelled;
233 true
234 | _ -> false (* Can't cancel if already sending/sent *)
235
236(** Abort a request (cancel and remove) *)
237let abort handle =
238 if cancel handle then begin
239 let queue = handle.queue in
240 queue.requests <- List.filter (fun r -> r.id <> handle.request_id) queue.requests;
241 true
242 end else
243 false
244
245(** Get request by handle *)
246let get_request handle =
247 List.find_opt (fun r -> r.id = handle.request_id) handle.queue.requests
248
249(** Check if request is still pending *)
250let is_pending handle =
251 match get_request handle with
252 | Some r -> r.state = Pending
253 | None -> false
254
255(** Check if request was sent *)
256let is_sent handle =
257 match get_request handle with
258 | Some r -> r.state = Sent
259 | None -> false
260
261(** {1 Queue Processing} *)
262
263(** Get next sendable request from queue *)
264let next_sendable queue =
265 if not queue.enabled then None
266 else
267 List.find_opt (fun r ->
268 r.state = Pending &&
269 dependencies_satisfied queue r &&
270 r.retry_count < queue.max_retries
271 ) queue.requests
272
273(** Mark request as being sent *)
274let mark_sending queue request =
275 update_state queue request Sending
276
277(** Mark request as successfully sent *)
278let mark_sent queue request =
279 update_state queue request Sent
280
281(** Mark request as failed with optional retry *)
282let mark_failed queue request error ~retryable =
283 request.retry_count <- request.retry_count + 1;
284 request.last_error <- Some error;
285 if retryable && request.retry_count < queue.max_retries then
286 update_state queue request Pending (* Will retry *)
287 else
288 update_state queue request (Failed error)
289
290(** Remove completed/cancelled/failed requests *)
291let cleanup_queue queue =
292 queue.requests <- List.filter (fun r ->
293 match r.state with
294 | Sent | Cancelled | Failed _ -> false
295 | Pending | Sending -> true
296 ) queue.requests
297
298(** {1 Queue Statistics} *)
299
300(** Count of pending requests in a room queue *)
301let pending_count queue =
302 List.length (List.filter (fun r -> r.state = Pending) queue.requests)
303
304(** Count of all pending requests across all rooms *)
305let total_pending t =
306 List.fold_left (fun acc (_, queue) ->
307 acc + pending_count queue
308 ) 0 t.room_queues
309
310(** Get all pending requests for a room *)
311let pending_requests queue =
312 List.filter (fun r -> r.state = Pending) queue.requests
313
314(** Get all failed requests for a room *)
315let failed_requests queue =
316 List.filter (fun r -> match r.state with Failed _ -> true | _ -> false) queue.requests
317
318(** {1 Queue Control} *)
319
320(** Enable/disable a room queue *)
321let set_room_enabled queue enabled =
322 queue.enabled <- enabled
323
324(** Enable/disable all queues globally *)
325let set_enabled t enabled =
326 t.globally_enabled <- enabled;
327 List.iter (fun (_, queue) ->
328 queue.enabled <- enabled
329 ) t.room_queues
330
331(** Check if globally enabled *)
332let is_enabled t = t.globally_enabled
333
334(** Check if a room queue is enabled *)
335let is_room_enabled queue = queue.enabled
336
337(** {1 Event Callbacks} *)
338
339(** Set callback for state changes *)
340let on_state_change queue callback =
341 queue.on_state_change <- Some callback
342
343(** Set global error callback *)
344let on_error t callback =
345 t.on_error <- Some callback
346
347(** {1 Persistence} *)
348
349(** Serializable queue state *)
350type persisted_request = {
351 p_room_id : string;
352 p_kind : request_kind;
353 p_created_at : int64;
354 p_retry_count : int;
355 p_depends_on : int option;
356}
357
358(** Convert request to persistable form *)
359let request_to_persisted (request : queued_request) = {
360 p_room_id = Matrix_proto.Id.Room_id.to_string request.room_id;
361 p_kind = request.kind;
362 p_created_at = request.created_at;
363 p_retry_count = request.retry_count;
364 p_depends_on = request.depends_on;
365}
366
367(** Get all pending requests for persistence *)
368let requests_to_persist t =
369 List.concat_map (fun (_, queue) ->
370 pending_requests queue |> List.map request_to_persisted
371 ) t.room_queues
372
373(** Restore requests from persistence *)
374let restore_requests t persisted_requests =
375 List.iter (fun p ->
376 match Matrix_proto.Id.Room_id.of_string p.p_room_id with
377 | Error _ -> () (* Skip invalid room IDs *)
378 | Ok room_id ->
379 let queue = get_room_queue t room_id in
380 let id = queue.next_id in
381 queue.next_id <- queue.next_id + 1;
382 let request = {
383 id;
384 room_id;
385 kind = p.p_kind;
386 state = Pending;
387 created_at = p.p_created_at;
388 retry_count = p.p_retry_count;
389 last_error = None;
390 depends_on = p.p_depends_on;
391 dependents = [];
392 } in
393 queue.requests <- queue.requests @ [request]
394 ) persisted_requests
395
396(** {1 Local Echo} *)
397
398(** Create a local echo event from a queued request *)
399let local_echo_event request =
400 match request.kind with
401 | Event { event_type; content; txn_id } ->
402 Some (event_type, content, txn_id)
403 | Reaction { relates_to; key; txn_id } ->
404 let event_id = Matrix_proto.Id.Event_id.to_string relates_to in
405 let content = Jsont.Object (
406 [(("m.relates_to", Jsont.Meta.none),
407 Jsont.Object (
408 [(("rel_type", Jsont.Meta.none), Jsont.String ("m.annotation", Jsont.Meta.none));
409 (("event_id", Jsont.Meta.none), Jsont.String (event_id, Jsont.Meta.none));
410 (("key", Jsont.Meta.none), Jsont.String (key, Jsont.Meta.none))],
411 Jsont.Meta.none))],
412 Jsont.Meta.none
413 ) in
414 Some ("m.reaction", content, txn_id)
415 | MediaUpload _ -> None
416 | Redaction _ -> None
417
418(** Check if an event_id matches a transaction ID (for local echo replacement) *)
419let matches_txn_id request ~event_id =
420 (* The event_id might contain the txn_id for local echoes *)
421 let txn = txn_id_of_kind request.kind in
422 String.equal (Matrix_proto.Id.Event_id.to_string event_id) ("$" ^ txn)
423
424(** {1 Retry Logic} *)
425
426(** Calculate delay for next retry (exponential backoff) *)
427let retry_delay queue request =
428 let base_delay = queue.retry_delay_ms in
429 let multiplier = 1 lsl request.retry_count in (* 2^retry_count *)
430 min (base_delay * multiplier) 60000 (* Cap at 60 seconds *)
431
432(** Check if a request should be retried *)
433let should_retry queue request =
434 request.retry_count < queue.max_retries &&
435 match request.state with
436 | Failed _ -> false (* Already marked as terminal failure *)
437 | Pending -> true (* Will be retried *)
438 | _ -> false
439
440(** {1 Media Upload Support} *)
441
442(** Create a media upload request with dependent event send *)
443let send_media t ~room_id ~content_type ~data_size ?local_path ~event_content () =
444 let queue = get_room_queue t room_id in
445
446 (* First, create the upload request *)
447 let upload_txn_id = generate_txn_id () in
448 let upload_handle = enqueue queue (MediaUpload {
449 content_type;
450 data_size;
451 local_path;
452 txn_id = upload_txn_id;
453 }) in
454
455 (* Then create the event request that depends on it *)
456 let event_txn_id = generate_txn_id () in
457 let event_handle = enqueue queue (Event {
458 event_type = "m.room.message";
459 content = event_content;
460 txn_id = event_txn_id;
461 }) in
462
463 (* Set up dependency *)
464 add_dependency ~parent:upload_handle ~child:event_handle;
465
466 (upload_handle, event_handle)