forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use crate::webhook::discord::{self, model::WebhookEnvelope};
2use anyhow::Error;
3use std::{
4 env,
5 sync::Arc,
6 time::{Duration, Instant},
7};
8use tokio::{sync::Mutex, time::interval};
9
10#[derive(Clone)]
11pub struct AppState {
12 pub redis: redis::Client,
13 pub queue_key: String,
14}
15
16pub async fn start_worker(state: Arc<Mutex<AppState>>) -> Result<(), Error> {
17 let max_rps: u32 = env::var("MAX_REQUESTS_PER_SEC")
18 .ok()
19 .and_then(|s| s.parse().ok())
20 .unwrap_or(5);
21 let max_embeds_per: usize = env::var("MAX_EMBEDS_PER_REQUEST")
22 .ok()
23 .and_then(|s| s.parse().ok())
24 .unwrap_or(10);
25 let batch_window_ms: u64 = env::var("BATCH_WINDOW_MS")
26 .ok()
27 .and_then(|s| s.parse().ok())
28 .unwrap_or(400);
29 let discord_webhook_url = env::var("DISCORD_WEBHOOK_URL").unwrap_or(String::new());
30
31 tokio::spawn(run_worker(
32 state.clone(),
33 discord_webhook_url,
34 max_rps,
35 max_embeds_per,
36 Duration::from_millis(batch_window_ms),
37 ));
38
39 Ok(())
40}
41
42async fn run_worker(
43 st: Arc<Mutex<AppState>>,
44 discord_webhook_url: String,
45 max_rps: u32,
46 max_embeds_per: usize,
47 batch_window: Duration,
48) {
49 let http = reqwest::Client::builder()
50 .user_agent("rocksky-discord-bridge/0.1")
51 .build()
52 .expect("http client");
53
54 let mut tokens = max_rps as i32;
55 let mut refill = interval(Duration::from_secs(1));
56 refill.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
57
58 loop {
59 tokio::select! {
60 _ = refill.tick() => {
61 tokens = (tokens + max_rps as i32).min(max_rps as i32);
62 }
63 _ = tokio::time::sleep(Duration::from_millis(10)) => { /* tick */ }
64 }
65
66 if tokens <= 0 {
67 continue;
68 }
69
70 let start = Instant::now();
71 let mut embeds = Vec::with_capacity(max_embeds_per);
72
73 while embeds.len() < max_embeds_per && start.elapsed() < batch_window {
74 match brpop_once(st.clone(), 1).await {
75 Ok(Some(json_str)) => {
76 if let Ok(env) = serde_json::from_str::<WebhookEnvelope>(&json_str) {
77 embeds.push(discord::embed_from_scrobble(&env.data, &env.id));
78 }
79 }
80 Ok(None) => break,
81 Err(e) => {
82 tracing::error!(error = %e, "Failed to pop from Redis");
83 break;
84 }
85 }
86 }
87
88 if embeds.is_empty() {
89 tokio::time::sleep(Duration::from_millis(50)).await;
90 continue;
91 }
92
93 tokens -= 1;
94
95 if let Err(e) = discord::post_embeds(&http, &discord_webhook_url, embeds).await {
96 tracing::error!(error = %e, "Failed to post to Discord webhook");
97 }
98 }
99}
100
101async fn brpop_once(
102 state: Arc<Mutex<AppState>>,
103 timeout_secs: u64,
104) -> redis::RedisResult<Option<String>> {
105 let AppState {
106 redis: client,
107 queue_key: key,
108 } = &*state.lock().await;
109 let mut conn = client.get_multiplexed_async_connection().await?;
110 let res: Option<(String, String)> = redis::cmd("BRPOP")
111 .arg(key)
112 .arg(timeout_secs as usize)
113 .query_async(&mut conn)
114 .await?;
115 Ok(res.map(|(_, v)| v))
116}
117
118pub async fn push_to_queue(
119 state: Arc<Mutex<AppState>>,
120 item: &WebhookEnvelope,
121) -> redis::RedisResult<()> {
122 let payload = serde_json::to_string(item).unwrap();
123 let AppState {
124 redis: client,
125 queue_key: key,
126 } = &*state.lock().await;
127 let mut conn = client.get_multiplexed_async_connection().await?;
128 let _: () = redis::pipe()
129 .cmd("RPUSH")
130 .arg(key)
131 .arg(payload)
132 .ignore()
133 .cmd("EXPIRE")
134 .arg(key)
135 .arg(60 * 60 * 24) // 24h
136 .query_async(&mut conn)
137 .await?;
138 Ok(())
139}