A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at feat/pgpull 897 lines 29 kB view raw
1use std::{ 2 collections::HashMap, 3 env, 4 sync::{atomic::AtomicBool, Arc, Mutex}, 5 thread, 6}; 7 8use anyhow::Error; 9use async_nats::connect; 10use owo_colors::OwoColorize; 11use reqwest::Client; 12use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; 13use tokio_stream::StreamExt; 14 15use crate::{ 16 cache::Cache, 17 crypto::decrypt_aes_256_ctr, 18 rocksky::{scrobble, update_library}, 19 types::{ 20 album_tracks::AlbumTracks, 21 currently_playing::{Album, Artist, CurrentlyPlaying}, 22 spotify_token::SpotifyTokenWithEmail, 23 token::AccessToken, 24 }, 25}; 26 27pub mod cache; 28pub mod crypto; 29pub mod rocksky; 30pub mod token; 31pub mod types; 32 33pub const BASE_URL: &str = "https://spotify-api.rocksky.app/v1"; 34 35pub async fn run() -> Result<(), Error> { 36 let cache = Cache::new()?; 37 let pool = PgPoolOptions::new() 38 .max_connections(5) 39 .connect(&env::var("XATA_POSTGRES_URL")?) 40 .await?; 41 42 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()); 43 let nc = connect(&addr).await?; 44 println!("Connected to NATS server at {}", addr.bright_green()); 45 46 let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?; 47 println!("Subscribed to {}", "rocksky.spotify.user".bright_green()); 48 49 let users = find_spotify_users(&pool, 0, 100).await?; 50 println!("Found {} users", users.len().bright_green()); 51 52 // Shared HashMap to manage threads and their stop flags 53 let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> = 54 Arc::new(Mutex::new(HashMap::new())); 55 56 // Start threads for all users 57 for user in users { 58 let email = user.0.clone(); 59 let token = user.1.clone(); 60 let did = user.2.clone(); 61 let stop_flag = Arc::new(AtomicBool::new(false)); 62 let cache = cache.clone(); 63 let nc = nc.clone(); 64 let thread_map = Arc::clone(&thread_map); 65 66 thread_map 67 .lock() 68 .unwrap() 69 .insert(email.clone(), Arc::clone(&stop_flag)); 70 71 thread::spawn(move || { 72 let rt = tokio::runtime::Runtime::new().unwrap(); 73 match rt.block_on(async { 74 watch_currently_playing(email.clone(), token, did, stop_flag, cache.clone()) 75 .await?; 76 Ok::<(), Error>(()) 77 }) { 78 Ok(_) => {} 79 Err(e) => { 80 println!( 81 "{} Error starting thread for user: {} - {}", 82 format!("[{}]", email).bright_green(), 83 email.bright_green(), 84 e.to_string().bright_red() 85 ); 86 87 // If there's an error, publish a message to restart the thread 88 match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) { 89 Ok(_) => { 90 println!( 91 "{} Published message to restart thread for user: {}", 92 format!("[{}]", email).bright_green(), 93 email.bright_green() 94 ); 95 } 96 Err(e) => { 97 println!( 98 "{} Error publishing message to restart thread: {}", 99 format!("[{}]", email).bright_green(), 100 e.to_string().bright_red() 101 ); 102 } 103 } 104 } 105 } 106 }); 107 } 108 109 // Handle subscription messages 110 while let Some(message) = sub.next().await { 111 let user_id = String::from_utf8(message.payload.to_vec()).unwrap(); 112 println!( 113 "Received message to restart thread for user: {}", 114 user_id.bright_green() 115 ); 116 117 let mut thread_map = thread_map.lock().unwrap(); 118 119 // Check if the user exists in the thread map 120 if let Some(stop_flag) = thread_map.get(&user_id) { 121 // Stop the existing thread 122 stop_flag.store(true, std::sync::atomic::Ordering::Relaxed); 123 124 // Create a new stop flag and restart the thread 125 let new_stop_flag = Arc::new(AtomicBool::new(false)); 126 thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag)); 127 128 let user = find_spotify_user(&pool, &user_id).await?; 129 130 if user.is_none() { 131 println!( 132 "Spotify user not found: {}, skipping", 133 user_id.bright_green() 134 ); 135 continue; 136 } 137 138 let user = user.unwrap(); 139 140 let email = user.0.clone(); 141 let token = user.1.clone(); 142 let did = user.2.clone(); 143 let cache = cache.clone(); 144 145 thread::spawn(move || { 146 let rt = tokio::runtime::Runtime::new().unwrap(); 147 match rt.block_on(async { 148 watch_currently_playing( 149 email.clone(), 150 token, 151 did, 152 new_stop_flag, 153 cache.clone(), 154 ) 155 .await?; 156 Ok::<(), Error>(()) 157 }) { 158 Ok(_) => {} 159 Err(e) => { 160 println!( 161 "{} Error restarting thread for user: {} - {}", 162 format!("[{}]", email).bright_green(), 163 email.bright_green(), 164 e.to_string().bright_red() 165 ); 166 } 167 } 168 }); 169 170 println!("Restarted thread for user: {}", user_id.bright_green()); 171 } else { 172 println!( 173 "No thread found for user: {}, starting new thread", 174 user_id.bright_green() 175 ); 176 let user = find_spotify_user(&pool, &user_id).await?; 177 if let Some(user) = user { 178 let email = user.0.clone(); 179 let token = user.1.clone(); 180 let did = user.2.clone(); 181 let stop_flag = Arc::new(AtomicBool::new(false)); 182 let cache = cache.clone(); 183 let nc = nc.clone(); 184 185 thread_map.insert(email.clone(), Arc::clone(&stop_flag)); 186 187 thread::spawn(move || { 188 let rt = tokio::runtime::Runtime::new().unwrap(); 189 match rt.block_on(async { 190 watch_currently_playing( 191 email.clone(), 192 token, 193 did, 194 stop_flag, 195 cache.clone(), 196 ) 197 .await?; 198 Ok::<(), Error>(()) 199 }) { 200 Ok(_) => {} 201 Err(e) => { 202 println!( 203 "{} Error starting thread for user: {} - {}", 204 format!("[{}]", email).bright_green(), 205 email.bright_green(), 206 e.to_string().bright_red() 207 ); 208 match rt 209 .block_on(nc.publish("rocksky.spotify.user", email.clone().into())) 210 { 211 Ok(_) => {} 212 Err(e) => { 213 println!( 214 "{} Error publishing message to restart thread: {}", 215 format!("[{}]", email).bright_green(), 216 e.to_string().bright_red() 217 ); 218 } 219 } 220 } 221 } 222 }); 223 } 224 } 225 } 226 227 Ok(()) 228} 229 230pub async fn refresh_token(token: &str) -> Result<AccessToken, Error> { 231 if env::var("SPOTIFY_CLIENT_ID").is_err() || env::var("SPOTIFY_CLIENT_SECRET").is_err() { 232 panic!("Please set SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET environment variables"); 233 } 234 235 let client_id = env::var("SPOTIFY_CLIENT_ID")?; 236 let client_secret = env::var("SPOTIFY_CLIENT_SECRET")?; 237 238 let client = Client::new(); 239 240 let response = client 241 .post("https://accounts.spotify.com/api/token") 242 .basic_auth(&client_id, Some(client_secret)) 243 .form(&[ 244 ("grant_type", "refresh_token"), 245 ("refresh_token", token), 246 ("client_id", &client_id), 247 ]) 248 .send() 249 .await?; 250 let token = response.json::<AccessToken>().await?; 251 Ok(token) 252} 253 254pub async fn get_currently_playing( 255 cache: Cache, 256 user_id: &str, 257 token: &str, 258) -> Result<Option<(CurrentlyPlaying, bool)>, Error> { 259 if let Ok(Some(data)) = cache.get(user_id) { 260 println!( 261 "{} {}", 262 format!("[{}]", user_id).bright_green(), 263 "Using cache".cyan() 264 ); 265 if data == "No content" { 266 return Ok(None); 267 } 268 let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data); 269 270 if decoded_data.is_err() { 271 println!( 272 "{} {} {}", 273 format!("[{}]", user_id).bright_green(), 274 "Cache is invalid".red(), 275 data 276 ); 277 cache.setex(user_id, "No content", 10)?; 278 cache.del(&format!("{}:current", user_id))?; 279 return Ok(None); 280 } 281 282 let data: CurrentlyPlaying = decoded_data.unwrap(); 283 // detect if the song has changed 284 let previous = cache.get(&format!("{}:previous", user_id)); 285 286 if previous.is_err() { 287 println!( 288 "{} redis error: {}", 289 format!("[{}]", user_id).bright_green(), 290 previous.unwrap_err().to_string().bright_red() 291 ); 292 return Ok(None); 293 } 294 295 let previous = previous.unwrap(); 296 297 let changed = match previous { 298 Some(previous) => { 299 if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() { 300 println!( 301 "{} {} {}", 302 format!("[{}]", user_id).bright_green(), 303 "Previous cache is invalid", 304 previous 305 ); 306 return Ok(None); 307 } 308 309 let previous: CurrentlyPlaying = serde_json::from_str(&previous)?; 310 if previous.item.is_none() && data.item.is_some() { 311 return Ok(Some((data, true))); 312 } 313 314 if previous.item.is_some() && data.item.is_none() { 315 return Ok(Some((data, false))); 316 } 317 318 if previous.item.is_none() && data.item.is_none() { 319 return Ok(Some((data, false))); 320 } 321 322 let previous_item = previous.item.unwrap(); 323 let data_item = data.clone().item.unwrap(); 324 previous_item.id != data_item.id 325 && previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0) 326 } 327 _ => true, 328 }; 329 return Ok(Some((data, changed))); 330 } 331 332 let token = refresh_token(token).await?; 333 let client = Client::new(); 334 let response = client 335 .get(format!("{}/me/player/currently-playing", BASE_URL)) 336 .bearer_auth(token.access_token) 337 .send() 338 .await?; 339 340 let headers = response.headers().clone(); 341 let status = response.status().as_u16(); 342 let data = response.text().await?; 343 344 if status == 429 { 345 println!( 346 "{} Too many requests, retry-after {}", 347 format!("[{}]", user_id).bright_green(), 348 headers 349 .get("retry-after") 350 .unwrap() 351 .to_str() 352 .unwrap() 353 .bright_green() 354 ); 355 return Ok(None); 356 } 357 358 let previous = cache.get(&format!("{}:previous", user_id)); 359 if previous.is_err() { 360 println!( 361 "{} redis error: {}", 362 format!("[{}]", user_id).bright_green(), 363 previous.unwrap_err().to_string().bright_red() 364 ); 365 return Ok(None); 366 } 367 368 let previous = previous.unwrap(); 369 370 // check if status code is 204 371 if status == 204 { 372 println!("No content"); 373 match cache.setex( 374 user_id, 375 "No content", 376 match previous.is_none() { 377 true => 30, 378 false => 10, 379 }, 380 ) { 381 Ok(_) => {} 382 Err(e) => { 383 println!( 384 "{} redis error: {}", 385 format!("[{}]", user_id).bright_green(), 386 e.to_string().bright_red() 387 ); 388 return Ok(None); 389 } 390 } 391 match cache.del(&format!("{}:current", user_id)) { 392 Ok(_) => {} 393 Err(e) => { 394 println!( 395 "{} redis error: {}", 396 format!("[{}]", user_id).bright_green(), 397 e.to_string().bright_red() 398 ); 399 return Ok(None); 400 } 401 } 402 return Ok(None); 403 } 404 405 if serde_json::from_str::<CurrentlyPlaying>(&data).is_err() { 406 println!( 407 "{} {} {}", 408 format!("[{}]", user_id).bright_green(), 409 "Invalid data received".red(), 410 data 411 ); 412 match cache.setex(user_id, "No content", 10) { 413 Ok(_) => {} 414 Err(e) => { 415 println!( 416 "{} redis error: {}", 417 format!("[{}]", user_id).bright_green(), 418 e.to_string().bright_red() 419 ); 420 return Ok(None); 421 } 422 } 423 match cache.del(&format!("{}:current", user_id)) { 424 Ok(_) => {} 425 Err(e) => { 426 println!( 427 "{} redis error: {}", 428 format!("[{}]", user_id).bright_green(), 429 e.to_string().bright_red() 430 ); 431 return Ok(None); 432 } 433 } 434 return Ok(None); 435 } 436 437 let data = serde_json::from_str::<CurrentlyPlaying>(&data)?; 438 439 match cache.setex( 440 user_id, 441 &serde_json::to_string(&data)?, 442 match previous.is_none() { 443 true => 30, 444 false => 15, 445 }, 446 ) { 447 Ok(_) => {} 448 Err(e) => { 449 println!( 450 "{} redis error: {}", 451 format!("[{}]", user_id).bright_green(), 452 e.to_string().bright_red() 453 ); 454 return Ok(None); 455 } 456 } 457 match cache.del(&format!("{}:current", user_id)) { 458 Ok(_) => {} 459 Err(e) => { 460 println!( 461 "{} redis error: {}", 462 format!("[{}]", user_id).bright_green(), 463 e.to_string().bright_red() 464 ); 465 return Ok(None); 466 } 467 } 468 469 // detect if the song has changed 470 let previous = cache.get(&format!("{}:previous", user_id)); 471 472 if previous.is_err() { 473 println!( 474 "{} redis error: {}", 475 format!("[{}]", user_id).bright_green(), 476 previous.unwrap_err().to_string().bright_red() 477 ); 478 return Ok(None); 479 } 480 481 let previous = previous.unwrap(); 482 let changed = match previous { 483 Some(previous) => { 484 if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() { 485 println!( 486 "{} {} {}", 487 format!("[{}]", user_id).bright_green(), 488 "Previous cache is invalid", 489 previous 490 ); 491 return Ok(None); 492 } 493 494 let previous: CurrentlyPlaying = serde_json::from_str(&previous)?; 495 if previous.item.is_none() || data.item.is_none() { 496 return Ok(Some((data, false))); 497 } 498 499 let previous_item = previous.item.unwrap(); 500 let data_item = data.clone().item.unwrap(); 501 502 previous_item.id != data_item.id 503 && previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0) 504 } 505 _ => false, 506 }; 507 508 // save as previous song 509 match cache.setex( 510 &format!("{}:previous", user_id), 511 &serde_json::to_string(&data)?, 512 600, 513 ) { 514 Ok(_) => {} 515 Err(e) => { 516 println!( 517 "{} redis error: {}", 518 format!("[{}]", user_id).bright_green(), 519 e.to_string().bright_red() 520 ); 521 return Ok(None); 522 } 523 } 524 525 Ok(Some((data, changed))) 526} 527 528pub async fn get_artist( 529 cache: Cache, 530 artist_id: &str, 531 token: &str, 532) -> Result<Option<Artist>, Error> { 533 if let Ok(Some(data)) = cache.get(artist_id) { 534 return Ok(Some(serde_json::from_str(&data)?)); 535 } 536 537 let token = refresh_token(token).await?; 538 let client = Client::new(); 539 let response = client 540 .get(&format!("{}/artists/{}", BASE_URL, artist_id)) 541 .bearer_auth(token.access_token) 542 .send() 543 .await?; 544 545 let headers = response.headers().clone(); 546 let data = response.text().await?; 547 548 if data == "Too many requests" { 549 println!( 550 "> retry-after {}", 551 headers.get("retry-after").unwrap().to_str().unwrap() 552 ); 553 println!("> {} [get_artist]", data); 554 return Ok(None); 555 } 556 557 match cache.setex(artist_id, &data, 20) { 558 Ok(_) => {} 559 Err(e) => { 560 println!( 561 "{} redis error: {}", 562 format!("[{}]", artist_id).bright_green(), 563 e.to_string().bright_red() 564 ); 565 return Ok(None); 566 } 567 } 568 569 Ok(Some(serde_json::from_str(&data)?)) 570} 571 572pub async fn get_album(cache: Cache, album_id: &str, token: &str) -> Result<Option<Album>, Error> { 573 if let Ok(Some(data)) = cache.get(album_id) { 574 return Ok(Some(serde_json::from_str(&data)?)); 575 } 576 577 let token = refresh_token(token).await?; 578 let client = Client::new(); 579 let response = client 580 .get(&format!("{}/albums/{}", BASE_URL, album_id)) 581 .bearer_auth(token.access_token) 582 .send() 583 .await?; 584 585 let headers = response.headers().clone(); 586 let data = response.text().await?; 587 588 if data == "Too many requests" { 589 println!( 590 "> retry-after {}", 591 headers.get("retry-after").unwrap().to_str().unwrap() 592 ); 593 println!("> {} [get_album]", data); 594 return Ok(None); 595 } 596 597 match cache.setex(album_id, &data, 20) { 598 Ok(_) => {} 599 Err(e) => { 600 println!( 601 "{} redis error: {}", 602 format!("[{}]", album_id).bright_green(), 603 e.to_string().bright_red() 604 ); 605 return Ok(None); 606 } 607 } 608 609 Ok(Some(serde_json::from_str(&data)?)) 610} 611 612pub async fn get_album_tracks( 613 cache: Cache, 614 album_id: &str, 615 token: &str, 616) -> Result<AlbumTracks, Error> { 617 if let Ok(Some(data)) = cache.get(&format!("{}:tracks", album_id)) { 618 return Ok(serde_json::from_str(&data)?); 619 } 620 621 let token = refresh_token(token).await?; 622 let client = Client::new(); 623 let mut all_tracks = Vec::new(); 624 let mut offset = 0; 625 let limit = 50; 626 627 loop { 628 let response = client 629 .get(&format!("{}/albums/{}/tracks", BASE_URL, album_id)) 630 .bearer_auth(&token.access_token) 631 .query(&[ 632 ("limit", &limit.to_string()), 633 ("offset", &offset.to_string()), 634 ]) 635 .send() 636 .await?; 637 638 let headers = response.headers().clone(); 639 let data = response.text().await?; 640 if data == "Too many requests" { 641 println!( 642 "> retry-after {}", 643 headers.get("retry-after").unwrap().to_str().unwrap() 644 ); 645 println!("> {} [get_album_tracks]", data); 646 continue; 647 } 648 649 let album_tracks: AlbumTracks = serde_json::from_str(&data)?; 650 651 if album_tracks.items.is_empty() { 652 break; 653 } 654 655 all_tracks.extend(album_tracks.items); 656 offset += limit; 657 } 658 659 let all_tracks_json = serde_json::to_string(&all_tracks)?; 660 match cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20) { 661 Ok(_) => {} 662 Err(e) => { 663 println!( 664 "{} redis error: {}", 665 format!("[{}]", album_id).bright_green(), 666 e.to_string().bright_red() 667 ); 668 } 669 } 670 671 Ok(AlbumTracks { 672 items: all_tracks, 673 ..Default::default() 674 }) 675} 676 677pub async fn find_spotify_users( 678 pool: &Pool<Postgres>, 679 offset: usize, 680 limit: usize, 681) -> Result<Vec<(String, String, String, String)>, Error> { 682 let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as( 683 r#" 684 SELECT * FROM spotify_tokens 685 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id 686 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id 687 LIMIT $1 OFFSET $2 688 "#, 689 ) 690 .bind(limit as i64) 691 .bind(offset as i64) 692 .fetch_all(pool) 693 .await?; 694 695 let mut user_tokens = vec![]; 696 697 for result in &results { 698 let token = decrypt_aes_256_ctr( 699 &result.refresh_token, 700 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 701 )?; 702 user_tokens.push(( 703 result.email.clone(), 704 token, 705 result.did.clone(), 706 result.user_id.clone(), 707 )); 708 } 709 710 Ok(user_tokens) 711} 712 713pub async fn find_spotify_user( 714 pool: &Pool<Postgres>, 715 email: &str, 716) -> Result<Option<(String, String, String)>, Error> { 717 let result: Vec<SpotifyTokenWithEmail> = sqlx::query_as( 718 r#" 719 SELECT * FROM spotify_tokens 720 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id 721 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id 722 WHERE spotify_accounts.email = $1 723 "#, 724 ) 725 .bind(email) 726 .fetch_all(pool) 727 .await?; 728 729 match result.first() { 730 Some(result) => { 731 let token = decrypt_aes_256_ctr( 732 &result.refresh_token, 733 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 734 )?; 735 Ok(Some((result.email.clone(), token, result.did.clone()))) 736 } 737 None => Ok(None), 738 } 739} 740 741pub async fn watch_currently_playing( 742 spotify_email: String, 743 token: String, 744 did: String, 745 stop_flag: Arc<AtomicBool>, 746 cache: Cache, 747) -> Result<(), Error> { 748 println!( 749 "{} {}", 750 format!("[{}]", spotify_email).bright_green(), 751 "Checking currently playing".cyan() 752 ); 753 754 let stop_flag_clone = stop_flag.clone(); 755 let spotify_email_clone = spotify_email.clone(); 756 let cache_clone = cache.clone(); 757 thread::spawn(move || { 758 loop { 759 if stop_flag_clone.load(std::sync::atomic::Ordering::Relaxed) { 760 println!( 761 "{} Stopping Thread", 762 format!("[{}]", spotify_email_clone).bright_green() 763 ); 764 break; 765 } 766 if let Ok(Some(cached)) = cache_clone.get(&format!("{}:current", spotify_email_clone)) { 767 if serde_json::from_str::<CurrentlyPlaying>(&cached).is_err() { 768 thread::sleep(std::time::Duration::from_millis(800)); 769 continue; 770 } 771 772 let mut current_song = serde_json::from_str::<CurrentlyPlaying>(&cached)?; 773 774 if let Some(item) = current_song.item.clone() { 775 if current_song.is_playing 776 && current_song.progress_ms.unwrap_or(0) < item.duration_ms.into() 777 { 778 current_song.progress_ms = 779 Some(current_song.progress_ms.unwrap_or(0) + 800); 780 match cache_clone.setex( 781 &format!("{}:current", spotify_email_clone), 782 &serde_json::to_string(&current_song)?, 783 16, 784 ) { 785 Ok(_) => {} 786 Err(e) => { 787 println!( 788 "{} redis error: {}", 789 format!("[{}]", spotify_email_clone).bright_green(), 790 e.to_string().bright_red() 791 ); 792 } 793 } 794 thread::sleep(std::time::Duration::from_millis(800)); 795 continue; 796 } 797 } 798 continue; 799 } 800 801 if let Ok(Some(cached)) = cache_clone.get(&spotify_email_clone) { 802 if cached == "No content" { 803 thread::sleep(std::time::Duration::from_millis(800)); 804 continue; 805 } 806 match cache_clone.setex(&format!("{}:current", spotify_email_clone), &cached, 16) { 807 Ok(_) => {} 808 Err(e) => { 809 println!( 810 "{} redis error: {}", 811 format!("[{}]", spotify_email_clone).bright_green(), 812 e.to_string().bright_red() 813 ); 814 } 815 } 816 } 817 818 thread::sleep(std::time::Duration::from_millis(800)); 819 } 820 Ok::<(), Error>(()) 821 }); 822 823 loop { 824 if stop_flag.load(std::sync::atomic::Ordering::Relaxed) { 825 println!( 826 "{} Stopping Thread", 827 format!("[{}]", spotify_email).bright_green() 828 ); 829 break; 830 } 831 let spotify_email = spotify_email.clone(); 832 let token = token.clone(); 833 let did = did.clone(); 834 let cache = cache.clone(); 835 836 let currently_playing = get_currently_playing(cache.clone(), &spotify_email, &token).await; 837 let currently_playing = match currently_playing { 838 Ok(currently_playing) => currently_playing, 839 Err(e) => { 840 println!( 841 "{} {}", 842 format!("[{}]", spotify_email).bright_green(), 843 e.to_string().bright_red() 844 ); 845 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; 846 continue; 847 } 848 }; 849 850 if let Some((data, changed)) = currently_playing { 851 if data.item.is_none() { 852 println!( 853 "{} {}", 854 format!("[{}]", spotify_email).bright_green(), 855 "No song playing".yellow() 856 ); 857 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; 858 continue; 859 } 860 let data_item = data.item.unwrap(); 861 println!( 862 "{} {} is_playing: {} changed: {}", 863 format!("[{}]", spotify_email).bright_green(), 864 format!("{} - {}", data_item.name, data_item.artists[0].name).yellow(), 865 data.is_playing, 866 changed 867 ); 868 869 if changed { 870 scrobble(cache.clone(), &spotify_email, &did, &token).await?; 871 872 thread::spawn(move || { 873 let rt = tokio::runtime::Runtime::new().unwrap(); 874 match rt.block_on(async { 875 get_album_tracks(cache.clone(), &data_item.album.id, &token).await?; 876 get_album(cache.clone(), &data_item.album.id, &token).await?; 877 update_library(cache.clone(), &spotify_email, &did, &token).await?; 878 Ok::<(), Error>(()) 879 }) { 880 Ok(_) => {} 881 Err(e) => { 882 println!( 883 "{} {}", 884 format!("[{}]", spotify_email).bright_green(), 885 e.to_string().bright_red() 886 ); 887 } 888 } 889 }); 890 } 891 } 892 893 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; 894 } 895 896 Ok(()) 897}