A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at fix/spotify 747 lines 25 kB view raw
1use anyhow::Error; 2use async_nats::connect; 3use owo_colors::OwoColorize; 4use reqwest::Client; 5use serde::{Deserialize, Serialize}; 6use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; 7use std::{ 8 collections::HashMap, 9 env, 10 sync::{atomic::AtomicBool, Arc, Mutex}, 11 thread, 12 time::{SystemTime, UNIX_EPOCH}, 13}; 14use tokio_stream::StreamExt; 15 16use crate::{ 17 cache::Cache, 18 crypto::decrypt_aes_256_ctr, 19 rocksky::{scrobble, update_library}, 20 types::{ 21 album_tracks::AlbumTracks, 22 currently_playing::{Album, Artist, CurrentlyPlaying}, 23 spotify_token::SpotifyTokenWithEmail, 24 token::AccessToken, 25 }, 26}; 27 28pub mod cache; 29pub mod crypto; 30pub mod rocksky; 31pub mod token; 32pub mod types; 33 34pub const BASE_URL: &str = "https://spotify-api.rocksky.app/v1"; 35pub const MAX_USERS: usize = 100; 36 37#[derive(Serialize, Deserialize, Debug, Clone)] 38struct TrackState { 39 track_id: String, 40 progress_ms: u64, 41 scrobbled: bool, 42 last_updated: u64, 43} 44 45pub async fn run() -> Result<(), Error> { 46 let cache = Cache::new()?; 47 let pool = PgPoolOptions::new() 48 .max_connections(5) 49 .connect(&env::var("XATA_POSTGRES_URL")?) 50 .await?; 51 52 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()); 53 let nc = connect(&addr).await?; 54 println!("Connected to NATS server at {}", addr.bright_green()); 55 56 let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?; 57 println!("Subscribed to {}", "rocksky.spotify.user".bright_green()); 58 59 let users = find_spotify_users(&pool, 0, MAX_USERS).await?; 60 println!("Found {} users", users.len().bright_green()); 61 62 let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> = 63 Arc::new(Mutex::new(HashMap::new())); 64 65 for user in users { 66 let email = user.0.clone(); 67 let token = user.1.clone(); 68 let did = user.2.clone(); 69 let stop_flag = Arc::new(AtomicBool::new(false)); 70 let cache = cache.clone(); 71 let nc = nc.clone(); 72 let thread_map = Arc::clone(&thread_map); 73 74 thread_map 75 .lock() 76 .unwrap() 77 .insert(email.clone(), Arc::clone(&stop_flag)); 78 79 thread::spawn(move || { 80 let rt = tokio::runtime::Runtime::new().unwrap(); 81 match rt.block_on(async { 82 watch_currently_playing(email.clone(), token, did, stop_flag, cache.clone()) 83 .await?; 84 Ok::<(), Error>(()) 85 }) { 86 Ok(_) => {} 87 Err(e) => { 88 println!( 89 "{} Error starting thread for user: {} - {}", 90 format!("[{}]", email).bright_green(), 91 email.bright_green(), 92 e.to_string().bright_red() 93 ); 94 // If there's an error, publish a message to restart the thread 95 match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) { 96 Ok(_) => { 97 println!( 98 "{} Published message to restart thread for user: {}", 99 format!("[{}]", email).bright_green(), 100 email.bright_green() 101 ); 102 } 103 Err(e) => { 104 println!( 105 "{} Error publishing message to restart thread: {}", 106 format!("[{}]", email).bright_green(), 107 e.to_string().bright_red() 108 ); 109 } 110 } 111 } 112 } 113 }); 114 } 115 116 while let Some(message) = sub.next().await { 117 let user_id = String::from_utf8(message.payload.to_vec()).unwrap(); 118 println!( 119 "Received message to restart thread for user: {}", 120 user_id.bright_green() 121 ); 122 123 let mut thread_map = thread_map.lock().unwrap(); 124 125 if let Some(stop_flag) = thread_map.get(&user_id) { 126 stop_flag.store(true, std::sync::atomic::Ordering::Relaxed); 127 128 let new_stop_flag = Arc::new(AtomicBool::new(false)); 129 thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag)); 130 131 let user = find_spotify_user(&pool, &user_id).await?; 132 if user.is_none() { 133 println!( 134 "Spotify user not found: {}, skipping", 135 user_id.bright_green() 136 ); 137 continue; 138 } 139 140 let user = user.unwrap(); 141 let email = user.0.clone(); 142 let token = user.1.clone(); 143 let did = user.2.clone(); 144 let cache = cache.clone(); 145 146 thread::spawn(move || { 147 let rt = tokio::runtime::Runtime::new().unwrap(); 148 match rt.block_on(async { 149 watch_currently_playing( 150 email.clone(), 151 token, 152 did, 153 new_stop_flag, 154 cache.clone(), 155 ) 156 .await?; 157 Ok::<(), Error>(()) 158 }) { 159 Ok(_) => {} 160 Err(e) => { 161 println!( 162 "{} Error restarting thread for user: {} - {}", 163 format!("[{}]", email).bright_green(), 164 email.bright_green(), 165 e.to_string().bright_red() 166 ); 167 } 168 } 169 }); 170 println!("Restarted thread for user: {}", user_id.bright_green()); 171 } else { 172 println!( 173 "No thread found for user: {}, starting new thread", 174 user_id.bright_green() 175 ); 176 let user = find_spotify_user(&pool, &user_id).await?; 177 if let Some(user) = user { 178 let email = user.0.clone(); 179 let token = user.1.clone(); 180 let did = user.2.clone(); 181 let stop_flag = Arc::new(AtomicBool::new(false)); 182 let cache = cache.clone(); 183 let nc = nc.clone(); 184 185 thread_map.insert(email.clone(), Arc::clone(&stop_flag)); 186 187 thread::spawn(move || { 188 let rt = tokio::runtime::Runtime::new().unwrap(); 189 match rt.block_on(async { 190 watch_currently_playing( 191 email.clone(), 192 token, 193 did, 194 stop_flag, 195 cache.clone(), 196 ) 197 .await?; 198 Ok::<(), Error>(()) 199 }) { 200 Ok(_) => {} 201 Err(e) => { 202 println!( 203 "{} Error starting thread for user: {} - {}", 204 format!("[{}]", email).bright_green(), 205 email.bright_green(), 206 e.to_string().bright_red() 207 ); 208 match rt 209 .block_on(nc.publish("rocksky.spotify.user", email.clone().into())) 210 { 211 Ok(_) => {} 212 Err(e) => { 213 println!( 214 "{} Error publishing message to restart thread: {}", 215 format!("[{}]", email).bright_green(), 216 e.to_string().bright_red() 217 ); 218 } 219 } 220 } 221 } 222 }); 223 } 224 } 225 } 226 227 Ok(()) 228} 229 230pub async fn refresh_token(token: &str) -> Result<AccessToken, Error> { 231 if env::var("SPOTIFY_CLIENT_ID").is_err() || env::var("SPOTIFY_CLIENT_SECRET").is_err() { 232 panic!("Please set SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET environment variables"); 233 } 234 235 let client_id = env::var("SPOTIFY_CLIENT_ID")?; 236 let client_secret = env::var("SPOTIFY_CLIENT_SECRET")?; 237 238 let client = Client::new(); 239 let response = client 240 .post("https://accounts.spotify.com/api/token") 241 .basic_auth(&client_id, Some(client_secret)) 242 .form(&[ 243 ("grant_type", "refresh_token"), 244 ("refresh_token", token), 245 ("client_id", &client_id), 246 ]) 247 .send() 248 .await?; 249 250 let token = response.json::<AccessToken>().await?; 251 Ok(token) 252} 253 254pub async fn get_currently_playing( 255 cache: Cache, 256 user_id: &str, 257 token: &str, 258) -> Result<Option<(CurrentlyPlaying, bool)>, Error> { 259 // Check if we have cached data 260 if let Ok(Some(data)) = cache.get(user_id) { 261 println!( 262 "{} {}", 263 format!("[{}]", user_id).bright_green(), 264 "Using cache".cyan() 265 ); 266 if data == "No content" { 267 return Ok(None); 268 } 269 270 let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data); 271 if decoded_data.is_err() { 272 println!( 273 "{} {} {}", 274 format!("[{}]", user_id).bright_green(), 275 "Cache is invalid".red(), 276 data 277 ); 278 cache.setex(user_id, "No content", 10)?; 279 cache.del(&format!("{}:current", user_id))?; 280 cache.del(&format!("{}:track_state", user_id))?; 281 return Ok(None); 282 } 283 284 let data: CurrentlyPlaying = decoded_data.unwrap(); 285 286 let changed = detect_track_change(&cache, user_id, &data)?; 287 return Ok(Some((data, changed))); 288 } 289 290 let token = refresh_token(token).await?; 291 let client = Client::new(); 292 let response = client 293 .get(format!("{}/me/player/currently-playing", BASE_URL)) 294 .bearer_auth(token.access_token) 295 .send() 296 .await?; 297 298 let headers = response.headers().clone(); 299 let status = response.status().as_u16(); 300 let data = response.text().await?; 301 302 if status == 429 { 303 println!( 304 "{} Too many requests, retry-after {}", 305 format!("[{}]", user_id).bright_green(), 306 headers 307 .get("retry-after") 308 .unwrap() 309 .to_str() 310 .unwrap() 311 .bright_green() 312 ); 313 return Ok(None); 314 } 315 316 if status == 204 { 317 println!("No content"); 318 // Clear track state when nothing is playing 319 cache.del(&format!("{}:track_state", user_id))?; 320 321 let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() { 322 30 323 } else { 324 10 325 }; 326 327 cache.setex(user_id, "No content", ttl)?; 328 cache.del(&format!("{}:current", user_id))?; 329 return Ok(None); 330 } 331 332 if serde_json::from_str::<CurrentlyPlaying>(&data).is_err() { 333 println!( 334 "{} {} {}", 335 format!("[{}]", user_id).bright_green(), 336 "Invalid data received".red(), 337 data 338 ); 339 cache.setex(user_id, "No content", 10)?; 340 cache.del(&format!("{}:current", user_id))?; 341 cache.del(&format!("{}:track_state", user_id))?; 342 return Ok(None); 343 } 344 345 let currently_playing_data = serde_json::from_str::<CurrentlyPlaying>(&data)?; 346 347 let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() { 348 30 349 } else { 350 15 351 }; 352 353 cache.setex( 354 user_id, 355 &serde_json::to_string(&currently_playing_data)?, 356 ttl, 357 )?; 358 cache.del(&format!("{}:current", user_id))?; 359 360 // Detect track change and update track state 361 let changed = detect_track_change(&cache, user_id, &currently_playing_data)?; 362 363 // Update previous song cache 364 cache.setex( 365 &format!("{}:previous", user_id), 366 &serde_json::to_string(&currently_playing_data)?, 367 600, 368 )?; 369 370 Ok(Some((currently_playing_data, changed))) 371} 372 373fn detect_track_change( 374 cache: &Cache, 375 user_id: &str, 376 current: &CurrentlyPlaying, 377) -> Result<bool, Error> { 378 let track_state_key = format!("{}:track_state", user_id); 379 380 let now = SystemTime::now() 381 .duration_since(UNIX_EPOCH) 382 .unwrap() 383 .as_secs(); 384 385 let current_item = match &current.item { 386 Some(item) => item, 387 None => { 388 let _ = cache.del(&track_state_key); 389 return Ok(false); 390 } 391 }; 392 393 let previous_state = cache.get(&track_state_key)?; 394 395 let changed = match previous_state { 396 Some(state_str) => { 397 if let Ok(prev_state) = serde_json::from_str::<TrackState>(&state_str) { 398 if prev_state.track_id != current_item.id { 399 true 400 } else { 401 // Same track - check if we should scrobble based on progress and time 402 let progress_diff = 403 current.progress_ms.unwrap_or(0) as i64 - prev_state.progress_ms as i64; 404 let time_diff = now - prev_state.last_updated; 405 406 // Only consider it changed if: 407 // 1. We haven't scrobbled this track yet 408 // 2. Significant progress was made (more than 10 seconds or reasonable time passed) 409 // 3. Track is actually playing 410 !prev_state.scrobbled 411 && current.is_playing 412 && (progress_diff > 10000 || (time_diff > 30 && progress_diff > 0)) 413 } 414 } else { 415 // Invalid previous state, treat as changed 416 true 417 } 418 } 419 None => { 420 // No previous state, treat as new track 421 current.is_playing 422 } 423 }; 424 425 let new_state = TrackState { 426 track_id: current_item.id.clone(), 427 progress_ms: current.progress_ms.unwrap_or(0), 428 scrobbled: changed, // Mark as scrobbled if we're reporting a change 429 last_updated: now, 430 }; 431 432 cache.setex(&track_state_key, &serde_json::to_string(&new_state)?, 300)?; 433 434 Ok(changed) 435} 436 437pub async fn get_artist( 438 cache: Cache, 439 artist_id: &str, 440 token: &str, 441) -> Result<Option<Artist>, Error> { 442 if let Ok(Some(data)) = cache.get(artist_id) { 443 return Ok(Some(serde_json::from_str(&data)?)); 444 } 445 446 let token = refresh_token(token).await?; 447 let client = Client::new(); 448 let response = client 449 .get(&format!("{}/artists/{}", BASE_URL, artist_id)) 450 .bearer_auth(token.access_token) 451 .send() 452 .await?; 453 454 let headers = response.headers().clone(); 455 let data = response.text().await?; 456 457 if data == "Too many requests" { 458 println!( 459 "> retry-after {}", 460 headers.get("retry-after").unwrap().to_str().unwrap() 461 ); 462 println!("> {} [get_artist]", data); 463 return Ok(None); 464 } 465 466 cache.setex(artist_id, &data, 20)?; 467 Ok(Some(serde_json::from_str(&data)?)) 468} 469 470pub async fn get_album(cache: Cache, album_id: &str, token: &str) -> Result<Option<Album>, Error> { 471 if let Ok(Some(data)) = cache.get(album_id) { 472 return Ok(Some(serde_json::from_str(&data)?)); 473 } 474 475 let token = refresh_token(token).await?; 476 let client = Client::new(); 477 let response = client 478 .get(&format!("{}/albums/{}", BASE_URL, album_id)) 479 .bearer_auth(token.access_token) 480 .send() 481 .await?; 482 483 let headers = response.headers().clone(); 484 let data = response.text().await?; 485 486 if data == "Too many requests" { 487 println!( 488 "> retry-after {}", 489 headers.get("retry-after").unwrap().to_str().unwrap() 490 ); 491 println!("> {} [get_album]", data); 492 return Ok(None); 493 } 494 495 cache.setex(album_id, &data, 20)?; 496 Ok(Some(serde_json::from_str(&data)?)) 497} 498 499pub async fn get_album_tracks( 500 cache: Cache, 501 album_id: &str, 502 token: &str, 503) -> Result<AlbumTracks, Error> { 504 if let Ok(Some(data)) = cache.get(&format!("{}:tracks", album_id)) { 505 return Ok(serde_json::from_str(&data)?); 506 } 507 508 let token = refresh_token(token).await?; 509 let client = Client::new(); 510 let mut all_tracks = Vec::new(); 511 let mut offset = 0; 512 let limit = 50; 513 514 loop { 515 let response = client 516 .get(&format!("{}/albums/{}/tracks", BASE_URL, album_id)) 517 .bearer_auth(&token.access_token) 518 .query(&[ 519 ("limit", &limit.to_string()), 520 ("offset", &offset.to_string()), 521 ]) 522 .send() 523 .await?; 524 525 let headers = response.headers().clone(); 526 let data = response.text().await?; 527 528 if data == "Too many requests" { 529 println!( 530 "> retry-after {}", 531 headers.get("retry-after").unwrap().to_str().unwrap() 532 ); 533 println!("> {} [get_album_tracks]", data); 534 continue; 535 } 536 537 let album_tracks: AlbumTracks = serde_json::from_str(&data)?; 538 if album_tracks.items.is_empty() { 539 break; 540 } 541 542 all_tracks.extend(album_tracks.items); 543 offset += limit; 544 } 545 546 let all_tracks_json = serde_json::to_string(&all_tracks)?; 547 cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20)?; 548 549 Ok(AlbumTracks { 550 items: all_tracks, 551 ..Default::default() 552 }) 553} 554 555pub async fn find_spotify_users( 556 pool: &Pool<Postgres>, 557 offset: usize, 558 limit: usize, 559) -> Result<Vec<(String, String, String, String)>, Error> { 560 let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as( 561 r#" 562 SELECT * FROM spotify_tokens 563 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id 564 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id 565 LIMIT $1 OFFSET $2 566 "#, 567 ) 568 .bind(limit as i64) 569 .bind(offset as i64) 570 .fetch_all(pool) 571 .await?; 572 573 let mut user_tokens = vec![]; 574 for result in &results { 575 let token = decrypt_aes_256_ctr( 576 &result.refresh_token, 577 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 578 )?; 579 user_tokens.push(( 580 result.email.clone(), 581 token, 582 result.did.clone(), 583 result.user_id.clone(), 584 )); 585 } 586 587 Ok(user_tokens) 588} 589 590pub async fn find_spotify_user( 591 pool: &Pool<Postgres>, 592 email: &str, 593) -> Result<Option<(String, String, String)>, Error> { 594 let result: Vec<SpotifyTokenWithEmail> = sqlx::query_as( 595 r#" 596 SELECT * FROM spotify_tokens 597 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id 598 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id 599 WHERE spotify_accounts.email = $1 600 "#, 601 ) 602 .bind(email) 603 .fetch_all(pool) 604 .await?; 605 606 match result.first() { 607 Some(result) => { 608 let token = decrypt_aes_256_ctr( 609 &result.refresh_token, 610 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 611 )?; 612 Ok(Some((result.email.clone(), token, result.did.clone()))) 613 } 614 None => Ok(None), 615 } 616} 617 618pub async fn watch_currently_playing( 619 spotify_email: String, 620 token: String, 621 did: String, 622 stop_flag: Arc<AtomicBool>, 623 cache: Cache, 624) -> Result<(), Error> { 625 println!( 626 "{} {}", 627 format!("[{}]", spotify_email).bright_green(), 628 "Checking currently playing".cyan() 629 ); 630 631 // Remove the separate progress tracking thread - it was causing race conditions 632 // and unnecessary complexity 633 634 loop { 635 if stop_flag.load(std::sync::atomic::Ordering::Relaxed) { 636 println!( 637 "{} Stopping Thread", 638 format!("[{}]", spotify_email).bright_green() 639 ); 640 break; 641 } 642 643 let spotify_email = spotify_email.clone(); 644 let token = token.clone(); 645 let did = did.clone(); 646 let cache = cache.clone(); 647 648 let currently_playing = get_currently_playing(cache.clone(), &spotify_email, &token).await; 649 let currently_playing = match currently_playing { 650 Ok(currently_playing) => currently_playing, 651 Err(e) => { 652 println!( 653 "{} {}", 654 format!("[{}]", spotify_email).bright_green(), 655 e.to_string().bright_red() 656 ); 657 tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; 658 continue; 659 } 660 }; 661 662 if let Some((data, changed)) = currently_playing { 663 if data.item.is_none() { 664 println!( 665 "{} {}", 666 format!("[{}]", spotify_email).bright_green(), 667 "No song playing".yellow() 668 ); 669 tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; 670 continue; 671 } 672 673 let data_item = data.item.unwrap(); 674 println!( 675 "{} {} is_playing: {} changed: {}", 676 format!("[{}]", spotify_email).bright_green(), 677 format!("{} - {}", data_item.name, data_item.artists[0].name).yellow(), 678 data.is_playing, 679 changed 680 ); 681 682 // Only scrobble if there's a genuine track change and the track is playing 683 if changed && data.is_playing { 684 // Add a small delay to prevent rapid duplicate scrobbles 685 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; 686 687 match scrobble(cache.clone(), &spotify_email, &did, &token).await { 688 Ok(_) => { 689 println!( 690 "{} {}", 691 format!("[{}]", spotify_email).bright_green(), 692 "Scrobbled successfully".green() 693 ); 694 } 695 Err(e) => { 696 println!( 697 "{} Scrobble failed: {}", 698 format!("[{}]", spotify_email).bright_green(), 699 e.to_string().bright_red() 700 ); 701 } 702 } 703 704 // Spawn background task for library updates 705 let cache_clone = cache.clone(); 706 let token_clone = token.clone(); 707 let spotify_email_clone = spotify_email.clone(); 708 let did_clone = did.clone(); 709 let album_id = data_item.album.id.clone(); 710 711 thread::spawn(move || { 712 let rt = tokio::runtime::Runtime::new().unwrap(); 713 match rt.block_on(async { 714 get_album_tracks(cache_clone.clone(), &album_id, &token_clone).await?; 715 get_album(cache_clone.clone(), &album_id, &token_clone).await?; 716 update_library( 717 cache_clone.clone(), 718 &spotify_email_clone, 719 &did_clone, 720 &token_clone, 721 ) 722 .await?; 723 Ok::<(), Error>(()) 724 }) { 725 Ok(_) => { 726 println!( 727 "{} Library updated successfully", 728 format!("[{}]", spotify_email_clone).bright_green() 729 ); 730 } 731 Err(e) => { 732 println!( 733 "{} Library update failed: {}", 734 format!("[{}]", spotify_email_clone).bright_green(), 735 e.to_string().bright_red() 736 ); 737 } 738 } 739 }); 740 } 741 } 742 743 tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; 744 } 745 746 Ok(()) 747}