A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at main 1049 lines 35 kB view raw
1use std::{ 2 collections::HashMap, 3 env, 4 sync::{atomic::AtomicBool, Arc, Mutex}, 5 thread, 6}; 7 8use anyhow::Error; 9use async_nats::connect; 10use owo_colors::OwoColorize; 11use reqwest::Client; 12use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; 13use tokio_stream::StreamExt; 14 15use crate::{ 16 cache::Cache, 17 crypto::decrypt_aes_256_ctr, 18 rocksky::{scrobble, update_library}, 19 types::{ 20 album_tracks::AlbumTracks, 21 currently_playing::{Album, Artist, CurrentlyPlaying}, 22 spotify_token::SpotifyTokenWithEmail, 23 token::AccessToken, 24 }, 25}; 26 27pub mod cache; 28pub mod crypto; 29pub mod rocksky; 30pub mod token; 31pub mod types; 32 33pub const BASE_URL: &str = "https://api.spotify.com/v1"; 34 35pub async fn run() -> Result<(), Error> { 36 let cache = Cache::new()?; 37 let pool = PgPoolOptions::new() 38 .max_connections(5) 39 .connect(&env::var("XATA_POSTGRES_URL")?) 40 .await?; 41 42 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()); 43 let nc = connect(&addr).await?; 44 println!("Connected to NATS server at {}", addr.bright_green()); 45 46 let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?; 47 println!("Subscribed to {}", "rocksky.spotify.user".bright_green()); 48 49 let users = find_spotify_users(&pool, 0, 500).await?; 50 println!("Found {} users", users.len().bright_green()); 51 52 // Shared HashMap to manage threads and their stop flags 53 let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> = 54 Arc::new(Mutex::new(HashMap::new())); 55 56 // Helper function to start a user thread with auto-recovery 57 let start_user_thread = |email: String, 58 token: String, 59 did: String, 60 client_id: String, 61 client_secret: String, 62 stop_flag: Arc<AtomicBool>, 63 cache: Cache, 64 nc: async_nats::Client| { 65 thread::spawn(move || { 66 let rt = tokio::runtime::Runtime::new().unwrap(); 67 let mut retry_count = 0; 68 let max_retries = 5; 69 70 loop { 71 if stop_flag.load(std::sync::atomic::Ordering::Relaxed) { 72 println!( 73 "{} Stop flag set, exiting recovery loop", 74 format!("[{}]", email).bright_green() 75 ); 76 break; 77 } 78 79 match rt.block_on(async { 80 watch_currently_playing( 81 email.clone(), 82 token.clone(), 83 did.clone(), 84 stop_flag.clone(), 85 cache.clone(), 86 client_id.clone(), 87 client_secret.clone(), 88 ) 89 .await 90 }) { 91 Ok(_) => { 92 println!( 93 "{} Thread completed normally", 94 format!("[{}]", email).bright_green() 95 ); 96 break; 97 } 98 Err(e) => { 99 retry_count += 1; 100 println!( 101 "{} Thread crashed (attempt {}/{}): {}", 102 format!("[{}]", email).bright_green(), 103 retry_count, 104 max_retries, 105 e.to_string().bright_red() 106 ); 107 108 if retry_count >= max_retries { 109 println!( 110 "{} Max retries reached, publishing to NATS for external restart", 111 format!("[{}]", email).bright_green() 112 ); 113 match rt 114 .block_on(nc.publish("rocksky.spotify.user", email.clone().into())) 115 { 116 Ok(_) => { 117 println!( 118 "{} Published message to restart thread", 119 format!("[{}]", email).bright_green() 120 ); 121 } 122 Err(e) => { 123 println!( 124 "{} Error publishing message to restart thread: {}", 125 format!("[{}]", email).bright_green(), 126 e.to_string().bright_red() 127 ); 128 } 129 } 130 break; 131 } 132 133 // Exponential backoff: 2^retry_count seconds, max 60 seconds 134 let backoff_seconds = std::cmp::min(2_u64.pow(retry_count as u32), 60); 135 println!( 136 "{} Retrying in {} seconds...", 137 format!("[{}]", email).bright_green(), 138 backoff_seconds 139 ); 140 std::thread::sleep(std::time::Duration::from_secs(backoff_seconds)); 141 } 142 } 143 } 144 }) 145 }; 146 147 // Start threads for all users 148 for user in users { 149 let email = user.0.clone(); 150 let token = user.1.clone(); 151 let did = user.2.clone(); 152 let client_id = user.3.clone(); 153 let client_secret = user.4.clone(); 154 let stop_flag = Arc::new(AtomicBool::new(false)); 155 let cache = cache.clone(); 156 let nc = nc.clone(); 157 let thread_map = Arc::clone(&thread_map); 158 159 thread_map 160 .lock() 161 .unwrap() 162 .insert(email.clone(), Arc::clone(&stop_flag)); 163 164 start_user_thread( 165 email, 166 token, 167 did, 168 client_id, 169 client_secret, 170 stop_flag, 171 cache, 172 nc, 173 ); 174 } 175 176 // Handle subscription messages 177 while let Some(message) = sub.next().await { 178 let user_id = String::from_utf8(message.payload.to_vec()).unwrap(); 179 println!( 180 "Received message to restart thread for user: {}", 181 user_id.bright_green() 182 ); 183 184 let mut thread_map = thread_map.lock().unwrap(); 185 186 // Check if the user exists in the thread map 187 if let Some(stop_flag) = thread_map.get(&user_id) { 188 // Stop the existing thread 189 stop_flag.store(true, std::sync::atomic::Ordering::Relaxed); 190 191 // Create a new stop flag and restart the thread 192 let new_stop_flag = Arc::new(AtomicBool::new(false)); 193 thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag)); 194 195 let user = find_spotify_user(&pool, &user_id).await?; 196 197 if user.is_none() { 198 println!( 199 "Spotify user not found: {}, skipping", 200 user_id.bright_green() 201 ); 202 continue; 203 } 204 205 let user = user.unwrap(); 206 207 let email = user.0.clone(); 208 let token = user.1.clone(); 209 let did = user.2.clone(); 210 let client_id = user.3.clone(); 211 let client_secret = user.4.clone(); 212 let cache = cache.clone(); 213 let nc = nc.clone(); 214 215 start_user_thread( 216 email, 217 token, 218 did, 219 client_id, 220 client_secret, 221 new_stop_flag, 222 cache, 223 nc, 224 ); 225 226 println!("Restarted thread for user: {}", user_id.bright_green()); 227 } else { 228 println!( 229 "No thread found for user: {}, starting new thread", 230 user_id.bright_green() 231 ); 232 let user = find_spotify_user(&pool, &user_id).await?; 233 if let Some(user) = user { 234 let email = user.0.clone(); 235 let token = user.1.clone(); 236 let did = user.2.clone(); 237 let client_id = user.3.clone(); 238 let client_secret = user.4.clone(); 239 let stop_flag = Arc::new(AtomicBool::new(false)); 240 let cache = cache.clone(); 241 let nc = nc.clone(); 242 243 thread_map.insert(email.clone(), Arc::clone(&stop_flag)); 244 245 start_user_thread( 246 email, 247 token, 248 did, 249 client_id, 250 client_secret, 251 stop_flag, 252 cache, 253 nc, 254 ); 255 } 256 } 257 } 258 259 Ok(()) 260} 261 262pub async fn refresh_token( 263 token: &str, 264 client_id: &str, 265 client_secret: &str, 266) -> Result<AccessToken, Error> { 267 let client = Client::new(); 268 269 let response = client 270 .post("https://accounts.spotify.com/api/token") 271 .basic_auth(&client_id, Some(client_secret)) 272 .form(&[ 273 ("grant_type", "refresh_token"), 274 ("refresh_token", token), 275 ("client_id", &client_id), 276 ]) 277 .send() 278 .await?; 279 let token = response.text().await?; 280 let json_token = serde_json::from_str::<AccessToken>(&token); 281 if let Err(e) = json_token { 282 println!("Error parsing token: {}", token); 283 return Err(Error::from(e)); 284 } 285 Ok(json_token.unwrap()) 286} 287 288pub async fn get_currently_playing( 289 cache: Cache, 290 user_id: &str, 291 token: &str, 292 client_id: &str, 293 client_secret: &str, 294) -> Result<Option<(CurrentlyPlaying, bool)>, Error> { 295 if let Ok(Some(data)) = cache.get(user_id) { 296 println!( 297 "{} {}", 298 format!("[{}]", user_id).bright_green(), 299 "Using cache".cyan() 300 ); 301 if data == "No content" { 302 return Ok(None); 303 } 304 let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data); 305 306 if decoded_data.is_err() { 307 println!( 308 "{} {} {}", 309 format!("[{}]", user_id).bright_green(), 310 "Cache is invalid".red(), 311 data 312 ); 313 cache.setex(user_id, "No content", 10)?; 314 cache.del(&format!("{}:current", user_id))?; 315 return Ok(None); 316 } 317 318 let data: CurrentlyPlaying = decoded_data.unwrap(); 319 // detect if the song has changed 320 let previous = cache.get(&format!("{}:previous", user_id)); 321 322 if previous.is_err() { 323 println!( 324 "{} redis error: {}", 325 format!("[{}]", user_id).bright_green(), 326 previous.unwrap_err().to_string().bright_red() 327 ); 328 return Ok(None); 329 } 330 331 let previous = previous.unwrap(); 332 333 let changed = match previous { 334 Some(previous) => { 335 if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() { 336 println!( 337 "{} {} {}", 338 format!("[{}]", user_id).bright_green(), 339 "Previous cache is invalid", 340 previous 341 ); 342 return Ok(None); 343 } 344 345 let previous: CurrentlyPlaying = serde_json::from_str(&previous)?; 346 if previous.item.is_none() && data.item.is_some() { 347 return Ok(Some((data, true))); 348 } 349 350 if previous.item.is_some() && data.item.is_none() { 351 return Ok(Some((data, false))); 352 } 353 354 if previous.item.is_none() && data.item.is_none() { 355 return Ok(Some((data, false))); 356 } 357 358 let previous_item = previous.item.unwrap(); 359 let data_item = data.clone().item.unwrap(); 360 previous_item.id != data_item.id 361 && previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0) 362 } 363 _ => true, 364 }; 365 return Ok(Some((data, changed))); 366 } 367 368 let token = refresh_token(token, client_id, client_secret).await?; 369 let client = Client::new(); 370 let response = client 371 .get(format!("{}/me/player/currently-playing", BASE_URL)) 372 .bearer_auth(token.access_token) 373 .send() 374 .await?; 375 376 let headers = response.headers().clone(); 377 let status = response.status().as_u16(); 378 let data = response.text().await?; 379 380 if !data.contains("is_playing") && !data.contains("context") { 381 println!("> Currently playing: {}", data); 382 } 383 384 if status == 429 { 385 println!( 386 "{} Too many requests, retry-after {}", 387 format!("[{}]", user_id).bright_green(), 388 headers 389 .get("retry-after") 390 .unwrap() 391 .to_str() 392 .unwrap() 393 .bright_green() 394 ); 395 return Ok(None); 396 } 397 398 let previous = cache.get(&format!("{}:previous", user_id)); 399 if previous.is_err() { 400 println!( 401 "{} redis error: {}", 402 format!("[{}]", user_id).bright_green(), 403 previous.unwrap_err().to_string().bright_red() 404 ); 405 return Ok(None); 406 } 407 408 let previous = previous.unwrap(); 409 410 // check if status code is 204 411 if status == 204 { 412 println!("No content"); 413 match cache.setex( 414 user_id, 415 "No content", 416 match previous.is_none() { 417 true => 30, 418 false => 10, 419 }, 420 ) { 421 Ok(_) => {} 422 Err(e) => { 423 println!( 424 "{} redis error: {}", 425 format!("[{}]", user_id).bright_green(), 426 e.to_string().bright_red() 427 ); 428 return Ok(None); 429 } 430 } 431 match cache.del(&format!("{}:current", user_id)) { 432 Ok(_) => {} 433 Err(e) => { 434 println!( 435 "{} redis error: {}", 436 format!("[{}]", user_id).bright_green(), 437 e.to_string().bright_red() 438 ); 439 return Ok(None); 440 } 441 } 442 return Ok(None); 443 } 444 445 if serde_json::from_str::<CurrentlyPlaying>(&data).is_err() { 446 println!( 447 "{} {} {}", 448 format!("[{}]", user_id).bright_green(), 449 "Invalid data received".red(), 450 data 451 ); 452 match cache.setex(user_id, "No content", 10) { 453 Ok(_) => {} 454 Err(e) => { 455 println!( 456 "{} redis error: {}", 457 format!("[{}]", user_id).bright_green(), 458 e.to_string().bright_red() 459 ); 460 return Ok(None); 461 } 462 } 463 match cache.del(&format!("{}:current", user_id)) { 464 Ok(_) => {} 465 Err(e) => { 466 println!( 467 "{} redis error: {}", 468 format!("[{}]", user_id).bright_green(), 469 e.to_string().bright_red() 470 ); 471 return Ok(None); 472 } 473 } 474 return Ok(None); 475 } 476 477 let data = serde_json::from_str::<CurrentlyPlaying>(&data)?; 478 479 match cache.setex( 480 user_id, 481 &serde_json::to_string(&data)?, 482 match previous.is_none() { 483 true => 30, 484 false => 15, 485 }, 486 ) { 487 Ok(_) => {} 488 Err(e) => { 489 println!( 490 "{} redis error: {}", 491 format!("[{}]", user_id).bright_green(), 492 e.to_string().bright_red() 493 ); 494 return Ok(None); 495 } 496 } 497 match cache.del(&format!("{}:current", user_id)) { 498 Ok(_) => {} 499 Err(e) => { 500 println!( 501 "{} redis error: {}", 502 format!("[{}]", user_id).bright_green(), 503 e.to_string().bright_red() 504 ); 505 return Ok(None); 506 } 507 } 508 509 // detect if the song has changed 510 let previous = cache.get(&format!("{}:previous", user_id)); 511 512 if previous.is_err() { 513 println!( 514 "{} redis error: {}", 515 format!("[{}]", user_id).bright_green(), 516 previous.unwrap_err().to_string().bright_red() 517 ); 518 return Ok(None); 519 } 520 521 let previous = previous.unwrap(); 522 let changed = match previous { 523 Some(previous) => { 524 if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() { 525 println!( 526 "{} {} {}", 527 format!("[{}]", user_id).bright_green(), 528 "Previous cache is invalid", 529 previous 530 ); 531 return Ok(None); 532 } 533 534 let previous: CurrentlyPlaying = serde_json::from_str(&previous)?; 535 if previous.item.is_none() || data.item.is_none() { 536 return Ok(Some((data, false))); 537 } 538 539 let previous_item = previous.item.unwrap(); 540 let data_item = data.clone().item.unwrap(); 541 542 previous_item.id != data_item.id 543 && previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0) 544 } 545 _ => data.item.is_some(), 546 }; 547 548 // save as previous song 549 match cache.setex( 550 &format!("{}:previous", user_id), 551 &serde_json::to_string(&data)?, 552 600, 553 ) { 554 Ok(_) => {} 555 Err(e) => { 556 println!( 557 "{} redis error: {}", 558 format!("[{}]", user_id).bright_green(), 559 e.to_string().bright_red() 560 ); 561 return Ok(None); 562 } 563 } 564 565 Ok(Some((data, changed))) 566} 567 568pub async fn get_artist( 569 cache: Cache, 570 artist_id: &str, 571 token: &str, 572 client_id: &str, 573 client_secret: &str, 574) -> Result<Option<Artist>, Error> { 575 if let Ok(Some(data)) = cache.get(artist_id) { 576 return Ok(Some(serde_json::from_str(&data)?)); 577 } 578 579 let token = refresh_token(token, client_id, client_secret).await?; 580 let client = Client::new(); 581 let response = client 582 .get(&format!("{}/artists/{}", BASE_URL, artist_id)) 583 .bearer_auth(token.access_token) 584 .send() 585 .await?; 586 587 let headers = response.headers().clone(); 588 let data = response.text().await?; 589 590 if data == "Too many requests" { 591 println!( 592 "> retry-after {}", 593 headers.get("retry-after").unwrap().to_str().unwrap() 594 ); 595 println!("> {} [get_artist]", data); 596 return Ok(None); 597 } 598 599 match cache.setex(artist_id, &data, 20) { 600 Ok(_) => {} 601 Err(e) => { 602 println!( 603 "{} redis error: {}", 604 format!("[{}]", artist_id).bright_green(), 605 e.to_string().bright_red() 606 ); 607 return Ok(None); 608 } 609 } 610 611 Ok(Some(serde_json::from_str(&data)?)) 612} 613 614pub async fn get_album( 615 cache: Cache, 616 album_id: &str, 617 token: &str, 618 client_id: &str, 619 client_secret: &str, 620) -> Result<Option<Album>, Error> { 621 if let Ok(Some(data)) = cache.get(album_id) { 622 return Ok(Some(serde_json::from_str(&data)?)); 623 } 624 625 let token = refresh_token(token, client_id, client_secret).await?; 626 let client = Client::new(); 627 let response = client 628 .get(&format!("{}/albums/{}", BASE_URL, album_id)) 629 .bearer_auth(token.access_token) 630 .send() 631 .await?; 632 633 let headers = response.headers().clone(); 634 let data = response.text().await?; 635 636 if data == "Too many requests" { 637 println!( 638 "> retry-after {}", 639 headers.get("retry-after").unwrap().to_str().unwrap() 640 ); 641 println!("> {} [get_album]", data); 642 return Ok(None); 643 } 644 645 match cache.setex(album_id, &data, 20) { 646 Ok(_) => {} 647 Err(e) => { 648 println!( 649 "{} redis error: {}", 650 format!("[{}]", album_id).bright_green(), 651 e.to_string().bright_red() 652 ); 653 return Ok(None); 654 } 655 } 656 657 Ok(Some(serde_json::from_str(&data)?)) 658} 659 660pub async fn get_album_tracks( 661 cache: Cache, 662 album_id: &str, 663 token: &str, 664 client_id: &str, 665 client_secret: &str, 666) -> Result<AlbumTracks, Error> { 667 if let Ok(Some(data)) = cache.get(&format!("{}:tracks", album_id)) { 668 return Ok(serde_json::from_str(&data)?); 669 } 670 671 let token = refresh_token(token, client_id, client_secret).await?; 672 let client = Client::new(); 673 let mut all_tracks = Vec::new(); 674 let mut offset = 0; 675 let limit = 50; 676 677 loop { 678 let response = client 679 .get(&format!("{}/albums/{}/tracks", BASE_URL, album_id)) 680 .bearer_auth(&token.access_token) 681 .query(&[ 682 ("limit", &limit.to_string()), 683 ("offset", &offset.to_string()), 684 ]) 685 .send() 686 .await?; 687 688 let headers = response.headers().clone(); 689 let data = response.text().await?; 690 if data == "Too many requests" { 691 println!( 692 "> retry-after {}", 693 headers.get("retry-after").unwrap().to_str().unwrap() 694 ); 695 println!("> {} [get_album_tracks]", data); 696 continue; 697 } 698 699 let album_tracks: AlbumTracks = serde_json::from_str(&data)?; 700 701 if album_tracks.items.is_empty() { 702 break; 703 } 704 705 all_tracks.extend(album_tracks.items); 706 offset += limit; 707 } 708 709 let all_tracks_json = serde_json::to_string(&all_tracks)?; 710 match cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20) { 711 Ok(_) => {} 712 Err(e) => { 713 println!( 714 "{} redis error: {}", 715 format!("[{}]", album_id).bright_green(), 716 e.to_string().bright_red() 717 ); 718 } 719 } 720 721 Ok(AlbumTracks { 722 items: all_tracks, 723 ..Default::default() 724 }) 725} 726 727pub async fn find_spotify_users( 728 pool: &Pool<Postgres>, 729 offset: usize, 730 limit: usize, 731) -> Result<Vec<(String, String, String, String, String)>, Error> { 732 let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as( 733 r#" 734 SELECT * FROM spotify_tokens 735 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id 736 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id 737 LEFT JOIN spotify_apps ON spotify_tokens.spotify_app_id = spotify_apps.spotify_app_id 738 LIMIT $1 OFFSET $2 739 "#, 740 ) 741 .bind(limit as i64) 742 .bind(offset as i64) 743 .fetch_all(pool) 744 .await?; 745 746 let mut user_tokens = vec![]; 747 748 for result in &results { 749 let token = decrypt_aes_256_ctr( 750 &result.refresh_token, 751 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 752 )?; 753 let spotify_secret = decrypt_aes_256_ctr( 754 &result.spotify_secret, 755 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 756 )?; 757 user_tokens.push(( 758 result.email.clone(), 759 token, 760 result.did.clone(), 761 result.spotify_app_id.clone(), 762 spotify_secret, 763 )); 764 } 765 766 Ok(user_tokens) 767} 768 769pub async fn find_spotify_user( 770 pool: &Pool<Postgres>, 771 email: &str, 772) -> Result<Option<(String, String, String, String, String)>, Error> { 773 let result: Vec<SpotifyTokenWithEmail> = sqlx::query_as( 774 r#" 775 SELECT * FROM spotify_tokens 776 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id 777 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id 778 LEFT JOIN spotify_apps ON spotify_tokens.spotify_app_id = spotify_apps.spotify_app_id 779 WHERE spotify_accounts.email = $1 780 "#, 781 ) 782 .bind(email) 783 .fetch_all(pool) 784 .await?; 785 786 match result.first() { 787 Some(result) => { 788 let token = decrypt_aes_256_ctr( 789 &result.refresh_token, 790 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 791 )?; 792 let spotify_secret = decrypt_aes_256_ctr( 793 &result.spotify_secret, 794 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 795 )?; 796 Ok(Some(( 797 result.email.clone(), 798 token, 799 result.did.clone(), 800 result.spotify_app_id.clone(), 801 spotify_secret, 802 ))) 803 } 804 None => Ok(None), 805 } 806} 807 808pub async fn watch_currently_playing( 809 spotify_email: String, 810 token: String, 811 did: String, 812 stop_flag: Arc<AtomicBool>, 813 cache: Cache, 814 client_id: String, 815 client_secret: String, 816) -> Result<(), Error> { 817 println!( 818 "{} {}", 819 format!("[{}]", spotify_email).bright_green(), 820 "Checking currently playing".cyan() 821 ); 822 823 let stop_flag_clone = stop_flag.clone(); 824 let spotify_email_clone = spotify_email.clone(); 825 let cache_clone = cache.clone(); 826 thread::spawn(move || { 827 // Inner thread with error recovery 828 let result: Result<(), Error> = (|| { 829 loop { 830 if stop_flag_clone.load(std::sync::atomic::Ordering::Relaxed) { 831 println!( 832 "{} Stopping progress tracker thread", 833 format!("[{}]", spotify_email_clone).bright_green() 834 ); 835 break; 836 } 837 838 if let Ok(Some(cached)) = 839 cache_clone.get(&format!("{}:current", spotify_email_clone)) 840 { 841 if let Ok(mut current_song) = serde_json::from_str::<CurrentlyPlaying>(&cached) 842 { 843 if let Some(item) = current_song.item.clone() { 844 if current_song.is_playing 845 && current_song.progress_ms.unwrap_or(0) < item.duration_ms.into() 846 { 847 current_song.progress_ms = 848 Some(current_song.progress_ms.unwrap_or(0) + 800); 849 match cache_clone.setex( 850 &format!("{}:current", spotify_email_clone), 851 &serde_json::to_string(&current_song).unwrap_or_default(), 852 16, 853 ) { 854 Ok(_) => {} 855 Err(e) => { 856 println!( 857 "{} redis error: {}", 858 format!("[{}]", spotify_email_clone).bright_green(), 859 e.to_string().bright_red() 860 ); 861 } 862 } 863 thread::sleep(std::time::Duration::from_millis(800)); 864 continue; 865 } 866 } 867 } 868 } 869 870 if let Ok(Some(cached)) = cache_clone.get(&spotify_email_clone) { 871 if cached != "No content" { 872 match cache_clone.setex( 873 &format!("{}:current", spotify_email_clone), 874 &cached, 875 16, 876 ) { 877 Ok(_) => {} 878 Err(e) => { 879 println!( 880 "{} redis error: {}", 881 format!("[{}]", spotify_email_clone).bright_green(), 882 e.to_string().bright_red() 883 ); 884 } 885 } 886 } 887 } 888 889 thread::sleep(std::time::Duration::from_millis(800)); 890 } 891 Ok(()) 892 })(); 893 894 if let Err(e) = result { 895 println!( 896 "{} Progress tracker thread error: {}", 897 format!("[{}]", spotify_email_clone).bright_green(), 898 e.to_string().bright_red() 899 ); 900 } 901 }); 902 903 loop { 904 if stop_flag.load(std::sync::atomic::Ordering::Relaxed) { 905 println!( 906 "{} Stopping Thread", 907 format!("[{}]", spotify_email).bright_green() 908 ); 909 break; 910 } 911 let spotify_email = spotify_email.clone(); 912 let token = token.clone(); 913 let did = did.clone(); 914 let cache = cache.clone(); 915 let client_id = client_id.clone(); 916 let client_secret = client_secret.clone(); 917 918 let currently_playing = get_currently_playing( 919 cache.clone(), 920 &spotify_email, 921 &token, 922 &client_id, 923 &client_secret, 924 ) 925 .await; 926 let currently_playing = match currently_playing { 927 Ok(currently_playing) => currently_playing, 928 Err(e) => { 929 println!( 930 "{} {}", 931 format!("[{}]", spotify_email).bright_green(), 932 e.to_string().bright_red() 933 ); 934 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; 935 continue; 936 } 937 }; 938 939 if let Some((data, changed)) = currently_playing { 940 if data.item.is_none() { 941 println!( 942 "{} {}", 943 format!("[{}]", spotify_email).bright_green(), 944 "No song playing".yellow() 945 ); 946 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; 947 continue; 948 } 949 let data_item = data.item.unwrap(); 950 println!( 951 "{} {} is_playing: {} changed: {}", 952 format!("[{}]", spotify_email).bright_green(), 953 format!("{} - {}", data_item.name, data_item.artists[0].name).yellow(), 954 data.is_playing, 955 changed 956 ); 957 958 if changed { 959 cache.setex( 960 &format!("changed:{}:{}", spotify_email, data_item.id), 961 &data_item.duration_ms.to_string(), 962 3600 * 24, 963 )?; 964 } 965 966 let current_track = 967 match cache.get(&format!("changed:{}:{}", spotify_email, data_item.id)) { 968 Ok(x) => x.is_some(), 969 Err(_) => false, 970 }; 971 972 if let Ok(Some(cached)) = cache.get(&format!("{}:current", spotify_email)) { 973 let current_song = serde_json::from_str::<CurrentlyPlaying>(&cached)?; 974 if let Some(item) = current_song.item { 975 let percentage = current_song.progress_ms.unwrap_or(0) as f32 976 / data_item.duration_ms as f32 977 * 100.0; 978 if current_track && percentage >= 40.0 && item.id == data_item.id { 979 println!( 980 "{} Scrobbling track: {} {}", 981 format!("[{}]", spotify_email).bright_green(), 982 item.name.yellow(), 983 format!("{:.2}%", percentage) 984 ); 985 scrobble( 986 cache.clone(), 987 &spotify_email, 988 &did, 989 &token, 990 &client_id, 991 &client_secret, 992 ) 993 .await?; 994 995 match cache.del(&format!("changed:{}:{}", spotify_email, data_item.id)) { 996 Ok(_) => {} 997 Err(_) => tracing::error!("Failed to delete cache entry"), 998 }; 999 1000 thread::spawn(move || { 1001 let rt = tokio::runtime::Runtime::new().unwrap(); 1002 match rt.block_on(async { 1003 get_album_tracks( 1004 cache.clone(), 1005 &data_item.album.id, 1006 &token, 1007 &client_id, 1008 &client_secret, 1009 ) 1010 .await?; 1011 get_album( 1012 cache.clone(), 1013 &data_item.album.id, 1014 &token, 1015 &client_id, 1016 &client_secret, 1017 ) 1018 .await?; 1019 update_library( 1020 cache.clone(), 1021 &spotify_email, 1022 &did, 1023 &token, 1024 &client_id, 1025 &client_secret, 1026 ) 1027 .await?; 1028 Ok::<(), Error>(()) 1029 }) { 1030 Ok(_) => {} 1031 Err(e) => { 1032 println!( 1033 "{} {}", 1034 format!("[{}]", spotify_email).bright_green(), 1035 e.to_string().bright_red() 1036 ); 1037 } 1038 } 1039 }); 1040 } 1041 } 1042 } 1043 } 1044 1045 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; 1046 } 1047 1048 Ok(()) 1049}