A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz

spotify: Add resilient user threads with recovery and backoff

Introduce start_user_thread helper to spawn per-user threads with
automatic retries, exponential backoff, and NATS publish after max
retries for external restart. Replace direct thread spawns with the
helper. Improve progress tracker by adding an inner recovery loop,
fixing cache update logic, and handling Redis errors gracefully.

+174 -149
+174 -149
crates/spotify/src/lib.rs
··· 53 53 let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> = 54 54 Arc::new(Mutex::new(HashMap::new())); 55 55 56 + // Helper function to start a user thread with auto-recovery 57 + let start_user_thread = |email: String, 58 + token: String, 59 + did: String, 60 + client_id: String, 61 + client_secret: String, 62 + stop_flag: Arc<AtomicBool>, 63 + cache: Cache, 64 + nc: async_nats::Client| { 65 + thread::spawn(move || { 66 + let rt = tokio::runtime::Runtime::new().unwrap(); 67 + let mut retry_count = 0; 68 + let max_retries = 5; 69 + 70 + loop { 71 + if stop_flag.load(std::sync::atomic::Ordering::Relaxed) { 72 + println!( 73 + "{} Stop flag set, exiting recovery loop", 74 + format!("[{}]", email).bright_green() 75 + ); 76 + break; 77 + } 78 + 79 + match rt.block_on(async { 80 + watch_currently_playing( 81 + email.clone(), 82 + token.clone(), 83 + did.clone(), 84 + stop_flag.clone(), 85 + cache.clone(), 86 + client_id.clone(), 87 + client_secret.clone(), 88 + ) 89 + .await 90 + }) { 91 + Ok(_) => { 92 + println!( 93 + "{} Thread completed normally", 94 + format!("[{}]", email).bright_green() 95 + ); 96 + break; 97 + } 98 + Err(e) => { 99 + retry_count += 1; 100 + println!( 101 + "{} Thread crashed (attempt {}/{}): {}", 102 + format!("[{}]", email).bright_green(), 103 + retry_count, 104 + max_retries, 105 + e.to_string().bright_red() 106 + ); 107 + 108 + if retry_count >= max_retries { 109 + println!( 110 + "{} Max retries reached, publishing to NATS for external restart", 111 + format!("[{}]", email).bright_green() 112 + ); 113 + match rt 114 + .block_on(nc.publish("rocksky.spotify.user", email.clone().into())) 115 + { 116 + Ok(_) => { 117 + println!( 118 + "{} Published message to restart thread", 119 + format!("[{}]", email).bright_green() 120 + ); 121 + } 122 + Err(e) => { 123 + println!( 124 + "{} Error publishing message to restart thread: {}", 125 + format!("[{}]", email).bright_green(), 126 + e.to_string().bright_red() 127 + ); 128 + } 129 + } 130 + break; 131 + } 132 + 133 + // Exponential backoff: 2^retry_count seconds, max 60 seconds 134 + let backoff_seconds = std::cmp::min(2_u64.pow(retry_count as u32), 60); 135 + println!( 136 + "{} Retrying in {} seconds...", 137 + format!("[{}]", email).bright_green(), 138 + backoff_seconds 139 + ); 140 + std::thread::sleep(std::time::Duration::from_secs(backoff_seconds)); 141 + } 142 + } 143 + } 144 + }) 145 + }; 146 + 56 147 // Start threads for all users 57 148 for user in users { 58 149 let email = user.0.clone(); ··· 70 161 .unwrap() 71 162 .insert(email.clone(), Arc::clone(&stop_flag)); 72 163 73 - thread::spawn(move || { 74 - let rt = tokio::runtime::Runtime::new().unwrap(); 75 - match rt.block_on(async { 76 - watch_currently_playing( 77 - email.clone(), 78 - token, 79 - did, 80 - stop_flag, 81 - cache.clone(), 82 - client_id, 83 - client_secret, 84 - ) 85 - .await?; 86 - Ok::<(), Error>(()) 87 - }) { 88 - Ok(_) => {} 89 - Err(e) => { 90 - println!( 91 - "{} Error starting thread for user: {} - {}", 92 - format!("[{}]", email).bright_green(), 93 - email.bright_green(), 94 - e.to_string().bright_red() 95 - ); 96 - 97 - // If there's an error, publish a message to restart the thread 98 - match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) { 99 - Ok(_) => { 100 - println!( 101 - "{} Published message to restart thread for user: {}", 102 - format!("[{}]", email).bright_green(), 103 - email.bright_green() 104 - ); 105 - } 106 - Err(e) => { 107 - println!( 108 - "{} Error publishing message to restart thread: {}", 109 - format!("[{}]", email).bright_green(), 110 - e.to_string().bright_red() 111 - ); 112 - } 113 - } 114 - } 115 - } 116 - }); 164 + start_user_thread( 165 + email, 166 + token, 167 + did, 168 + client_id, 169 + client_secret, 170 + stop_flag, 171 + cache, 172 + nc, 173 + ); 117 174 } 118 175 119 176 // Handle subscription messages ··· 153 210 let client_id = user.3.clone(); 154 211 let client_secret = user.4.clone(); 155 212 let cache = cache.clone(); 213 + let nc = nc.clone(); 156 214 157 - thread::spawn(move || { 158 - let rt = tokio::runtime::Runtime::new().unwrap(); 159 - match rt.block_on(async { 160 - watch_currently_playing( 161 - email.clone(), 162 - token, 163 - did, 164 - new_stop_flag, 165 - cache.clone(), 166 - client_id, 167 - client_secret, 168 - ) 169 - .await?; 170 - Ok::<(), Error>(()) 171 - }) { 172 - Ok(_) => {} 173 - Err(e) => { 174 - println!( 175 - "{} Error restarting thread for user: {} - {}", 176 - format!("[{}]", email).bright_green(), 177 - email.bright_green(), 178 - e.to_string().bright_red() 179 - ); 180 - } 181 - } 182 - }); 215 + start_user_thread( 216 + email, 217 + token, 218 + did, 219 + client_id, 220 + client_secret, 221 + new_stop_flag, 222 + cache, 223 + nc, 224 + ); 183 225 184 226 println!("Restarted thread for user: {}", user_id.bright_green()); 185 227 } else { ··· 200 242 201 243 thread_map.insert(email.clone(), Arc::clone(&stop_flag)); 202 244 203 - thread::spawn(move || { 204 - let rt = tokio::runtime::Runtime::new().unwrap(); 205 - match rt.block_on(async { 206 - watch_currently_playing( 207 - email.clone(), 208 - token, 209 - did, 210 - stop_flag, 211 - cache.clone(), 212 - client_id, 213 - client_secret, 214 - ) 215 - .await?; 216 - Ok::<(), Error>(()) 217 - }) { 218 - Ok(_) => {} 219 - Err(e) => { 220 - println!( 221 - "{} Error starting thread for user: {} - {}", 222 - format!("[{}]", email).bright_green(), 223 - email.bright_green(), 224 - e.to_string().bright_red() 225 - ); 226 - match rt 227 - .block_on(nc.publish("rocksky.spotify.user", email.clone().into())) 228 - { 229 - Ok(_) => {} 230 - Err(e) => { 231 - println!( 232 - "{} Error publishing message to restart thread: {}", 233 - format!("[{}]", email).bright_green(), 234 - e.to_string().bright_red() 235 - ); 236 - } 237 - } 238 - } 239 - } 240 - }); 245 + start_user_thread( 246 + email, 247 + token, 248 + did, 249 + client_id, 250 + client_secret, 251 + stop_flag, 252 + cache, 253 + nc, 254 + ); 241 255 } 242 256 } 243 257 } ··· 810 824 let spotify_email_clone = spotify_email.clone(); 811 825 let cache_clone = cache.clone(); 812 826 thread::spawn(move || { 813 - loop { 814 - if stop_flag_clone.load(std::sync::atomic::Ordering::Relaxed) { 815 - println!( 816 - "{} Stopping Thread", 817 - format!("[{}]", spotify_email_clone).bright_green() 818 - ); 819 - break; 820 - } 821 - if let Ok(Some(cached)) = cache_clone.get(&format!("{}:current", spotify_email_clone)) { 822 - if serde_json::from_str::<CurrentlyPlaying>(&cached).is_err() { 823 - thread::sleep(std::time::Duration::from_millis(800)); 824 - continue; 827 + // Inner thread with error recovery 828 + let result: Result<(), Error> = (|| { 829 + loop { 830 + if stop_flag_clone.load(std::sync::atomic::Ordering::Relaxed) { 831 + println!( 832 + "{} Stopping progress tracker thread", 833 + format!("[{}]", spotify_email_clone).bright_green() 834 + ); 835 + break; 825 836 } 826 837 827 - let mut current_song = serde_json::from_str::<CurrentlyPlaying>(&cached)?; 838 + if let Ok(Some(cached)) = 839 + cache_clone.get(&format!("{}:current", spotify_email_clone)) 840 + { 841 + if let Ok(mut current_song) = serde_json::from_str::<CurrentlyPlaying>(&cached) 842 + { 843 + if let Some(item) = current_song.item.clone() { 844 + if current_song.is_playing 845 + && current_song.progress_ms.unwrap_or(0) < item.duration_ms.into() 846 + { 847 + current_song.progress_ms = 848 + Some(current_song.progress_ms.unwrap_or(0) + 800); 849 + match cache_clone.setex( 850 + &format!("{}:current", spotify_email_clone), 851 + &serde_json::to_string(&current_song).unwrap_or_default(), 852 + 16, 853 + ) { 854 + Ok(_) => {} 855 + Err(e) => { 856 + println!( 857 + "{} redis error: {}", 858 + format!("[{}]", spotify_email_clone).bright_green(), 859 + e.to_string().bright_red() 860 + ); 861 + } 862 + } 863 + thread::sleep(std::time::Duration::from_millis(800)); 864 + continue; 865 + } 866 + } 867 + } 868 + } 828 869 829 - if let Some(item) = current_song.item.clone() { 830 - if current_song.is_playing 831 - && current_song.progress_ms.unwrap_or(0) < item.duration_ms.into() 832 - { 833 - current_song.progress_ms = 834 - Some(current_song.progress_ms.unwrap_or(0) + 800); 870 + if let Ok(Some(cached)) = cache_clone.get(&spotify_email_clone) { 871 + if cached != "No content" { 835 872 match cache_clone.setex( 836 873 &format!("{}:current", spotify_email_clone), 837 - &serde_json::to_string(&current_song)?, 874 + &cached, 838 875 16, 839 876 ) { 840 877 Ok(_) => {} ··· 846 883 ); 847 884 } 848 885 } 849 - thread::sleep(std::time::Duration::from_millis(800)); 850 - continue; 851 886 } 852 887 } 853 - continue; 854 - } 855 888 856 - if let Ok(Some(cached)) = cache_clone.get(&spotify_email_clone) { 857 - if cached == "No content" { 858 - thread::sleep(std::time::Duration::from_millis(800)); 859 - continue; 860 - } 861 - match cache_clone.setex(&format!("{}:current", spotify_email_clone), &cached, 16) { 862 - Ok(_) => {} 863 - Err(e) => { 864 - println!( 865 - "{} redis error: {}", 866 - format!("[{}]", spotify_email_clone).bright_green(), 867 - e.to_string().bright_red() 868 - ); 869 - } 870 - } 889 + thread::sleep(std::time::Duration::from_millis(800)); 871 890 } 891 + Ok(()) 892 + })(); 872 893 873 - thread::sleep(std::time::Duration::from_millis(800)); 894 + if let Err(e) = result { 895 + println!( 896 + "{} Progress tracker thread error: {}", 897 + format!("[{}]", spotify_email_clone).bright_green(), 898 + e.to_string().bright_red() 899 + ); 874 900 } 875 - Ok::<(), Error>(()) 876 901 }); 877 902 878 903 loop {