//! PubSub infrastructure for broadcasting GraphQL subscription events //! //! This module provides a publish-subscribe mechanism for broadcasting record //! updates from the Jetstream consumer to active GraphQL subscriptions. use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{RwLock, broadcast}; use tracing::{debug, info}; /// Event broadcast when a record is created or updated #[derive(Clone, Debug, Serialize, Deserialize)] pub struct RecordUpdateEvent { pub uri: String, pub cid: String, pub did: String, pub collection: String, pub value: serde_json::Value, pub slice_uri: String, pub indexed_at: String, pub operation: RecordOperation, } /// Type of record operation #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub enum RecordOperation { Create, Update, Delete, } /// PubSub manager for broadcasting events to subscribers /// /// Each slice has its own broadcast channel to avoid cross-slice event leaking. /// Channels are created lazily when the first subscriber for a slice connects. pub struct GraphQLPubSub { /// Map of slice_uri -> broadcast sender /// Using broadcast channel allows multiple subscribers per slice channels: Arc>>>, /// Channel capacity (number of events to buffer) capacity: usize, } impl GraphQLPubSub { /// Create a new PubSub manager /// /// # Arguments /// * `capacity` - Number of events to buffer per slice (default: 1000) pub fn new(capacity: usize) -> Self { info!("Initializing GraphQL PubSub with capacity {}", capacity); Self { channels: Arc::new(RwLock::new(HashMap::new())), capacity, } } /// Publish an event to all subscribers of a slice /// /// If no subscribers exist, the event is dropped silently. pub async fn publish(&self, event: RecordUpdateEvent) { let slice_uri = event.slice_uri.clone(); let channels = self.channels.read().await; if let Some(sender) = channels.get(&slice_uri) { // Try to send, ignore if no active receivers match sender.send(event.clone()) { Ok(receiver_count) => { debug!( "Published {} event for {} to {} subscriber(s)", match event.operation { RecordOperation::Create => "CREATE", RecordOperation::Update => "UPDATE", RecordOperation::Delete => "DELETE", }, event.collection, receiver_count ); } Err(_) => { // No receivers, which is fine debug!("No active subscribers for slice {}", slice_uri); } } } } /// Subscribe to events for a specific slice /// /// Returns a receiver that will receive all future events for the slice. /// Creates a new broadcast channel if one doesn't exist yet. pub async fn subscribe(&self, slice_uri: &str) -> broadcast::Receiver { let mut channels = self.channels.write().await; let sender = channels.entry(slice_uri.to_string()).or_insert_with(|| { info!("Creating new broadcast channel for slice: {}", slice_uri); let (tx, _) = broadcast::channel(self.capacity); tx }); sender.subscribe() } /// Get statistics about active channels and subscribers pub async fn stats(&self) -> PubSubStats { let channels = self.channels.read().await; PubSubStats { active_channels: channels.len(), total_subscribers: channels.values().map(|s| s.receiver_count()).sum(), } } /// Clean up channels with no subscribers /// /// Should be called periodically to prevent memory leaks pub async fn cleanup_empty_channels(&self) { let mut channels = self.channels.write().await; let before_count = channels.len(); channels.retain(|slice_uri, sender| { let has_subscribers = sender.receiver_count() > 0; if !has_subscribers { debug!("Removing empty broadcast channel for slice: {}", slice_uri); } has_subscribers }); let removed = before_count - channels.len(); if removed > 0 { info!("Cleaned up {} empty broadcast channel(s)", removed); } } } /// Statistics about the PubSub system #[derive(Debug, Clone)] pub struct PubSubStats { pub active_channels: usize, pub total_subscribers: usize, } impl Default for GraphQLPubSub { fn default() -> Self { Self::new(1000) } } // Global PubSub instance // This is initialized once at application startup and shared across // the Jetstream consumer and GraphQL subscription handlers. lazy_static::lazy_static! { pub static ref PUBSUB: GraphQLPubSub = GraphQLPubSub::default(); } /// Start periodic cleanup task for empty channels pub fn start_cleanup_task() { tokio::spawn(async { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Every 5 minutes loop { interval.tick().await; PUBSUB.cleanup_empty_channels().await; let stats = PUBSUB.stats().await; info!( "PubSub stats: {} active channels, {} total subscribers", stats.active_channels, stats.total_subscribers ); } }); } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_pubsub_broadcast() { let pubsub = GraphQLPubSub::new(100); // Subscribe to events let mut rx = pubsub.subscribe("test://slice").await; // Publish an event let event = RecordUpdateEvent { uri: "at://did:plc:test/app.test/123".to_string(), cid: "bafytest".to_string(), did: "did:plc:test".to_string(), collection: "app.test".to_string(), value: serde_json::json!({"text": "Hello"}), slice_uri: "test://slice".to_string(), indexed_at: "2024-01-01T00:00:00Z".to_string(), operation: RecordOperation::Create, }; pubsub.publish(event.clone()).await; // Receive the event let received = rx.recv().await.unwrap(); assert_eq!(received.uri, event.uri); assert_eq!(received.collection, event.collection); } #[tokio::test] async fn test_multiple_subscribers() { let pubsub = GraphQLPubSub::new(100); let mut rx1 = pubsub.subscribe("test://slice").await; let mut rx2 = pubsub.subscribe("test://slice").await; let event = RecordUpdateEvent { uri: "at://did:plc:test/app.test/123".to_string(), cid: "bafytest".to_string(), did: "did:plc:test".to_string(), collection: "app.test".to_string(), value: serde_json::json!({"text": "Hello"}), slice_uri: "test://slice".to_string(), indexed_at: "2024-01-01T00:00:00Z".to_string(), operation: RecordOperation::Create, }; pubsub.publish(event.clone()).await; // Both subscribers should receive the event let received1 = rx1.recv().await.unwrap(); let received2 = rx2.recv().await.unwrap(); assert_eq!(received1.uri, event.uri); assert_eq!(received2.uri, event.uri); } #[tokio::test] async fn test_cleanup_empty_channels() { let pubsub = GraphQLPubSub::new(100); // Create a subscriber and drop it { let _rx = pubsub.subscribe("test://slice").await; assert_eq!(pubsub.stats().await.active_channels, 1); } // Cleanup should remove the empty channel pubsub.cleanup_empty_channels().await; assert_eq!(pubsub.stats().await.active_channels, 0); } }