Matrix protocol in OCaml, Eio specialised
at main 423 lines 13 kB view raw
1(** Room timeline management and event caching. 2 3 This module provides: 4 - Event storage and retrieval 5 - Timeline pagination (forward and backward) 6 - Room state tracking 7 - Event deduplication *) 8 9(** A linked chunk data structure for efficient timeline operations. 10 Based on the matrix-rust-sdk LinkedChunk pattern. *) 11module LinkedChunk = struct 12 type 'a chunk = { 13 mutable items : 'a list; 14 mutable prev : 'a chunk option; 15 mutable next : 'a chunk option; 16 id : int; 17 max_size : int; 18 } 19 20 type 'a t = { 21 mutable head : 'a chunk option; 22 mutable tail : 'a chunk option; 23 mutable next_id : int; 24 max_chunk_size : int; 25 } 26 27 let create ?(max_chunk_size = 50) () = { 28 head = None; 29 tail = None; 30 next_id = 0; 31 max_chunk_size; 32 } 33 34 let create_chunk t = 35 let chunk = { 36 items = []; 37 prev = None; 38 next = None; 39 id = t.next_id; 40 max_size = t.max_chunk_size; 41 } in 42 t.next_id <- t.next_id + 1; 43 chunk 44 45 (** Push item to the back of the timeline *) 46 let push_back t item = 47 match t.tail with 48 | None -> 49 let chunk = create_chunk t in 50 chunk.items <- [item]; 51 t.head <- Some chunk; 52 t.tail <- Some chunk 53 | Some tail -> 54 if List.length tail.items >= tail.max_size then begin 55 (* Create new chunk *) 56 let chunk = create_chunk t in 57 chunk.items <- [item]; 58 chunk.prev <- Some tail; 59 tail.next <- Some chunk; 60 t.tail <- Some chunk 61 end else 62 tail.items <- tail.items @ [item] 63 64 (** Push item to the front of the timeline *) 65 let push_front t item = 66 match t.head with 67 | None -> 68 let chunk = create_chunk t in 69 chunk.items <- [item]; 70 t.head <- Some chunk; 71 t.tail <- Some chunk 72 | Some head -> 73 if List.length head.items >= head.max_size then begin 74 (* Create new chunk *) 75 let chunk = create_chunk t in 76 chunk.items <- [item]; 77 chunk.next <- Some head; 78 head.prev <- Some chunk; 79 t.head <- Some chunk 80 end else 81 head.items <- item :: head.items 82 83 (** Push items to the front (for back-pagination) *) 84 let push_front_items t items = 85 List.iter (fun item -> push_front t item) (List.rev items) 86 87 (** Iterate over all items from oldest to newest *) 88 let iter f t = 89 let rec iter_chunk = function 90 | None -> () 91 | Some chunk -> 92 List.iter f chunk.items; 93 iter_chunk chunk.next 94 in 95 iter_chunk t.head 96 97 (** Iterate from newest to oldest *) 98 let iter_rev f t = 99 let rec iter_chunk = function 100 | None -> () 101 | Some chunk -> 102 List.iter f (List.rev chunk.items); 103 iter_chunk chunk.prev 104 in 105 iter_chunk t.tail 106 107 (** Get last N items *) 108 let last_n t n = 109 let result = ref [] in 110 let count = ref 0 in 111 iter_rev (fun item -> 112 if !count < n then begin 113 result := item :: !result; 114 incr count 115 end 116 ) t; 117 !result 118 119 (** Total number of items *) 120 let length t = 121 let count = ref 0 in 122 iter (fun _ -> incr count) t; 123 !count 124 125 (** Find an item by predicate *) 126 let find_opt pred t = 127 let result = ref None in 128 let rec search_chunk = function 129 | None -> () 130 | Some chunk -> 131 (match List.find_opt pred chunk.items with 132 | Some item -> result := Some item 133 | None -> search_chunk chunk.next) 134 in 135 search_chunk t.head; 136 !result 137 138 (** Clear all items *) 139 let clear t = 140 t.head <- None; 141 t.tail <- None 142end 143 144(** Timeline event wrapper with metadata *) 145type event_item = { 146 event : Jsont.json; (* Raw event JSON *) 147 event_id : Matrix_proto.Id.Event_id.t; 148 sender : Matrix_proto.Id.User_id.t; 149 origin_server_ts : int64; 150 event_type : string; (* e.g., "m.room.message" *) 151 (* Local metadata *) 152 local_echo : bool; (* true if this is a local echo not yet confirmed *) 153 decrypted : bool; (* true if this was decrypted from E2EE *) 154} 155 156(** Room state entry *) 157type state_entry = { 158 event_type : string; 159 state_key : string; 160 content : Jsont.json; 161 sender : Matrix_proto.Id.User_id.t; 162 event_id : Matrix_proto.Id.Event_id.t option; 163} 164 165(** Room timeline with state tracking *) 166type t = { 167 room_id : Matrix_proto.Id.Room_id.t; 168 (* Timeline events *) 169 events : event_item LinkedChunk.t; 170 (* Room state: (event_type, state_key) -> state_entry *) 171 mutable state : ((string * string) * state_entry) list; 172 (* Pagination tokens *) 173 mutable prev_batch : string option; 174 mutable next_batch : string option; 175 (* Event ID index for deduplication *) 176 mutable event_ids : string list; 177 (* Maximum events to keep in memory *) 178 max_events : int; 179 (* Room summary *) 180 mutable name : string option; 181 mutable topic : string option; 182 mutable avatar_url : string option; 183 mutable canonical_alias : Matrix_proto.Id.Room_alias.t option; 184 mutable joined_member_count : int; 185 mutable invited_member_count : int; 186 mutable is_encrypted : bool; 187 mutable is_direct : bool; 188 mutable notification_count : int; 189 mutable highlight_count : int; 190} 191 192(** Create a new timeline for a room *) 193let create ~room_id ?(max_events = 1000) () = { 194 room_id; 195 events = LinkedChunk.create (); 196 state = []; 197 prev_batch = None; 198 next_batch = None; 199 event_ids = []; 200 max_events; 201 name = None; 202 topic = None; 203 avatar_url = None; 204 canonical_alias = None; 205 joined_member_count = 0; 206 invited_member_count = 0; 207 is_encrypted = false; 208 is_direct = false; 209 notification_count = 0; 210 highlight_count = 0; 211} 212 213(** Check if an event is already in the timeline *) 214let has_event t event_id = 215 let id_str = Matrix_proto.Id.Event_id.to_string event_id in 216 List.mem id_str t.event_ids 217 218(** Add an event to the timeline *) 219let add_event t ~event ~event_id ~sender ~origin_server_ts ~event_type ?(local_echo = false) ?(decrypted = false) () = 220 let id_str = Matrix_proto.Id.Event_id.to_string event_id in 221 if not (List.mem id_str t.event_ids) then begin 222 let item = { event; event_id; sender; origin_server_ts; event_type; local_echo; decrypted } in 223 LinkedChunk.push_back t.events item; 224 t.event_ids <- id_str :: t.event_ids; 225 (* Trim if over limit *) 226 if LinkedChunk.length t.events > t.max_events then begin 227 (* TODO: Remove oldest events *) 228 () 229 end 230 end 231 232(** Add events from back-pagination (older events) *) 233let add_events_back t events = 234 List.iter (fun (event, event_id, sender, origin_server_ts, event_type, decrypted) -> 235 let id_str = Matrix_proto.Id.Event_id.to_string event_id in 236 if not (List.mem id_str t.event_ids) then begin 237 let item = { event; event_id; sender; origin_server_ts; event_type; local_echo = false; decrypted } in 238 LinkedChunk.push_front t.events item; 239 t.event_ids <- id_str :: t.event_ids 240 end 241 ) events 242 243(** Helper to get a string field from JSON object *) 244let get_json_string_field content field = 245 match content with 246 | Jsont.Object (fields, _meta) -> 247 (* name is (string * Meta.t) and mem is (name * json) *) 248 let find_field name = 249 List.find_opt (fun ((n, _meta), _v) -> String.equal n name) fields 250 in 251 (match find_field field with 252 | Some (_, Jsont.String (s, _)) -> Some s 253 | _ -> None) 254 | _ -> None 255 256(** Update room state from a state event *) 257let update_state t ~event_type ~state_key ~content ~sender ?event_id () = 258 let key = (event_type, state_key) in 259 let entry = { event_type; state_key; content; sender; event_id } in 260 t.state <- (key, entry) :: List.filter (fun (k, _) -> k <> key) t.state; 261 (* Update summary fields based on state *) 262 match event_type with 263 | "m.room.name" -> 264 t.name <- get_json_string_field content "name" 265 | "m.room.topic" -> 266 t.topic <- get_json_string_field content "topic" 267 | "m.room.avatar" -> 268 t.avatar_url <- get_json_string_field content "url" 269 | "m.room.canonical_alias" -> 270 (match get_json_string_field content "alias" with 271 | Some alias -> 272 (match Matrix_proto.Id.Room_alias.of_string alias with 273 | Ok a -> t.canonical_alias <- Some a 274 | Error _ -> ()) 275 | None -> ()) 276 | "m.room.encryption" -> 277 t.is_encrypted <- true 278 | _ -> () 279 280(** Get state for a specific event type and state key *) 281let get_state t ~event_type ~state_key = 282 List.assoc_opt (event_type, state_key) t.state 283 284(** Get all state for an event type *) 285let get_state_by_type t ~event_type = 286 List.filter_map (fun ((et, _sk), entry) -> 287 if et = event_type then Some entry else None 288 ) t.state 289 290(** Get room members from state *) 291let get_members t = 292 get_state_by_type t ~event_type:"m.room.member" 293 294(** Get the last N events *) 295let get_last_events t n = 296 LinkedChunk.last_n t.events n 297 298(** Get all events *) 299let get_all_events t = 300 let result = ref [] in 301 LinkedChunk.iter (fun item -> result := item :: !result) t.events; 302 List.rev !result 303 304(** Find an event by ID *) 305let find_event t event_id = 306 let target = Matrix_proto.Id.Event_id.to_string event_id in 307 LinkedChunk.find_opt (fun (item : event_item) -> 308 String.equal (Matrix_proto.Id.Event_id.to_string item.event_id) target 309 ) t.events 310 311(** Get room display name (computed from state) *) 312let display_name t = 313 match t.name with 314 | Some name -> Some name 315 | None -> 316 match t.canonical_alias with 317 | Some alias -> Some (Matrix_proto.Id.Room_alias.to_string alias) 318 | None -> None (* Would compute from heroes in a full implementation *) 319 320(** Set pagination token for back-pagination *) 321let set_prev_batch t token = 322 t.prev_batch <- Some token 323 324(** Set pagination token for forward sync *) 325let set_next_batch t token = 326 t.next_batch <- Some token 327 328(** Update from sync response *) 329let update_from_sync t ~joined_count ~invited_count ~notification_count ~highlight_count = 330 t.joined_member_count <- joined_count; 331 t.invited_member_count <- invited_count; 332 t.notification_count <- notification_count; 333 t.highlight_count <- highlight_count 334 335(** Clear timeline (but keep state) *) 336let clear_timeline t = 337 LinkedChunk.clear t.events; 338 t.event_ids <- []; 339 t.prev_batch <- None 340 341(** Replace local echo with confirmed event *) 342let confirm_local_echo t ~local_event_id ~confirmed_event_id = 343 let local_id = Matrix_proto.Id.Event_id.to_string local_event_id in 344 let confirmed_id = Matrix_proto.Id.Event_id.to_string confirmed_event_id in 345 (* Find and update the local echo *) 346 let found = ref false in 347 LinkedChunk.iter (fun (item : event_item) -> 348 if String.equal (Matrix_proto.Id.Event_id.to_string item.event_id) local_id then begin 349 (* Can't mutate in LinkedChunk, so we just track dedup *) 350 found := true 351 end 352 ) t.events; 353 if !found then begin 354 t.event_ids <- List.filter (fun id -> id <> local_id) t.event_ids; 355 t.event_ids <- confirmed_id :: t.event_ids 356 end 357 358(* Alias for the timeline create function before Cache module shadows it *) 359let create_timeline = create 360 361(** Room timeline cache - manages timelines for multiple rooms *) 362module Cache = struct 363 type cache = { 364 mutable rooms : (string * t) list; 365 max_rooms : int; 366 } 367 368 let create ?(max_rooms = 100) () = { 369 rooms = []; 370 max_rooms; 371 } 372 373 let get_or_create cache room_id = 374 let room_id_str = Matrix_proto.Id.Room_id.to_string room_id in 375 match List.assoc_opt room_id_str cache.rooms with 376 | Some timeline -> timeline 377 | None -> 378 let timeline = create_timeline ~room_id () in 379 cache.rooms <- (room_id_str, timeline) :: cache.rooms; 380 (* LRU eviction if needed *) 381 if List.length cache.rooms > cache.max_rooms then 382 cache.rooms <- List.rev (List.tl (List.rev cache.rooms)); 383 timeline 384 385 let get cache room_id = 386 let room_id_str = Matrix_proto.Id.Room_id.to_string room_id in 387 List.assoc_opt room_id_str cache.rooms 388 389 let remove cache room_id = 390 let room_id_str = Matrix_proto.Id.Room_id.to_string room_id in 391 cache.rooms <- List.filter (fun (id, _) -> id <> room_id_str) cache.rooms 392 393 let all_room_ids cache = 394 List.filter_map (fun (id_str, _) -> 395 match Matrix_proto.Id.Room_id.of_string id_str with 396 | Ok id -> Some id 397 | Error _ -> None 398 ) cache.rooms 399end 400 401(** Pagination helper for fetching older messages *) 402let paginate_back client t ~limit = 403 match t.prev_batch with 404 | None -> Ok [] (* No more messages *) 405 | Some from -> 406 match Messages.get_messages client ~room_id:t.room_id ~from ~dir:Messages.Backward ~limit () with 407 | Error e -> Error e 408 | Ok response -> 409 (* Update prev_batch for next pagination *) 410 t.prev_batch <- response.Messages.end_; 411 (* Add events to timeline *) 412 let events = List.filter_map (fun (raw_event : Matrix_proto.Event.Raw_event.t) -> 413 (* Extract fields from the Raw_event structure *) 414 match raw_event.event_id with 415 | Some event_id -> 416 let event_type = Matrix_proto.Event.Event_type.to_string raw_event.type_ in 417 let ts = raw_event.origin_server_ts in (* Timestamp.t is int64 *) 418 (* tuple: (event, event_id, sender, ts, event_type, decrypted) *) 419 Some (raw_event.content, event_id, raw_event.sender, ts, event_type, false) 420 | None -> None 421 ) response.Messages.chunk in 422 add_events_back t events; 423 Ok events