forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use std::{
2 collections::HashMap,
3 env,
4 sync::{atomic::AtomicBool, Arc, Mutex},
5 thread,
6};
7
8use anyhow::Error;
9use async_nats::connect;
10use dotenv::dotenv;
11use owo_colors::OwoColorize;
12use rocksky_spotify::cache::Cache;
13use rocksky_spotify::{find_spotify_user, find_spotify_users, watch_currently_playing};
14use sqlx::postgres::PgPoolOptions;
15use tokio_stream::StreamExt;
16
17#[tokio::main]
18async fn main() -> Result<(), Box<dyn std::error::Error>> {
19 dotenv().ok();
20 let cache = Cache::new()?;
21 let pool = PgPoolOptions::new()
22 .max_connections(5)
23 .connect(&env::var("XATA_POSTGRES_URL")?)
24 .await?;
25
26 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
27 let nc = connect(&addr).await?;
28 println!("Connected to NATS server at {}", addr.bright_green());
29
30 let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?;
31 println!("Subscribed to {}", "rocksky.spotify.user".bright_green());
32
33 let users = find_spotify_users(&pool, 0, 100).await?;
34 println!("Found {} users", users.len().bright_green());
35
36 // Shared HashMap to manage threads and their stop flags
37 let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> =
38 Arc::new(Mutex::new(HashMap::new()));
39
40 // Start threads for all users
41 for user in users {
42 let email = user.0.clone();
43 let token = user.1.clone();
44 let did = user.2.clone();
45 let stop_flag = Arc::new(AtomicBool::new(false));
46 let cache = cache.clone();
47 let nc = nc.clone();
48 let thread_map = Arc::clone(&thread_map);
49
50 thread_map
51 .lock()
52 .unwrap()
53 .insert(email.clone(), Arc::clone(&stop_flag));
54
55 thread::spawn(move || {
56 let rt = tokio::runtime::Runtime::new().unwrap();
57 match rt.block_on(async {
58 watch_currently_playing(email.clone(), token, did, stop_flag, cache.clone())
59 .await?;
60 Ok::<(), Error>(())
61 }) {
62 Ok(_) => {}
63 Err(e) => {
64 println!(
65 "{} Error starting thread for user: {} - {}",
66 format!("[{}]", email).bright_green(),
67 email.bright_green(),
68 e.to_string().bright_red()
69 );
70
71 // If there's an error, publish a message to restart the thread
72 match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) {
73 Ok(_) => {
74 println!(
75 "{} Published message to restart thread for user: {}",
76 format!("[{}]", email).bright_green(),
77 email.bright_green()
78 );
79 }
80 Err(e) => {
81 println!(
82 "{} Error publishing message to restart thread: {}",
83 format!("[{}]", email).bright_green(),
84 e.to_string().bright_red()
85 );
86 }
87 }
88 }
89 }
90 });
91 }
92
93 // Handle subscription messages
94 while let Some(message) = sub.next().await {
95 let user_id = String::from_utf8(message.payload.to_vec()).unwrap();
96 println!(
97 "Received message to restart thread for user: {}",
98 user_id.bright_green()
99 );
100
101 let mut thread_map = thread_map.lock().unwrap();
102
103 // Check if the user exists in the thread map
104 if let Some(stop_flag) = thread_map.get(&user_id) {
105 // Stop the existing thread
106 stop_flag.store(true, std::sync::atomic::Ordering::Relaxed);
107
108 // Create a new stop flag and restart the thread
109 let new_stop_flag = Arc::new(AtomicBool::new(false));
110 thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag));
111
112 let user = find_spotify_user(&pool, &user_id).await?;
113
114 if user.is_none() {
115 println!(
116 "Spotify user not found: {}, skipping",
117 user_id.bright_green()
118 );
119 continue;
120 }
121
122 let user = user.unwrap();
123
124 let email = user.0.clone();
125 let token = user.1.clone();
126 let did = user.2.clone();
127 let cache = cache.clone();
128
129 thread::spawn(move || {
130 let rt = tokio::runtime::Runtime::new().unwrap();
131 match rt.block_on(async {
132 watch_currently_playing(
133 email.clone(),
134 token,
135 did,
136 new_stop_flag,
137 cache.clone(),
138 )
139 .await?;
140 Ok::<(), Error>(())
141 }) {
142 Ok(_) => {}
143 Err(e) => {
144 println!(
145 "{} Error restarting thread for user: {} - {}",
146 format!("[{}]", email).bright_green(),
147 email.bright_green(),
148 e.to_string().bright_red()
149 );
150 }
151 }
152 });
153
154 println!("Restarted thread for user: {}", user_id.bright_green());
155 } else {
156 println!(
157 "No thread found for user: {}, starting new thread",
158 user_id.bright_green()
159 );
160 let user = find_spotify_user(&pool, &user_id).await?;
161 if let Some(user) = user {
162 let email = user.0.clone();
163 let token = user.1.clone();
164 let did = user.2.clone();
165 let stop_flag = Arc::new(AtomicBool::new(false));
166 let cache = cache.clone();
167 let nc = nc.clone();
168
169 thread_map.insert(email.clone(), Arc::clone(&stop_flag));
170
171 thread::spawn(move || {
172 let rt = tokio::runtime::Runtime::new().unwrap();
173 match rt.block_on(async {
174 watch_currently_playing(
175 email.clone(),
176 token,
177 did,
178 stop_flag,
179 cache.clone(),
180 )
181 .await?;
182 Ok::<(), Error>(())
183 }) {
184 Ok(_) => {}
185 Err(e) => {
186 println!(
187 "{} Error starting thread for user: {} - {}",
188 format!("[{}]", email).bright_green(),
189 email.bright_green(),
190 e.to_string().bright_red()
191 );
192 match rt
193 .block_on(nc.publish("rocksky.spotify.user", email.clone().into()))
194 {
195 Ok(_) => {}
196 Err(e) => {
197 println!(
198 "{} Error publishing message to restart thread: {}",
199 format!("[{}]", email).bright_green(),
200 e.to_string().bright_red()
201 );
202 }
203 }
204 }
205 }
206 });
207 }
208 }
209 }
210
211 Ok(())
212}