Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql
at main 828 lines 25 kB view raw
1import backfill 2import database/executor.{type Executor} 3import database/jetstream 4import database/repositories/config as config_repo 5import database/repositories/lexicons 6import envoy 7import event_handler 8import gleam/dynamic/decode 9import gleam/erlang/process 10import gleam/int 11import gleam/list 12import gleam/option 13import gleam/otp/actor 14import gleam/string 15import gleam/time/timestamp 16import goose 17import logging 18 19// ============================================================================ 20// CONFIGURATION 21// ============================================================================ 22 23/// How long to wait without messages before forcing a restart (in milliseconds) 24const heartbeat_timeout_ms = 300_000 25 26// 5 minutes - restart when stuck 27 28/// How often to check for heartbeat timeouts (in milliseconds) 29const heartbeat_check_interval_ms = 300_000 30 31// 5 minutes - check at the same interval as timeout for consistent triggering 32 33// ============================================================================ 34// TYPES 35// ============================================================================ 36 37/// Messages that can be sent to the consumer manager 38pub type ManagerMessage { 39 /// Update the last seen message timestamp 40 MessageReceived(timestamp: Int) 41 /// Check if we should restart due to timeout 42 CheckHeartbeat 43 /// Update the self subject after actor starts 44 UpdateSelfSubject(process.Subject(ManagerMessage)) 45 /// Update the consumer subject 46 UpdateConsumerSubject(option.Option(process.Subject(Message))) 47 /// Manual restart request 48 ManualRestart(reply_with: process.Subject(Result(Nil, String))) 49 /// Manual stop request 50 ManualStop(reply_with: process.Subject(Nil)) 51} 52 53/// State for the consumer manager actor 54pub type ManagerState { 55 ManagerState( 56 db: Executor, 57 last_message_time_ms: Int, 58 consumer_subject: option.Option(process.Subject(Message)), 59 self_subject: process.Subject(ManagerMessage), 60 ) 61} 62 63/// Messages that can be sent to the Jetstream consumer actor 64pub type Message { 65 Stop(reply_with: process.Subject(Nil)) 66 Restart(reply_with: process.Subject(Result(Nil, String))) 67} 68 69/// Messages for cursor tracker actor 70pub type CursorMessage { 71 UpdateCursor(time_us: Int) 72 FlushCursor(reply_with: process.Subject(Nil)) 73} 74 75/// Internal state of the Jetstream consumer actor 76type State { 77 State( 78 db: Executor, 79 consumer_pid: option.Option(process.Pid), 80 cursor_tracker_pid: option.Option(process.Pid), 81 ) 82} 83 84/// State for cursor tracker 85type CursorState { 86 CursorState( 87 db: Executor, 88 latest_cursor: option.Option(Int), 89 last_flush_time: Int, 90 ) 91} 92 93// ============================================================================ 94// CONSUMER MANAGER 95// ============================================================================ 96 97/// Start the consumer manager that spawns and monitors the consumer 98pub fn start(db: Executor) -> Result(process.Subject(ManagerMessage), String) { 99 let temp_subject = process.new_subject() 100 101 let state = 102 ManagerState( 103 db: db, 104 last_message_time_ms: get_current_time_milliseconds(), 105 consumer_subject: option.None, 106 self_subject: temp_subject, 107 ) 108 109 let result = 110 actor.new(state) 111 |> actor.on_message(handle_manager_message) 112 |> actor.start 113 114 case result { 115 Ok(started) -> { 116 // Update the actor's state with its real subject 117 process.send(started.data, UpdateSelfSubject(started.data)) 118 119 // Spawn the initial consumer actor 120 case start_consumer_actor(db, option.Some(started.data)) { 121 Ok(consumer_subject) -> { 122 // Update manager state with consumer subject 123 process.send( 124 started.data, 125 UpdateConsumerSubject(option.Some(consumer_subject)), 126 ) 127 128 // Schedule the first heartbeat check 129 process.send_after( 130 started.data, 131 heartbeat_check_interval_ms, 132 CheckHeartbeat, 133 ) 134 135 Ok(started.data) 136 } 137 Error(err) -> { 138 // Consumer failed to start, but manager is running for future restarts 139 logging.log( 140 logging.Warning, 141 "[jetstream] Consumer failed to start: " <> err, 142 ) 143 144 // Schedule heartbeat check anyway (it will attempt restart) 145 process.send_after( 146 started.data, 147 heartbeat_check_interval_ms, 148 CheckHeartbeat, 149 ) 150 151 Ok(started.data) 152 } 153 } 154 } 155 Error(err) -> 156 Error("Failed to start manager actor: " <> string.inspect(err)) 157 } 158} 159 160/// Stop the Jetstream consumer 161pub fn stop(manager: process.Subject(ManagerMessage)) -> Nil { 162 // Send stop request through manager 163 let _ = actor.call(manager, waiting: 1000, sending: ManualStop) 164 Nil 165} 166 167/// Restart the Jetstream consumer with fresh lexicon data 168pub fn restart(manager: process.Subject(ManagerMessage)) -> Result(Nil, String) { 169 actor.call(manager, waiting: 5000, sending: ManualRestart) 170} 171 172/// Handle messages sent to the consumer manager 173fn handle_manager_message( 174 state: ManagerState, 175 message: ManagerMessage, 176) -> actor.Next(ManagerState, ManagerMessage) { 177 case message { 178 UpdateSelfSubject(subject) -> { 179 // Update state with the real actor subject 180 let new_state = ManagerState(..state, self_subject: subject) 181 actor.continue(new_state) 182 } 183 184 UpdateConsumerSubject(subject) -> { 185 let new_state = ManagerState(..state, consumer_subject: subject) 186 actor.continue(new_state) 187 } 188 189 MessageReceived(timestamp) -> { 190 // Update the last seen message time 191 let new_state = ManagerState(..state, last_message_time_ms: timestamp) 192 actor.continue(new_state) 193 } 194 195 CheckHeartbeat -> { 196 let current_time = get_current_time_milliseconds() 197 let time_since_last_message = current_time - state.last_message_time_ms 198 199 // Debug logging for slow message rates 200 case time_since_last_message > 60_000 { 201 True -> 202 logging.log( 203 logging.Debug, 204 "[jetstream] Health check: " 205 <> int.to_string(time_since_last_message / 1000) 206 <> "s since last message", 207 ) 208 False -> Nil 209 } 210 211 case time_since_last_message > heartbeat_timeout_ms { 212 True -> { 213 // No messages received within timeout - force restart 214 logging.log( 215 logging.Warning, 216 "[jetstream] No messages received for " 217 <> int.to_string(time_since_last_message / 1000) 218 <> " seconds. Restarting consumer...", 219 ) 220 221 // Stop old consumer if running 222 case state.consumer_subject { 223 option.Some(subject) -> { 224 let _ = actor.call(subject, waiting: 1000, sending: Stop) 225 Nil 226 } 227 option.None -> Nil 228 } 229 230 // Start new consumer 231 case start_consumer_actor(state.db, option.Some(state.self_subject)) { 232 Ok(new_subject) -> { 233 logging.log(logging.Info, "[jetstream] Consumer restarted") 234 235 // Reset the timer and update state 236 let new_state = 237 ManagerState( 238 ..state, 239 last_message_time_ms: current_time, 240 consumer_subject: option.Some(new_subject), 241 ) 242 243 // Schedule next check 244 process.send_after( 245 state.self_subject, 246 heartbeat_check_interval_ms, 247 CheckHeartbeat, 248 ) 249 250 actor.continue(new_state) 251 } 252 Error(err) -> { 253 logging.log( 254 logging.Error, 255 "[jetstream] Failed to restart consumer: " <> err, 256 ) 257 258 // Schedule next check to retry 259 process.send_after( 260 state.self_subject, 261 heartbeat_check_interval_ms, 262 CheckHeartbeat, 263 ) 264 265 actor.continue( 266 ManagerState(..state, consumer_subject: option.None), 267 ) 268 } 269 } 270 } 271 False -> { 272 // Still receiving messages - schedule next check 273 process.send_after( 274 state.self_subject, 275 heartbeat_check_interval_ms, 276 CheckHeartbeat, 277 ) 278 actor.continue(state) 279 } 280 } 281 } 282 283 ManualRestart(client) -> { 284 logging.log(logging.Info, "[jetstream] Manual restart requested") 285 286 // Stop old consumer if running 287 case state.consumer_subject { 288 option.Some(subject) -> { 289 let _ = actor.call(subject, waiting: 1000, sending: Stop) 290 Nil 291 } 292 option.None -> Nil 293 } 294 295 // Start new consumer 296 case start_consumer_actor(state.db, option.Some(state.self_subject)) { 297 Ok(new_subject) -> { 298 process.send(client, Ok(Nil)) 299 actor.continue( 300 ManagerState( 301 ..state, 302 last_message_time_ms: get_current_time_milliseconds(), 303 consumer_subject: option.Some(new_subject), 304 ), 305 ) 306 } 307 Error(err) -> { 308 process.send(client, Error(err)) 309 actor.continue(ManagerState(..state, consumer_subject: option.None)) 310 } 311 } 312 } 313 314 ManualStop(client) -> { 315 logging.log(logging.Info, "[jetstream] Manual stop requested") 316 317 // Stop consumer if running 318 case state.consumer_subject { 319 option.Some(subject) -> { 320 let _ = actor.call(subject, waiting: 1000, sending: Stop) 321 process.send(client, Nil) 322 } 323 option.None -> process.send(client, Nil) 324 } 325 326 actor.continue(ManagerState(..state, consumer_subject: option.None)) 327 } 328 } 329} 330 331// ============================================================================ 332// CONSUMER ACTOR 333// ============================================================================ 334 335/// Start the Jetstream consumer actor (called by manager) 336fn start_consumer_actor( 337 db: Executor, 338 manager: option.Option(process.Subject(ManagerMessage)), 339) -> Result(process.Subject(Message), String) { 340 case start_consumer_process(db, manager) { 341 Ok(consumer_pid) -> { 342 let initial_state = 343 State( 344 db: db, 345 consumer_pid: option.Some(consumer_pid), 346 cursor_tracker_pid: option.None, 347 ) 348 349 let result = 350 actor.new(initial_state) 351 |> actor.on_message(handle_message) 352 |> actor.start 353 354 case result { 355 Ok(started) -> Ok(started.data) 356 Error(err) -> 357 Error("Failed to start consumer actor: " <> string.inspect(err)) 358 } 359 } 360 Error(err) -> { 361 // Consumer failed to start, but we still create the actor so it can be restarted later 362 logging.log(logging.Warning, "[jetstream] " <> err) 363 let initial_state = 364 State( 365 db: db, 366 consumer_pid: option.None, 367 cursor_tracker_pid: option.None, 368 ) 369 370 let result = 371 actor.new(initial_state) 372 |> actor.on_message(handle_message) 373 |> actor.start 374 375 case result { 376 Ok(started) -> Ok(started.data) 377 Error(actor_err) -> 378 Error("Failed to start consumer actor: " <> string.inspect(actor_err)) 379 } 380 } 381 } 382} 383 384/// Handle messages sent to the consumer actor 385fn handle_message(state: State, message: Message) -> actor.Next(State, Message) { 386 case message { 387 Stop(client) -> { 388 // Stop the consumer if it's running 389 case state.consumer_pid { 390 option.Some(pid) -> { 391 logging.log(logging.Info, "[jetstream] Stopping consumer...") 392 process.kill(pid) 393 process.send(client, Nil) 394 actor.continue(State(..state, consumer_pid: option.None)) 395 } 396 option.None -> { 397 process.send(client, Nil) 398 actor.continue(state) 399 } 400 } 401 } 402 403 Restart(client) -> { 404 // Stop old consumer if running 405 case state.consumer_pid { 406 option.Some(pid) -> { 407 logging.log(logging.Info, "[jetstream] Stopping old consumer...") 408 process.kill(pid) 409 } 410 option.None -> Nil 411 } 412 413 // Start new consumer with fresh lexicon data 414 // Note: We pass option.None for manager since restarts go through the manager 415 case start_consumer_process(state.db, option.None) { 416 Ok(new_pid) -> { 417 process.send(client, Ok(Nil)) 418 actor.continue(State(..state, consumer_pid: option.Some(new_pid))) 419 } 420 Error(err) -> { 421 process.send(client, Error(err)) 422 actor.continue(State(..state, consumer_pid: option.None)) 423 } 424 } 425 } 426 } 427} 428 429/// Get current timestamp in seconds (for tracking flush intervals) 430fn get_current_time_seconds() -> Int { 431 let #(seconds, _nanoseconds) = 432 timestamp.system_time() 433 |> timestamp.to_unix_seconds_and_nanoseconds 434 seconds 435} 436 437/// Get current timestamp in milliseconds 438fn get_current_time_milliseconds() -> Int { 439 let #(seconds, nanoseconds) = 440 timestamp.system_time() 441 |> timestamp.to_unix_seconds_and_nanoseconds 442 seconds * 1000 + nanoseconds / 1_000_000 443} 444 445/// Handle cursor tracker messages 446fn handle_cursor_message( 447 state: CursorState, 448 message: CursorMessage, 449) -> actor.Next(CursorState, CursorMessage) { 450 case message { 451 UpdateCursor(time_us) -> { 452 let current_time = get_current_time_seconds() 453 let time_since_last_flush = current_time - state.last_flush_time 454 455 // Update latest cursor 456 let new_state = CursorState(..state, latest_cursor: option.Some(time_us)) 457 458 // Flush every 5 seconds 459 case time_since_last_flush >= 5 { 460 True -> { 461 // Flush the new cursor value (time_us) 462 case jetstream.set_cursor(state.db, time_us) { 463 Ok(_) -> { 464 actor.continue(CursorState( 465 db: state.db, 466 last_flush_time: current_time, 467 latest_cursor: option.None, 468 )) 469 } 470 Error(err) -> { 471 logging.log( 472 logging.Error, 473 "[jetstream] Failed to update cursor: " <> string.inspect(err), 474 ) 475 // Keep the cursor in state so we can retry on next flush 476 actor.continue(new_state) 477 } 478 } 479 } 480 False -> actor.continue(new_state) 481 } 482 } 483 484 FlushCursor(client) -> { 485 // Force flush current cursor 486 case state.latest_cursor { 487 option.Some(cursor) -> { 488 case jetstream.set_cursor(state.db, cursor) { 489 Ok(_) -> { 490 process.send(client, Nil) 491 actor.continue( 492 CursorState( 493 ..state, 494 latest_cursor: option.None, 495 last_flush_time: get_current_time_seconds(), 496 ), 497 ) 498 } 499 Error(err) -> { 500 logging.log( 501 logging.Error, 502 "[jetstream] Failed to flush cursor: " <> string.inspect(err), 503 ) 504 process.send(client, Nil) 505 actor.continue(state) 506 } 507 } 508 } 509 option.None -> { 510 process.send(client, Nil) 511 actor.continue(state) 512 } 513 } 514 } 515 } 516} 517 518/// Start cursor tracker actor 519fn start_cursor_tracker( 520 db: Executor, 521 disable_cursor: Bool, 522) -> option.Option(process.Subject(CursorMessage)) { 523 case disable_cursor { 524 True -> option.None 525 False -> { 526 let initial_state = 527 CursorState( 528 db: db, 529 latest_cursor: option.None, 530 last_flush_time: get_current_time_seconds(), 531 ) 532 533 case 534 actor.new(initial_state) 535 |> actor.on_message(handle_cursor_message) 536 |> actor.start 537 { 538 Ok(started) -> option.Some(started.data) 539 Error(err) -> { 540 logging.log( 541 logging.Error, 542 "[jetstream] Failed to start cursor tracker: " 543 <> string.inspect(err), 544 ) 545 option.None 546 } 547 } 548 } 549 } 550} 551 552/// Start the actual consumer process (extracted from original start function) 553fn start_consumer_process( 554 db: Executor, 555 manager: option.Option(process.Subject(ManagerMessage)), 556) -> Result(process.Pid, String) { 557 logging.log(logging.Info, "") 558 logging.log(logging.Info, "[jetstream] Starting Jetstream consumer...") 559 560 // Get PLC directory URL from database config 561 let plc_url = config_repo.get_plc_directory_url(db) 562 563 // Get domain authority from database 564 let domain_authority = case config_repo.get(db, "domain_authority") { 565 Ok(authority) -> authority 566 Error(_) -> "" 567 } 568 569 // Get all record-type lexicons from the database 570 case lexicons.get_record_types(db) { 571 Ok(lexicons) -> { 572 // Separate lexicons by domain authority 573 let #(local_lexicons, external_lexicons) = 574 lexicons 575 |> list.partition(fn(lex) { 576 backfill.nsid_matches_domain_authority(lex.id, domain_authority) 577 }) 578 579 let local_collection_ids = list.map(local_lexicons, fn(lex) { lex.id }) 580 let external_collection_ids = 581 list.map(external_lexicons, fn(lex) { lex.id }) 582 583 // Combine all collections into a single list for unified consumer 584 let all_collection_ids = 585 list.append(local_collection_ids, external_collection_ids) 586 587 case all_collection_ids { 588 [] -> { 589 logging.log( 590 logging.Warning, 591 "[jetstream] No collections found - skipping Jetstream consumer", 592 ) 593 logging.log(logging.Info, "[jetstream] Import lexicons first") 594 logging.log(logging.Info, "") 595 Error("No collections found") 596 } 597 _ -> { 598 logging.log( 599 logging.Info, 600 "[jetstream] Listening to " 601 <> int.to_string(list.length(local_collection_ids)) 602 <> " local collection(s) (all DIDs):", 603 ) 604 list.each(local_collection_ids, fn(col) { 605 logging.log(logging.Info, "[jetstream] - " <> col) 606 }) 607 608 case external_collection_ids { 609 [] -> Nil 610 _ -> { 611 logging.log(logging.Info, "") 612 logging.log( 613 logging.Info, 614 "[jetstream] Tracking " 615 <> int.to_string(list.length(external_collection_ids)) 616 <> " external collection(s) (known DIDs only, filtered client-side):", 617 ) 618 list.each(external_collection_ids, fn(col) { 619 logging.log(logging.Info, "[jetstream] - " <> col) 620 }) 621 } 622 } 623 624 // Get Jetstream URL from database config 625 let jetstream_url = config_repo.get_jetstream_url(db) 626 627 // Check if cursor tracking is disabled via environment variable 628 let disable_cursor = case envoy.get("JETSTREAM_DISABLE_CURSOR") { 629 Ok(value) -> 630 case string.lowercase(value) { 631 "true" | "1" | "yes" -> True 632 _ -> False 633 } 634 Error(_) -> False 635 } 636 637 // Read cursor from database unless disabled 638 let cursor = case disable_cursor { 639 True -> { 640 logging.log( 641 logging.Info, 642 "[jetstream] Cursor tracking disabled via JETSTREAM_DISABLE_CURSOR", 643 ) 644 option.None 645 } 646 False -> { 647 case jetstream.get_cursor(db) { 648 Ok(option.Some(cursor)) -> { 649 logging.log( 650 logging.Info, 651 "[jetstream] Resuming from cursor: " 652 <> int.to_string(cursor), 653 ) 654 option.Some(cursor) 655 } 656 Ok(option.None) -> { 657 logging.log( 658 logging.Info, 659 "[jetstream] No cursor found, starting from live stream", 660 ) 661 option.None 662 } 663 Error(err) -> { 664 logging.log( 665 logging.Error, 666 "[jetstream] Failed to read cursor: " 667 <> string.inspect(err) 668 <> ", starting from live stream", 669 ) 670 option.None 671 } 672 } 673 } 674 } 675 676 // Create unified Jetstream config for all collections (no DID filter - listen to all) 677 let unified_config = 678 goose.JetstreamConfig( 679 endpoint: jetstream_url, 680 wanted_collections: all_collection_ids, 681 wanted_dids: [], 682 cursor: cursor, 683 max_message_size_bytes: option.None, 684 compress: False, 685 require_hello: False, 686 ) 687 688 logging.log(logging.Info, "") 689 logging.log(logging.Info, "[jetstream] Connecting to Jetstream...") 690 logging.log( 691 logging.Info, 692 "[jetstream] Endpoint: " <> jetstream_url, 693 ) 694 logging.log( 695 logging.Info, 696 "[jetstream] Collections: " 697 <> int.to_string(list.length(all_collection_ids)) 698 <> " (all DIDs, filtered client-side for external)", 699 ) 700 701 // Start cursor tracker 702 let cursor_tracker = start_cursor_tracker(db, disable_cursor) 703 704 // Start the unified consumer 705 let local_collections = local_collection_ids 706 let ext_collections = external_collection_ids 707 let pid = 708 process.spawn_unlinked(fn() { 709 goose.start_consumer(unified_config, fn(event_json) { 710 // Spawn each event into its own process so they don't block each other 711 let _pid = 712 process.spawn_unlinked(fn() { 713 handle_jetstream_event( 714 db, 715 event_json, 716 local_collections, 717 ext_collections, 718 plc_url, 719 cursor_tracker, 720 manager, 721 ) 722 }) 723 Nil 724 }) 725 }) 726 727 logging.log(logging.Info, "") 728 logging.log(logging.Info, "[jetstream] Jetstream consumer started") 729 logging.log(logging.Info, "") 730 731 Ok(pid) 732 } 733 } 734 } 735 Error(err) -> { 736 Error("Failed to fetch lexicons: " <> string.inspect(err)) 737 } 738 } 739} 740 741/// Check if a DID exists in the actor table 742fn is_known_did(db: Executor, did: String) -> Bool { 743 let sql = case executor.dialect(db) { 744 executor.SQLite -> "SELECT 1 FROM actor WHERE did = ? LIMIT 1" 745 executor.PostgreSQL -> "SELECT 1 FROM actor WHERE did = $1 LIMIT 1" 746 } 747 748 case 749 executor.query(db, sql, [executor.Text(did)], decode.at([0], decode.int)) 750 { 751 Ok(results) -> results != [] 752 Error(_) -> False 753 } 754} 755 756/// Handle a raw Jetstream event JSON string 757fn handle_jetstream_event( 758 db: Executor, 759 event_json: String, 760 collection_ids: List(String), 761 external_collection_ids: List(String), 762 plc_url: String, 763 cursor_tracker: option.Option(process.Subject(CursorMessage)), 764 manager: option.Option(process.Subject(ManagerMessage)), 765) -> Nil { 766 case goose.parse_event(event_json) { 767 goose.CommitEvent(did, time_us, commit) -> { 768 // Send heartbeat to manager (convert microseconds to milliseconds) 769 case manager { 770 option.Some(mgr) -> process.send(mgr, MessageReceived(time_us / 1000)) 771 option.None -> Nil 772 } 773 774 // Update cursor tracker with latest time_us 775 case cursor_tracker { 776 option.Some(tracker) -> process.send(tracker, UpdateCursor(time_us)) 777 option.None -> Nil 778 } 779 780 // Check if this is an external collection event 781 let is_external = 782 list.contains(external_collection_ids, commit.collection) 783 784 // If external, only process if DID is known 785 case is_external { 786 True -> { 787 case is_known_did(db, did) { 788 True -> 789 event_handler.handle_commit_event( 790 db, 791 did, 792 time_us, 793 commit, 794 plc_url, 795 collection_ids, 796 external_collection_ids, 797 ) 798 False -> Nil 799 } 800 } 801 False -> { 802 // Local collection - always process 803 event_handler.handle_commit_event( 804 db, 805 did, 806 time_us, 807 commit, 808 plc_url, 809 collection_ids, 810 external_collection_ids, 811 ) 812 } 813 } 814 } 815 goose.IdentityEvent(_did, _time_us, _identity) -> { 816 // Silently ignore identity events 817 Nil 818 } 819 goose.AccountEvent(_did, _time_us, _account) -> { 820 // Silently ignore account events 821 Nil 822 } 823 goose.UnknownEvent(_raw) -> { 824 // Silently ignore unknown events 825 Nil 826 } 827 } 828}