Auto-indexing service and GraphQL API for AT Protocol Records
quickslice.slices.network/
atproto
gleam
graphql
1import backfill
2import database/executor.{type Executor}
3import database/jetstream
4import database/repositories/config as config_repo
5import database/repositories/lexicons
6import envoy
7import event_handler
8import gleam/dynamic/decode
9import gleam/erlang/process
10import gleam/int
11import gleam/list
12import gleam/option
13import gleam/otp/actor
14import gleam/string
15import gleam/time/timestamp
16import goose
17import logging
18
19// ============================================================================
20// CONFIGURATION
21// ============================================================================
22
23/// How long to wait without messages before forcing a restart (in milliseconds)
24const heartbeat_timeout_ms = 300_000
25
26// 5 minutes - restart when stuck
27
28/// How often to check for heartbeat timeouts (in milliseconds)
29const heartbeat_check_interval_ms = 300_000
30
31// 5 minutes - check at the same interval as timeout for consistent triggering
32
33// ============================================================================
34// TYPES
35// ============================================================================
36
37/// Messages that can be sent to the consumer manager
38pub type ManagerMessage {
39 /// Update the last seen message timestamp
40 MessageReceived(timestamp: Int)
41 /// Check if we should restart due to timeout
42 CheckHeartbeat
43 /// Update the self subject after actor starts
44 UpdateSelfSubject(process.Subject(ManagerMessage))
45 /// Update the consumer subject
46 UpdateConsumerSubject(option.Option(process.Subject(Message)))
47 /// Manual restart request
48 ManualRestart(reply_with: process.Subject(Result(Nil, String)))
49 /// Manual stop request
50 ManualStop(reply_with: process.Subject(Nil))
51}
52
53/// State for the consumer manager actor
54pub type ManagerState {
55 ManagerState(
56 db: Executor,
57 last_message_time_ms: Int,
58 consumer_subject: option.Option(process.Subject(Message)),
59 self_subject: process.Subject(ManagerMessage),
60 )
61}
62
63/// Messages that can be sent to the Jetstream consumer actor
64pub type Message {
65 Stop(reply_with: process.Subject(Nil))
66 Restart(reply_with: process.Subject(Result(Nil, String)))
67}
68
69/// Messages for cursor tracker actor
70pub type CursorMessage {
71 UpdateCursor(time_us: Int)
72 FlushCursor(reply_with: process.Subject(Nil))
73}
74
75/// Internal state of the Jetstream consumer actor
76type State {
77 State(
78 db: Executor,
79 consumer_pid: option.Option(process.Pid),
80 cursor_tracker_pid: option.Option(process.Pid),
81 )
82}
83
84/// State for cursor tracker
85type CursorState {
86 CursorState(
87 db: Executor,
88 latest_cursor: option.Option(Int),
89 last_flush_time: Int,
90 )
91}
92
93// ============================================================================
94// CONSUMER MANAGER
95// ============================================================================
96
97/// Start the consumer manager that spawns and monitors the consumer
98pub fn start(db: Executor) -> Result(process.Subject(ManagerMessage), String) {
99 let temp_subject = process.new_subject()
100
101 let state =
102 ManagerState(
103 db: db,
104 last_message_time_ms: get_current_time_milliseconds(),
105 consumer_subject: option.None,
106 self_subject: temp_subject,
107 )
108
109 let result =
110 actor.new(state)
111 |> actor.on_message(handle_manager_message)
112 |> actor.start
113
114 case result {
115 Ok(started) -> {
116 // Update the actor's state with its real subject
117 process.send(started.data, UpdateSelfSubject(started.data))
118
119 // Spawn the initial consumer actor
120 case start_consumer_actor(db, option.Some(started.data)) {
121 Ok(consumer_subject) -> {
122 // Update manager state with consumer subject
123 process.send(
124 started.data,
125 UpdateConsumerSubject(option.Some(consumer_subject)),
126 )
127
128 // Schedule the first heartbeat check
129 process.send_after(
130 started.data,
131 heartbeat_check_interval_ms,
132 CheckHeartbeat,
133 )
134
135 Ok(started.data)
136 }
137 Error(err) -> {
138 // Consumer failed to start, but manager is running for future restarts
139 logging.log(
140 logging.Warning,
141 "[jetstream] Consumer failed to start: " <> err,
142 )
143
144 // Schedule heartbeat check anyway (it will attempt restart)
145 process.send_after(
146 started.data,
147 heartbeat_check_interval_ms,
148 CheckHeartbeat,
149 )
150
151 Ok(started.data)
152 }
153 }
154 }
155 Error(err) ->
156 Error("Failed to start manager actor: " <> string.inspect(err))
157 }
158}
159
160/// Stop the Jetstream consumer
161pub fn stop(manager: process.Subject(ManagerMessage)) -> Nil {
162 // Send stop request through manager
163 let _ = actor.call(manager, waiting: 1000, sending: ManualStop)
164 Nil
165}
166
167/// Restart the Jetstream consumer with fresh lexicon data
168pub fn restart(manager: process.Subject(ManagerMessage)) -> Result(Nil, String) {
169 actor.call(manager, waiting: 5000, sending: ManualRestart)
170}
171
172/// Handle messages sent to the consumer manager
173fn handle_manager_message(
174 state: ManagerState,
175 message: ManagerMessage,
176) -> actor.Next(ManagerState, ManagerMessage) {
177 case message {
178 UpdateSelfSubject(subject) -> {
179 // Update state with the real actor subject
180 let new_state = ManagerState(..state, self_subject: subject)
181 actor.continue(new_state)
182 }
183
184 UpdateConsumerSubject(subject) -> {
185 let new_state = ManagerState(..state, consumer_subject: subject)
186 actor.continue(new_state)
187 }
188
189 MessageReceived(timestamp) -> {
190 // Update the last seen message time
191 let new_state = ManagerState(..state, last_message_time_ms: timestamp)
192 actor.continue(new_state)
193 }
194
195 CheckHeartbeat -> {
196 let current_time = get_current_time_milliseconds()
197 let time_since_last_message = current_time - state.last_message_time_ms
198
199 // Debug logging for slow message rates
200 case time_since_last_message > 60_000 {
201 True ->
202 logging.log(
203 logging.Debug,
204 "[jetstream] Health check: "
205 <> int.to_string(time_since_last_message / 1000)
206 <> "s since last message",
207 )
208 False -> Nil
209 }
210
211 case time_since_last_message > heartbeat_timeout_ms {
212 True -> {
213 // No messages received within timeout - force restart
214 logging.log(
215 logging.Warning,
216 "[jetstream] No messages received for "
217 <> int.to_string(time_since_last_message / 1000)
218 <> " seconds. Restarting consumer...",
219 )
220
221 // Stop old consumer if running
222 case state.consumer_subject {
223 option.Some(subject) -> {
224 let _ = actor.call(subject, waiting: 1000, sending: Stop)
225 Nil
226 }
227 option.None -> Nil
228 }
229
230 // Start new consumer
231 case start_consumer_actor(state.db, option.Some(state.self_subject)) {
232 Ok(new_subject) -> {
233 logging.log(logging.Info, "[jetstream] Consumer restarted")
234
235 // Reset the timer and update state
236 let new_state =
237 ManagerState(
238 ..state,
239 last_message_time_ms: current_time,
240 consumer_subject: option.Some(new_subject),
241 )
242
243 // Schedule next check
244 process.send_after(
245 state.self_subject,
246 heartbeat_check_interval_ms,
247 CheckHeartbeat,
248 )
249
250 actor.continue(new_state)
251 }
252 Error(err) -> {
253 logging.log(
254 logging.Error,
255 "[jetstream] Failed to restart consumer: " <> err,
256 )
257
258 // Schedule next check to retry
259 process.send_after(
260 state.self_subject,
261 heartbeat_check_interval_ms,
262 CheckHeartbeat,
263 )
264
265 actor.continue(
266 ManagerState(..state, consumer_subject: option.None),
267 )
268 }
269 }
270 }
271 False -> {
272 // Still receiving messages - schedule next check
273 process.send_after(
274 state.self_subject,
275 heartbeat_check_interval_ms,
276 CheckHeartbeat,
277 )
278 actor.continue(state)
279 }
280 }
281 }
282
283 ManualRestart(client) -> {
284 logging.log(logging.Info, "[jetstream] Manual restart requested")
285
286 // Stop old consumer if running
287 case state.consumer_subject {
288 option.Some(subject) -> {
289 let _ = actor.call(subject, waiting: 1000, sending: Stop)
290 Nil
291 }
292 option.None -> Nil
293 }
294
295 // Start new consumer
296 case start_consumer_actor(state.db, option.Some(state.self_subject)) {
297 Ok(new_subject) -> {
298 process.send(client, Ok(Nil))
299 actor.continue(
300 ManagerState(
301 ..state,
302 last_message_time_ms: get_current_time_milliseconds(),
303 consumer_subject: option.Some(new_subject),
304 ),
305 )
306 }
307 Error(err) -> {
308 process.send(client, Error(err))
309 actor.continue(ManagerState(..state, consumer_subject: option.None))
310 }
311 }
312 }
313
314 ManualStop(client) -> {
315 logging.log(logging.Info, "[jetstream] Manual stop requested")
316
317 // Stop consumer if running
318 case state.consumer_subject {
319 option.Some(subject) -> {
320 let _ = actor.call(subject, waiting: 1000, sending: Stop)
321 process.send(client, Nil)
322 }
323 option.None -> process.send(client, Nil)
324 }
325
326 actor.continue(ManagerState(..state, consumer_subject: option.None))
327 }
328 }
329}
330
331// ============================================================================
332// CONSUMER ACTOR
333// ============================================================================
334
335/// Start the Jetstream consumer actor (called by manager)
336fn start_consumer_actor(
337 db: Executor,
338 manager: option.Option(process.Subject(ManagerMessage)),
339) -> Result(process.Subject(Message), String) {
340 case start_consumer_process(db, manager) {
341 Ok(consumer_pid) -> {
342 let initial_state =
343 State(
344 db: db,
345 consumer_pid: option.Some(consumer_pid),
346 cursor_tracker_pid: option.None,
347 )
348
349 let result =
350 actor.new(initial_state)
351 |> actor.on_message(handle_message)
352 |> actor.start
353
354 case result {
355 Ok(started) -> Ok(started.data)
356 Error(err) ->
357 Error("Failed to start consumer actor: " <> string.inspect(err))
358 }
359 }
360 Error(err) -> {
361 // Consumer failed to start, but we still create the actor so it can be restarted later
362 logging.log(logging.Warning, "[jetstream] " <> err)
363 let initial_state =
364 State(
365 db: db,
366 consumer_pid: option.None,
367 cursor_tracker_pid: option.None,
368 )
369
370 let result =
371 actor.new(initial_state)
372 |> actor.on_message(handle_message)
373 |> actor.start
374
375 case result {
376 Ok(started) -> Ok(started.data)
377 Error(actor_err) ->
378 Error("Failed to start consumer actor: " <> string.inspect(actor_err))
379 }
380 }
381 }
382}
383
384/// Handle messages sent to the consumer actor
385fn handle_message(state: State, message: Message) -> actor.Next(State, Message) {
386 case message {
387 Stop(client) -> {
388 // Stop the consumer if it's running
389 case state.consumer_pid {
390 option.Some(pid) -> {
391 logging.log(logging.Info, "[jetstream] Stopping consumer...")
392 process.kill(pid)
393 process.send(client, Nil)
394 actor.continue(State(..state, consumer_pid: option.None))
395 }
396 option.None -> {
397 process.send(client, Nil)
398 actor.continue(state)
399 }
400 }
401 }
402
403 Restart(client) -> {
404 // Stop old consumer if running
405 case state.consumer_pid {
406 option.Some(pid) -> {
407 logging.log(logging.Info, "[jetstream] Stopping old consumer...")
408 process.kill(pid)
409 }
410 option.None -> Nil
411 }
412
413 // Start new consumer with fresh lexicon data
414 // Note: We pass option.None for manager since restarts go through the manager
415 case start_consumer_process(state.db, option.None) {
416 Ok(new_pid) -> {
417 process.send(client, Ok(Nil))
418 actor.continue(State(..state, consumer_pid: option.Some(new_pid)))
419 }
420 Error(err) -> {
421 process.send(client, Error(err))
422 actor.continue(State(..state, consumer_pid: option.None))
423 }
424 }
425 }
426 }
427}
428
429/// Get current timestamp in seconds (for tracking flush intervals)
430fn get_current_time_seconds() -> Int {
431 let #(seconds, _nanoseconds) =
432 timestamp.system_time()
433 |> timestamp.to_unix_seconds_and_nanoseconds
434 seconds
435}
436
437/// Get current timestamp in milliseconds
438fn get_current_time_milliseconds() -> Int {
439 let #(seconds, nanoseconds) =
440 timestamp.system_time()
441 |> timestamp.to_unix_seconds_and_nanoseconds
442 seconds * 1000 + nanoseconds / 1_000_000
443}
444
445/// Handle cursor tracker messages
446fn handle_cursor_message(
447 state: CursorState,
448 message: CursorMessage,
449) -> actor.Next(CursorState, CursorMessage) {
450 case message {
451 UpdateCursor(time_us) -> {
452 let current_time = get_current_time_seconds()
453 let time_since_last_flush = current_time - state.last_flush_time
454
455 // Update latest cursor
456 let new_state = CursorState(..state, latest_cursor: option.Some(time_us))
457
458 // Flush every 5 seconds
459 case time_since_last_flush >= 5 {
460 True -> {
461 // Flush the new cursor value (time_us)
462 case jetstream.set_cursor(state.db, time_us) {
463 Ok(_) -> {
464 actor.continue(CursorState(
465 db: state.db,
466 last_flush_time: current_time,
467 latest_cursor: option.None,
468 ))
469 }
470 Error(err) -> {
471 logging.log(
472 logging.Error,
473 "[jetstream] Failed to update cursor: " <> string.inspect(err),
474 )
475 // Keep the cursor in state so we can retry on next flush
476 actor.continue(new_state)
477 }
478 }
479 }
480 False -> actor.continue(new_state)
481 }
482 }
483
484 FlushCursor(client) -> {
485 // Force flush current cursor
486 case state.latest_cursor {
487 option.Some(cursor) -> {
488 case jetstream.set_cursor(state.db, cursor) {
489 Ok(_) -> {
490 process.send(client, Nil)
491 actor.continue(
492 CursorState(
493 ..state,
494 latest_cursor: option.None,
495 last_flush_time: get_current_time_seconds(),
496 ),
497 )
498 }
499 Error(err) -> {
500 logging.log(
501 logging.Error,
502 "[jetstream] Failed to flush cursor: " <> string.inspect(err),
503 )
504 process.send(client, Nil)
505 actor.continue(state)
506 }
507 }
508 }
509 option.None -> {
510 process.send(client, Nil)
511 actor.continue(state)
512 }
513 }
514 }
515 }
516}
517
518/// Start cursor tracker actor
519fn start_cursor_tracker(
520 db: Executor,
521 disable_cursor: Bool,
522) -> option.Option(process.Subject(CursorMessage)) {
523 case disable_cursor {
524 True -> option.None
525 False -> {
526 let initial_state =
527 CursorState(
528 db: db,
529 latest_cursor: option.None,
530 last_flush_time: get_current_time_seconds(),
531 )
532
533 case
534 actor.new(initial_state)
535 |> actor.on_message(handle_cursor_message)
536 |> actor.start
537 {
538 Ok(started) -> option.Some(started.data)
539 Error(err) -> {
540 logging.log(
541 logging.Error,
542 "[jetstream] Failed to start cursor tracker: "
543 <> string.inspect(err),
544 )
545 option.None
546 }
547 }
548 }
549 }
550}
551
552/// Start the actual consumer process (extracted from original start function)
553fn start_consumer_process(
554 db: Executor,
555 manager: option.Option(process.Subject(ManagerMessage)),
556) -> Result(process.Pid, String) {
557 logging.log(logging.Info, "")
558 logging.log(logging.Info, "[jetstream] Starting Jetstream consumer...")
559
560 // Get PLC directory URL from database config
561 let plc_url = config_repo.get_plc_directory_url(db)
562
563 // Get domain authority from database
564 let domain_authority = case config_repo.get(db, "domain_authority") {
565 Ok(authority) -> authority
566 Error(_) -> ""
567 }
568
569 // Get all record-type lexicons from the database
570 case lexicons.get_record_types(db) {
571 Ok(lexicons) -> {
572 // Separate lexicons by domain authority
573 let #(local_lexicons, external_lexicons) =
574 lexicons
575 |> list.partition(fn(lex) {
576 backfill.nsid_matches_domain_authority(lex.id, domain_authority)
577 })
578
579 let local_collection_ids = list.map(local_lexicons, fn(lex) { lex.id })
580 let external_collection_ids =
581 list.map(external_lexicons, fn(lex) { lex.id })
582
583 // Combine all collections into a single list for unified consumer
584 let all_collection_ids =
585 list.append(local_collection_ids, external_collection_ids)
586
587 case all_collection_ids {
588 [] -> {
589 logging.log(
590 logging.Warning,
591 "[jetstream] No collections found - skipping Jetstream consumer",
592 )
593 logging.log(logging.Info, "[jetstream] Import lexicons first")
594 logging.log(logging.Info, "")
595 Error("No collections found")
596 }
597 _ -> {
598 logging.log(
599 logging.Info,
600 "[jetstream] Listening to "
601 <> int.to_string(list.length(local_collection_ids))
602 <> " local collection(s) (all DIDs):",
603 )
604 list.each(local_collection_ids, fn(col) {
605 logging.log(logging.Info, "[jetstream] - " <> col)
606 })
607
608 case external_collection_ids {
609 [] -> Nil
610 _ -> {
611 logging.log(logging.Info, "")
612 logging.log(
613 logging.Info,
614 "[jetstream] Tracking "
615 <> int.to_string(list.length(external_collection_ids))
616 <> " external collection(s) (known DIDs only, filtered client-side):",
617 )
618 list.each(external_collection_ids, fn(col) {
619 logging.log(logging.Info, "[jetstream] - " <> col)
620 })
621 }
622 }
623
624 // Get Jetstream URL from database config
625 let jetstream_url = config_repo.get_jetstream_url(db)
626
627 // Check if cursor tracking is disabled via environment variable
628 let disable_cursor = case envoy.get("JETSTREAM_DISABLE_CURSOR") {
629 Ok(value) ->
630 case string.lowercase(value) {
631 "true" | "1" | "yes" -> True
632 _ -> False
633 }
634 Error(_) -> False
635 }
636
637 // Read cursor from database unless disabled
638 let cursor = case disable_cursor {
639 True -> {
640 logging.log(
641 logging.Info,
642 "[jetstream] Cursor tracking disabled via JETSTREAM_DISABLE_CURSOR",
643 )
644 option.None
645 }
646 False -> {
647 case jetstream.get_cursor(db) {
648 Ok(option.Some(cursor)) -> {
649 logging.log(
650 logging.Info,
651 "[jetstream] Resuming from cursor: "
652 <> int.to_string(cursor),
653 )
654 option.Some(cursor)
655 }
656 Ok(option.None) -> {
657 logging.log(
658 logging.Info,
659 "[jetstream] No cursor found, starting from live stream",
660 )
661 option.None
662 }
663 Error(err) -> {
664 logging.log(
665 logging.Error,
666 "[jetstream] Failed to read cursor: "
667 <> string.inspect(err)
668 <> ", starting from live stream",
669 )
670 option.None
671 }
672 }
673 }
674 }
675
676 // Create unified Jetstream config for all collections (no DID filter - listen to all)
677 let unified_config =
678 goose.JetstreamConfig(
679 endpoint: jetstream_url,
680 wanted_collections: all_collection_ids,
681 wanted_dids: [],
682 cursor: cursor,
683 max_message_size_bytes: option.None,
684 compress: False,
685 require_hello: False,
686 )
687
688 logging.log(logging.Info, "")
689 logging.log(logging.Info, "[jetstream] Connecting to Jetstream...")
690 logging.log(
691 logging.Info,
692 "[jetstream] Endpoint: " <> jetstream_url,
693 )
694 logging.log(
695 logging.Info,
696 "[jetstream] Collections: "
697 <> int.to_string(list.length(all_collection_ids))
698 <> " (all DIDs, filtered client-side for external)",
699 )
700
701 // Start cursor tracker
702 let cursor_tracker = start_cursor_tracker(db, disable_cursor)
703
704 // Start the unified consumer
705 let local_collections = local_collection_ids
706 let ext_collections = external_collection_ids
707 let pid =
708 process.spawn_unlinked(fn() {
709 goose.start_consumer(unified_config, fn(event_json) {
710 // Spawn each event into its own process so they don't block each other
711 let _pid =
712 process.spawn_unlinked(fn() {
713 handle_jetstream_event(
714 db,
715 event_json,
716 local_collections,
717 ext_collections,
718 plc_url,
719 cursor_tracker,
720 manager,
721 )
722 })
723 Nil
724 })
725 })
726
727 logging.log(logging.Info, "")
728 logging.log(logging.Info, "[jetstream] Jetstream consumer started")
729 logging.log(logging.Info, "")
730
731 Ok(pid)
732 }
733 }
734 }
735 Error(err) -> {
736 Error("Failed to fetch lexicons: " <> string.inspect(err))
737 }
738 }
739}
740
741/// Check if a DID exists in the actor table
742fn is_known_did(db: Executor, did: String) -> Bool {
743 let sql = case executor.dialect(db) {
744 executor.SQLite -> "SELECT 1 FROM actor WHERE did = ? LIMIT 1"
745 executor.PostgreSQL -> "SELECT 1 FROM actor WHERE did = $1 LIMIT 1"
746 }
747
748 case
749 executor.query(db, sql, [executor.Text(did)], decode.at([0], decode.int))
750 {
751 Ok(results) -> results != []
752 Error(_) -> False
753 }
754}
755
756/// Handle a raw Jetstream event JSON string
757fn handle_jetstream_event(
758 db: Executor,
759 event_json: String,
760 collection_ids: List(String),
761 external_collection_ids: List(String),
762 plc_url: String,
763 cursor_tracker: option.Option(process.Subject(CursorMessage)),
764 manager: option.Option(process.Subject(ManagerMessage)),
765) -> Nil {
766 case goose.parse_event(event_json) {
767 goose.CommitEvent(did, time_us, commit) -> {
768 // Send heartbeat to manager (convert microseconds to milliseconds)
769 case manager {
770 option.Some(mgr) -> process.send(mgr, MessageReceived(time_us / 1000))
771 option.None -> Nil
772 }
773
774 // Update cursor tracker with latest time_us
775 case cursor_tracker {
776 option.Some(tracker) -> process.send(tracker, UpdateCursor(time_us))
777 option.None -> Nil
778 }
779
780 // Check if this is an external collection event
781 let is_external =
782 list.contains(external_collection_ids, commit.collection)
783
784 // If external, only process if DID is known
785 case is_external {
786 True -> {
787 case is_known_did(db, did) {
788 True ->
789 event_handler.handle_commit_event(
790 db,
791 did,
792 time_us,
793 commit,
794 plc_url,
795 collection_ids,
796 external_collection_ids,
797 )
798 False -> Nil
799 }
800 }
801 False -> {
802 // Local collection - always process
803 event_handler.handle_commit_event(
804 db,
805 did,
806 time_us,
807 commit,
808 plc_url,
809 collection_ids,
810 external_collection_ids,
811 )
812 }
813 }
814 }
815 goose.IdentityEvent(_did, _time_us, _identity) -> {
816 // Silently ignore identity events
817 Nil
818 }
819 goose.AccountEvent(_did, _time_us, _account) -> {
820 // Silently ignore account events
821 Nil
822 }
823 goose.UnknownEvent(_raw) -> {
824 // Silently ignore unknown events
825 Nil
826 }
827 }
828}