tangled
alpha
login
or
join now
danabra.mov
/
slices
forked from
slices.network/slices
0
fork
atom
Highly ambitious ATProtocol AppView service and sdks
0
fork
atom
overview
issues
pulls
pipelines
use redis for sync pub/sub if available
chadtmiller.com
4 months ago
7b9f4d02
466a7481
+131
-3
3 changed files
expand all
collapse all
unified
split
api
Cargo.toml
src
graphql
schema_ext
sync.rs
main.rs
+2
-2
api/Cargo.toml
···
63
63
sqlxmq = "0.6"
64
64
regex = "1.11.2"
65
65
66
66
-
# Redis for caching
67
67
-
redis = { version = "0.32", features = ["tokio-comp", "connection-manager"] }
66
66
+
# Redis for caching and pub/sub
67
67
+
redis = { version = "0.32", features = ["tokio-comp", "connection-manager", "aio"] }
68
68
69
69
# GraphQL server
70
70
async-graphql = { version = "7.0", features = ["dynamic-schema", "dataloader"] }
+125
-1
api/src/graphql/schema_ext/sync.rs
···
10
10
use uuid::Uuid;
11
11
use base64::engine::general_purpose;
12
12
use base64::Engine;
13
13
+
use redis::aio::ConnectionManager;
14
14
+
use redis::{Client, AsyncCommands};
15
15
+
use futures_util::StreamExt;
13
16
14
17
/// Global broadcast channel for sync job status updates
15
18
/// This allows real-time job status streaming to GraphQL subscriptions
16
19
static JOB_CHANNEL: OnceLock<Arc<Mutex<broadcast::Sender<JobStatus>>>> = OnceLock::new();
20
20
+
21
21
+
/// Global Redis client for cross-process pub/sub (optional)
22
22
+
static REDIS_CLIENT: OnceLock<Option<Client>> = OnceLock::new();
17
23
18
24
/// Initialize or get the global job channel
19
25
fn get_job_channel() -> Arc<Mutex<broadcast::Sender<JobStatus>>> {
···
27
33
28
34
/// Publish a sync job status update to subscribers
29
35
pub async fn publish_sync_job_update(job_status: JobStatus) {
36
36
+
// Publish to in-memory broadcast channel (for same-process subscribers)
30
37
let sender = get_job_channel();
31
38
let sender_lock = sender.lock().await;
32
32
-
let _ = sender_lock.send(job_status); // Ignore errors if no subscribers
39
39
+
let _ = sender_lock.send(job_status.clone()); // Ignore errors if no subscribers
40
40
+
drop(sender_lock);
41
41
+
42
42
+
// Also publish to Redis for cross-process communication (if Redis is configured)
43
43
+
if let Some(Some(client)) = REDIS_CLIENT.get() {
44
44
+
if let Err(e) = publish_to_redis(client, &job_status).await {
45
45
+
tracing::warn!("Failed to publish job status to Redis: {}", e);
46
46
+
}
47
47
+
}
48
48
+
}
49
49
+
50
50
+
/// Publish job status to Redis for cross-process communication
51
51
+
async fn publish_to_redis(client: &Client, job_status: &JobStatus) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
52
52
+
let mut conn = ConnectionManager::new(client.clone()).await?;
53
53
+
let payload = serde_json::to_string(job_status)?;
54
54
+
let _: () = conn.publish("sync_job_updates", payload).await?;
55
55
+
Ok(())
33
56
}
34
57
35
58
/// Container for JobStatus to implement Any trait for GraphQL
···
837
860
.description("Delete a sync job from the database")
838
861
)
839
862
}
863
863
+
864
864
+
/// Initialize Redis pub/sub for sync job updates
865
865
+
///
866
866
+
/// This function should be called once at application startup.
867
867
+
/// It initializes the Redis client and starts a background task to listen for
868
868
+
/// job updates from other processes (e.g., worker processes).
869
869
+
///
870
870
+
/// # Arguments
871
871
+
/// * `redis_url` - Optional Redis connection URL. If None, Redis pub/sub is disabled.
872
872
+
pub fn initialize_redis_pubsub(redis_url: Option<String>) {
873
873
+
// Initialize Redis client (or None if not configured)
874
874
+
let client = redis_url.and_then(|url| {
875
875
+
match Client::open(url.as_str()) {
876
876
+
Ok(client) => {
877
877
+
tracing::info!("Initialized Redis client for sync job pub/sub");
878
878
+
Some(client)
879
879
+
}
880
880
+
Err(e) => {
881
881
+
tracing::error!("Failed to create Redis client for sync job pub/sub: {}", e);
882
882
+
None
883
883
+
}
884
884
+
}
885
885
+
});
886
886
+
887
887
+
let has_redis = client.is_some();
888
888
+
REDIS_CLIENT.get_or_init(|| client);
889
889
+
890
890
+
// Start Redis subscription listener task if Redis is available
891
891
+
if has_redis {
892
892
+
start_redis_listener();
893
893
+
} else {
894
894
+
tracing::info!("Redis not configured - sync job updates will use in-memory broadcast only");
895
895
+
}
896
896
+
}
897
897
+
898
898
+
/// Start a background task that subscribes to Redis and forwards messages to the in-memory broadcast channel
899
899
+
fn start_redis_listener() {
900
900
+
tokio::spawn(async {
901
901
+
tracing::info!("Starting Redis subscription listener for sync job updates");
902
902
+
903
903
+
loop {
904
904
+
// Get Redis client
905
905
+
let client = match REDIS_CLIENT.get() {
906
906
+
Some(Some(client)) => client,
907
907
+
_ => {
908
908
+
tracing::error!("Redis client not available for subscription");
909
909
+
return;
910
910
+
}
911
911
+
};
912
912
+
913
913
+
// Connect and subscribe
914
914
+
match subscribe_to_redis(client).await {
915
915
+
Ok(_) => {
916
916
+
tracing::warn!("Redis subscription ended, reconnecting in 5 seconds...");
917
917
+
}
918
918
+
Err(e) => {
919
919
+
tracing::error!("Redis subscription error: {}, reconnecting in 5 seconds...", e);
920
920
+
}
921
921
+
}
922
922
+
923
923
+
// Wait before reconnecting
924
924
+
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
925
925
+
}
926
926
+
});
927
927
+
}
928
928
+
929
929
+
/// Subscribe to Redis channel and forward messages to in-memory broadcast
930
930
+
async fn subscribe_to_redis(client: &Client) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
931
931
+
// Create a pub/sub connection from the client
932
932
+
let mut pubsub = client.get_async_pubsub().await?;
933
933
+
934
934
+
pubsub.subscribe("sync_job_updates").await?;
935
935
+
tracing::info!("Subscribed to Redis channel: sync_job_updates");
936
936
+
937
937
+
// Get the in-memory broadcast sender
938
938
+
let sender = get_job_channel();
939
939
+
940
940
+
loop {
941
941
+
let msg = pubsub.on_message().next().await;
942
942
+
if let Some(msg) = msg {
943
943
+
let payload: String = msg.get_payload()?;
944
944
+
945
945
+
// Deserialize JobStatus from JSON
946
946
+
match serde_json::from_str::<JobStatus>(&payload) {
947
947
+
Ok(job_status) => {
948
948
+
// Forward to in-memory broadcast channel
949
949
+
let sender_lock = sender.lock().await;
950
950
+
if let Err(e) = sender_lock.send(job_status.clone()) {
951
951
+
tracing::debug!("No local subscribers for job update: {}", e);
952
952
+
}
953
953
+
drop(sender_lock);
954
954
+
955
955
+
tracing::debug!("Forwarded job update from Redis: job_id={}", job_status.job_id);
956
956
+
}
957
957
+
Err(e) => {
958
958
+
tracing::warn!("Failed to deserialize job status from Redis: {}", e);
959
959
+
}
960
960
+
}
961
961
+
}
962
962
+
}
963
963
+
}
+4
api/src/main.rs
···
109
109
// Start GraphQL PubSub cleanup task
110
110
graphql::pubsub::start_cleanup_task();
111
111
112
112
+
// Initialize Redis pub/sub for cross-process sync job updates
113
113
+
let redis_url = env::var("REDIS_URL").ok();
114
114
+
graphql::schema_ext::sync::initialize_redis_pubsub(redis_url);
115
115
+
112
116
// Detect process type from environment (supports both PROCESS_TYPE and FLY_PROCESS_GROUP)
113
117
let process_type = env::var("PROCESS_TYPE")
114
118
.or_else(|_| env::var("FLY_PROCESS_GROUP"))