Matrix protocol in OCaml, Eio specialised
at main 466 lines 14 kB view raw
1(** Send queue for serialized message sending and offline support. 2 3 This module provides: 4 - Offline message queueing 5 - Automatic retry with backoff 6 - Transaction ID tracking for deduplication 7 - Local echo support 8 - Media upload coordination 9 10 Each room has its own queue to serialize sends, preventing race conditions 11 and ensuring messages are sent in order. *) 12 13(** {1 Queue Request Types} *) 14 15(** Type of queued request *) 16type request_kind = 17 | Event of { 18 event_type : string; 19 content : Jsont.json; 20 txn_id : string; 21 } 22 | MediaUpload of { 23 content_type : string; 24 data_size : int; 25 local_path : string option; 26 txn_id : string; 27 } 28 | Reaction of { 29 relates_to : Matrix_proto.Id.Event_id.t; 30 key : string; 31 txn_id : string; 32 } 33 | Redaction of { 34 event_id : Matrix_proto.Id.Event_id.t; 35 reason : string option; 36 txn_id : string; 37 } 38 39(** Get transaction ID from request kind *) 40let txn_id_of_kind = function 41 | Event { txn_id; _ } -> txn_id 42 | MediaUpload { txn_id; _ } -> txn_id 43 | Reaction { txn_id; _ } -> txn_id 44 | Redaction { txn_id; _ } -> txn_id 45 46(** Request state *) 47type request_state = 48 | Pending (** Waiting to be sent *) 49 | Sending (** Currently being sent *) 50 | Sent (** Successfully sent *) 51 | Failed of string (** Failed with error message *) 52 | Cancelled (** Cancelled by user *) 53 54(** Queued request with metadata *) 55type queued_request = { 56 id : int; 57 room_id : Matrix_proto.Id.Room_id.t; 58 kind : request_kind; 59 mutable state : request_state; 60 created_at : int64; 61 mutable retry_count : int; 62 mutable last_error : string option; 63 (* For dependency tracking *) 64 mutable depends_on : int option; (** ID of parent request *) 65 mutable dependents : int list; (** IDs of dependent requests *) 66} 67 68(** Result of sending a request *) 69type send_result = 70 | Sent_ok of { event_id : Matrix_proto.Id.Event_id.t option } 71 | Send_failed of { error : string; retryable : bool } 72 | Send_cancelled 73 74(** {1 Send Handle} *) 75 76(** Handle for a queued request, allowing cancellation and status checks *) 77type send_handle = { 78 request_id : int; 79 txn_id : string; 80 room_id : Matrix_proto.Id.Room_id.t; 81 queue : room_send_queue; 82} 83 84(** Room-specific send queue *) 85and room_send_queue = { 86 room_id : Matrix_proto.Id.Room_id.t; 87 mutable requests : queued_request list; 88 mutable next_id : int; 89 mutable enabled : bool; 90 mutable is_processing : bool; 91 (* Configuration *) 92 max_retries : int; 93 retry_delay_ms : int; 94 (* Callbacks *) 95 mutable on_state_change : (queued_request -> unit) option; 96} 97 98(** Global send queue manager *) 99type t = { 100 user_id : Matrix_proto.Id.User_id.t; 101 mutable room_queues : (string * room_send_queue) list; 102 mutable globally_enabled : bool; 103 mutable on_error : (queued_request -> string -> unit) option; 104} 105 106(** {1 Queue Creation} *) 107 108(** Create a new room send queue *) 109let create_room_queue ~room_id ?(max_retries = 3) ?(retry_delay_ms = 1000) () = { 110 room_id; 111 requests = []; 112 next_id = 0; 113 enabled = true; 114 is_processing = false; 115 max_retries; 116 retry_delay_ms; 117 on_state_change = None; 118} 119 120(** Create a new global send queue manager *) 121let create ~user_id = { 122 user_id; 123 room_queues = []; 124 globally_enabled = true; 125 on_error = None; 126} 127 128(** Get or create a room queue *) 129let get_room_queue t room_id = 130 let room_id_str = Matrix_proto.Id.Room_id.to_string room_id in 131 match List.assoc_opt room_id_str t.room_queues with 132 | Some queue -> queue 133 | None -> 134 let queue = create_room_queue ~room_id () in 135 t.room_queues <- (room_id_str, queue) :: t.room_queues; 136 queue 137 138(** {1 Enqueueing Requests} *) 139 140(** Generate a new transaction ID *) 141let generate_txn_id () = 142 let random_bytes = Mirage_crypto_rng.generate 16 in 143 "m" ^ (Base64.encode_string ~pad:false random_bytes) 144 145(** Enqueue a request *) 146let enqueue (queue : room_send_queue) kind = 147 let id = queue.next_id in 148 queue.next_id <- queue.next_id + 1; 149 let request = { 150 id; 151 room_id = queue.room_id; 152 kind; 153 state = Pending; 154 created_at = Int64.of_float (Unix.gettimeofday () *. 1000.0); 155 retry_count = 0; 156 last_error = None; 157 depends_on = None; 158 dependents = []; 159 } in 160 queue.requests <- queue.requests @ [request]; 161 let handle = { 162 request_id = id; 163 txn_id = txn_id_of_kind kind; 164 room_id = queue.room_id; 165 queue; 166 } in 167 (Option.iter (fun cb -> cb request) queue.on_state_change); 168 handle 169 170(** Enqueue a message event *) 171let send_message t ~room_id ~event_type ~content = 172 let queue = get_room_queue t room_id in 173 let txn_id = generate_txn_id () in 174 enqueue queue (Event { event_type; content; txn_id }) 175 176(** Enqueue a text message *) 177let send_text t ~room_id ~body = 178 let content = Jsont.Object ( 179 [(("msgtype", Jsont.Meta.none), Jsont.String ("m.text", Jsont.Meta.none)); 180 (("body", Jsont.Meta.none), Jsont.String (body, Jsont.Meta.none))], 181 Jsont.Meta.none 182 ) in 183 send_message t ~room_id ~event_type:"m.room.message" ~content 184 185(** Enqueue a reaction *) 186let send_reaction t ~room_id ~relates_to ~key = 187 let queue = get_room_queue t room_id in 188 let txn_id = generate_txn_id () in 189 enqueue queue (Reaction { relates_to; key; txn_id }) 190 191(** Enqueue a redaction *) 192let send_redaction t ~room_id ~event_id ?reason () = 193 let queue = get_room_queue t room_id in 194 let txn_id = generate_txn_id () in 195 enqueue queue (Redaction { event_id; reason; txn_id }) 196 197(** {1 Dependencies} *) 198 199(** Add a dependency between requests *) 200let add_dependency ~parent:parent_handle ~child:child_handle = 201 let queue = parent_handle.queue in 202 match 203 List.find_opt (fun r -> r.id = parent_handle.request_id) queue.requests, 204 List.find_opt (fun r -> r.id = child_handle.request_id) queue.requests 205 with 206 | Some parent, Some child -> 207 child.depends_on <- Some parent.id; 208 parent.dependents <- child.id :: parent.dependents 209 | _ -> () 210 211(** Check if a request's dependencies are satisfied *) 212let dependencies_satisfied queue request = 213 match request.depends_on with 214 | None -> true 215 | Some parent_id -> 216 match List.find_opt (fun r -> r.id = parent_id) queue.requests with 217 | Some parent -> parent.state = Sent 218 | None -> true (* Parent removed, assume satisfied *) 219 220(** {1 Request State Management} *) 221 222(** Update request state *) 223let update_state queue request new_state = 224 request.state <- new_state; 225 Option.iter (fun cb -> cb request) queue.on_state_change 226 227(** Cancel a queued request *) 228let cancel handle = 229 let queue = handle.queue in 230 match List.find_opt (fun r -> r.id = handle.request_id) queue.requests with 231 | Some request when request.state = Pending -> 232 update_state queue request Cancelled; 233 true 234 | _ -> false (* Can't cancel if already sending/sent *) 235 236(** Abort a request (cancel and remove) *) 237let abort handle = 238 if cancel handle then begin 239 let queue = handle.queue in 240 queue.requests <- List.filter (fun r -> r.id <> handle.request_id) queue.requests; 241 true 242 end else 243 false 244 245(** Get request by handle *) 246let get_request handle = 247 List.find_opt (fun r -> r.id = handle.request_id) handle.queue.requests 248 249(** Check if request is still pending *) 250let is_pending handle = 251 match get_request handle with 252 | Some r -> r.state = Pending 253 | None -> false 254 255(** Check if request was sent *) 256let is_sent handle = 257 match get_request handle with 258 | Some r -> r.state = Sent 259 | None -> false 260 261(** {1 Queue Processing} *) 262 263(** Get next sendable request from queue *) 264let next_sendable queue = 265 if not queue.enabled then None 266 else 267 List.find_opt (fun r -> 268 r.state = Pending && 269 dependencies_satisfied queue r && 270 r.retry_count < queue.max_retries 271 ) queue.requests 272 273(** Mark request as being sent *) 274let mark_sending queue request = 275 update_state queue request Sending 276 277(** Mark request as successfully sent *) 278let mark_sent queue request = 279 update_state queue request Sent 280 281(** Mark request as failed with optional retry *) 282let mark_failed queue request error ~retryable = 283 request.retry_count <- request.retry_count + 1; 284 request.last_error <- Some error; 285 if retryable && request.retry_count < queue.max_retries then 286 update_state queue request Pending (* Will retry *) 287 else 288 update_state queue request (Failed error) 289 290(** Remove completed/cancelled/failed requests *) 291let cleanup_queue queue = 292 queue.requests <- List.filter (fun r -> 293 match r.state with 294 | Sent | Cancelled | Failed _ -> false 295 | Pending | Sending -> true 296 ) queue.requests 297 298(** {1 Queue Statistics} *) 299 300(** Count of pending requests in a room queue *) 301let pending_count queue = 302 List.length (List.filter (fun r -> r.state = Pending) queue.requests) 303 304(** Count of all pending requests across all rooms *) 305let total_pending t = 306 List.fold_left (fun acc (_, queue) -> 307 acc + pending_count queue 308 ) 0 t.room_queues 309 310(** Get all pending requests for a room *) 311let pending_requests queue = 312 List.filter (fun r -> r.state = Pending) queue.requests 313 314(** Get all failed requests for a room *) 315let failed_requests queue = 316 List.filter (fun r -> match r.state with Failed _ -> true | _ -> false) queue.requests 317 318(** {1 Queue Control} *) 319 320(** Enable/disable a room queue *) 321let set_room_enabled queue enabled = 322 queue.enabled <- enabled 323 324(** Enable/disable all queues globally *) 325let set_enabled t enabled = 326 t.globally_enabled <- enabled; 327 List.iter (fun (_, queue) -> 328 queue.enabled <- enabled 329 ) t.room_queues 330 331(** Check if globally enabled *) 332let is_enabled t = t.globally_enabled 333 334(** Check if a room queue is enabled *) 335let is_room_enabled queue = queue.enabled 336 337(** {1 Event Callbacks} *) 338 339(** Set callback for state changes *) 340let on_state_change queue callback = 341 queue.on_state_change <- Some callback 342 343(** Set global error callback *) 344let on_error t callback = 345 t.on_error <- Some callback 346 347(** {1 Persistence} *) 348 349(** Serializable queue state *) 350type persisted_request = { 351 p_room_id : string; 352 p_kind : request_kind; 353 p_created_at : int64; 354 p_retry_count : int; 355 p_depends_on : int option; 356} 357 358(** Convert request to persistable form *) 359let request_to_persisted (request : queued_request) = { 360 p_room_id = Matrix_proto.Id.Room_id.to_string request.room_id; 361 p_kind = request.kind; 362 p_created_at = request.created_at; 363 p_retry_count = request.retry_count; 364 p_depends_on = request.depends_on; 365} 366 367(** Get all pending requests for persistence *) 368let requests_to_persist t = 369 List.concat_map (fun (_, queue) -> 370 pending_requests queue |> List.map request_to_persisted 371 ) t.room_queues 372 373(** Restore requests from persistence *) 374let restore_requests t persisted_requests = 375 List.iter (fun p -> 376 match Matrix_proto.Id.Room_id.of_string p.p_room_id with 377 | Error _ -> () (* Skip invalid room IDs *) 378 | Ok room_id -> 379 let queue = get_room_queue t room_id in 380 let id = queue.next_id in 381 queue.next_id <- queue.next_id + 1; 382 let request = { 383 id; 384 room_id; 385 kind = p.p_kind; 386 state = Pending; 387 created_at = p.p_created_at; 388 retry_count = p.p_retry_count; 389 last_error = None; 390 depends_on = p.p_depends_on; 391 dependents = []; 392 } in 393 queue.requests <- queue.requests @ [request] 394 ) persisted_requests 395 396(** {1 Local Echo} *) 397 398(** Create a local echo event from a queued request *) 399let local_echo_event request = 400 match request.kind with 401 | Event { event_type; content; txn_id } -> 402 Some (event_type, content, txn_id) 403 | Reaction { relates_to; key; txn_id } -> 404 let event_id = Matrix_proto.Id.Event_id.to_string relates_to in 405 let content = Jsont.Object ( 406 [(("m.relates_to", Jsont.Meta.none), 407 Jsont.Object ( 408 [(("rel_type", Jsont.Meta.none), Jsont.String ("m.annotation", Jsont.Meta.none)); 409 (("event_id", Jsont.Meta.none), Jsont.String (event_id, Jsont.Meta.none)); 410 (("key", Jsont.Meta.none), Jsont.String (key, Jsont.Meta.none))], 411 Jsont.Meta.none))], 412 Jsont.Meta.none 413 ) in 414 Some ("m.reaction", content, txn_id) 415 | MediaUpload _ -> None 416 | Redaction _ -> None 417 418(** Check if an event_id matches a transaction ID (for local echo replacement) *) 419let matches_txn_id request ~event_id = 420 (* The event_id might contain the txn_id for local echoes *) 421 let txn = txn_id_of_kind request.kind in 422 String.equal (Matrix_proto.Id.Event_id.to_string event_id) ("$" ^ txn) 423 424(** {1 Retry Logic} *) 425 426(** Calculate delay for next retry (exponential backoff) *) 427let retry_delay queue request = 428 let base_delay = queue.retry_delay_ms in 429 let multiplier = 1 lsl request.retry_count in (* 2^retry_count *) 430 min (base_delay * multiplier) 60000 (* Cap at 60 seconds *) 431 432(** Check if a request should be retried *) 433let should_retry queue request = 434 request.retry_count < queue.max_retries && 435 match request.state with 436 | Failed _ -> false (* Already marked as terminal failure *) 437 | Pending -> true (* Will be retried *) 438 | _ -> false 439 440(** {1 Media Upload Support} *) 441 442(** Create a media upload request with dependent event send *) 443let send_media t ~room_id ~content_type ~data_size ?local_path ~event_content () = 444 let queue = get_room_queue t room_id in 445 446 (* First, create the upload request *) 447 let upload_txn_id = generate_txn_id () in 448 let upload_handle = enqueue queue (MediaUpload { 449 content_type; 450 data_size; 451 local_path; 452 txn_id = upload_txn_id; 453 }) in 454 455 (* Then create the event request that depends on it *) 456 let event_txn_id = generate_txn_id () in 457 let event_handle = enqueue queue (Event { 458 event_type = "m.room.message"; 459 content = event_content; 460 txn_id = event_txn_id; 461 }) in 462 463 (* Set up dependency *) 464 add_dependency ~parent:upload_handle ~child:event_handle; 465 466 (upload_handle, event_handle)