forked from
slices.network/quickslice
Auto-indexing service and GraphQL API for AT Protocol Records
1import database/executor.{type Executor}
2import database/repositories/jetstream_activity
3import gleam/erlang/process
4import gleam/otp/actor
5import gleam/string
6import logging
7
8/// Message types for the cleanup actor
9pub type Message {
10 Cleanup
11 Shutdown
12}
13
14type State {
15 State(db: Executor, self: process.Subject(Message))
16}
17
18/// Start the cleanup scheduler
19/// Returns a Subject that can be used to send messages to the scheduler
20pub fn start(db: Executor) -> Result(process.Subject(Message), actor.StartError) {
21 let initial_state = State(db: db, self: process.new_subject())
22
23 let result =
24 actor.new(initial_state)
25 |> actor.on_message(handle_message)
26 |> actor.start
27
28 // Schedule first cleanup after 1 hour
29 case result {
30 Ok(started) -> {
31 let _ = process.send_after(started.data, 3_600_000, Cleanup)
32 Ok(started.data)
33 }
34 Error(reason) -> Error(reason)
35 }
36}
37
38fn handle_message(state: State, message: Message) -> actor.Next(State, Message) {
39 case message {
40 Cleanup -> {
41 // Clean up activity entries older than 7 days (168 hours)
42 case jetstream_activity.cleanup_old_activity(state.db, 168) {
43 Ok(_) -> Nil
44 Error(err) -> {
45 logging.log(
46 logging.Error,
47 "[cleanup] Failed to cleanup old activity: " <> string.inspect(err),
48 )
49 }
50 }
51
52 // Schedule next cleanup in 1 hour (3600000 milliseconds)
53 let _ = process.send_after(state.self, 3_600_000, Cleanup)
54
55 actor.continue(state)
56 }
57 Shutdown -> {
58 logging.log(logging.Info, "[cleanup] Shutting down cleanup scheduler")
59 actor.stop()
60 }
61 }
62}