···44 let cursor =
55 match Dream.query req "cursor" with
66 | Some s ->
77- Option.value (int_of_string_opt s) ~default:0
77+ max 0 (Option.value (int_of_string_opt s) ~default:0)
88 | None ->
99 0
1010 in
+19-14
pegasus/lib/sequencer.ml
···518518 (Uri.to_string crawler) (Printexc.to_string exn) ) ) )
519519 Env.crawlers
520520 end ;
521521+ let to_remove = ref [] in
521522 Hashtbl.iter
522523 (fun _ s ->
523524 if not s.closed then (
···525526 if Queue.length s.q > queue_max then (
526527 s.closed <- true ;
527528 s.close_reason <- Some "ConsumerTooSlow" ;
528528- Hashtbl.remove subs s.id ;
529529+ to_remove := s.id :: !to_remove ;
529530 Lwt_condition.broadcast s.cond () )
530531 else Lwt_condition.broadcast s.cond () ) )
531532 subs ;
533533+ List.iter (Hashtbl.remove subs) !to_remove ;
532534 Lwt.return_unit )
533535534536 let latest_seq () = !head_seq
···552554 Hashtbl.remove subs s.id ;
553555 Lwt.return_unit )
554556555555- let ring_after (after : int) : item list =
556556- if !head_seq <= after then []
557557- else
558558- let first = max (!head_seq - !count + 1) (after + 1) in
559559- if first > !head_seq then []
560560- else
561561- let rec collect acc seq =
562562- if seq > !head_seq then List.rev acc
557557+ let ring_after (after : int) : item list Lwt.t =
558558+ Lwt_mutex.with_lock lock (fun () ->
559559+ let head = !head_seq in
560560+ let cnt = !count in
561561+ if head <= after then Lwt.return []
562562+ else
563563+ let first = max (head - cnt + 1) (after + 1) in
564564+ if first > head then Lwt.return []
563565 else
564564- let it = ring.(seq mod ring_size) in
565565- collect (it :: acc) (seq + 1)
566566- in
567567- collect [] first
566566+ let rec collect acc seq =
567567+ if seq > head then List.rev acc
568568+ else
569569+ let it = ring.(seq mod ring_size) in
570570+ collect (it :: acc) (seq + 1)
571571+ in
572572+ Lwt.return (collect [] first) )
568573569574 let rec wait_next (s : subscriber) : item Lwt.t =
570575 if s.closed then failwith "subscriber closed"
···634639 let%lwt head_db = DB.latest_seq conn in
635640 let cutoff = head_db in
636641 (* try backfill from buffer first *)
637637- let ring = Bus.ring_after cursor in
642642+ let%lwt ring = Bus.ring_after cursor in
638643 let ring_covers =
639644 match ring with
640645 | [] ->