Matrix protocol in OCaml, Eio specialised
1(** Room timeline management and event caching.
2
3 This module provides:
4 - Event storage and retrieval
5 - Timeline pagination (forward and backward)
6 - Room state tracking
7 - Event deduplication *)
8
9(** A linked chunk data structure for efficient timeline operations.
10 Based on the matrix-rust-sdk LinkedChunk pattern. *)
11module LinkedChunk = struct
12 type 'a chunk = {
13 mutable items : 'a list;
14 mutable prev : 'a chunk option;
15 mutable next : 'a chunk option;
16 id : int;
17 max_size : int;
18 }
19
20 type 'a t = {
21 mutable head : 'a chunk option;
22 mutable tail : 'a chunk option;
23 mutable next_id : int;
24 max_chunk_size : int;
25 }
26
27 let create ?(max_chunk_size = 50) () = {
28 head = None;
29 tail = None;
30 next_id = 0;
31 max_chunk_size;
32 }
33
34 let create_chunk t =
35 let chunk = {
36 items = [];
37 prev = None;
38 next = None;
39 id = t.next_id;
40 max_size = t.max_chunk_size;
41 } in
42 t.next_id <- t.next_id + 1;
43 chunk
44
45 (** Push item to the back of the timeline *)
46 let push_back t item =
47 match t.tail with
48 | None ->
49 let chunk = create_chunk t in
50 chunk.items <- [item];
51 t.head <- Some chunk;
52 t.tail <- Some chunk
53 | Some tail ->
54 if List.length tail.items >= tail.max_size then begin
55 (* Create new chunk *)
56 let chunk = create_chunk t in
57 chunk.items <- [item];
58 chunk.prev <- Some tail;
59 tail.next <- Some chunk;
60 t.tail <- Some chunk
61 end else
62 tail.items <- tail.items @ [item]
63
64 (** Push item to the front of the timeline *)
65 let push_front t item =
66 match t.head with
67 | None ->
68 let chunk = create_chunk t in
69 chunk.items <- [item];
70 t.head <- Some chunk;
71 t.tail <- Some chunk
72 | Some head ->
73 if List.length head.items >= head.max_size then begin
74 (* Create new chunk *)
75 let chunk = create_chunk t in
76 chunk.items <- [item];
77 chunk.next <- Some head;
78 head.prev <- Some chunk;
79 t.head <- Some chunk
80 end else
81 head.items <- item :: head.items
82
83 (** Push items to the front (for back-pagination) *)
84 let push_front_items t items =
85 List.iter (fun item -> push_front t item) (List.rev items)
86
87 (** Iterate over all items from oldest to newest *)
88 let iter f t =
89 let rec iter_chunk = function
90 | None -> ()
91 | Some chunk ->
92 List.iter f chunk.items;
93 iter_chunk chunk.next
94 in
95 iter_chunk t.head
96
97 (** Iterate from newest to oldest *)
98 let iter_rev f t =
99 let rec iter_chunk = function
100 | None -> ()
101 | Some chunk ->
102 List.iter f (List.rev chunk.items);
103 iter_chunk chunk.prev
104 in
105 iter_chunk t.tail
106
107 (** Get last N items *)
108 let last_n t n =
109 let result = ref [] in
110 let count = ref 0 in
111 iter_rev (fun item ->
112 if !count < n then begin
113 result := item :: !result;
114 incr count
115 end
116 ) t;
117 !result
118
119 (** Total number of items *)
120 let length t =
121 let count = ref 0 in
122 iter (fun _ -> incr count) t;
123 !count
124
125 (** Find an item by predicate *)
126 let find_opt pred t =
127 let result = ref None in
128 let rec search_chunk = function
129 | None -> ()
130 | Some chunk ->
131 (match List.find_opt pred chunk.items with
132 | Some item -> result := Some item
133 | None -> search_chunk chunk.next)
134 in
135 search_chunk t.head;
136 !result
137
138 (** Clear all items *)
139 let clear t =
140 t.head <- None;
141 t.tail <- None
142end
143
144(** Timeline event wrapper with metadata *)
145type event_item = {
146 event : Jsont.json; (* Raw event JSON *)
147 event_id : Matrix_proto.Id.Event_id.t;
148 sender : Matrix_proto.Id.User_id.t;
149 origin_server_ts : int64;
150 event_type : string; (* e.g., "m.room.message" *)
151 (* Local metadata *)
152 local_echo : bool; (* true if this is a local echo not yet confirmed *)
153 decrypted : bool; (* true if this was decrypted from E2EE *)
154}
155
156(** Room state entry *)
157type state_entry = {
158 event_type : string;
159 state_key : string;
160 content : Jsont.json;
161 sender : Matrix_proto.Id.User_id.t;
162 event_id : Matrix_proto.Id.Event_id.t option;
163}
164
165(** Room timeline with state tracking *)
166type t = {
167 room_id : Matrix_proto.Id.Room_id.t;
168 (* Timeline events *)
169 events : event_item LinkedChunk.t;
170 (* Room state: (event_type, state_key) -> state_entry *)
171 mutable state : ((string * string) * state_entry) list;
172 (* Pagination tokens *)
173 mutable prev_batch : string option;
174 mutable next_batch : string option;
175 (* Event ID index for deduplication *)
176 mutable event_ids : string list;
177 (* Maximum events to keep in memory *)
178 max_events : int;
179 (* Room summary *)
180 mutable name : string option;
181 mutable topic : string option;
182 mutable avatar_url : string option;
183 mutable canonical_alias : Matrix_proto.Id.Room_alias.t option;
184 mutable joined_member_count : int;
185 mutable invited_member_count : int;
186 mutable is_encrypted : bool;
187 mutable is_direct : bool;
188 mutable notification_count : int;
189 mutable highlight_count : int;
190}
191
192(** Create a new timeline for a room *)
193let create ~room_id ?(max_events = 1000) () = {
194 room_id;
195 events = LinkedChunk.create ();
196 state = [];
197 prev_batch = None;
198 next_batch = None;
199 event_ids = [];
200 max_events;
201 name = None;
202 topic = None;
203 avatar_url = None;
204 canonical_alias = None;
205 joined_member_count = 0;
206 invited_member_count = 0;
207 is_encrypted = false;
208 is_direct = false;
209 notification_count = 0;
210 highlight_count = 0;
211}
212
213(** Check if an event is already in the timeline *)
214let has_event t event_id =
215 let id_str = Matrix_proto.Id.Event_id.to_string event_id in
216 List.mem id_str t.event_ids
217
218(** Add an event to the timeline *)
219let add_event t ~event ~event_id ~sender ~origin_server_ts ~event_type ?(local_echo = false) ?(decrypted = false) () =
220 let id_str = Matrix_proto.Id.Event_id.to_string event_id in
221 if not (List.mem id_str t.event_ids) then begin
222 let item = { event; event_id; sender; origin_server_ts; event_type; local_echo; decrypted } in
223 LinkedChunk.push_back t.events item;
224 t.event_ids <- id_str :: t.event_ids;
225 (* Trim if over limit *)
226 if LinkedChunk.length t.events > t.max_events then begin
227 (* TODO: Remove oldest events *)
228 ()
229 end
230 end
231
232(** Add events from back-pagination (older events) *)
233let add_events_back t events =
234 List.iter (fun (event, event_id, sender, origin_server_ts, event_type, decrypted) ->
235 let id_str = Matrix_proto.Id.Event_id.to_string event_id in
236 if not (List.mem id_str t.event_ids) then begin
237 let item = { event; event_id; sender; origin_server_ts; event_type; local_echo = false; decrypted } in
238 LinkedChunk.push_front t.events item;
239 t.event_ids <- id_str :: t.event_ids
240 end
241 ) events
242
243(** Helper to get a string field from JSON object *)
244let get_json_string_field content field =
245 match content with
246 | Jsont.Object (fields, _meta) ->
247 (* name is (string * Meta.t) and mem is (name * json) *)
248 let find_field name =
249 List.find_opt (fun ((n, _meta), _v) -> String.equal n name) fields
250 in
251 (match find_field field with
252 | Some (_, Jsont.String (s, _)) -> Some s
253 | _ -> None)
254 | _ -> None
255
256(** Update room state from a state event *)
257let update_state t ~event_type ~state_key ~content ~sender ?event_id () =
258 let key = (event_type, state_key) in
259 let entry = { event_type; state_key; content; sender; event_id } in
260 t.state <- (key, entry) :: List.filter (fun (k, _) -> k <> key) t.state;
261 (* Update summary fields based on state *)
262 match event_type with
263 | "m.room.name" ->
264 t.name <- get_json_string_field content "name"
265 | "m.room.topic" ->
266 t.topic <- get_json_string_field content "topic"
267 | "m.room.avatar" ->
268 t.avatar_url <- get_json_string_field content "url"
269 | "m.room.canonical_alias" ->
270 (match get_json_string_field content "alias" with
271 | Some alias ->
272 (match Matrix_proto.Id.Room_alias.of_string alias with
273 | Ok a -> t.canonical_alias <- Some a
274 | Error _ -> ())
275 | None -> ())
276 | "m.room.encryption" ->
277 t.is_encrypted <- true
278 | _ -> ()
279
280(** Get state for a specific event type and state key *)
281let get_state t ~event_type ~state_key =
282 List.assoc_opt (event_type, state_key) t.state
283
284(** Get all state for an event type *)
285let get_state_by_type t ~event_type =
286 List.filter_map (fun ((et, _sk), entry) ->
287 if et = event_type then Some entry else None
288 ) t.state
289
290(** Get room members from state *)
291let get_members t =
292 get_state_by_type t ~event_type:"m.room.member"
293
294(** Get the last N events *)
295let get_last_events t n =
296 LinkedChunk.last_n t.events n
297
298(** Get all events *)
299let get_all_events t =
300 let result = ref [] in
301 LinkedChunk.iter (fun item -> result := item :: !result) t.events;
302 List.rev !result
303
304(** Find an event by ID *)
305let find_event t event_id =
306 let target = Matrix_proto.Id.Event_id.to_string event_id in
307 LinkedChunk.find_opt (fun (item : event_item) ->
308 String.equal (Matrix_proto.Id.Event_id.to_string item.event_id) target
309 ) t.events
310
311(** Get room display name (computed from state) *)
312let display_name t =
313 match t.name with
314 | Some name -> Some name
315 | None ->
316 match t.canonical_alias with
317 | Some alias -> Some (Matrix_proto.Id.Room_alias.to_string alias)
318 | None -> None (* Would compute from heroes in a full implementation *)
319
320(** Set pagination token for back-pagination *)
321let set_prev_batch t token =
322 t.prev_batch <- Some token
323
324(** Set pagination token for forward sync *)
325let set_next_batch t token =
326 t.next_batch <- Some token
327
328(** Update from sync response *)
329let update_from_sync t ~joined_count ~invited_count ~notification_count ~highlight_count =
330 t.joined_member_count <- joined_count;
331 t.invited_member_count <- invited_count;
332 t.notification_count <- notification_count;
333 t.highlight_count <- highlight_count
334
335(** Clear timeline (but keep state) *)
336let clear_timeline t =
337 LinkedChunk.clear t.events;
338 t.event_ids <- [];
339 t.prev_batch <- None
340
341(** Replace local echo with confirmed event *)
342let confirm_local_echo t ~local_event_id ~confirmed_event_id =
343 let local_id = Matrix_proto.Id.Event_id.to_string local_event_id in
344 let confirmed_id = Matrix_proto.Id.Event_id.to_string confirmed_event_id in
345 (* Find and update the local echo *)
346 let found = ref false in
347 LinkedChunk.iter (fun (item : event_item) ->
348 if String.equal (Matrix_proto.Id.Event_id.to_string item.event_id) local_id then begin
349 (* Can't mutate in LinkedChunk, so we just track dedup *)
350 found := true
351 end
352 ) t.events;
353 if !found then begin
354 t.event_ids <- List.filter (fun id -> id <> local_id) t.event_ids;
355 t.event_ids <- confirmed_id :: t.event_ids
356 end
357
358(* Alias for the timeline create function before Cache module shadows it *)
359let create_timeline = create
360
361(** Room timeline cache - manages timelines for multiple rooms *)
362module Cache = struct
363 type cache = {
364 mutable rooms : (string * t) list;
365 max_rooms : int;
366 }
367
368 let create ?(max_rooms = 100) () = {
369 rooms = [];
370 max_rooms;
371 }
372
373 let get_or_create cache room_id =
374 let room_id_str = Matrix_proto.Id.Room_id.to_string room_id in
375 match List.assoc_opt room_id_str cache.rooms with
376 | Some timeline -> timeline
377 | None ->
378 let timeline = create_timeline ~room_id () in
379 cache.rooms <- (room_id_str, timeline) :: cache.rooms;
380 (* LRU eviction if needed *)
381 if List.length cache.rooms > cache.max_rooms then
382 cache.rooms <- List.rev (List.tl (List.rev cache.rooms));
383 timeline
384
385 let get cache room_id =
386 let room_id_str = Matrix_proto.Id.Room_id.to_string room_id in
387 List.assoc_opt room_id_str cache.rooms
388
389 let remove cache room_id =
390 let room_id_str = Matrix_proto.Id.Room_id.to_string room_id in
391 cache.rooms <- List.filter (fun (id, _) -> id <> room_id_str) cache.rooms
392
393 let all_room_ids cache =
394 List.filter_map (fun (id_str, _) ->
395 match Matrix_proto.Id.Room_id.of_string id_str with
396 | Ok id -> Some id
397 | Error _ -> None
398 ) cache.rooms
399end
400
401(** Pagination helper for fetching older messages *)
402let paginate_back client t ~limit =
403 match t.prev_batch with
404 | None -> Ok [] (* No more messages *)
405 | Some from ->
406 match Messages.get_messages client ~room_id:t.room_id ~from ~dir:Messages.Backward ~limit () with
407 | Error e -> Error e
408 | Ok response ->
409 (* Update prev_batch for next pagination *)
410 t.prev_batch <- response.Messages.end_;
411 (* Add events to timeline *)
412 let events = List.filter_map (fun (raw_event : Matrix_proto.Event.Raw_event.t) ->
413 (* Extract fields from the Raw_event structure *)
414 match raw_event.event_id with
415 | Some event_id ->
416 let event_type = Matrix_proto.Event.Event_type.to_string raw_event.type_ in
417 let ts = raw_event.origin_server_ts in (* Timestamp.t is int64 *)
418 (* tuple: (event, event_id, sender, ts, event_type, decrypted) *)
419 Some (raw_event.content, event_id, raw_event.sender, ts, event_type, false)
420 | None -> None
421 ) response.Messages.chunk in
422 add_events_back t events;
423 Ok events