A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz

[spotify] cache current track progression

+49 -3
+6
crates/spotify/src/cache.rs
··· 38 38 .query::<()>(&mut con)?; 39 39 Ok(()) 40 40 } 41 + 42 + pub fn del(&self, key: &str) -> Result<(), Error> { 43 + let mut con = self.client.get_connection()?; 44 + redis::cmd("DEL").arg(key).query::<()>(&mut con)?; 45 + Ok(()) 46 + } 41 47 }
+43 -3
crates/spotify/src/main.rs
··· 169 169 if decoded_data.is_err() { 170 170 println!("{} {} {}", format!("[{}]", user_id).bright_green(), "Cache is invalid".red(), data); 171 171 cache.setex(user_id, "No content", 10)?; 172 + cache.del(&format!("{}:current", user_id))?; 172 173 return Ok(None); 173 174 } 174 175 ··· 221 222 if status == 204 { 222 223 println!("No content"); 223 224 cache.setex(user_id, "No content", 10)?; 225 + cache.del(&format!("{}:current", user_id))?; 224 226 return Ok(None); 225 227 } 226 228 227 229 let data = serde_json::from_str::<CurrentlyPlaying>(&data)?; 228 230 229 231 cache.setex(user_id, &serde_json::to_string(&data)?, 15)?; 232 + cache.del(&format!("{}:current", user_id))?; 233 + 230 234 // detect if the song has changed 231 235 let previous = cache.get(&format!("{}:previous", user_id))?; 232 236 let changed = match previous { ··· 407 411 pub async fn watch_currently_playing(spotify_email: String, token: String, did: String, stop_flag: Arc<AtomicBool>, cache: Cache) -> Result<(), Error> { 408 412 println!("{} {}", format!("[{}]", spotify_email).bright_green(), "Checking currently playing".cyan()); 409 413 414 + let stop_flag_clone = stop_flag.clone(); 415 + let spotify_email_clone = spotify_email.clone(); 416 + let cache_clone = cache.clone(); 417 + thread::spawn(move || { 418 + loop { 419 + if stop_flag_clone.load(std::sync::atomic::Ordering::Relaxed) { 420 + println!("{} Stopping Thread", format!("[{}]", spotify_email_clone).bright_green()); 421 + break; 422 + } 423 + if let Some(cached) = cache_clone.get(&format!("{}:current", spotify_email_clone))? { 424 + let mut current_song = serde_json::from_str::<CurrentlyPlaying>(&cached)?; 425 + 426 + if current_song.is_playing && current_song.progress_ms.unwrap_or(0) < current_song.item.clone().unwrap().duration_ms.into() { 427 + current_song.progress_ms = Some(current_song.progress_ms.unwrap_or(0) + 500); 428 + cache_clone.setex(&format!("{}:current", spotify_email_clone), &serde_json::to_string(&current_song)?, 16)?; 429 + thread::sleep(std::time::Duration::from_millis(500)); 430 + continue; 431 + } 432 + continue; 433 + } 434 + 435 + if let Some(cached) = cache_clone.get(&spotify_email_clone)? { 436 + if cached == "No content" { 437 + thread::sleep(std::time::Duration::from_millis(500)); 438 + continue; 439 + } 440 + let data = serde_json::from_str::<CurrentlyPlaying>(&cached)?; 441 + cache_clone.setex(&format!("{}:current", spotify_email_clone), &serde_json::to_string(&data)?, 16)?; 442 + } 443 + 444 + 445 + thread::sleep(std::time::Duration::from_millis(500)); 446 + } 447 + Ok::<(), Error>(()) 448 + }); 449 + 410 450 loop { 411 451 if stop_flag.load(std::sync::atomic::Ordering::Relaxed) { 412 452 println!("{} Stopping Thread", format!("[{}]", spotify_email).bright_green()); ··· 426 466 Ok(currently_playing) => currently_playing, 427 467 Err(e) => { 428 468 println!("{} {}", format!("[{}]", spotify_email).bright_green(), e.to_string().bright_red()); 429 - tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await; 469 + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; 430 470 continue; 431 471 } 432 472 }; ··· 434 474 if let Some((data, changed)) = currently_playing { 435 475 if data.item.is_none() { 436 476 println!("{} {}", format!("[{}]", spotify_email).bright_green(), "No song playing".yellow()); 437 - tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await; 477 + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; 438 478 continue; 439 479 } 440 480 let data_item = data.item.unwrap(); ··· 465 505 } 466 506 } 467 507 468 - tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await; 508 + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; 469 509 } 470 510 471 511 Ok(())