A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at fix/spotify 212 lines 7.9 kB view raw
1use std::{ 2 collections::HashMap, 3 env, 4 sync::{atomic::AtomicBool, Arc, Mutex}, 5 thread, 6}; 7 8use anyhow::Error; 9use async_nats::connect; 10use dotenv::dotenv; 11use owo_colors::OwoColorize; 12use rocksky_spotify::cache::Cache; 13use rocksky_spotify::{find_spotify_user, find_spotify_users, watch_currently_playing}; 14use sqlx::postgres::PgPoolOptions; 15use tokio_stream::StreamExt; 16 17#[tokio::main] 18async fn main() -> Result<(), Box<dyn std::error::Error>> { 19 dotenv().ok(); 20 let cache = Cache::new()?; 21 let pool = PgPoolOptions::new() 22 .max_connections(5) 23 .connect(&env::var("XATA_POSTGRES_URL")?) 24 .await?; 25 26 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()); 27 let nc = connect(&addr).await?; 28 println!("Connected to NATS server at {}", addr.bright_green()); 29 30 let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?; 31 println!("Subscribed to {}", "rocksky.spotify.user".bright_green()); 32 33 let users = find_spotify_users(&pool, 0, 100).await?; 34 println!("Found {} users", users.len().bright_green()); 35 36 // Shared HashMap to manage threads and their stop flags 37 let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> = 38 Arc::new(Mutex::new(HashMap::new())); 39 40 // Start threads for all users 41 for user in users { 42 let email = user.0.clone(); 43 let token = user.1.clone(); 44 let did = user.2.clone(); 45 let stop_flag = Arc::new(AtomicBool::new(false)); 46 let cache = cache.clone(); 47 let nc = nc.clone(); 48 let thread_map = Arc::clone(&thread_map); 49 50 thread_map 51 .lock() 52 .unwrap() 53 .insert(email.clone(), Arc::clone(&stop_flag)); 54 55 thread::spawn(move || { 56 let rt = tokio::runtime::Runtime::new().unwrap(); 57 match rt.block_on(async { 58 watch_currently_playing(email.clone(), token, did, stop_flag, cache.clone()) 59 .await?; 60 Ok::<(), Error>(()) 61 }) { 62 Ok(_) => {} 63 Err(e) => { 64 println!( 65 "{} Error starting thread for user: {} - {}", 66 format!("[{}]", email).bright_green(), 67 email.bright_green(), 68 e.to_string().bright_red() 69 ); 70 71 // If there's an error, publish a message to restart the thread 72 match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) { 73 Ok(_) => { 74 println!( 75 "{} Published message to restart thread for user: {}", 76 format!("[{}]", email).bright_green(), 77 email.bright_green() 78 ); 79 } 80 Err(e) => { 81 println!( 82 "{} Error publishing message to restart thread: {}", 83 format!("[{}]", email).bright_green(), 84 e.to_string().bright_red() 85 ); 86 } 87 } 88 } 89 } 90 }); 91 } 92 93 // Handle subscription messages 94 while let Some(message) = sub.next().await { 95 let user_id = String::from_utf8(message.payload.to_vec()).unwrap(); 96 println!( 97 "Received message to restart thread for user: {}", 98 user_id.bright_green() 99 ); 100 101 let mut thread_map = thread_map.lock().unwrap(); 102 103 // Check if the user exists in the thread map 104 if let Some(stop_flag) = thread_map.get(&user_id) { 105 // Stop the existing thread 106 stop_flag.store(true, std::sync::atomic::Ordering::Relaxed); 107 108 // Create a new stop flag and restart the thread 109 let new_stop_flag = Arc::new(AtomicBool::new(false)); 110 thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag)); 111 112 let user = find_spotify_user(&pool, &user_id).await?; 113 114 if user.is_none() { 115 println!( 116 "Spotify user not found: {}, skipping", 117 user_id.bright_green() 118 ); 119 continue; 120 } 121 122 let user = user.unwrap(); 123 124 let email = user.0.clone(); 125 let token = user.1.clone(); 126 let did = user.2.clone(); 127 let cache = cache.clone(); 128 129 thread::spawn(move || { 130 let rt = tokio::runtime::Runtime::new().unwrap(); 131 match rt.block_on(async { 132 watch_currently_playing( 133 email.clone(), 134 token, 135 did, 136 new_stop_flag, 137 cache.clone(), 138 ) 139 .await?; 140 Ok::<(), Error>(()) 141 }) { 142 Ok(_) => {} 143 Err(e) => { 144 println!( 145 "{} Error restarting thread for user: {} - {}", 146 format!("[{}]", email).bright_green(), 147 email.bright_green(), 148 e.to_string().bright_red() 149 ); 150 } 151 } 152 }); 153 154 println!("Restarted thread for user: {}", user_id.bright_green()); 155 } else { 156 println!( 157 "No thread found for user: {}, starting new thread", 158 user_id.bright_green() 159 ); 160 let user = find_spotify_user(&pool, &user_id).await?; 161 if let Some(user) = user { 162 let email = user.0.clone(); 163 let token = user.1.clone(); 164 let did = user.2.clone(); 165 let stop_flag = Arc::new(AtomicBool::new(false)); 166 let cache = cache.clone(); 167 let nc = nc.clone(); 168 169 thread_map.insert(email.clone(), Arc::clone(&stop_flag)); 170 171 thread::spawn(move || { 172 let rt = tokio::runtime::Runtime::new().unwrap(); 173 match rt.block_on(async { 174 watch_currently_playing( 175 email.clone(), 176 token, 177 did, 178 stop_flag, 179 cache.clone(), 180 ) 181 .await?; 182 Ok::<(), Error>(()) 183 }) { 184 Ok(_) => {} 185 Err(e) => { 186 println!( 187 "{} Error starting thread for user: {} - {}", 188 format!("[{}]", email).bright_green(), 189 email.bright_green(), 190 e.to_string().bright_red() 191 ); 192 match rt 193 .block_on(nc.publish("rocksky.spotify.user", email.clone().into())) 194 { 195 Ok(_) => {} 196 Err(e) => { 197 println!( 198 "{} Error publishing message to restart thread: {}", 199 format!("[{}]", email).bright_green(), 200 e.to_string().bright_red() 201 ); 202 } 203 } 204 } 205 } 206 }); 207 } 208 } 209 } 210 211 Ok(()) 212}