My aggregated monorepo of OCaml code, automaintained

Run message handler concurrently with broadcast loop in poe

The loop command now responds to DMs and mentions while sleeping
between broadcast intervals, using Eio.Fiber.both to run both
loops concurrently.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+135 -58
+13
ocaml-zulip/lib/zulip_bot/bot.mli
··· 142 142 (** [fetch_identity client] retrieves the bot's identity from the Zulip server. 143 143 144 144 @raise Eio.Io on API errors *) 145 + 146 + val process_event : 147 + client:Zulip.Client.t -> 148 + storage:Storage.t -> 149 + identity:identity -> 150 + handler:handler -> 151 + Zulip.Event.t -> 152 + unit 153 + (** [process_event ~client ~storage ~identity ~handler event] processes a single 154 + Zulip event. 155 + 156 + This is useful for custom event loops that need finer control over event 157 + processing than [run] provides. *)
+12 -1
poe/bin/main.ml
··· 183 183 Eio_main.run @@ fun env -> 184 184 Eio.Switch.run @@ fun sw -> 185 185 let fs = Eio.Stdenv.fs env in 186 + let process_mgr = Eio.Stdenv.process_mgr env in 187 + let clock = Eio.Stdenv.clock env in 186 188 187 189 (* Load poe config: explicit path > XDG > current dir > defaults *) 188 190 let poe_config = ··· 198 200 in 199 201 200 202 let zulip_config = Zulip_bot.Config.load_or_env ~xdg_app:"poe" ~fs bot_name in 203 + 204 + (* Create handler environment *) 205 + let handler_env : _ Poe.Handler.env = 206 + { sw; process_mgr; clock; fs } 207 + in 208 + 209 + (* Create handler for message processing *) 210 + let handler = Poe.Handler.make_handler handler_env poe_config in 211 + 201 212 Logs.info (fun m -> 202 213 m "Starting loop, broadcasting to %s/%s every %d seconds" 203 214 poe_config.channel poe_config.topic interval); 204 - Poe.Loop.run ~sw ~env ~config:poe_config ~zulip_config ~interval 215 + Poe.Loop.run ~sw ~env ~config:poe_config ~zulip_config ~handler ~interval 205 216 in 206 217 let doc = "Run polling loop to check for and broadcast changes" in 207 218 let info = Cmd.info "loop" ~doc in
+96 -51
poe/lib/loop.ml
··· 74 74 let resp = Zulip.Messages.send client msg in 75 75 Log.info (fun m -> m "Broadcast sent, message ID: %d" (Zulip.Message_response.id resp)) 76 76 77 - let run ~sw ~env ~config ~zulip_config ~interval = 77 + let run ~sw ~env ~config ~zulip_config ~handler ~interval = 78 78 let fs = Eio.Stdenv.fs env in 79 79 let proc = Eio.Stdenv.process_mgr env in 80 80 let clock = Eio.Stdenv.clock env in ··· 88 88 89 89 Log.info (fun m -> m "Starting loop with %d second interval" interval); 90 90 91 - let rec loop () = 92 - Log.info (fun m -> m "Checking for changes..."); 91 + let broadcast_loop () = 92 + let rec loop () = 93 + Log.info (fun m -> m "Checking for changes..."); 93 94 94 - (* Pull latest changes from remote *) 95 - let _pull_ok = run_git_pull ~proc ~cwd:monorepo_path in 95 + (* Pull latest changes from remote *) 96 + let _pull_ok = run_git_pull ~proc ~cwd:monorepo_path in 96 97 97 - (* Get current git HEAD *) 98 - let current_head = get_git_head ~proc ~cwd:monorepo_path in 99 - let last_head = Admin.get_last_git_head storage in 98 + (* Get current git HEAD *) 99 + let current_head = get_git_head ~proc ~cwd:monorepo_path in 100 + let last_head = Admin.get_last_git_head storage in 100 101 101 - Log.debug (fun m -> m "Current HEAD: %s, Last HEAD: %s" 102 - (Option.value ~default:"unknown" current_head) 103 - (Option.value ~default:"unknown" last_head)); 102 + Log.debug (fun m -> m "Current HEAD: %s, Last HEAD: %s" 103 + (Option.value ~default:"unknown" current_head) 104 + (Option.value ~default:"unknown" last_head)); 104 105 105 - (* Check if HEAD has changed *) 106 - let head_changed = match (current_head, last_head) with 107 - | (Some c, Some l) -> c <> l 108 - | (Some _, None) -> true (* First run *) 109 - | _ -> false 110 - in 106 + (* Check if HEAD has changed *) 107 + let head_changed = match (current_head, last_head) with 108 + | (Some c, Some l) -> c <> l 109 + | (Some _, None) -> true (* First run *) 110 + | _ -> false 111 + in 111 112 112 - if head_changed then begin 113 - Log.info (fun m -> m "Git HEAD changed, generating changes..."); 113 + if head_changed then begin 114 + Log.info (fun m -> m "Git HEAD changed, generating changes..."); 114 115 115 - (* Run monopam to generate changes *) 116 - let _success = run_monopam_changes ~proc ~cwd:monorepo_path in 116 + (* Run monopam to generate changes *) 117 + let _success = run_monopam_changes ~proc ~cwd:monorepo_path in 117 118 118 - (* Load changes since last broadcast *) 119 - let last_broadcast = Admin.get_last_broadcast_time storage in 120 - let since = match last_broadcast with 121 - | None -> 122 - let now = Ptime_clock.now () in 123 - let day_ago = Ptime.Span.of_int_s (24 * 60 * 60) in 124 - Option.value ~default:Ptime.epoch (Ptime.sub_span now day_ago) 125 - | Some t -> t 126 - in 119 + (* Load changes since last broadcast *) 120 + let last_broadcast = Admin.get_last_broadcast_time storage in 121 + let since = match last_broadcast with 122 + | None -> 123 + let now = Ptime_clock.now () in 124 + let day_ago = Ptime.Span.of_int_s (24 * 60 * 60) in 125 + Option.value ~default:Ptime.epoch (Ptime.sub_span now day_ago) 126 + | Some t -> t 127 + in 127 128 128 - match Monopam.Changes.Query.changes_since ~fs ~changes_dir ~since with 129 - | Error e -> 130 - Log.warn (fun m -> m "Error loading changes: %s" e) 131 - | Ok entries when entries = [] -> 132 - Log.info (fun m -> m "No new changes to broadcast") 133 - | Ok entries -> 134 - Log.info (fun m -> m "Broadcasting %d new entries" (List.length entries)); 135 - send_changes ~client ~stream:config.Config.channel 136 - ~topic:config.Config.topic ~entries; 129 + match Monopam.Changes.Query.changes_since ~fs ~changes_dir ~since with 130 + | Error e -> 131 + Log.warn (fun m -> m "Error loading changes: %s" e) 132 + | Ok entries when entries = [] -> 133 + Log.info (fun m -> m "No new changes to broadcast") 134 + | Ok entries -> 135 + Log.info (fun m -> m "Broadcasting %d new entries" (List.length entries)); 136 + send_changes ~client ~stream:config.Config.channel 137 + ~topic:config.Config.topic ~entries; 137 138 138 - (* Update storage *) 139 - let now = Ptime_clock.now () in 140 - Admin.set_last_broadcast_time storage now; 141 - Option.iter (Admin.set_last_git_head storage) current_head; 142 - Log.info (fun m -> m "Updated broadcast time and git HEAD") 143 - end 144 - else 145 - Log.debug (fun m -> m "No HEAD change, skipping"); 139 + (* Update storage *) 140 + let now = Ptime_clock.now () in 141 + Admin.set_last_broadcast_time storage now; 142 + Option.iter (Admin.set_last_git_head storage) current_head; 143 + Log.info (fun m -> m "Updated broadcast time and git HEAD") 144 + end 145 + else 146 + Log.debug (fun m -> m "No HEAD change, skipping"); 146 147 147 - (* Sleep until next check *) 148 - Log.info (fun m -> m "Sleeping for %d seconds" interval); 149 - Eio.Time.sleep clock (float_of_int interval); 148 + (* Sleep until next check *) 149 + Log.info (fun m -> m "Sleeping for %d seconds" interval); 150 + Eio.Time.sleep clock (float_of_int interval); 151 + loop () 152 + in 150 153 loop () 151 154 in 152 155 153 - loop () 156 + (* Run broadcast loop and message handler concurrently *) 157 + Eio.Fiber.both 158 + broadcast_loop 159 + (fun () -> 160 + Log.info (fun m -> m "Starting message handler"); 161 + let identity = Zulip_bot.Bot.fetch_identity client in 162 + Log.info (fun m -> 163 + m "Bot identity: %s <%s> (id: %d)" identity.full_name identity.email 164 + identity.user_id); 165 + let queue = 166 + Zulip.Event_queue.register client 167 + ~event_types:[ Zulip.Event_type.Message ] 168 + () 169 + in 170 + Log.info (fun m -> 171 + m "Event queue registered: %s" (Zulip.Event_queue.id queue)); 172 + let rec event_loop last_event_id = 173 + try 174 + let events = 175 + Zulip.Event_queue.get_events queue client ~last_event_id () 176 + in 177 + if List.length events > 0 then 178 + Log.info (fun m -> m "Received %d event(s)" (List.length events)); 179 + List.iter 180 + (fun event -> 181 + Log.debug (fun m -> 182 + m "Event id=%d, type=%s" (Zulip.Event.id event) 183 + (Zulip.Event_type.to_string (Zulip.Event.type_ event))); 184 + Zulip_bot.Bot.process_event ~client ~storage ~identity ~handler event) 185 + events; 186 + let new_last_id = 187 + List.fold_left 188 + (fun max_id event -> max (Zulip.Event.id event) max_id) 189 + last_event_id events 190 + in 191 + event_loop new_last_id 192 + with Eio.Exn.Io (e, _) -> 193 + Log.warn (fun m -> 194 + m "Error getting events: %a (retrying in 2s)" Eio.Exn.pp_err e); 195 + Eio.Time.sleep clock 2.0; 196 + event_loop last_event_id 197 + in 198 + event_loop (-1))
+14 -6
poe/lib/loop.mli
··· 3 3 SPDX-License-Identifier: ISC 4 4 ---------------------------------------------------------------------------*) 5 5 6 - (** Hourly loop for automated change detection and broadcast. 6 + (** Combined loop for change detection, broadcast, and message handling. 7 7 8 8 This module implements a polling loop that periodically checks for new 9 - changes in the monorepo and broadcasts them to Zulip. *) 9 + changes in the monorepo and broadcasts them to Zulip. It also runs a 10 + concurrent message handler to respond to DMs and mentions while the 11 + broadcast loop sleeps. *) 10 12 11 13 val run : 12 14 sw:Eio.Switch.t -> ··· 17 19 .. > -> 18 20 config:Config.t -> 19 21 zulip_config:Zulip_bot.Config.t -> 22 + handler:Zulip_bot.Bot.handler -> 20 23 interval:int -> 21 - 'a 22 - (** [run ~sw ~env ~config ~zulip_config ~interval] starts the polling loop. 24 + unit 25 + (** [run ~sw ~env ~config ~zulip_config ~handler ~interval] starts both the 26 + polling loop and the message handler concurrently. 23 27 24 - Loop flow: 28 + The broadcast loop flow: 25 29 1. Pull latest changes from remote (git pull --ff-only) 26 30 2. Check if git HEAD has changed (compare with stored last_git_head) 27 31 3. If changed: ··· 32 36 4. Sleep for interval seconds 33 37 5. Repeat 34 38 39 + Concurrently, the message handler listens for incoming Zulip messages 40 + (DMs and mentions) and processes them using the provided handler. 41 + 35 42 @param sw Eio switch for resource management 36 43 @param env Eio environment 37 44 @param config Poe configuration 38 45 @param zulip_config Zulip bot configuration 39 - @param interval Seconds between checks (default: 3600) *) 46 + @param handler Message handler function 47 + @param interval Seconds between broadcast checks (default: 3600) *)