Auto-indexing service and GraphQL API for AT Protocol Records
at main 62 lines 1.6 kB view raw
1import database/executor.{type Executor} 2import database/repositories/jetstream_activity 3import gleam/erlang/process 4import gleam/otp/actor 5import gleam/string 6import logging 7 8/// Message types for the cleanup actor 9pub type Message { 10 Cleanup 11 Shutdown 12} 13 14type State { 15 State(db: Executor, self: process.Subject(Message)) 16} 17 18/// Start the cleanup scheduler 19/// Returns a Subject that can be used to send messages to the scheduler 20pub fn start(db: Executor) -> Result(process.Subject(Message), actor.StartError) { 21 let initial_state = State(db: db, self: process.new_subject()) 22 23 let result = 24 actor.new(initial_state) 25 |> actor.on_message(handle_message) 26 |> actor.start 27 28 // Schedule first cleanup after 1 hour 29 case result { 30 Ok(started) -> { 31 let _ = process.send_after(started.data, 3_600_000, Cleanup) 32 Ok(started.data) 33 } 34 Error(reason) -> Error(reason) 35 } 36} 37 38fn handle_message(state: State, message: Message) -> actor.Next(State, Message) { 39 case message { 40 Cleanup -> { 41 // Clean up activity entries older than 7 days (168 hours) 42 case jetstream_activity.cleanup_old_activity(state.db, 168) { 43 Ok(_) -> Nil 44 Error(err) -> { 45 logging.log( 46 logging.Error, 47 "[cleanup] Failed to cleanup old activity: " <> string.inspect(err), 48 ) 49 } 50 } 51 52 // Schedule next cleanup in 1 hour (3600000 milliseconds) 53 let _ = process.send_after(state.self, 3_600_000, Cleanup) 54 55 actor.continue(state) 56 } 57 Shutdown -> { 58 logging.log(logging.Info, "[cleanup] Shutting down cleanup scheduler") 59 actor.stop() 60 } 61 } 62}