Highly ambitious ATProtocol AppView service and sdks
at cbae53eab6250ae7a50190a2397761dcb46dfd96 936 lines 38 kB view raw
1use anyhow::Result; 2use async_trait::async_trait; 3use atproto_jetstream::{ 4 CancellationToken, Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, 5}; 6use chrono::Utc; 7use reqwest::Client; 8use std::collections::HashSet; 9use std::sync::Arc; 10use tokio::sync::{Mutex, RwLock}; 11use tracing::{error, info, warn}; 12 13use crate::actor_resolver::resolve_actor_data; 14use crate::cache::{CacheBackend, CacheFactory, SliceCache}; 15use crate::database::Database; 16use crate::errors::JetstreamError; 17use crate::jetstream_cursor::PostgresCursorHandler; 18use crate::logging::{LogLevel, Logger}; 19use crate::models::{Actor, Record}; 20 21pub struct JetstreamConsumer { 22 consumer: Consumer, 23 database: Database, 24 http_client: Client, 25 actor_cache: Arc<Mutex<SliceCache>>, 26 lexicon_cache: Arc<Mutex<SliceCache>>, 27 domain_cache: Arc<Mutex<SliceCache>>, 28 collections_cache: Arc<Mutex<SliceCache>>, 29 pub event_count: Arc<std::sync::atomic::AtomicU64>, 30 cursor_handler: Option<Arc<PostgresCursorHandler>>, 31 slices_list: Arc<RwLock<Vec<String>>>, 32} 33 34// Event handler that implements the EventHandler trait 35struct SliceEventHandler { 36 database: Database, 37 http_client: Client, 38 event_count: Arc<std::sync::atomic::AtomicU64>, 39 actor_cache: Arc<Mutex<SliceCache>>, 40 lexicon_cache: Arc<Mutex<SliceCache>>, 41 domain_cache: Arc<Mutex<SliceCache>>, 42 collections_cache: Arc<Mutex<SliceCache>>, 43 cursor_handler: Option<Arc<PostgresCursorHandler>>, 44 slices_list: Arc<RwLock<Vec<String>>>, 45} 46 47#[async_trait] 48impl EventHandler for SliceEventHandler { 49 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { 50 let count = self 51 .event_count 52 .fetch_add(1, std::sync::atomic::Ordering::Relaxed) 53 + 1; 54 55 if count.is_multiple_of(10000) { 56 info!("Jetstream consumer has processed {} events", count); 57 } 58 59 // Extract and update cursor position from event 60 let time_us = match &event { 61 JetstreamEvent::Commit { time_us, .. } => *time_us, 62 JetstreamEvent::Delete { time_us, .. } => *time_us, 63 JetstreamEvent::Identity { time_us, .. } => *time_us, 64 JetstreamEvent::Account { time_us, .. } => *time_us, 65 }; 66 67 if let Some(cursor_handler) = &self.cursor_handler { 68 cursor_handler.update_position(time_us); 69 70 // Periodically write cursor to DB (debounced by handler) 71 if let Err(e) = cursor_handler.maybe_write_cursor().await { 72 error!("Failed to write cursor: {}", e); 73 } 74 } 75 76 match event { 77 JetstreamEvent::Commit { did, commit, .. } => { 78 if let Err(e) = self.handle_commit_event(&did, commit).await { 79 let message = format!("Error handling commit event: {}", e); 80 error!("{}", message); 81 Logger::global().log_jetstream( 82 LogLevel::Error, 83 &message, 84 Some(serde_json::json!({ 85 "error": e.to_string(), 86 "did": did, 87 "event_type": "commit" 88 })), 89 ); 90 } 91 } 92 JetstreamEvent::Delete { did, commit, .. } => { 93 if let Err(e) = self.handle_delete_event(&did, commit).await { 94 let message = format!("Error handling delete event: {}", e); 95 error!("{}", message); 96 Logger::global().log_jetstream( 97 LogLevel::Error, 98 &message, 99 Some(serde_json::json!({ 100 "error": e.to_string(), 101 "did": did, 102 "event_type": "delete" 103 })), 104 ); 105 } 106 } 107 _ => { 108 // Ignore other event types (identity, account, etc.) 109 } 110 } 111 Ok(()) 112 } 113 114 fn handler_id(&self) -> String { 115 "slice-records-indexer".to_string() 116 } 117} 118 119impl SliceEventHandler { 120 /// Check if DID is an actor for the given slice 121 async fn is_actor_cached( 122 &self, 123 did: &str, 124 slice_uri: &str, 125 ) -> Result<Option<bool>, anyhow::Error> { 126 match self.actor_cache.lock().await.is_actor(did, slice_uri).await { 127 Ok(result) => Ok(result), 128 Err(e) => { 129 warn!( 130 error = ?e, 131 did = did, 132 slice_uri = slice_uri, 133 "Actor cache error" 134 ); 135 Ok(None) 136 } 137 } 138 } 139 140 /// Cache that an actor exists 141 async fn cache_actor_exists(&self, did: &str, slice_uri: &str) { 142 if let Err(e) = self 143 .actor_cache 144 .lock() 145 .await 146 .cache_actor_exists(did, slice_uri) 147 .await 148 { 149 warn!( 150 error = ?e, 151 did = did, 152 slice_uri = slice_uri, 153 "Failed to cache actor exists" 154 ); 155 } 156 } 157 158 /// Remove actor from cache 159 async fn remove_actor_from_cache(&self, did: &str, slice_uri: &str) { 160 if let Err(e) = self 161 .actor_cache 162 .lock() 163 .await 164 .remove_actor(did, slice_uri) 165 .await 166 { 167 warn!( 168 error = ?e, 169 did = did, 170 slice_uri = slice_uri, 171 "Failed to remove actor from cache" 172 ); 173 } 174 } 175 176 /// Get slice collections from cache with database fallback 177 async fn get_slice_collections( 178 &self, 179 slice_uri: &str, 180 ) -> Result<Option<HashSet<String>>, anyhow::Error> { 181 // Try cache first 182 let cache_result = { 183 let mut cache = self.collections_cache.lock().await; 184 cache.get_slice_collections(slice_uri).await 185 }; 186 187 match cache_result { 188 Ok(Some(collections)) => Ok(Some(collections)), 189 Ok(None) => { 190 // Cache miss - load from database 191 match self.database.get_slice_collections_list(slice_uri).await { 192 Ok(collections) => { 193 let collections_set: HashSet<String> = collections.into_iter().collect(); 194 // Cache the result 195 let _ = self 196 .collections_cache 197 .lock() 198 .await 199 .cache_slice_collections(slice_uri, &collections_set) 200 .await; 201 Ok(Some(collections_set)) 202 } 203 Err(e) => Err(e.into()), 204 } 205 } 206 Err(e) => Err(e), 207 } 208 } 209 210 /// Get slice domain from cache with database fallback 211 async fn get_slice_domain(&self, slice_uri: &str) -> Result<Option<String>, anyhow::Error> { 212 // Try cache first 213 let cache_result = { 214 let mut cache = self.domain_cache.lock().await; 215 cache.get_slice_domain(slice_uri).await 216 }; 217 218 match cache_result { 219 Ok(Some(domain)) => Ok(Some(domain)), 220 Ok(None) => { 221 // Cache miss - load from database 222 match self.database.get_slice_domain(slice_uri).await { 223 Ok(Some(domain)) => { 224 // Cache the result 225 let _ = self 226 .domain_cache 227 .lock() 228 .await 229 .cache_slice_domain(slice_uri, &domain) 230 .await; 231 Ok(Some(domain)) 232 } 233 Ok(None) => Ok(None), 234 Err(e) => Err(e.into()), 235 } 236 } 237 Err(e) => Err(e), 238 } 239 } 240 241 /// Get slice lexicons from cache with database fallback 242 async fn get_slice_lexicons( 243 &self, 244 slice_uri: &str, 245 ) -> Result<Option<Vec<serde_json::Value>>, anyhow::Error> { 246 // Try cache first 247 let cache_result = { 248 let mut cache = self.lexicon_cache.lock().await; 249 cache.get_lexicons(slice_uri).await 250 }; 251 252 match cache_result { 253 Ok(Some(lexicons)) => Ok(Some(lexicons)), 254 Ok(None) => { 255 // Cache miss - load from database 256 match self.database.get_lexicons_by_slice(slice_uri).await { 257 Ok(lexicons) if !lexicons.is_empty() => { 258 // Cache the result 259 let _ = self 260 .lexicon_cache 261 .lock() 262 .await 263 .cache_lexicons(slice_uri, &lexicons) 264 .await; 265 Ok(Some(lexicons)) 266 } 267 Ok(_) => Ok(None), // Empty lexicons 268 Err(e) => Err(e.into()), 269 } 270 } 271 Err(e) => Err(e), 272 } 273 } 274 async fn handle_commit_event( 275 &self, 276 did: &str, 277 commit: atproto_jetstream::JetstreamEventCommit, 278 ) -> Result<()> { 279 // Get all slices from cached list 280 let slices = self.slices_list.read().await.clone(); 281 282 // Process each slice 283 for slice_uri in slices { 284 // Get collections for this slice (with caching) 285 let collections = match self.get_slice_collections(&slice_uri).await { 286 Ok(Some(collections)) => collections, 287 Ok(None) => continue, // No collections for this slice 288 Err(e) => { 289 error!("Failed to get collections for slice {}: {}", slice_uri, e); 290 continue; 291 } 292 }; 293 294 if collections.contains(&commit.collection) { 295 // Special handling for network.slices.lexicon records 296 // These should only be indexed to the slice specified in their JSON data 297 if commit.collection == "network.slices.lexicon" { 298 if let Some(target_slice_uri) = 299 commit.record.get("slice").and_then(|v| v.as_str()) 300 { 301 // Skip this slice if it's not the target slice for this lexicon 302 if slice_uri != target_slice_uri { 303 continue; 304 } 305 } else { 306 // No target slice specified, skip this lexicon record entirely 307 continue; 308 } 309 } 310 // Get the domain for this slice (with caching) 311 let domain = match self.get_slice_domain(&slice_uri).await { 312 Ok(Some(domain)) => domain, 313 Ok(None) => continue, // No domain, skip 314 Err(e) => { 315 error!("Failed to get domain for slice {}: {}", slice_uri, e); 316 continue; 317 } 318 }; 319 320 // Check if this is a primary collection (starts with slice domain) 321 let is_primary_collection = commit.collection.starts_with(&domain); 322 323 // For external collections, check actor status BEFORE expensive validation 324 if !is_primary_collection { 325 let is_actor = match self.is_actor_cached(did, &slice_uri).await { 326 Ok(Some(cached_result)) => cached_result, 327 Ok(None) => { 328 // Cache miss means this DID is not an actor we've synced 329 // For external collections, we only care about actors we've already added 330 false 331 } 332 Err(e) => { 333 error!("Error checking actor status: {}", e); 334 continue; 335 } 336 }; 337 338 if !is_actor { 339 // Not an actor - skip validation entirely for external collections 340 continue; 341 } 342 } 343 344 // Get lexicons for validation (after actor check for external collections) 345 let lexicons = match self.get_slice_lexicons(&slice_uri).await { 346 Ok(Some(lexicons)) => lexicons, 347 Ok(None) => { 348 info!( 349 "No lexicons found for slice {} - skipping validation", 350 slice_uri 351 ); 352 continue; 353 } 354 Err(e) => { 355 error!("Failed to get lexicons for slice {}: {}", slice_uri, e); 356 continue; 357 } 358 }; 359 360 // Validate the record against the slice's lexicons 361 let validation_result = match slices_lexicon::validate_record( 362 lexicons.clone(), 363 &commit.collection, 364 commit.record.clone(), 365 ) { 366 Ok(_) => { 367 info!( 368 "Record validated for collection {} in slice {}", 369 commit.collection, slice_uri 370 ); 371 true 372 } 373 Err(e) => { 374 let message = format!( 375 "Validation failed for collection {} in slice {}", 376 commit.collection, slice_uri 377 ); 378 error!("{}: {}", message, e); 379 Logger::global().log_jetstream_with_slice( 380 LogLevel::Warn, 381 &message, 382 Some(serde_json::json!({ 383 "collection": commit.collection, 384 "slice_uri": slice_uri, 385 "did": did 386 })), 387 Some(&slice_uri), 388 ); 389 false 390 } 391 }; 392 393 if !validation_result { 394 continue; // Skip this slice if validation fails 395 } 396 397 if is_primary_collection { 398 // Primary collection - ensure actor exists and index ALL records 399 info!( 400 "Primary collection {} for slice {} (domain: {}) - indexing record", 401 commit.collection, slice_uri, domain 402 ); 403 404 // Ensure actor exists for primary collections 405 let is_cached = 406 matches!(self.is_actor_cached(did, &slice_uri).await, Ok(Some(_))); 407 408 if !is_cached { 409 // Actor not in cache - create it 410 info!("Creating new actor {} for slice {}", did, slice_uri); 411 412 // Resolve actor data (handle, PDS) 413 match resolve_actor_data(&self.http_client, did).await { 414 Ok(actor_data) => { 415 let actor = Actor { 416 did: actor_data.did.clone(), 417 handle: actor_data.handle, 418 slice_uri: slice_uri.clone(), 419 indexed_at: Utc::now().to_rfc3339(), 420 }; 421 422 // Insert into database 423 if let Err(e) = self.database.batch_insert_actors(&[actor]).await { 424 error!("Failed to create actor {}: {}", did, e); 425 } else { 426 // Add to cache after successful database insert 427 self.cache_actor_exists(did, &slice_uri).await; 428 info!("Created actor {} for slice {}", did, slice_uri); 429 } 430 } 431 Err(e) => { 432 error!("Failed to resolve actor data for {}: {}", did, e); 433 } 434 } 435 } 436 437 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 438 439 let record = Record { 440 uri: uri.clone(), 441 cid: commit.cid.clone(), 442 did: did.to_string(), 443 collection: commit.collection.clone(), 444 json: commit.record.clone(), 445 indexed_at: Utc::now(), 446 slice_uri: Some(slice_uri.clone()), 447 }; 448 449 match self.database.upsert_record(&record).await { 450 Ok(is_insert) => { 451 let message = if is_insert { 452 format!("Record inserted in {}", commit.collection) 453 } else { 454 format!("Record updated in {}", commit.collection) 455 }; 456 let operation = if is_insert { "insert" } else { "update" }; 457 Logger::global().log_jetstream_with_slice( 458 LogLevel::Info, 459 &message, 460 Some(serde_json::json!({ 461 "operation": operation, 462 "collection": commit.collection, 463 "slice_uri": slice_uri, 464 "did": did, 465 "record_type": "primary" 466 })), 467 Some(&slice_uri), 468 ); 469 } 470 Err(e) => { 471 let message = "Failed to insert/update record"; 472 Logger::global().log_jetstream_with_slice( 473 LogLevel::Error, 474 message, 475 Some(serde_json::json!({ 476 "operation": "upsert", 477 "collection": commit.collection, 478 "slice_uri": slice_uri, 479 "did": did, 480 "error": e.to_string(), 481 "record_type": "primary" 482 })), 483 Some(&slice_uri), 484 ); 485 return Err(anyhow::anyhow!("Database error: {}", e)); 486 } 487 } 488 489 info!( 490 "Successfully indexed {} record from primary collection: {}", 491 commit.operation, uri 492 ); 493 break; 494 } else { 495 // External collection - we already checked actor status, so just index 496 info!( 497 "External collection {} - DID {} is actor in slice {} - indexing", 498 commit.collection, did, slice_uri 499 ); 500 501 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 502 503 let record = Record { 504 uri: uri.clone(), 505 cid: commit.cid.clone(), 506 did: did.to_string(), 507 collection: commit.collection.clone(), 508 json: commit.record.clone(), 509 indexed_at: Utc::now(), 510 slice_uri: Some(slice_uri.clone()), 511 }; 512 513 match self.database.upsert_record(&record).await { 514 Ok(is_insert) => { 515 let message = if is_insert { 516 format!("Record inserted in {}", commit.collection) 517 } else { 518 format!("Record updated in {}", commit.collection) 519 }; 520 let operation = if is_insert { "insert" } else { "update" }; 521 Logger::global().log_jetstream_with_slice( 522 LogLevel::Info, 523 &message, 524 Some(serde_json::json!({ 525 "operation": operation, 526 "collection": commit.collection, 527 "slice_uri": slice_uri, 528 "did": did, 529 "record_type": "external" 530 })), 531 Some(&slice_uri), 532 ); 533 } 534 Err(e) => { 535 let message = "Failed to insert/update record"; 536 Logger::global().log_jetstream_with_slice( 537 LogLevel::Error, 538 message, 539 Some(serde_json::json!({ 540 "operation": "upsert", 541 "collection": commit.collection, 542 "slice_uri": slice_uri, 543 "did": did, 544 "error": e.to_string(), 545 "record_type": "external" 546 })), 547 Some(&slice_uri), 548 ); 549 return Err(anyhow::anyhow!("Database error: {}", e)); 550 } 551 } 552 553 info!( 554 "Successfully indexed {} record from external collection: {}", 555 commit.operation, uri 556 ); 557 break; 558 } 559 } 560 } 561 562 Ok(()) 563 } 564 565 async fn handle_delete_event( 566 &self, 567 did: &str, 568 commit: atproto_jetstream::JetstreamEventDelete, 569 ) -> Result<()> { 570 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 571 572 // Get all slices from cached list 573 let slices = self.slices_list.read().await.clone(); 574 575 let mut relevant_slices: Vec<String> = Vec::new(); 576 577 for slice_uri in slices { 578 // Get collections for this slice (with caching) 579 let collections = match self.get_slice_collections(&slice_uri).await { 580 Ok(Some(collections)) => collections, 581 Ok(None) => continue, // No collections for this slice 582 Err(e) => { 583 error!("Failed to get collections for slice {}: {}", slice_uri, e); 584 continue; 585 } 586 }; 587 588 if !collections.contains(&commit.collection) { 589 continue; 590 } 591 592 // Get the domain for this slice (with caching) 593 let domain = match self.get_slice_domain(&slice_uri).await { 594 Ok(Some(domain)) => domain, 595 Ok(None) => continue, // No domain, skip 596 Err(e) => { 597 error!("Failed to get domain for slice {}: {}", slice_uri, e); 598 continue; 599 } 600 }; 601 602 // Check if this is a primary collection (starts with slice domain) 603 let is_primary_collection = commit.collection.starts_with(&domain); 604 605 if is_primary_collection { 606 // Primary collection - always process deletes 607 relevant_slices.push(slice_uri.clone()); 608 } else { 609 // External collection - only process if DID is an actor in this slice 610 let is_actor = match self.is_actor_cached(did, &slice_uri).await { 611 Ok(Some(cached_result)) => cached_result, 612 _ => false, 613 }; 614 if is_actor { 615 relevant_slices.push(slice_uri.clone()); 616 } 617 } 618 } 619 620 if relevant_slices.is_empty() { 621 // No relevant slices found, skip deletion 622 return Ok(()); 623 } 624 625 // Handle cascade deletion before deleting the record 626 if let Err(e) = self.database.handle_cascade_deletion(&uri, &commit.collection).await { 627 warn!("Cascade deletion failed for {}: {}", uri, e); 628 } 629 630 // Delete the record and log only for relevant slices 631 match self.database.delete_record_by_uri(&uri, None).await { 632 Ok(rows_affected) => { 633 if rows_affected > 0 { 634 info!( 635 "Deleted record: {} ({} rows) for {} slice(s)", 636 uri, 637 rows_affected, 638 relevant_slices.len() 639 ); 640 let message = format!("Record deleted from {}", commit.collection); 641 642 // Log to each relevant slice and check if actor cleanup is needed 643 for slice_uri in &relevant_slices { 644 Logger::global().log_jetstream_with_slice( 645 LogLevel::Info, 646 &message, 647 Some(serde_json::json!({ 648 "operation": "delete", 649 "collection": commit.collection, 650 "did": did, 651 "uri": uri, 652 "rows_affected": rows_affected 653 })), 654 Some(slice_uri), 655 ); 656 } 657 658 // Check if actor should be cleaned up (no more records) 659 for slice_uri in &relevant_slices { 660 match self.database.actor_has_records(did, slice_uri).await { 661 Ok(has_records) => { 662 if !has_records { 663 // No more records for this actor in this slice - clean up 664 match self.database.delete_actor(did, slice_uri).await { 665 Ok(deleted) => { 666 if deleted > 0 { 667 info!( 668 "Cleaned up actor {} from slice {} (no records remaining)", 669 did, slice_uri 670 ); 671 // Remove from cache 672 self.remove_actor_from_cache(did, slice_uri).await; 673 } 674 } 675 Err(e) => { 676 error!( 677 "Failed to delete actor {} from slice {}: {}", 678 did, slice_uri, e 679 ); 680 } 681 } 682 } 683 } 684 Err(e) => { 685 error!( 686 "Failed to check if actor {} has records in slice {}: {}", 687 did, slice_uri, e 688 ); 689 } 690 } 691 } 692 } 693 } 694 Err(e) => { 695 let message = "Failed to delete record"; 696 error!("{}: {}", message, e); 697 698 // Log error to each relevant slice 699 for slice_uri in relevant_slices { 700 Logger::global().log_jetstream_with_slice( 701 LogLevel::Error, 702 message, 703 Some(serde_json::json!({ 704 "operation": "delete", 705 "collection": commit.collection, 706 "did": did, 707 "uri": uri, 708 "error": e.to_string() 709 })), 710 Some(&slice_uri), 711 ); 712 } 713 } 714 } 715 716 Ok(()) 717 } 718} 719 720impl JetstreamConsumer { 721 /// Create a new Jetstream consumer with optional cursor support and Redis cache 722 /// 723 /// # Arguments 724 /// * `database` - Database connection for slice configurations and record storage 725 /// * `jetstream_hostname` - Optional custom jetstream hostname 726 /// * `cursor_handler` - Optional cursor handler for resumable event processing 727 /// * `initial_cursor` - Optional starting cursor position (time_us) to resume from 728 /// * `redis_url` - Optional Redis URL for caching (falls back to in-memory if not provided) 729 pub async fn new( 730 database: Database, 731 jetstream_hostname: Option<String>, 732 cursor_handler: Option<Arc<PostgresCursorHandler>>, 733 initial_cursor: Option<i64>, 734 redis_url: Option<String>, 735 ) -> Result<Self, JetstreamError> { 736 let config = ConsumerTaskConfig { 737 user_agent: "slice-server/1.0".to_string(), 738 compression: false, 739 zstd_dictionary_location: String::new(), 740 jetstream_hostname: jetstream_hostname 741 .unwrap_or_else(|| "jetstream1.us-east.bsky.network".to_string()), 742 collections: Vec::new(), 743 dids: Vec::new(), 744 max_message_size_bytes: None, 745 cursor: initial_cursor, 746 require_hello: true, 747 }; 748 749 let consumer = Consumer::new(config); 750 let http_client = Client::new(); 751 752 // Determine cache backend based on Redis URL 753 let cache_backend = if let Some(redis_url) = redis_url { 754 CacheBackend::Redis { 755 url: redis_url, 756 ttl_seconds: None, 757 } 758 } else { 759 CacheBackend::InMemory { ttl_seconds: None } 760 }; 761 762 // Create cache instances 763 let actor_cache = Arc::new(Mutex::new( 764 CacheFactory::create_slice_cache(cache_backend.clone()) 765 .await 766 .map_err(|e| JetstreamError::ConnectionFailed { 767 message: format!("Failed to create actor cache: {}", e), 768 })?, 769 )); 770 771 let lexicon_cache = Arc::new(Mutex::new( 772 CacheFactory::create_slice_cache(cache_backend.clone()) 773 .await 774 .map_err(|e| JetstreamError::ConnectionFailed { 775 message: format!("Failed to create lexicon cache: {}", e), 776 })?, 777 )); 778 779 let domain_cache = Arc::new(Mutex::new( 780 CacheFactory::create_slice_cache(cache_backend.clone()) 781 .await 782 .map_err(|e| JetstreamError::ConnectionFailed { 783 message: format!("Failed to create domain cache: {}", e), 784 })?, 785 )); 786 787 let collections_cache = Arc::new(Mutex::new( 788 CacheFactory::create_slice_cache(cache_backend) 789 .await 790 .map_err(|e| JetstreamError::ConnectionFailed { 791 message: format!("Failed to create collections cache: {}", e), 792 })?, 793 )); 794 795 Ok(Self { 796 consumer, 797 database, 798 http_client, 799 actor_cache, 800 lexicon_cache, 801 domain_cache, 802 collections_cache, 803 event_count: Arc::new(std::sync::atomic::AtomicU64::new(0)), 804 cursor_handler, 805 slices_list: Arc::new(RwLock::new(Vec::new())), 806 }) 807 } 808 809 /// Load slice configurations 810 pub async fn load_slice_configurations(&self) -> Result<(), JetstreamError> { 811 info!("Loading slice configurations..."); 812 813 // Get all slices and update cached list 814 let slices = self.database.get_all_slices().await?; 815 *self.slices_list.write().await = slices.clone(); 816 info!("Found {} total slices in database", slices.len()); 817 818 Ok(()) 819 } 820 821 /// Preload actor cache to avoid database hits during event processing 822 async fn preload_actor_cache(&self) -> Result<(), JetstreamError> { 823 info!("Preloading actor cache..."); 824 825 let actors = self.database.get_all_actors().await?; 826 info!("Found {} actors to cache", actors.len()); 827 828 match self.actor_cache.lock().await.preload_actors(actors).await { 829 Ok(_) => { 830 info!("Actor cache preloaded successfully"); 831 Ok(()) 832 } 833 Err(e) => { 834 warn!(error = ?e, "Failed to preload actors to cache"); 835 Ok(()) // Don't fail startup if preload fails 836 } 837 } 838 } 839 840 /// Start consuming events from Jetstream 841 pub async fn start_consuming( 842 &self, 843 cancellation_token: CancellationToken, 844 ) -> Result<(), JetstreamError> { 845 info!("Starting Jetstream consumer"); 846 847 // Load initial slice configurations 848 self.load_slice_configurations().await?; 849 850 // Preload actor cache 851 self.preload_actor_cache().await?; 852 853 // Create and register the event handler 854 let handler = Arc::new(SliceEventHandler { 855 database: self.database.clone(), 856 http_client: self.http_client.clone(), 857 event_count: self.event_count.clone(), 858 actor_cache: self.actor_cache.clone(), 859 lexicon_cache: self.lexicon_cache.clone(), 860 domain_cache: self.domain_cache.clone(), 861 collections_cache: self.collections_cache.clone(), 862 cursor_handler: self.cursor_handler.clone(), 863 slices_list: self.slices_list.clone(), 864 }); 865 866 self.consumer.register_handler(handler).await.map_err(|e| { 867 JetstreamError::ConnectionFailed { 868 message: format!("Failed to register event handler: {}", e), 869 } 870 })?; 871 872 // Start periodic status reporting (with cancellation support) 873 let event_count_for_status = self.event_count.clone(); 874 let cancellation_token_for_status = cancellation_token.clone(); 875 tokio::spawn(async move { 876 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); // Every minute 877 loop { 878 tokio::select! { 879 _ = interval.tick() => { 880 let count = event_count_for_status.load(std::sync::atomic::Ordering::Relaxed); 881 info!( 882 "Jetstream consumer status: {} total events processed", 883 count 884 ); 885 } 886 _ = cancellation_token_for_status.cancelled() => { 887 info!("Status reporting task cancelled"); 888 break; 889 } 890 } 891 } 892 }); 893 894 // Start the consumer 895 info!("Starting Jetstream background consumer..."); 896 let result = self 897 .consumer 898 .run_background(cancellation_token) 899 .await 900 .map_err(|e| JetstreamError::ConnectionFailed { 901 message: format!("Consumer failed: {}", e), 902 }); 903 904 // Force write cursor on shutdown to ensure latest position is persisted 905 if let Some(cursor_handler) = &self.cursor_handler { 906 if let Err(e) = cursor_handler.force_write_cursor().await { 907 error!("Failed to write final cursor position: {}", e); 908 } else { 909 info!("Final cursor position written to database"); 910 } 911 } 912 913 result?; 914 Ok(()) 915 } 916 917 /// Periodically reload slice configurations and actor cache to pick up new slices/collections/actors 918 pub fn start_configuration_reloader(consumer: Arc<Self>) { 919 tokio::spawn(async move { 920 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Reload every 5 minutes 921 interval.tick().await; // Skip first immediate tick 922 923 loop { 924 interval.tick().await; 925 926 if let Err(e) = consumer.load_slice_configurations().await { 927 error!("Failed to reload slice configurations: {}", e); 928 } 929 930 if let Err(e) = consumer.preload_actor_cache().await { 931 error!("Failed to reload actor cache: {}", e); 932 } 933 } 934 }); 935 } 936}