(** Send queue for serialized message sending and offline support. This module provides: - Offline message queueing - Automatic retry with backoff - Transaction ID tracking for deduplication - Local echo support - Media upload coordination Each room has its own queue to serialize sends, preventing race conditions and ensuring messages are sent in order. *) (** {1 Queue Request Types} *) (** Type of queued request. *) type request_kind = | Event of { event_type : string; content : Jsont.json; txn_id : string; } | MediaUpload of { content_type : string; data_size : int; local_path : string option; txn_id : string; } | Reaction of { relates_to : Matrix_proto.Id.Event_id.t; key : string; txn_id : string; } | Redaction of { event_id : Matrix_proto.Id.Event_id.t; reason : string option; txn_id : string; } (** Request state. *) type request_state = | Pending | Sending | Sent | Failed of string | Cancelled (** Queued request with metadata. *) type queued_request = { id : int; room_id : Matrix_proto.Id.Room_id.t; kind : request_kind; mutable state : request_state; created_at : int64; mutable retry_count : int; mutable last_error : string option; mutable depends_on : int option; mutable dependents : int list; } (** Result of sending a request. *) type send_result = | Sent_ok of { event_id : Matrix_proto.Id.Event_id.t option } | Send_failed of { error : string; retryable : bool } | Send_cancelled (** {1 Room Send Queue} *) (** Room-specific send queue. *) type room_send_queue = { room_id : Matrix_proto.Id.Room_id.t; mutable requests : queued_request list; mutable next_id : int; mutable enabled : bool; mutable is_processing : bool; max_retries : int; retry_delay_ms : int; mutable on_state_change : (queued_request -> unit) option; } (** {1 Send Handle} *) (** Handle for a queued request, allowing cancellation and status checks. *) type send_handle = { request_id : int; txn_id : string; room_id : Matrix_proto.Id.Room_id.t; queue : room_send_queue; } (** {1 Global Send Queue} *) (** Global send queue manager. *) type t = { user_id : Matrix_proto.Id.User_id.t; mutable room_queues : (string * room_send_queue) list; mutable globally_enabled : bool; mutable on_error : (queued_request -> string -> unit) option; } (** {1 Queue Creation} *) val create_room_queue : room_id:Matrix_proto.Id.Room_id.t -> ?max_retries:int -> ?retry_delay_ms:int -> unit -> room_send_queue (** Create a new room send queue. *) val create : user_id:Matrix_proto.Id.User_id.t -> t (** Create a new global send queue manager. *) val get_room_queue : t -> Matrix_proto.Id.Room_id.t -> room_send_queue (** Get or create a room queue. *) (** {1 Enqueueing Requests} *) val generate_txn_id : unit -> string (** Generate a new transaction ID. *) val enqueue : room_send_queue -> request_kind -> send_handle (** Enqueue a request. *) val send_message : t -> room_id:Matrix_proto.Id.Room_id.t -> event_type:string -> content:Jsont.json -> send_handle (** Enqueue a message event. *) val send_text : t -> room_id:Matrix_proto.Id.Room_id.t -> body:string -> send_handle (** Enqueue a text message. *) val send_reaction : t -> room_id:Matrix_proto.Id.Room_id.t -> relates_to:Matrix_proto.Id.Event_id.t -> key:string -> send_handle (** Enqueue a reaction. *) val send_redaction : t -> room_id:Matrix_proto.Id.Room_id.t -> event_id:Matrix_proto.Id.Event_id.t -> ?reason:string -> unit -> send_handle (** Enqueue a redaction. *) (** {1 Dependencies} *) val add_dependency : parent:send_handle -> child:send_handle -> unit (** Add a dependency between requests. *) (** {1 Request State Management} *) val cancel : send_handle -> bool (** Cancel a queued request. Returns true if cancelled. *) val abort : send_handle -> bool (** Abort a request (cancel and remove). *) val get_request : send_handle -> queued_request option (** Get request by handle. *) val is_pending : send_handle -> bool (** Check if request is still pending. *) val is_sent : send_handle -> bool (** Check if request was sent. *) (** {1 Queue Processing} *) val next_sendable : room_send_queue -> queued_request option (** Get next sendable request from queue. *) val mark_sending : room_send_queue -> queued_request -> unit (** Mark request as being sent. *) val mark_sent : room_send_queue -> queued_request -> unit (** Mark request as successfully sent. *) val mark_failed : room_send_queue -> queued_request -> string -> retryable:bool -> unit (** Mark request as failed with optional retry. *) val cleanup_queue : room_send_queue -> unit (** Remove completed/cancelled/failed requests. *) (** {1 Queue Statistics} *) val pending_count : room_send_queue -> int (** Count of pending requests in a room queue. *) val total_pending : t -> int (** Count of all pending requests across all rooms. *) val pending_requests : room_send_queue -> queued_request list (** Get all pending requests for a room. *) val failed_requests : room_send_queue -> queued_request list (** Get all failed requests for a room. *) (** {1 Queue Control} *) val set_room_enabled : room_send_queue -> bool -> unit (** Enable/disable a room queue. *) val set_enabled : t -> bool -> unit (** Enable/disable all queues globally. *) val is_enabled : t -> bool (** Check if globally enabled. *) val is_room_enabled : room_send_queue -> bool (** Check if a room queue is enabled. *) (** {1 Event Callbacks} *) val on_state_change : room_send_queue -> (queued_request -> unit) -> unit (** Set callback for state changes. *) val on_error : t -> (queued_request -> string -> unit) -> unit (** Set global error callback. *) (** {1 Persistence} *) (** Serializable queue state. *) type persisted_request = { p_room_id : string; p_kind : request_kind; p_created_at : int64; p_retry_count : int; p_depends_on : int option; } val request_to_persisted : queued_request -> persisted_request (** Convert request to persistable form. *) val requests_to_persist : t -> persisted_request list (** Get all pending requests for persistence. *) val restore_requests : t -> persisted_request list -> unit (** Restore requests from persistence. *) (** {1 Local Echo} *) val local_echo_event : queued_request -> (string * Jsont.json * string) option (** Create a local echo event from a queued request. Returns [(event_type, content, txn_id)] or [None]. *) val matches_txn_id : queued_request -> event_id:Matrix_proto.Id.Event_id.t -> bool (** Check if an event_id matches a transaction ID (for local echo replacement). *) (** {1 Retry Logic} *) val retry_delay : room_send_queue -> queued_request -> int (** Calculate delay for next retry (exponential backoff). *) val should_retry : room_send_queue -> queued_request -> bool (** Check if a request should be retried. *) (** {1 Media Upload Support} *) val send_media : t -> room_id:Matrix_proto.Id.Room_id.t -> content_type:string -> data_size:int -> ?local_path:string -> event_content:Jsont.json -> unit -> send_handle * send_handle (** Create a media upload request with dependent event send. Returns [(upload_handle, event_handle)]. *) (** {1 Internal} *) val txn_id_of_kind : request_kind -> string (** Get transaction ID from request kind. *) val dependencies_satisfied : room_send_queue -> queued_request -> bool (** Check if a request's dependencies are satisfied. *) val update_state : room_send_queue -> queued_request -> request_state -> unit (** Update request state. *)