(** Send queue for serialized message sending and offline support. This module provides: - Offline message queueing - Automatic retry with backoff - Transaction ID tracking for deduplication - Local echo support - Media upload coordination Each room has its own queue to serialize sends, preventing race conditions and ensuring messages are sent in order. *) (** {1 Queue Request Types} *) (** Type of queued request *) type request_kind = | Event of { event_type : string; content : Jsont.json; txn_id : string; } | MediaUpload of { content_type : string; data_size : int; local_path : string option; txn_id : string; } | Reaction of { relates_to : Matrix_proto.Id.Event_id.t; key : string; txn_id : string; } | Redaction of { event_id : Matrix_proto.Id.Event_id.t; reason : string option; txn_id : string; } (** Get transaction ID from request kind *) let txn_id_of_kind = function | Event { txn_id; _ } -> txn_id | MediaUpload { txn_id; _ } -> txn_id | Reaction { txn_id; _ } -> txn_id | Redaction { txn_id; _ } -> txn_id (** Request state *) type request_state = | Pending (** Waiting to be sent *) | Sending (** Currently being sent *) | Sent (** Successfully sent *) | Failed of string (** Failed with error message *) | Cancelled (** Cancelled by user *) (** Queued request with metadata *) type queued_request = { id : int; room_id : Matrix_proto.Id.Room_id.t; kind : request_kind; mutable state : request_state; created_at : int64; mutable retry_count : int; mutable last_error : string option; (* For dependency tracking *) mutable depends_on : int option; (** ID of parent request *) mutable dependents : int list; (** IDs of dependent requests *) } (** Result of sending a request *) type send_result = | Sent_ok of { event_id : Matrix_proto.Id.Event_id.t option } | Send_failed of { error : string; retryable : bool } | Send_cancelled (** {1 Send Handle} *) (** Handle for a queued request, allowing cancellation and status checks *) type send_handle = { request_id : int; txn_id : string; room_id : Matrix_proto.Id.Room_id.t; queue : room_send_queue; } (** Room-specific send queue *) and room_send_queue = { room_id : Matrix_proto.Id.Room_id.t; mutable requests : queued_request list; mutable next_id : int; mutable enabled : bool; mutable is_processing : bool; (* Configuration *) max_retries : int; retry_delay_ms : int; (* Callbacks *) mutable on_state_change : (queued_request -> unit) option; } (** Global send queue manager *) type t = { user_id : Matrix_proto.Id.User_id.t; mutable room_queues : (string * room_send_queue) list; mutable globally_enabled : bool; mutable on_error : (queued_request -> string -> unit) option; } (** {1 Queue Creation} *) (** Create a new room send queue *) let create_room_queue ~room_id ?(max_retries = 3) ?(retry_delay_ms = 1000) () = { room_id; requests = []; next_id = 0; enabled = true; is_processing = false; max_retries; retry_delay_ms; on_state_change = None; } (** Create a new global send queue manager *) let create ~user_id = { user_id; room_queues = []; globally_enabled = true; on_error = None; } (** Get or create a room queue *) let get_room_queue t room_id = let room_id_str = Matrix_proto.Id.Room_id.to_string room_id in match List.assoc_opt room_id_str t.room_queues with | Some queue -> queue | None -> let queue = create_room_queue ~room_id () in t.room_queues <- (room_id_str, queue) :: t.room_queues; queue (** {1 Enqueueing Requests} *) (** Generate a new transaction ID *) let generate_txn_id () = let random_bytes = Mirage_crypto_rng.generate 16 in "m" ^ (Base64.encode_string ~pad:false random_bytes) (** Enqueue a request *) let enqueue (queue : room_send_queue) kind = let id = queue.next_id in queue.next_id <- queue.next_id + 1; let request = { id; room_id = queue.room_id; kind; state = Pending; created_at = Int64.of_float (Unix.gettimeofday () *. 1000.0); retry_count = 0; last_error = None; depends_on = None; dependents = []; } in queue.requests <- queue.requests @ [request]; let handle = { request_id = id; txn_id = txn_id_of_kind kind; room_id = queue.room_id; queue; } in (Option.iter (fun cb -> cb request) queue.on_state_change); handle (** Enqueue a message event *) let send_message t ~room_id ~event_type ~content = let queue = get_room_queue t room_id in let txn_id = generate_txn_id () in enqueue queue (Event { event_type; content; txn_id }) (** Enqueue a text message *) let send_text t ~room_id ~body = let content = Jsont.Object ( [(("msgtype", Jsont.Meta.none), Jsont.String ("m.text", Jsont.Meta.none)); (("body", Jsont.Meta.none), Jsont.String (body, Jsont.Meta.none))], Jsont.Meta.none ) in send_message t ~room_id ~event_type:"m.room.message" ~content (** Enqueue a reaction *) let send_reaction t ~room_id ~relates_to ~key = let queue = get_room_queue t room_id in let txn_id = generate_txn_id () in enqueue queue (Reaction { relates_to; key; txn_id }) (** Enqueue a redaction *) let send_redaction t ~room_id ~event_id ?reason () = let queue = get_room_queue t room_id in let txn_id = generate_txn_id () in enqueue queue (Redaction { event_id; reason; txn_id }) (** {1 Dependencies} *) (** Add a dependency between requests *) let add_dependency ~parent:parent_handle ~child:child_handle = let queue = parent_handle.queue in match List.find_opt (fun r -> r.id = parent_handle.request_id) queue.requests, List.find_opt (fun r -> r.id = child_handle.request_id) queue.requests with | Some parent, Some child -> child.depends_on <- Some parent.id; parent.dependents <- child.id :: parent.dependents | _ -> () (** Check if a request's dependencies are satisfied *) let dependencies_satisfied queue request = match request.depends_on with | None -> true | Some parent_id -> match List.find_opt (fun r -> r.id = parent_id) queue.requests with | Some parent -> parent.state = Sent | None -> true (* Parent removed, assume satisfied *) (** {1 Request State Management} *) (** Update request state *) let update_state queue request new_state = request.state <- new_state; Option.iter (fun cb -> cb request) queue.on_state_change (** Cancel a queued request *) let cancel handle = let queue = handle.queue in match List.find_opt (fun r -> r.id = handle.request_id) queue.requests with | Some request when request.state = Pending -> update_state queue request Cancelled; true | _ -> false (* Can't cancel if already sending/sent *) (** Abort a request (cancel and remove) *) let abort handle = if cancel handle then begin let queue = handle.queue in queue.requests <- List.filter (fun r -> r.id <> handle.request_id) queue.requests; true end else false (** Get request by handle *) let get_request handle = List.find_opt (fun r -> r.id = handle.request_id) handle.queue.requests (** Check if request is still pending *) let is_pending handle = match get_request handle with | Some r -> r.state = Pending | None -> false (** Check if request was sent *) let is_sent handle = match get_request handle with | Some r -> r.state = Sent | None -> false (** {1 Queue Processing} *) (** Get next sendable request from queue *) let next_sendable queue = if not queue.enabled then None else List.find_opt (fun r -> r.state = Pending && dependencies_satisfied queue r && r.retry_count < queue.max_retries ) queue.requests (** Mark request as being sent *) let mark_sending queue request = update_state queue request Sending (** Mark request as successfully sent *) let mark_sent queue request = update_state queue request Sent (** Mark request as failed with optional retry *) let mark_failed queue request error ~retryable = request.retry_count <- request.retry_count + 1; request.last_error <- Some error; if retryable && request.retry_count < queue.max_retries then update_state queue request Pending (* Will retry *) else update_state queue request (Failed error) (** Remove completed/cancelled/failed requests *) let cleanup_queue queue = queue.requests <- List.filter (fun r -> match r.state with | Sent | Cancelled | Failed _ -> false | Pending | Sending -> true ) queue.requests (** {1 Queue Statistics} *) (** Count of pending requests in a room queue *) let pending_count queue = List.length (List.filter (fun r -> r.state = Pending) queue.requests) (** Count of all pending requests across all rooms *) let total_pending t = List.fold_left (fun acc (_, queue) -> acc + pending_count queue ) 0 t.room_queues (** Get all pending requests for a room *) let pending_requests queue = List.filter (fun r -> r.state = Pending) queue.requests (** Get all failed requests for a room *) let failed_requests queue = List.filter (fun r -> match r.state with Failed _ -> true | _ -> false) queue.requests (** {1 Queue Control} *) (** Enable/disable a room queue *) let set_room_enabled queue enabled = queue.enabled <- enabled (** Enable/disable all queues globally *) let set_enabled t enabled = t.globally_enabled <- enabled; List.iter (fun (_, queue) -> queue.enabled <- enabled ) t.room_queues (** Check if globally enabled *) let is_enabled t = t.globally_enabled (** Check if a room queue is enabled *) let is_room_enabled queue = queue.enabled (** {1 Event Callbacks} *) (** Set callback for state changes *) let on_state_change queue callback = queue.on_state_change <- Some callback (** Set global error callback *) let on_error t callback = t.on_error <- Some callback (** {1 Persistence} *) (** Serializable queue state *) type persisted_request = { p_room_id : string; p_kind : request_kind; p_created_at : int64; p_retry_count : int; p_depends_on : int option; } (** Convert request to persistable form *) let request_to_persisted (request : queued_request) = { p_room_id = Matrix_proto.Id.Room_id.to_string request.room_id; p_kind = request.kind; p_created_at = request.created_at; p_retry_count = request.retry_count; p_depends_on = request.depends_on; } (** Get all pending requests for persistence *) let requests_to_persist t = List.concat_map (fun (_, queue) -> pending_requests queue |> List.map request_to_persisted ) t.room_queues (** Restore requests from persistence *) let restore_requests t persisted_requests = List.iter (fun p -> match Matrix_proto.Id.Room_id.of_string p.p_room_id with | Error _ -> () (* Skip invalid room IDs *) | Ok room_id -> let queue = get_room_queue t room_id in let id = queue.next_id in queue.next_id <- queue.next_id + 1; let request = { id; room_id; kind = p.p_kind; state = Pending; created_at = p.p_created_at; retry_count = p.p_retry_count; last_error = None; depends_on = p.p_depends_on; dependents = []; } in queue.requests <- queue.requests @ [request] ) persisted_requests (** {1 Local Echo} *) (** Create a local echo event from a queued request *) let local_echo_event request = match request.kind with | Event { event_type; content; txn_id } -> Some (event_type, content, txn_id) | Reaction { relates_to; key; txn_id } -> let event_id = Matrix_proto.Id.Event_id.to_string relates_to in let content = Jsont.Object ( [(("m.relates_to", Jsont.Meta.none), Jsont.Object ( [(("rel_type", Jsont.Meta.none), Jsont.String ("m.annotation", Jsont.Meta.none)); (("event_id", Jsont.Meta.none), Jsont.String (event_id, Jsont.Meta.none)); (("key", Jsont.Meta.none), Jsont.String (key, Jsont.Meta.none))], Jsont.Meta.none))], Jsont.Meta.none ) in Some ("m.reaction", content, txn_id) | MediaUpload _ -> None | Redaction _ -> None (** Check if an event_id matches a transaction ID (for local echo replacement) *) let matches_txn_id request ~event_id = (* The event_id might contain the txn_id for local echoes *) let txn = txn_id_of_kind request.kind in String.equal (Matrix_proto.Id.Event_id.to_string event_id) ("$" ^ txn) (** {1 Retry Logic} *) (** Calculate delay for next retry (exponential backoff) *) let retry_delay queue request = let base_delay = queue.retry_delay_ms in let multiplier = 1 lsl request.retry_count in (* 2^retry_count *) min (base_delay * multiplier) 60000 (* Cap at 60 seconds *) (** Check if a request should be retried *) let should_retry queue request = request.retry_count < queue.max_retries && match request.state with | Failed _ -> false (* Already marked as terminal failure *) | Pending -> true (* Will be retried *) | _ -> false (** {1 Media Upload Support} *) (** Create a media upload request with dependent event send *) let send_media t ~room_id ~content_type ~data_size ?local_path ~event_content () = let queue = get_room_queue t room_id in (* First, create the upload request *) let upload_txn_id = generate_txn_id () in let upload_handle = enqueue queue (MediaUpload { content_type; data_size; local_path; txn_id = upload_txn_id; }) in (* Then create the event request that depends on it *) let event_txn_id = generate_txn_id () in let event_handle = enqueue queue (Event { event_type = "m.room.message"; content = event_content; txn_id = event_txn_id; }) in (* Set up dependency *) add_dependency ~parent:upload_handle ~child:event_handle; (upload_handle, event_handle)