forked from
slices.network/slices
Highly ambitious ATProtocol AppView service and sdks
1use anyhow::Result;
2use async_trait::async_trait;
3use atproto_jetstream::{
4 CancellationToken, Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent,
5};
6use chrono::Utc;
7use reqwest::Client;
8use std::collections::HashSet;
9use std::sync::Arc;
10use tokio::sync::{Mutex, RwLock};
11use tracing::{error, info, warn};
12
13use crate::actor_resolver::resolve_actor_data;
14use crate::cache::{CacheBackend, CacheFactory, SliceCache};
15use crate::database::Database;
16use crate::errors::JetstreamError;
17use crate::jetstream_cursor::PostgresCursorHandler;
18use crate::logging::{LogLevel, Logger};
19use crate::models::{Actor, Record};
20
21pub struct JetstreamConsumer {
22 consumer: Consumer,
23 database: Database,
24 http_client: Client,
25 actor_cache: Arc<Mutex<SliceCache>>,
26 lexicon_cache: Arc<Mutex<SliceCache>>,
27 domain_cache: Arc<Mutex<SliceCache>>,
28 collections_cache: Arc<Mutex<SliceCache>>,
29 pub event_count: Arc<std::sync::atomic::AtomicU64>,
30 cursor_handler: Option<Arc<PostgresCursorHandler>>,
31 slices_list: Arc<RwLock<Vec<String>>>,
32}
33
34// Event handler that implements the EventHandler trait
35struct SliceEventHandler {
36 database: Database,
37 http_client: Client,
38 event_count: Arc<std::sync::atomic::AtomicU64>,
39 actor_cache: Arc<Mutex<SliceCache>>,
40 lexicon_cache: Arc<Mutex<SliceCache>>,
41 domain_cache: Arc<Mutex<SliceCache>>,
42 collections_cache: Arc<Mutex<SliceCache>>,
43 cursor_handler: Option<Arc<PostgresCursorHandler>>,
44 slices_list: Arc<RwLock<Vec<String>>>,
45}
46
47#[async_trait]
48impl EventHandler for SliceEventHandler {
49 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> {
50 let count = self
51 .event_count
52 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
53 + 1;
54
55 if count.is_multiple_of(10000) {
56 info!("Jetstream consumer has processed {} events", count);
57 }
58
59 // Extract and update cursor position from event
60 let time_us = match &event {
61 JetstreamEvent::Commit { time_us, .. } => *time_us,
62 JetstreamEvent::Delete { time_us, .. } => *time_us,
63 JetstreamEvent::Identity { time_us, .. } => *time_us,
64 JetstreamEvent::Account { time_us, .. } => *time_us,
65 };
66
67 if let Some(cursor_handler) = &self.cursor_handler {
68 cursor_handler.update_position(time_us);
69
70 // Periodically write cursor to DB (debounced by handler)
71 if let Err(e) = cursor_handler.maybe_write_cursor().await {
72 error!("Failed to write cursor: {}", e);
73 }
74 }
75
76 match event {
77 JetstreamEvent::Commit { did, commit, .. } => {
78 if let Err(e) = self.handle_commit_event(&did, commit).await {
79 let message = format!("Error handling commit event: {}", e);
80 error!("{}", message);
81 Logger::global().log_jetstream(
82 LogLevel::Error,
83 &message,
84 Some(serde_json::json!({
85 "error": e.to_string(),
86 "did": did,
87 "event_type": "commit"
88 })),
89 );
90 }
91 }
92 JetstreamEvent::Delete { did, commit, .. } => {
93 if let Err(e) = self.handle_delete_event(&did, commit).await {
94 let message = format!("Error handling delete event: {}", e);
95 error!("{}", message);
96 Logger::global().log_jetstream(
97 LogLevel::Error,
98 &message,
99 Some(serde_json::json!({
100 "error": e.to_string(),
101 "did": did,
102 "event_type": "delete"
103 })),
104 );
105 }
106 }
107 _ => {
108 // Ignore other event types (identity, account, etc.)
109 }
110 }
111 Ok(())
112 }
113
114 fn handler_id(&self) -> String {
115 "slice-records-indexer".to_string()
116 }
117}
118
119impl SliceEventHandler {
120 /// Check if DID is an actor for the given slice
121 async fn is_actor_cached(
122 &self,
123 did: &str,
124 slice_uri: &str,
125 ) -> Result<Option<bool>, anyhow::Error> {
126 match self.actor_cache.lock().await.is_actor(did, slice_uri).await {
127 Ok(result) => Ok(result),
128 Err(e) => {
129 warn!(
130 error = ?e,
131 did = did,
132 slice_uri = slice_uri,
133 "Actor cache error"
134 );
135 Ok(None)
136 }
137 }
138 }
139
140 /// Cache that an actor exists
141 async fn cache_actor_exists(&self, did: &str, slice_uri: &str) {
142 if let Err(e) = self
143 .actor_cache
144 .lock()
145 .await
146 .cache_actor_exists(did, slice_uri)
147 .await
148 {
149 warn!(
150 error = ?e,
151 did = did,
152 slice_uri = slice_uri,
153 "Failed to cache actor exists"
154 );
155 }
156 }
157
158 /// Remove actor from cache
159 async fn remove_actor_from_cache(&self, did: &str, slice_uri: &str) {
160 if let Err(e) = self
161 .actor_cache
162 .lock()
163 .await
164 .remove_actor(did, slice_uri)
165 .await
166 {
167 warn!(
168 error = ?e,
169 did = did,
170 slice_uri = slice_uri,
171 "Failed to remove actor from cache"
172 );
173 }
174 }
175
176 /// Get slice collections from cache with database fallback
177 async fn get_slice_collections(
178 &self,
179 slice_uri: &str,
180 ) -> Result<Option<HashSet<String>>, anyhow::Error> {
181 // Try cache first
182 let cache_result = {
183 let mut cache = self.collections_cache.lock().await;
184 cache.get_slice_collections(slice_uri).await
185 };
186
187 match cache_result {
188 Ok(Some(collections)) => Ok(Some(collections)),
189 Ok(None) => {
190 // Cache miss - load from database
191 match self.database.get_slice_collections_list(slice_uri).await {
192 Ok(collections) => {
193 let collections_set: HashSet<String> = collections.into_iter().collect();
194 // Cache the result
195 let _ = self
196 .collections_cache
197 .lock()
198 .await
199 .cache_slice_collections(slice_uri, &collections_set)
200 .await;
201 Ok(Some(collections_set))
202 }
203 Err(e) => Err(e.into()),
204 }
205 }
206 Err(e) => Err(e),
207 }
208 }
209
210 /// Get slice domain from cache with database fallback
211 async fn get_slice_domain(&self, slice_uri: &str) -> Result<Option<String>, anyhow::Error> {
212 // Try cache first
213 let cache_result = {
214 let mut cache = self.domain_cache.lock().await;
215 cache.get_slice_domain(slice_uri).await
216 };
217
218 match cache_result {
219 Ok(Some(domain)) => Ok(Some(domain)),
220 Ok(None) => {
221 // Cache miss - load from database
222 match self.database.get_slice_domain(slice_uri).await {
223 Ok(Some(domain)) => {
224 // Cache the result
225 let _ = self
226 .domain_cache
227 .lock()
228 .await
229 .cache_slice_domain(slice_uri, &domain)
230 .await;
231 Ok(Some(domain))
232 }
233 Ok(None) => Ok(None),
234 Err(e) => Err(e.into()),
235 }
236 }
237 Err(e) => Err(e),
238 }
239 }
240
241 /// Get slice lexicons from cache with database fallback
242 async fn get_slice_lexicons(
243 &self,
244 slice_uri: &str,
245 ) -> Result<Option<Vec<serde_json::Value>>, anyhow::Error> {
246 // Try cache first
247 let cache_result = {
248 let mut cache = self.lexicon_cache.lock().await;
249 cache.get_lexicons(slice_uri).await
250 };
251
252 match cache_result {
253 Ok(Some(lexicons)) => Ok(Some(lexicons)),
254 Ok(None) => {
255 // Cache miss - load from database
256 match self.database.get_lexicons_by_slice(slice_uri).await {
257 Ok(lexicons) if !lexicons.is_empty() => {
258 // Cache the result
259 let _ = self
260 .lexicon_cache
261 .lock()
262 .await
263 .cache_lexicons(slice_uri, &lexicons)
264 .await;
265 Ok(Some(lexicons))
266 }
267 Ok(_) => Ok(None), // Empty lexicons
268 Err(e) => Err(e.into()),
269 }
270 }
271 Err(e) => Err(e),
272 }
273 }
274 async fn handle_commit_event(
275 &self,
276 did: &str,
277 commit: atproto_jetstream::JetstreamEventCommit,
278 ) -> Result<()> {
279 // Get all slices from cached list
280 let slices = self.slices_list.read().await.clone();
281
282 // Process each slice
283 for slice_uri in slices {
284 // Get collections for this slice (with caching)
285 let collections = match self.get_slice_collections(&slice_uri).await {
286 Ok(Some(collections)) => collections,
287 Ok(None) => continue, // No collections for this slice
288 Err(e) => {
289 error!("Failed to get collections for slice {}: {}", slice_uri, e);
290 continue;
291 }
292 };
293
294 if collections.contains(&commit.collection) {
295 // Special handling for network.slices.lexicon records
296 // These should only be indexed to the slice specified in their JSON data
297 if commit.collection == "network.slices.lexicon" {
298 if let Some(target_slice_uri) =
299 commit.record.get("slice").and_then(|v| v.as_str())
300 {
301 // Skip this slice if it's not the target slice for this lexicon
302 if slice_uri != target_slice_uri {
303 continue;
304 }
305 } else {
306 // No target slice specified, skip this lexicon record entirely
307 continue;
308 }
309 }
310 // Get the domain for this slice (with caching)
311 let domain = match self.get_slice_domain(&slice_uri).await {
312 Ok(Some(domain)) => domain,
313 Ok(None) => continue, // No domain, skip
314 Err(e) => {
315 error!("Failed to get domain for slice {}: {}", slice_uri, e);
316 continue;
317 }
318 };
319
320 // Check if this is a primary collection (starts with slice domain)
321 let is_primary_collection = commit.collection.starts_with(&domain);
322
323 // For external collections, check actor status BEFORE expensive validation
324 if !is_primary_collection {
325 let is_actor = match self.is_actor_cached(did, &slice_uri).await {
326 Ok(Some(cached_result)) => cached_result,
327 Ok(None) => {
328 // Cache miss means this DID is not an actor we've synced
329 // For external collections, we only care about actors we've already added
330 false
331 }
332 Err(e) => {
333 error!("Error checking actor status: {}", e);
334 continue;
335 }
336 };
337
338 if !is_actor {
339 // Not an actor - skip validation entirely for external collections
340 continue;
341 }
342 }
343
344 // Get lexicons for validation (after actor check for external collections)
345 let lexicons = match self.get_slice_lexicons(&slice_uri).await {
346 Ok(Some(lexicons)) => lexicons,
347 Ok(None) => {
348 info!(
349 "No lexicons found for slice {} - skipping validation",
350 slice_uri
351 );
352 continue;
353 }
354 Err(e) => {
355 error!("Failed to get lexicons for slice {}: {}", slice_uri, e);
356 continue;
357 }
358 };
359
360 // Validate the record against the slice's lexicons
361 let validation_result = match slices_lexicon::validate_record(
362 lexicons.clone(),
363 &commit.collection,
364 commit.record.clone(),
365 ) {
366 Ok(_) => {
367 info!(
368 "Record validated for collection {} in slice {}",
369 commit.collection, slice_uri
370 );
371 true
372 }
373 Err(e) => {
374 let message = format!(
375 "Validation failed for collection {} in slice {}",
376 commit.collection, slice_uri
377 );
378 error!("{}: {}", message, e);
379 Logger::global().log_jetstream_with_slice(
380 LogLevel::Warn,
381 &message,
382 Some(serde_json::json!({
383 "collection": commit.collection,
384 "slice_uri": slice_uri,
385 "did": did
386 })),
387 Some(&slice_uri),
388 );
389 false
390 }
391 };
392
393 if !validation_result {
394 continue; // Skip this slice if validation fails
395 }
396
397 if is_primary_collection {
398 // Primary collection - ensure actor exists and index ALL records
399 info!(
400 "Primary collection {} for slice {} (domain: {}) - indexing record",
401 commit.collection, slice_uri, domain
402 );
403
404 // Ensure actor exists for primary collections
405 let is_cached =
406 matches!(self.is_actor_cached(did, &slice_uri).await, Ok(Some(_)));
407
408 if !is_cached {
409 // Actor not in cache - create it
410 info!("Creating new actor {} for slice {}", did, slice_uri);
411
412 // Resolve actor data (handle, PDS)
413 match resolve_actor_data(&self.http_client, did).await {
414 Ok(actor_data) => {
415 let actor = Actor {
416 did: actor_data.did.clone(),
417 handle: actor_data.handle,
418 slice_uri: slice_uri.clone(),
419 indexed_at: Utc::now().to_rfc3339(),
420 };
421
422 // Insert into database
423 if let Err(e) = self.database.batch_insert_actors(&[actor]).await {
424 error!("Failed to create actor {}: {}", did, e);
425 } else {
426 // Add to cache after successful database insert
427 self.cache_actor_exists(did, &slice_uri).await;
428 info!("Created actor {} for slice {}", did, slice_uri);
429 }
430 }
431 Err(e) => {
432 error!("Failed to resolve actor data for {}: {}", did, e);
433 }
434 }
435 }
436
437 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey);
438
439 let record = Record {
440 uri: uri.clone(),
441 cid: commit.cid.clone(),
442 did: did.to_string(),
443 collection: commit.collection.clone(),
444 json: commit.record.clone(),
445 indexed_at: Utc::now(),
446 slice_uri: Some(slice_uri.clone()),
447 };
448
449 match self.database.upsert_record(&record).await {
450 Ok(is_insert) => {
451 let message = if is_insert {
452 format!("Record inserted in {}", commit.collection)
453 } else {
454 format!("Record updated in {}", commit.collection)
455 };
456 let operation = if is_insert { "insert" } else { "update" };
457 Logger::global().log_jetstream_with_slice(
458 LogLevel::Info,
459 &message,
460 Some(serde_json::json!({
461 "operation": operation,
462 "collection": commit.collection,
463 "slice_uri": slice_uri,
464 "did": did,
465 "record_type": "primary"
466 })),
467 Some(&slice_uri),
468 );
469 }
470 Err(e) => {
471 let message = "Failed to insert/update record";
472 Logger::global().log_jetstream_with_slice(
473 LogLevel::Error,
474 message,
475 Some(serde_json::json!({
476 "operation": "upsert",
477 "collection": commit.collection,
478 "slice_uri": slice_uri,
479 "did": did,
480 "error": e.to_string(),
481 "record_type": "primary"
482 })),
483 Some(&slice_uri),
484 );
485 return Err(anyhow::anyhow!("Database error: {}", e));
486 }
487 }
488
489 info!(
490 "Successfully indexed {} record from primary collection: {}",
491 commit.operation, uri
492 );
493 break;
494 } else {
495 // External collection - we already checked actor status, so just index
496 info!(
497 "External collection {} - DID {} is actor in slice {} - indexing",
498 commit.collection, did, slice_uri
499 );
500
501 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey);
502
503 let record = Record {
504 uri: uri.clone(),
505 cid: commit.cid.clone(),
506 did: did.to_string(),
507 collection: commit.collection.clone(),
508 json: commit.record.clone(),
509 indexed_at: Utc::now(),
510 slice_uri: Some(slice_uri.clone()),
511 };
512
513 match self.database.upsert_record(&record).await {
514 Ok(is_insert) => {
515 let message = if is_insert {
516 format!("Record inserted in {}", commit.collection)
517 } else {
518 format!("Record updated in {}", commit.collection)
519 };
520 let operation = if is_insert { "insert" } else { "update" };
521 Logger::global().log_jetstream_with_slice(
522 LogLevel::Info,
523 &message,
524 Some(serde_json::json!({
525 "operation": operation,
526 "collection": commit.collection,
527 "slice_uri": slice_uri,
528 "did": did,
529 "record_type": "external"
530 })),
531 Some(&slice_uri),
532 );
533 }
534 Err(e) => {
535 let message = "Failed to insert/update record";
536 Logger::global().log_jetstream_with_slice(
537 LogLevel::Error,
538 message,
539 Some(serde_json::json!({
540 "operation": "upsert",
541 "collection": commit.collection,
542 "slice_uri": slice_uri,
543 "did": did,
544 "error": e.to_string(),
545 "record_type": "external"
546 })),
547 Some(&slice_uri),
548 );
549 return Err(anyhow::anyhow!("Database error: {}", e));
550 }
551 }
552
553 info!(
554 "Successfully indexed {} record from external collection: {}",
555 commit.operation, uri
556 );
557 break;
558 }
559 }
560 }
561
562 Ok(())
563 }
564
565 async fn handle_delete_event(
566 &self,
567 did: &str,
568 commit: atproto_jetstream::JetstreamEventDelete,
569 ) -> Result<()> {
570 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey);
571
572 // Get all slices from cached list
573 let slices = self.slices_list.read().await.clone();
574
575 let mut relevant_slices: Vec<String> = Vec::new();
576
577 for slice_uri in slices {
578 // Get collections for this slice (with caching)
579 let collections = match self.get_slice_collections(&slice_uri).await {
580 Ok(Some(collections)) => collections,
581 Ok(None) => continue, // No collections for this slice
582 Err(e) => {
583 error!("Failed to get collections for slice {}: {}", slice_uri, e);
584 continue;
585 }
586 };
587
588 if !collections.contains(&commit.collection) {
589 continue;
590 }
591
592 // Get the domain for this slice (with caching)
593 let domain = match self.get_slice_domain(&slice_uri).await {
594 Ok(Some(domain)) => domain,
595 Ok(None) => continue, // No domain, skip
596 Err(e) => {
597 error!("Failed to get domain for slice {}: {}", slice_uri, e);
598 continue;
599 }
600 };
601
602 // Check if this is a primary collection (starts with slice domain)
603 let is_primary_collection = commit.collection.starts_with(&domain);
604
605 if is_primary_collection {
606 // Primary collection - always process deletes
607 relevant_slices.push(slice_uri.clone());
608 } else {
609 // External collection - only process if DID is an actor in this slice
610 let is_actor = match self.is_actor_cached(did, &slice_uri).await {
611 Ok(Some(cached_result)) => cached_result,
612 _ => false,
613 };
614 if is_actor {
615 relevant_slices.push(slice_uri.clone());
616 }
617 }
618 }
619
620 if relevant_slices.is_empty() {
621 // No relevant slices found, skip deletion
622 return Ok(());
623 }
624
625 // Handle cascade deletion before deleting the record
626 if let Err(e) = self.database.handle_cascade_deletion(&uri, &commit.collection).await {
627 warn!("Cascade deletion failed for {}: {}", uri, e);
628 }
629
630 // Delete the record and log only for relevant slices
631 match self.database.delete_record_by_uri(&uri, None).await {
632 Ok(rows_affected) => {
633 if rows_affected > 0 {
634 info!(
635 "Deleted record: {} ({} rows) for {} slice(s)",
636 uri,
637 rows_affected,
638 relevant_slices.len()
639 );
640 let message = format!("Record deleted from {}", commit.collection);
641
642 // Log to each relevant slice and check if actor cleanup is needed
643 for slice_uri in &relevant_slices {
644 Logger::global().log_jetstream_with_slice(
645 LogLevel::Info,
646 &message,
647 Some(serde_json::json!({
648 "operation": "delete",
649 "collection": commit.collection,
650 "did": did,
651 "uri": uri,
652 "rows_affected": rows_affected
653 })),
654 Some(slice_uri),
655 );
656 }
657
658 // Check if actor should be cleaned up (no more records)
659 for slice_uri in &relevant_slices {
660 match self.database.actor_has_records(did, slice_uri).await {
661 Ok(has_records) => {
662 if !has_records {
663 // No more records for this actor in this slice - clean up
664 match self.database.delete_actor(did, slice_uri).await {
665 Ok(deleted) => {
666 if deleted > 0 {
667 info!(
668 "Cleaned up actor {} from slice {} (no records remaining)",
669 did, slice_uri
670 );
671 // Remove from cache
672 self.remove_actor_from_cache(did, slice_uri).await;
673 }
674 }
675 Err(e) => {
676 error!(
677 "Failed to delete actor {} from slice {}: {}",
678 did, slice_uri, e
679 );
680 }
681 }
682 }
683 }
684 Err(e) => {
685 error!(
686 "Failed to check if actor {} has records in slice {}: {}",
687 did, slice_uri, e
688 );
689 }
690 }
691 }
692 }
693 }
694 Err(e) => {
695 let message = "Failed to delete record";
696 error!("{}: {}", message, e);
697
698 // Log error to each relevant slice
699 for slice_uri in relevant_slices {
700 Logger::global().log_jetstream_with_slice(
701 LogLevel::Error,
702 message,
703 Some(serde_json::json!({
704 "operation": "delete",
705 "collection": commit.collection,
706 "did": did,
707 "uri": uri,
708 "error": e.to_string()
709 })),
710 Some(&slice_uri),
711 );
712 }
713 }
714 }
715
716 Ok(())
717 }
718}
719
720impl JetstreamConsumer {
721 /// Create a new Jetstream consumer with optional cursor support and Redis cache
722 ///
723 /// # Arguments
724 /// * `database` - Database connection for slice configurations and record storage
725 /// * `jetstream_hostname` - Optional custom jetstream hostname
726 /// * `cursor_handler` - Optional cursor handler for resumable event processing
727 /// * `initial_cursor` - Optional starting cursor position (time_us) to resume from
728 /// * `redis_url` - Optional Redis URL for caching (falls back to in-memory if not provided)
729 pub async fn new(
730 database: Database,
731 jetstream_hostname: Option<String>,
732 cursor_handler: Option<Arc<PostgresCursorHandler>>,
733 initial_cursor: Option<i64>,
734 redis_url: Option<String>,
735 ) -> Result<Self, JetstreamError> {
736 let config = ConsumerTaskConfig {
737 user_agent: "slice-server/1.0".to_string(),
738 compression: false,
739 zstd_dictionary_location: String::new(),
740 jetstream_hostname: jetstream_hostname
741 .unwrap_or_else(|| "jetstream1.us-east.bsky.network".to_string()),
742 collections: Vec::new(),
743 dids: Vec::new(),
744 max_message_size_bytes: None,
745 cursor: initial_cursor,
746 require_hello: true,
747 };
748
749 let consumer = Consumer::new(config);
750 let http_client = Client::new();
751
752 // Determine cache backend based on Redis URL
753 let cache_backend = if let Some(redis_url) = redis_url {
754 CacheBackend::Redis {
755 url: redis_url,
756 ttl_seconds: None,
757 }
758 } else {
759 CacheBackend::InMemory { ttl_seconds: None }
760 };
761
762 // Create cache instances
763 let actor_cache = Arc::new(Mutex::new(
764 CacheFactory::create_slice_cache(cache_backend.clone())
765 .await
766 .map_err(|e| JetstreamError::ConnectionFailed {
767 message: format!("Failed to create actor cache: {}", e),
768 })?,
769 ));
770
771 let lexicon_cache = Arc::new(Mutex::new(
772 CacheFactory::create_slice_cache(cache_backend.clone())
773 .await
774 .map_err(|e| JetstreamError::ConnectionFailed {
775 message: format!("Failed to create lexicon cache: {}", e),
776 })?,
777 ));
778
779 let domain_cache = Arc::new(Mutex::new(
780 CacheFactory::create_slice_cache(cache_backend.clone())
781 .await
782 .map_err(|e| JetstreamError::ConnectionFailed {
783 message: format!("Failed to create domain cache: {}", e),
784 })?,
785 ));
786
787 let collections_cache = Arc::new(Mutex::new(
788 CacheFactory::create_slice_cache(cache_backend)
789 .await
790 .map_err(|e| JetstreamError::ConnectionFailed {
791 message: format!("Failed to create collections cache: {}", e),
792 })?,
793 ));
794
795 Ok(Self {
796 consumer,
797 database,
798 http_client,
799 actor_cache,
800 lexicon_cache,
801 domain_cache,
802 collections_cache,
803 event_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
804 cursor_handler,
805 slices_list: Arc::new(RwLock::new(Vec::new())),
806 })
807 }
808
809 /// Load slice configurations
810 pub async fn load_slice_configurations(&self) -> Result<(), JetstreamError> {
811 info!("Loading slice configurations...");
812
813 // Get all slices and update cached list
814 let slices = self.database.get_all_slices().await?;
815 *self.slices_list.write().await = slices.clone();
816 info!("Found {} total slices in database", slices.len());
817
818 Ok(())
819 }
820
821 /// Preload actor cache to avoid database hits during event processing
822 async fn preload_actor_cache(&self) -> Result<(), JetstreamError> {
823 info!("Preloading actor cache...");
824
825 let actors = self.database.get_all_actors().await?;
826 info!("Found {} actors to cache", actors.len());
827
828 match self.actor_cache.lock().await.preload_actors(actors).await {
829 Ok(_) => {
830 info!("Actor cache preloaded successfully");
831 Ok(())
832 }
833 Err(e) => {
834 warn!(error = ?e, "Failed to preload actors to cache");
835 Ok(()) // Don't fail startup if preload fails
836 }
837 }
838 }
839
840 /// Start consuming events from Jetstream
841 pub async fn start_consuming(
842 &self,
843 cancellation_token: CancellationToken,
844 ) -> Result<(), JetstreamError> {
845 info!("Starting Jetstream consumer");
846
847 // Load initial slice configurations
848 self.load_slice_configurations().await?;
849
850 // Preload actor cache
851 self.preload_actor_cache().await?;
852
853 // Create and register the event handler
854 let handler = Arc::new(SliceEventHandler {
855 database: self.database.clone(),
856 http_client: self.http_client.clone(),
857 event_count: self.event_count.clone(),
858 actor_cache: self.actor_cache.clone(),
859 lexicon_cache: self.lexicon_cache.clone(),
860 domain_cache: self.domain_cache.clone(),
861 collections_cache: self.collections_cache.clone(),
862 cursor_handler: self.cursor_handler.clone(),
863 slices_list: self.slices_list.clone(),
864 });
865
866 self.consumer.register_handler(handler).await.map_err(|e| {
867 JetstreamError::ConnectionFailed {
868 message: format!("Failed to register event handler: {}", e),
869 }
870 })?;
871
872 // Start periodic status reporting (with cancellation support)
873 let event_count_for_status = self.event_count.clone();
874 let cancellation_token_for_status = cancellation_token.clone();
875 tokio::spawn(async move {
876 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); // Every minute
877 loop {
878 tokio::select! {
879 _ = interval.tick() => {
880 let count = event_count_for_status.load(std::sync::atomic::Ordering::Relaxed);
881 info!(
882 "Jetstream consumer status: {} total events processed",
883 count
884 );
885 }
886 _ = cancellation_token_for_status.cancelled() => {
887 info!("Status reporting task cancelled");
888 break;
889 }
890 }
891 }
892 });
893
894 // Start the consumer
895 info!("Starting Jetstream background consumer...");
896 let result = self
897 .consumer
898 .run_background(cancellation_token)
899 .await
900 .map_err(|e| JetstreamError::ConnectionFailed {
901 message: format!("Consumer failed: {}", e),
902 });
903
904 // Force write cursor on shutdown to ensure latest position is persisted
905 if let Some(cursor_handler) = &self.cursor_handler {
906 if let Err(e) = cursor_handler.force_write_cursor().await {
907 error!("Failed to write final cursor position: {}", e);
908 } else {
909 info!("Final cursor position written to database");
910 }
911 }
912
913 result?;
914 Ok(())
915 }
916
917 /// Periodically reload slice configurations and actor cache to pick up new slices/collections/actors
918 pub fn start_configuration_reloader(consumer: Arc<Self>) {
919 tokio::spawn(async move {
920 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Reload every 5 minutes
921 interval.tick().await; // Skip first immediate tick
922
923 loop {
924 interval.tick().await;
925
926 if let Err(e) = consumer.load_slice_configurations().await {
927 error!("Failed to reload slice configurations: {}", e);
928 }
929
930 if let Err(e) = consumer.preload_actor_cache().await {
931 error!("Failed to reload actor cache: {}", e);
932 }
933 }
934 });
935 }
936}