Matrix protocol in OCaml, Eio specialised
at main 500 lines 18 kB view raw
1(** Sliding Sync (MSC3575) - Efficient sync protocol. 2 3 Sliding sync is a more efficient alternative to the traditional /sync 4 endpoint, designed for clients with many rooms. *) 5 6(** Room subscription mode *) 7type room_subscription = { 8 required_state : (string * string) list; (* event_type, state_key pairs *) 9 timeline_limit : int option; 10 include_old_rooms : bool option; 11} 12 13let state_pair_list_jsont = 14 (* Decode array of [event_type, state_key] pairs *) 15 Jsont.list (Jsont.list Jsont.string) 16 |> Jsont.map 17 ~dec:(fun pairs -> 18 List.filter_map (function 19 | [et; sk] -> Some (et, sk) 20 | _ -> None) pairs) 21 ~enc:(fun pairs -> 22 List.map (fun (et, sk) -> [et; sk]) pairs) 23 24let room_subscription_jsont = 25 Jsont.Object.( 26 map (fun required_state timeline_limit include_old_rooms -> 27 { required_state; timeline_limit; include_old_rooms }) 28 |> mem "required_state" state_pair_list_jsont ~dec_absent:[] 29 ~enc:(fun t -> t.required_state) 30 |> opt_mem "timeline_limit" Jsont.int ~enc:(fun t -> t.timeline_limit) 31 |> opt_mem "include_old_rooms" Jsont.bool ~enc:(fun t -> t.include_old_rooms) 32 |> finish) 33 34(** List operation for sliding window *) 35type list_op = 36 | Sync of int * int (* start, end - request this range *) 37 | Insert of int * string (* index, room_id *) 38 | Delete of int (* index *) 39 | Invalidate of int * int (* start, end *) 40 41(** Sliding sync list configuration *) 42type list_config = { 43 ranges : (int * int) list; 44 sort : string list option; 45 required_state : (string * string) list; 46 timeline_limit : int option; 47 filters : list_filters option; 48 bump_event_types : string list option; 49} 50 51and list_filters = { 52 is_dm : bool option; 53 spaces : string list option; 54 is_encrypted : bool option; 55 is_invite : bool option; 56 room_types : string list option; 57 not_room_types : string list option; 58 room_name_like : string option; 59 tags : string list option; 60 not_tags : string list option; 61} 62 63let list_filters_jsont = 64 Jsont.Object.( 65 map (fun is_dm spaces is_encrypted is_invite room_types not_room_types 66 room_name_like tags not_tags -> 67 { is_dm; spaces; is_encrypted; is_invite; room_types; not_room_types; 68 room_name_like; tags; not_tags }) 69 |> opt_mem "is_dm" Jsont.bool ~enc:(fun t -> t.is_dm) 70 |> opt_mem "spaces" (Jsont.list Jsont.string) ~enc:(fun t -> t.spaces) 71 |> opt_mem "is_encrypted" Jsont.bool ~enc:(fun t -> t.is_encrypted) 72 |> opt_mem "is_invite" Jsont.bool ~enc:(fun t -> t.is_invite) 73 |> opt_mem "room_types" (Jsont.list Jsont.string) ~enc:(fun t -> t.room_types) 74 |> opt_mem "not_room_types" (Jsont.list Jsont.string) ~enc:(fun t -> t.not_room_types) 75 |> opt_mem "room_name_like" Jsont.string ~enc:(fun t -> t.room_name_like) 76 |> opt_mem "tags" (Jsont.list Jsont.string) ~enc:(fun t -> t.tags) 77 |> opt_mem "not_tags" (Jsont.list Jsont.string) ~enc:(fun t -> t.not_tags) 78 |> finish) 79 80let range_jsont = 81 (* Decode [start, end] pair as (int * int) *) 82 Jsont.list Jsont.int 83 |> Jsont.map 84 ~dec:(function 85 | [a; b] -> (a, b) 86 | _ -> (0, 0)) 87 ~enc:(fun (a, b) -> [a; b]) 88 89let ranges_jsont = 90 (* List of ranges *) 91 Jsont.list range_jsont 92 93let state_pair_jsont = 94 (* Decode [event_type, state_key] as (string * string) *) 95 Jsont.list Jsont.string 96 |> Jsont.map 97 ~dec:(function 98 | [et; sk] -> (et, sk) 99 | _ -> ("", "")) 100 ~enc:(fun (et, sk) -> [et; sk]) 101 102let list_config_jsont = 103 Jsont.Object.( 104 map (fun ranges sort required_state timeline_limit filters bump_event_types -> 105 { ranges; sort; required_state; timeline_limit; filters; bump_event_types }) 106 |> mem "ranges" ranges_jsont ~dec_absent:[] ~enc:(fun t -> t.ranges) 107 |> opt_mem "sort" (Jsont.list Jsont.string) ~enc:(fun t -> t.sort) 108 |> mem "required_state" (Jsont.list state_pair_jsont) ~dec_absent:[] 109 ~enc:(fun t -> t.required_state) 110 |> opt_mem "timeline_limit" Jsont.int ~enc:(fun t -> t.timeline_limit) 111 |> opt_mem "filters" list_filters_jsont ~enc:(fun t -> t.filters) 112 |> opt_mem "bump_event_types" (Jsont.list Jsont.string) 113 ~enc:(fun t -> t.bump_event_types) 114 |> finish) 115 116(** Sliding sync request *) 117type request = { 118 lists : (string * list_config) list; 119 room_subscriptions : (string * room_subscription) list; 120 unsubscribe_rooms : string list; 121 extensions : extensions option; 122 pos : string option; 123 timeout : int option; 124} 125 126and extensions = { 127 to_device : to_device_ext option; 128 e2ee : e2ee_ext option; 129 account_data : account_data_ext option; 130 typing : typing_ext option; 131 receipts : receipts_ext option; 132} 133 134and to_device_ext = { 135 enabled : bool; 136 since : string option; 137 limit : int option; 138} 139 140and e2ee_ext = { 141 enabled : bool; 142} 143 144and account_data_ext = { 145 enabled : bool; 146 lists : string list option; 147 rooms : string list option; 148} 149 150and typing_ext = { 151 enabled : bool; 152 lists : string list option; 153 rooms : string list option; 154} 155 156and receipts_ext = { 157 enabled : bool; 158 lists : string list option; 159 rooms : string list option; 160} 161 162let to_device_ext_jsont = 163 Jsont.Object.( 164 map (fun enabled since limit -> { enabled; since; limit }) 165 |> mem "enabled" Jsont.bool ~enc:(fun t -> t.enabled) 166 |> opt_mem "since" Jsont.string ~enc:(fun t -> t.since) 167 |> opt_mem "limit" Jsont.int ~enc:(fun t -> t.limit) 168 |> finish) 169 170let e2ee_ext_jsont = 171 Jsont.Object.( 172 map (fun enabled -> ({ enabled } : e2ee_ext)) 173 |> mem "enabled" Jsont.bool ~enc:(fun (t : e2ee_ext) -> t.enabled) 174 |> finish) 175 176let account_data_ext_jsont = 177 Jsont.Object.( 178 map (fun enabled lists rooms -> ({ enabled; lists; rooms } : account_data_ext)) 179 |> mem "enabled" Jsont.bool ~enc:(fun (t : account_data_ext) -> t.enabled) 180 |> opt_mem "lists" (Jsont.list Jsont.string) ~enc:(fun (t : account_data_ext) -> t.lists) 181 |> opt_mem "rooms" (Jsont.list Jsont.string) ~enc:(fun (t : account_data_ext) -> t.rooms) 182 |> finish) 183 184let typing_ext_jsont = 185 Jsont.Object.( 186 map (fun enabled lists rooms -> ({ enabled; lists; rooms } : typing_ext)) 187 |> mem "enabled" Jsont.bool ~enc:(fun (t : typing_ext) -> t.enabled) 188 |> opt_mem "lists" (Jsont.list Jsont.string) ~enc:(fun (t : typing_ext) -> t.lists) 189 |> opt_mem "rooms" (Jsont.list Jsont.string) ~enc:(fun (t : typing_ext) -> t.rooms) 190 |> finish) 191 192let receipts_ext_jsont = 193 Jsont.Object.( 194 map (fun enabled lists rooms -> ({ enabled; lists; rooms } : receipts_ext)) 195 |> mem "enabled" Jsont.bool ~enc:(fun (t : receipts_ext) -> t.enabled) 196 |> opt_mem "lists" (Jsont.list Jsont.string) ~enc:(fun (t : receipts_ext) -> t.lists) 197 |> opt_mem "rooms" (Jsont.list Jsont.string) ~enc:(fun (t : receipts_ext) -> t.rooms) 198 |> finish) 199 200let extensions_jsont = 201 Jsont.Object.( 202 map (fun to_device e2ee account_data typing receipts -> 203 { to_device; e2ee; account_data; typing; receipts }) 204 |> opt_mem "to_device" to_device_ext_jsont ~enc:(fun t -> t.to_device) 205 |> opt_mem "e2ee" e2ee_ext_jsont ~enc:(fun t -> t.e2ee) 206 |> opt_mem "account_data" account_data_ext_jsont ~enc:(fun t -> t.account_data) 207 |> opt_mem "typing" typing_ext_jsont ~enc:(fun t -> t.typing) 208 |> opt_mem "receipts" receipts_ext_jsont ~enc:(fun t -> t.receipts) 209 |> finish) 210 211module StringMap = Map.Make(String) 212 213let string_map_jsont value_jsont = 214 Jsont.Object.as_string_map value_jsont 215 |> Jsont.map 216 ~dec:(fun m -> StringMap.bindings m) 217 ~enc:(fun l -> List.to_seq l |> StringMap.of_seq) 218 219let request_jsont = 220 Jsont.Object.( 221 map (fun lists room_subscriptions unsubscribe_rooms extensions pos timeout -> 222 { lists; room_subscriptions; unsubscribe_rooms; extensions; pos; timeout }) 223 |> mem "lists" (string_map_jsont list_config_jsont) ~dec_absent:[] 224 ~enc:(fun t -> t.lists) 225 |> mem "room_subscriptions" (string_map_jsont room_subscription_jsont) ~dec_absent:[] 226 ~enc:(fun t -> t.room_subscriptions) 227 |> mem "unsubscribe_rooms" (Jsont.list Jsont.string) ~dec_absent:[] 228 ~enc:(fun t -> t.unsubscribe_rooms) 229 |> opt_mem "extensions" extensions_jsont ~enc:(fun t -> t.extensions) 230 |> opt_mem "pos" Jsont.string ~enc:(fun t -> t.pos) 231 |> opt_mem "timeout" Jsont.int ~enc:(fun t -> t.timeout) 232 |> finish) 233 234(** Sliding sync response room data *) 235type room_response = { 236 name : string option; 237 avatar : string option; 238 heroes : hero list option; 239 is_dm : bool option; 240 initial : bool option; 241 required_state : Jsont.json list; 242 timeline : Jsont.json list; 243 prev_batch : string option; 244 limited : bool option; 245 joined_count : int option; 246 invited_count : int option; 247 notification_count : int option; 248 highlight_count : int option; 249 num_live : int option; 250 timestamp : int64 option; 251} 252 253and hero = { 254 user_id : string; 255 name : string option; 256 avatar : string option; 257} 258 259let hero_jsont = 260 Jsont.Object.( 261 map (fun user_id name avatar -> ({ user_id; name; avatar } : hero)) 262 |> mem "user_id" Jsont.string ~enc:(fun (t : hero) -> t.user_id) 263 |> opt_mem "name" Jsont.string ~enc:(fun (t : hero) -> t.name) 264 |> opt_mem "avatar" Jsont.string ~enc:(fun (t : hero) -> t.avatar) 265 |> finish) 266 267let room_response_jsont = 268 Jsont.Object.( 269 map (fun name avatar heroes is_dm initial required_state timeline 270 prev_batch limited joined_count invited_count notification_count 271 highlight_count num_live timestamp -> 272 ({ name; avatar; heroes; is_dm; initial; required_state; timeline; 273 prev_batch; limited; joined_count; invited_count; notification_count; 274 highlight_count; num_live; timestamp } : room_response)) 275 |> opt_mem "name" Jsont.string ~enc:(fun (t : room_response) -> t.name) 276 |> opt_mem "avatar" Jsont.string ~enc:(fun (t : room_response) -> t.avatar) 277 |> opt_mem "heroes" (Jsont.list hero_jsont) ~enc:(fun (t : room_response) -> t.heroes) 278 |> opt_mem "is_dm" Jsont.bool ~enc:(fun (t : room_response) -> t.is_dm) 279 |> opt_mem "initial" Jsont.bool ~enc:(fun (t : room_response) -> t.initial) 280 |> mem "required_state" (Jsont.list Jsont.json) ~dec_absent:[] 281 ~enc:(fun (t : room_response) -> t.required_state) 282 |> mem "timeline" (Jsont.list Jsont.json) ~dec_absent:[] ~enc:(fun (t : room_response) -> t.timeline) 283 |> opt_mem "prev_batch" Jsont.string ~enc:(fun (t : room_response) -> t.prev_batch) 284 |> opt_mem "limited" Jsont.bool ~enc:(fun (t : room_response) -> t.limited) 285 |> opt_mem "joined_count" Jsont.int ~enc:(fun (t : room_response) -> t.joined_count) 286 |> opt_mem "invited_count" Jsont.int ~enc:(fun (t : room_response) -> t.invited_count) 287 |> opt_mem "notification_count" Jsont.int ~enc:(fun (t : room_response) -> t.notification_count) 288 |> opt_mem "highlight_count" Jsont.int ~enc:(fun (t : room_response) -> t.highlight_count) 289 |> opt_mem "num_live" Jsont.int ~enc:(fun (t : room_response) -> t.num_live) 290 |> opt_mem "timestamp" Jsont.int64 ~enc:(fun (t : room_response) -> t.timestamp) 291 |> finish) 292 293(** Sliding sync list response *) 294type list_response = { 295 count : int; 296 ops : list_op_response list; 297} 298 299and list_op_response = { 300 op : string; 301 range : (int * int) option; 302 index : int option; 303 room_ids : string list option; 304 room_id : string option; 305} 306 307let list_op_response_jsont = 308 Jsont.Object.( 309 map (fun op range index room_ids room_id -> 310 { op; range; index; room_ids; room_id }) 311 |> mem "op" Jsont.string ~enc:(fun t -> t.op) 312 |> opt_mem "range" range_jsont ~enc:(fun t -> t.range) 313 |> opt_mem "index" Jsont.int ~enc:(fun t -> t.index) 314 |> opt_mem "room_ids" (Jsont.list Jsont.string) ~enc:(fun t -> t.room_ids) 315 |> opt_mem "room_id" Jsont.string ~enc:(fun t -> t.room_id) 316 |> finish) 317 318let list_response_jsont = 319 Jsont.Object.( 320 map (fun count ops -> { count; ops }) 321 |> mem "count" Jsont.int ~dec_absent:0 ~enc:(fun t -> t.count) 322 |> mem "ops" (Jsont.list list_op_response_jsont) ~dec_absent:[] ~enc:(fun t -> t.ops) 323 |> finish) 324 325(** Extensions response *) 326type extensions_response = { 327 to_device : to_device_response option; 328 e2ee : e2ee_response option; 329 account_data : account_data_response option; 330 typing : typing_response option; 331 receipts : receipts_response option; 332} 333 334and to_device_response = { 335 next_batch : string; 336 events : Jsont.json list; 337} 338 339and e2ee_response = { 340 device_lists : device_lists option; 341 device_one_time_keys_count : (string * int) list; 342 device_unused_fallback_key_types : string list; 343} 344 345and device_lists = { 346 changed : string list; 347 left : string list; 348} 349 350and account_data_response = { 351 global : Jsont.json list; 352 rooms : (string * Jsont.json list) list; 353} 354 355and typing_response = { 356 rooms : (string * string list) list; (* room_id -> typing user_ids *) 357} 358 359and receipts_response = { 360 rooms : (string * Jsont.json) list; (* room_id -> receipt content *) 361} 362 363let to_device_response_jsont = 364 Jsont.Object.( 365 map (fun next_batch events -> { next_batch; events }) 366 |> mem "next_batch" Jsont.string ~enc:(fun t -> t.next_batch) 367 |> mem "events" (Jsont.list Jsont.json) ~dec_absent:[] ~enc:(fun t -> t.events) 368 |> finish) 369 370let device_lists_jsont = 371 Jsont.Object.( 372 map (fun changed left -> { changed; left }) 373 |> mem "changed" (Jsont.list Jsont.string) ~dec_absent:[] ~enc:(fun t -> t.changed) 374 |> mem "left" (Jsont.list Jsont.string) ~dec_absent:[] ~enc:(fun t -> t.left) 375 |> finish) 376 377let int_map_jsont = 378 Jsont.Object.as_string_map Jsont.int 379 |> Jsont.map 380 ~dec:(fun m -> StringMap.bindings m) 381 ~enc:(fun l -> List.to_seq l |> StringMap.of_seq) 382 383let e2ee_response_jsont = 384 Jsont.Object.( 385 map (fun device_lists device_one_time_keys_count device_unused_fallback_key_types -> 386 { device_lists; device_one_time_keys_count; device_unused_fallback_key_types }) 387 |> opt_mem "device_lists" device_lists_jsont ~enc:(fun t -> t.device_lists) 388 |> mem "device_one_time_keys_count" int_map_jsont ~dec_absent:[] 389 ~enc:(fun t -> t.device_one_time_keys_count) 390 |> mem "device_unused_fallback_key_types" (Jsont.list Jsont.string) ~dec_absent:[] 391 ~enc:(fun t -> t.device_unused_fallback_key_types) 392 |> finish) 393 394let json_list_map_jsont = 395 Jsont.Object.as_string_map (Jsont.list Jsont.json) 396 |> Jsont.map 397 ~dec:(fun m -> StringMap.bindings m) 398 ~enc:(fun l -> List.to_seq l |> StringMap.of_seq) 399 400let account_data_response_jsont = 401 Jsont.Object.( 402 map (fun global rooms -> { global; rooms }) 403 |> mem "global" (Jsont.list Jsont.json) ~dec_absent:[] ~enc:(fun t -> t.global) 404 |> mem "rooms" json_list_map_jsont ~dec_absent:[] ~enc:(fun t -> t.rooms) 405 |> finish) 406 407let string_list_map_jsont = 408 Jsont.Object.as_string_map (Jsont.list Jsont.string) 409 |> Jsont.map 410 ~dec:(fun m -> StringMap.bindings m) 411 ~enc:(fun l -> List.to_seq l |> StringMap.of_seq) 412 413let typing_response_jsont = 414 Jsont.Object.( 415 map (fun rooms -> ({ rooms } : typing_response)) 416 |> mem "rooms" string_list_map_jsont ~dec_absent:[] ~enc:(fun (t : typing_response) -> t.rooms) 417 |> finish) 418 419let json_map_jsont = 420 Jsont.Object.as_string_map Jsont.json 421 |> Jsont.map 422 ~dec:(fun m -> StringMap.bindings m) 423 ~enc:(fun l -> List.to_seq l |> StringMap.of_seq) 424 425let receipts_response_jsont = 426 Jsont.Object.( 427 map (fun rooms -> ({ rooms } : receipts_response)) 428 |> mem "rooms" json_map_jsont ~dec_absent:[] ~enc:(fun (t : receipts_response) -> t.rooms) 429 |> finish) 430 431let extensions_response_jsont = 432 Jsont.Object.( 433 map (fun to_device e2ee account_data typing receipts -> 434 { to_device; e2ee; account_data; typing; receipts }) 435 |> opt_mem "to_device" to_device_response_jsont ~enc:(fun t -> t.to_device) 436 |> opt_mem "e2ee" e2ee_response_jsont ~enc:(fun t -> t.e2ee) 437 |> opt_mem "account_data" account_data_response_jsont ~enc:(fun t -> t.account_data) 438 |> opt_mem "typing" typing_response_jsont ~enc:(fun t -> t.typing) 439 |> opt_mem "receipts" receipts_response_jsont ~enc:(fun t -> t.receipts) 440 |> finish) 441 442(** Full sliding sync response *) 443type response = { 444 pos : string; 445 lists : (string * list_response) list; 446 rooms : (string * room_response) list; 447 extensions : extensions_response option; 448} 449 450let response_jsont = 451 Jsont.Object.( 452 map (fun pos lists rooms extensions -> 453 { pos; lists; rooms; extensions }) 454 |> mem "pos" Jsont.string ~enc:(fun t -> t.pos) 455 |> mem "lists" (string_map_jsont list_response_jsont) ~dec_absent:[] 456 ~enc:(fun t -> t.lists) 457 |> mem "rooms" (string_map_jsont room_response_jsont) ~dec_absent:[] 458 ~enc:(fun t -> t.rooms) 459 |> opt_mem "extensions" extensions_response_jsont ~enc:(fun t -> t.extensions) 460 |> finish) 461 462(** Perform a sliding sync request. 463 464 This is the main entry point for sliding sync. The client should maintain 465 the position token and include it in subsequent requests. *) 466let sync client ~request () = 467 match Client.encode_body request_jsont request with 468 | Error e -> Error e 469 | Ok body -> 470 (* Note: timeout handling would need to be done at the HTTP client level *) 471 let _ = request.timeout in 472 match Client.post client ~path:"/sync" ~body () with 473 | Error e -> Error e 474 | Ok resp_body -> Client.decode_response response_jsont resp_body 475 476(** Create a default request for initial sync. *) 477let initial_request ?(timeline_limit = 20) ?(room_limit = 20) () = 478 { 479 lists = [ 480 ("all_rooms", { 481 ranges = [(0, room_limit - 1)]; 482 sort = Some ["by_recency"; "by_name"]; 483 required_state = [("m.room.name", ""); ("m.room.avatar", "")]; 484 timeline_limit = Some timeline_limit; 485 filters = None; 486 bump_event_types = Some ["m.room.message"; "m.room.encrypted"]; 487 }) 488 ]; 489 room_subscriptions = []; 490 unsubscribe_rooms = []; 491 extensions = Some { 492 to_device = Some { enabled = true; since = None; limit = Some 100 }; 493 e2ee = Some { enabled = true }; 494 account_data = Some { enabled = true; lists = None; rooms = None }; 495 typing = Some { enabled = true; lists = None; rooms = None }; 496 receipts = Some { enabled = true; lists = None; rooms = None }; 497 }; 498 pos = None; 499 timeout = Some 30000; 500 }