A decentralized music tracking and discovery platform built on AT Protocol 馃幍
rocksky.app
spotify
atproto
lastfm
musicbrainz
scrobbling
listenbrainz
1use std::{
2 collections::HashMap,
3 env,
4 sync::{atomic::AtomicBool, Arc, Mutex},
5 thread,
6};
7
8use anyhow::Error;
9use async_nats::connect;
10use owo_colors::OwoColorize;
11use reqwest::Client;
12use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
13use tokio_stream::StreamExt;
14
15use crate::{
16 cache::Cache,
17 crypto::decrypt_aes_256_ctr,
18 rocksky::{scrobble, update_library},
19 types::{
20 album_tracks::AlbumTracks,
21 currently_playing::{Album, Artist, CurrentlyPlaying},
22 spotify_token::SpotifyTokenWithEmail,
23 token::AccessToken,
24 },
25};
26
27pub mod cache;
28pub mod crypto;
29pub mod rocksky;
30pub mod token;
31pub mod types;
32
33pub const BASE_URL: &str = "https://api.spotify.com/v1";
34
35pub async fn run() -> Result<(), Error> {
36 let cache = Cache::new()?;
37 let pool = PgPoolOptions::new()
38 .max_connections(5)
39 .connect(&env::var("XATA_POSTGRES_URL")?)
40 .await?;
41
42 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
43 let nc = connect(&addr).await?;
44 println!("Connected to NATS server at {}", addr.bright_green());
45
46 let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?;
47 println!("Subscribed to {}", "rocksky.spotify.user".bright_green());
48
49 let users = find_spotify_users(&pool, 0, 500).await?;
50 println!("Found {} users", users.len().bright_green());
51
52 // Shared HashMap to manage threads and their stop flags
53 let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> =
54 Arc::new(Mutex::new(HashMap::new()));
55
56 // Helper function to start a user thread with auto-recovery
57 let start_user_thread = |email: String,
58 token: String,
59 did: String,
60 client_id: String,
61 client_secret: String,
62 stop_flag: Arc<AtomicBool>,
63 cache: Cache,
64 nc: async_nats::Client| {
65 thread::spawn(move || {
66 let rt = tokio::runtime::Runtime::new().unwrap();
67 let mut retry_count = 0;
68 let max_retries = 5;
69
70 loop {
71 if stop_flag.load(std::sync::atomic::Ordering::Relaxed) {
72 println!(
73 "{} Stop flag set, exiting recovery loop",
74 format!("[{}]", email).bright_green()
75 );
76 break;
77 }
78
79 match rt.block_on(async {
80 watch_currently_playing(
81 email.clone(),
82 token.clone(),
83 did.clone(),
84 stop_flag.clone(),
85 cache.clone(),
86 client_id.clone(),
87 client_secret.clone(),
88 )
89 .await
90 }) {
91 Ok(_) => {
92 println!(
93 "{} Thread completed normally",
94 format!("[{}]", email).bright_green()
95 );
96 break;
97 }
98 Err(e) => {
99 retry_count += 1;
100 println!(
101 "{} Thread crashed (attempt {}/{}): {}",
102 format!("[{}]", email).bright_green(),
103 retry_count,
104 max_retries,
105 e.to_string().bright_red()
106 );
107
108 if retry_count >= max_retries {
109 println!(
110 "{} Max retries reached, publishing to NATS for external restart",
111 format!("[{}]", email).bright_green()
112 );
113 match rt
114 .block_on(nc.publish("rocksky.spotify.user", email.clone().into()))
115 {
116 Ok(_) => {
117 println!(
118 "{} Published message to restart thread",
119 format!("[{}]", email).bright_green()
120 );
121 }
122 Err(e) => {
123 println!(
124 "{} Error publishing message to restart thread: {}",
125 format!("[{}]", email).bright_green(),
126 e.to_string().bright_red()
127 );
128 }
129 }
130 break;
131 }
132
133 // Exponential backoff: 2^retry_count seconds, max 60 seconds
134 let backoff_seconds = std::cmp::min(2_u64.pow(retry_count as u32), 60);
135 println!(
136 "{} Retrying in {} seconds...",
137 format!("[{}]", email).bright_green(),
138 backoff_seconds
139 );
140 std::thread::sleep(std::time::Duration::from_secs(backoff_seconds));
141 }
142 }
143 }
144 })
145 };
146
147 // Start threads for all users
148 for user in users {
149 let email = user.0.clone();
150 let token = user.1.clone();
151 let did = user.2.clone();
152 let client_id = user.3.clone();
153 let client_secret = user.4.clone();
154 let stop_flag = Arc::new(AtomicBool::new(false));
155 let cache = cache.clone();
156 let nc = nc.clone();
157 let thread_map = Arc::clone(&thread_map);
158
159 thread_map
160 .lock()
161 .unwrap()
162 .insert(email.clone(), Arc::clone(&stop_flag));
163
164 start_user_thread(
165 email,
166 token,
167 did,
168 client_id,
169 client_secret,
170 stop_flag,
171 cache,
172 nc,
173 );
174 }
175
176 // Handle subscription messages
177 while let Some(message) = sub.next().await {
178 let user_id = String::from_utf8(message.payload.to_vec()).unwrap();
179 println!(
180 "Received message to restart thread for user: {}",
181 user_id.bright_green()
182 );
183
184 let mut thread_map = thread_map.lock().unwrap();
185
186 // Check if the user exists in the thread map
187 if let Some(stop_flag) = thread_map.get(&user_id) {
188 // Stop the existing thread
189 stop_flag.store(true, std::sync::atomic::Ordering::Relaxed);
190
191 // Create a new stop flag and restart the thread
192 let new_stop_flag = Arc::new(AtomicBool::new(false));
193 thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag));
194
195 let user = find_spotify_user(&pool, &user_id).await?;
196
197 if user.is_none() {
198 println!(
199 "Spotify user not found: {}, skipping",
200 user_id.bright_green()
201 );
202 continue;
203 }
204
205 let user = user.unwrap();
206
207 let email = user.0.clone();
208 let token = user.1.clone();
209 let did = user.2.clone();
210 let client_id = user.3.clone();
211 let client_secret = user.4.clone();
212 let cache = cache.clone();
213 let nc = nc.clone();
214
215 start_user_thread(
216 email,
217 token,
218 did,
219 client_id,
220 client_secret,
221 new_stop_flag,
222 cache,
223 nc,
224 );
225
226 println!("Restarted thread for user: {}", user_id.bright_green());
227 } else {
228 println!(
229 "No thread found for user: {}, starting new thread",
230 user_id.bright_green()
231 );
232 let user = find_spotify_user(&pool, &user_id).await?;
233 if let Some(user) = user {
234 let email = user.0.clone();
235 let token = user.1.clone();
236 let did = user.2.clone();
237 let client_id = user.3.clone();
238 let client_secret = user.4.clone();
239 let stop_flag = Arc::new(AtomicBool::new(false));
240 let cache = cache.clone();
241 let nc = nc.clone();
242
243 thread_map.insert(email.clone(), Arc::clone(&stop_flag));
244
245 start_user_thread(
246 email,
247 token,
248 did,
249 client_id,
250 client_secret,
251 stop_flag,
252 cache,
253 nc,
254 );
255 }
256 }
257 }
258
259 Ok(())
260}
261
262pub async fn refresh_token(
263 token: &str,
264 client_id: &str,
265 client_secret: &str,
266) -> Result<AccessToken, Error> {
267 let client = Client::new();
268
269 let response = client
270 .post("https://accounts.spotify.com/api/token")
271 .basic_auth(&client_id, Some(client_secret))
272 .form(&[
273 ("grant_type", "refresh_token"),
274 ("refresh_token", token),
275 ("client_id", &client_id),
276 ])
277 .send()
278 .await?;
279 let token = response.text().await?;
280 let json_token = serde_json::from_str::<AccessToken>(&token);
281 if let Err(e) = json_token {
282 println!("Error parsing token: {}", token);
283 return Err(Error::from(e));
284 }
285 Ok(json_token.unwrap())
286}
287
288pub async fn get_currently_playing(
289 cache: Cache,
290 user_id: &str,
291 token: &str,
292 client_id: &str,
293 client_secret: &str,
294) -> Result<Option<(CurrentlyPlaying, bool)>, Error> {
295 if let Ok(Some(data)) = cache.get(user_id) {
296 println!(
297 "{} {}",
298 format!("[{}]", user_id).bright_green(),
299 "Using cache".cyan()
300 );
301 if data == "No content" {
302 return Ok(None);
303 }
304 let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data);
305
306 if decoded_data.is_err() {
307 println!(
308 "{} {} {}",
309 format!("[{}]", user_id).bright_green(),
310 "Cache is invalid".red(),
311 data
312 );
313 cache.setex(user_id, "No content", 10)?;
314 cache.del(&format!("{}:current", user_id))?;
315 return Ok(None);
316 }
317
318 let data: CurrentlyPlaying = decoded_data.unwrap();
319 // detect if the song has changed
320 let previous = cache.get(&format!("{}:previous", user_id));
321
322 if previous.is_err() {
323 println!(
324 "{} redis error: {}",
325 format!("[{}]", user_id).bright_green(),
326 previous.unwrap_err().to_string().bright_red()
327 );
328 return Ok(None);
329 }
330
331 let previous = previous.unwrap();
332
333 let changed = match previous {
334 Some(previous) => {
335 if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() {
336 println!(
337 "{} {} {}",
338 format!("[{}]", user_id).bright_green(),
339 "Previous cache is invalid",
340 previous
341 );
342 return Ok(None);
343 }
344
345 let previous: CurrentlyPlaying = serde_json::from_str(&previous)?;
346 if previous.item.is_none() && data.item.is_some() {
347 return Ok(Some((data, true)));
348 }
349
350 if previous.item.is_some() && data.item.is_none() {
351 return Ok(Some((data, false)));
352 }
353
354 if previous.item.is_none() && data.item.is_none() {
355 return Ok(Some((data, false)));
356 }
357
358 let previous_item = previous.item.unwrap();
359 let data_item = data.clone().item.unwrap();
360 previous_item.id != data_item.id
361 && previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0)
362 }
363 _ => true,
364 };
365 return Ok(Some((data, changed)));
366 }
367
368 let token = refresh_token(token, client_id, client_secret).await?;
369 let client = Client::new();
370 let response = client
371 .get(format!("{}/me/player/currently-playing", BASE_URL))
372 .bearer_auth(token.access_token)
373 .send()
374 .await?;
375
376 let headers = response.headers().clone();
377 let status = response.status().as_u16();
378 let data = response.text().await?;
379
380 if !data.contains("is_playing") && !data.contains("context") {
381 println!("> Currently playing: {}", data);
382 }
383
384 if status == 429 {
385 println!(
386 "{} Too many requests, retry-after {}",
387 format!("[{}]", user_id).bright_green(),
388 headers
389 .get("retry-after")
390 .unwrap()
391 .to_str()
392 .unwrap()
393 .bright_green()
394 );
395 return Ok(None);
396 }
397
398 let previous = cache.get(&format!("{}:previous", user_id));
399 if previous.is_err() {
400 println!(
401 "{} redis error: {}",
402 format!("[{}]", user_id).bright_green(),
403 previous.unwrap_err().to_string().bright_red()
404 );
405 return Ok(None);
406 }
407
408 let previous = previous.unwrap();
409
410 // check if status code is 204
411 if status == 204 {
412 println!("No content");
413 match cache.setex(
414 user_id,
415 "No content",
416 match previous.is_none() {
417 true => 30,
418 false => 10,
419 },
420 ) {
421 Ok(_) => {}
422 Err(e) => {
423 println!(
424 "{} redis error: {}",
425 format!("[{}]", user_id).bright_green(),
426 e.to_string().bright_red()
427 );
428 return Ok(None);
429 }
430 }
431 match cache.del(&format!("{}:current", user_id)) {
432 Ok(_) => {}
433 Err(e) => {
434 println!(
435 "{} redis error: {}",
436 format!("[{}]", user_id).bright_green(),
437 e.to_string().bright_red()
438 );
439 return Ok(None);
440 }
441 }
442 return Ok(None);
443 }
444
445 if serde_json::from_str::<CurrentlyPlaying>(&data).is_err() {
446 println!(
447 "{} {} {}",
448 format!("[{}]", user_id).bright_green(),
449 "Invalid data received".red(),
450 data
451 );
452 match cache.setex(user_id, "No content", 10) {
453 Ok(_) => {}
454 Err(e) => {
455 println!(
456 "{} redis error: {}",
457 format!("[{}]", user_id).bright_green(),
458 e.to_string().bright_red()
459 );
460 return Ok(None);
461 }
462 }
463 match cache.del(&format!("{}:current", user_id)) {
464 Ok(_) => {}
465 Err(e) => {
466 println!(
467 "{} redis error: {}",
468 format!("[{}]", user_id).bright_green(),
469 e.to_string().bright_red()
470 );
471 return Ok(None);
472 }
473 }
474 return Ok(None);
475 }
476
477 let data = serde_json::from_str::<CurrentlyPlaying>(&data)?;
478
479 match cache.setex(
480 user_id,
481 &serde_json::to_string(&data)?,
482 match previous.is_none() {
483 true => 30,
484 false => 15,
485 },
486 ) {
487 Ok(_) => {}
488 Err(e) => {
489 println!(
490 "{} redis error: {}",
491 format!("[{}]", user_id).bright_green(),
492 e.to_string().bright_red()
493 );
494 return Ok(None);
495 }
496 }
497 match cache.del(&format!("{}:current", user_id)) {
498 Ok(_) => {}
499 Err(e) => {
500 println!(
501 "{} redis error: {}",
502 format!("[{}]", user_id).bright_green(),
503 e.to_string().bright_red()
504 );
505 return Ok(None);
506 }
507 }
508
509 // detect if the song has changed
510 let previous = cache.get(&format!("{}:previous", user_id));
511
512 if previous.is_err() {
513 println!(
514 "{} redis error: {}",
515 format!("[{}]", user_id).bright_green(),
516 previous.unwrap_err().to_string().bright_red()
517 );
518 return Ok(None);
519 }
520
521 let previous = previous.unwrap();
522 let changed = match previous {
523 Some(previous) => {
524 if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() {
525 println!(
526 "{} {} {}",
527 format!("[{}]", user_id).bright_green(),
528 "Previous cache is invalid",
529 previous
530 );
531 return Ok(None);
532 }
533
534 let previous: CurrentlyPlaying = serde_json::from_str(&previous)?;
535 if previous.item.is_none() || data.item.is_none() {
536 return Ok(Some((data, false)));
537 }
538
539 let previous_item = previous.item.unwrap();
540 let data_item = data.clone().item.unwrap();
541
542 previous_item.id != data_item.id
543 && previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0)
544 }
545 _ => data.item.is_some(),
546 };
547
548 // save as previous song
549 match cache.setex(
550 &format!("{}:previous", user_id),
551 &serde_json::to_string(&data)?,
552 600,
553 ) {
554 Ok(_) => {}
555 Err(e) => {
556 println!(
557 "{} redis error: {}",
558 format!("[{}]", user_id).bright_green(),
559 e.to_string().bright_red()
560 );
561 return Ok(None);
562 }
563 }
564
565 Ok(Some((data, changed)))
566}
567
568pub async fn get_artist(
569 cache: Cache,
570 artist_id: &str,
571 token: &str,
572 client_id: &str,
573 client_secret: &str,
574) -> Result<Option<Artist>, Error> {
575 if let Ok(Some(data)) = cache.get(artist_id) {
576 return Ok(Some(serde_json::from_str(&data)?));
577 }
578
579 let token = refresh_token(token, client_id, client_secret).await?;
580 let client = Client::new();
581 let response = client
582 .get(&format!("{}/artists/{}", BASE_URL, artist_id))
583 .bearer_auth(token.access_token)
584 .send()
585 .await?;
586
587 let headers = response.headers().clone();
588 let data = response.text().await?;
589
590 if data == "Too many requests" {
591 println!(
592 "> retry-after {}",
593 headers.get("retry-after").unwrap().to_str().unwrap()
594 );
595 println!("> {} [get_artist]", data);
596 return Ok(None);
597 }
598
599 match cache.setex(artist_id, &data, 20) {
600 Ok(_) => {}
601 Err(e) => {
602 println!(
603 "{} redis error: {}",
604 format!("[{}]", artist_id).bright_green(),
605 e.to_string().bright_red()
606 );
607 return Ok(None);
608 }
609 }
610
611 Ok(Some(serde_json::from_str(&data)?))
612}
613
614pub async fn get_album(
615 cache: Cache,
616 album_id: &str,
617 token: &str,
618 client_id: &str,
619 client_secret: &str,
620) -> Result<Option<Album>, Error> {
621 if let Ok(Some(data)) = cache.get(album_id) {
622 return Ok(Some(serde_json::from_str(&data)?));
623 }
624
625 let token = refresh_token(token, client_id, client_secret).await?;
626 let client = Client::new();
627 let response = client
628 .get(&format!("{}/albums/{}", BASE_URL, album_id))
629 .bearer_auth(token.access_token)
630 .send()
631 .await?;
632
633 let headers = response.headers().clone();
634 let data = response.text().await?;
635
636 if data == "Too many requests" {
637 println!(
638 "> retry-after {}",
639 headers.get("retry-after").unwrap().to_str().unwrap()
640 );
641 println!("> {} [get_album]", data);
642 return Ok(None);
643 }
644
645 match cache.setex(album_id, &data, 20) {
646 Ok(_) => {}
647 Err(e) => {
648 println!(
649 "{} redis error: {}",
650 format!("[{}]", album_id).bright_green(),
651 e.to_string().bright_red()
652 );
653 return Ok(None);
654 }
655 }
656
657 Ok(Some(serde_json::from_str(&data)?))
658}
659
660pub async fn get_album_tracks(
661 cache: Cache,
662 album_id: &str,
663 token: &str,
664 client_id: &str,
665 client_secret: &str,
666) -> Result<AlbumTracks, Error> {
667 if let Ok(Some(data)) = cache.get(&format!("{}:tracks", album_id)) {
668 return Ok(serde_json::from_str(&data)?);
669 }
670
671 let token = refresh_token(token, client_id, client_secret).await?;
672 let client = Client::new();
673 let mut all_tracks = Vec::new();
674 let mut offset = 0;
675 let limit = 50;
676
677 loop {
678 let response = client
679 .get(&format!("{}/albums/{}/tracks", BASE_URL, album_id))
680 .bearer_auth(&token.access_token)
681 .query(&[
682 ("limit", &limit.to_string()),
683 ("offset", &offset.to_string()),
684 ])
685 .send()
686 .await?;
687
688 let headers = response.headers().clone();
689 let data = response.text().await?;
690 if data == "Too many requests" {
691 println!(
692 "> retry-after {}",
693 headers.get("retry-after").unwrap().to_str().unwrap()
694 );
695 println!("> {} [get_album_tracks]", data);
696 continue;
697 }
698
699 let album_tracks: AlbumTracks = serde_json::from_str(&data)?;
700
701 if album_tracks.items.is_empty() {
702 break;
703 }
704
705 all_tracks.extend(album_tracks.items);
706 offset += limit;
707 }
708
709 let all_tracks_json = serde_json::to_string(&all_tracks)?;
710 match cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20) {
711 Ok(_) => {}
712 Err(e) => {
713 println!(
714 "{} redis error: {}",
715 format!("[{}]", album_id).bright_green(),
716 e.to_string().bright_red()
717 );
718 }
719 }
720
721 Ok(AlbumTracks {
722 items: all_tracks,
723 ..Default::default()
724 })
725}
726
727pub async fn find_spotify_users(
728 pool: &Pool<Postgres>,
729 offset: usize,
730 limit: usize,
731) -> Result<Vec<(String, String, String, String, String)>, Error> {
732 let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
733 r#"
734 SELECT * FROM spotify_tokens
735 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
736 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
737 LEFT JOIN spotify_apps ON spotify_tokens.spotify_app_id = spotify_apps.spotify_app_id
738 LIMIT $1 OFFSET $2
739 "#,
740 )
741 .bind(limit as i64)
742 .bind(offset as i64)
743 .fetch_all(pool)
744 .await?;
745
746 let mut user_tokens = vec![];
747
748 for result in &results {
749 let token = decrypt_aes_256_ctr(
750 &result.refresh_token,
751 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
752 )?;
753 let spotify_secret = decrypt_aes_256_ctr(
754 &result.spotify_secret,
755 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
756 )?;
757 user_tokens.push((
758 result.email.clone(),
759 token,
760 result.did.clone(),
761 result.spotify_app_id.clone(),
762 spotify_secret,
763 ));
764 }
765
766 Ok(user_tokens)
767}
768
769pub async fn find_spotify_user(
770 pool: &Pool<Postgres>,
771 email: &str,
772) -> Result<Option<(String, String, String, String, String)>, Error> {
773 let result: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
774 r#"
775 SELECT * FROM spotify_tokens
776 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
777 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
778 LEFT JOIN spotify_apps ON spotify_tokens.spotify_app_id = spotify_apps.spotify_app_id
779 WHERE spotify_accounts.email = $1
780 "#,
781 )
782 .bind(email)
783 .fetch_all(pool)
784 .await?;
785
786 match result.first() {
787 Some(result) => {
788 let token = decrypt_aes_256_ctr(
789 &result.refresh_token,
790 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
791 )?;
792 let spotify_secret = decrypt_aes_256_ctr(
793 &result.spotify_secret,
794 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
795 )?;
796 Ok(Some((
797 result.email.clone(),
798 token,
799 result.did.clone(),
800 result.spotify_app_id.clone(),
801 spotify_secret,
802 )))
803 }
804 None => Ok(None),
805 }
806}
807
808pub async fn watch_currently_playing(
809 spotify_email: String,
810 token: String,
811 did: String,
812 stop_flag: Arc<AtomicBool>,
813 cache: Cache,
814 client_id: String,
815 client_secret: String,
816) -> Result<(), Error> {
817 println!(
818 "{} {}",
819 format!("[{}]", spotify_email).bright_green(),
820 "Checking currently playing".cyan()
821 );
822
823 let stop_flag_clone = stop_flag.clone();
824 let spotify_email_clone = spotify_email.clone();
825 let cache_clone = cache.clone();
826 thread::spawn(move || {
827 // Inner thread with error recovery
828 let result: Result<(), Error> = (|| {
829 loop {
830 if stop_flag_clone.load(std::sync::atomic::Ordering::Relaxed) {
831 println!(
832 "{} Stopping progress tracker thread",
833 format!("[{}]", spotify_email_clone).bright_green()
834 );
835 break;
836 }
837
838 if let Ok(Some(cached)) =
839 cache_clone.get(&format!("{}:current", spotify_email_clone))
840 {
841 if let Ok(mut current_song) = serde_json::from_str::<CurrentlyPlaying>(&cached)
842 {
843 if let Some(item) = current_song.item.clone() {
844 if current_song.is_playing
845 && current_song.progress_ms.unwrap_or(0) < item.duration_ms.into()
846 {
847 current_song.progress_ms =
848 Some(current_song.progress_ms.unwrap_or(0) + 800);
849 match cache_clone.setex(
850 &format!("{}:current", spotify_email_clone),
851 &serde_json::to_string(¤t_song).unwrap_or_default(),
852 16,
853 ) {
854 Ok(_) => {}
855 Err(e) => {
856 println!(
857 "{} redis error: {}",
858 format!("[{}]", spotify_email_clone).bright_green(),
859 e.to_string().bright_red()
860 );
861 }
862 }
863 thread::sleep(std::time::Duration::from_millis(800));
864 continue;
865 }
866 }
867 }
868 }
869
870 if let Ok(Some(cached)) = cache_clone.get(&spotify_email_clone) {
871 if cached != "No content" {
872 match cache_clone.setex(
873 &format!("{}:current", spotify_email_clone),
874 &cached,
875 16,
876 ) {
877 Ok(_) => {}
878 Err(e) => {
879 println!(
880 "{} redis error: {}",
881 format!("[{}]", spotify_email_clone).bright_green(),
882 e.to_string().bright_red()
883 );
884 }
885 }
886 }
887 }
888
889 thread::sleep(std::time::Duration::from_millis(800));
890 }
891 Ok(())
892 })();
893
894 if let Err(e) = result {
895 println!(
896 "{} Progress tracker thread error: {}",
897 format!("[{}]", spotify_email_clone).bright_green(),
898 e.to_string().bright_red()
899 );
900 }
901 });
902
903 loop {
904 if stop_flag.load(std::sync::atomic::Ordering::Relaxed) {
905 println!(
906 "{} Stopping Thread",
907 format!("[{}]", spotify_email).bright_green()
908 );
909 break;
910 }
911 let spotify_email = spotify_email.clone();
912 let token = token.clone();
913 let did = did.clone();
914 let cache = cache.clone();
915 let client_id = client_id.clone();
916 let client_secret = client_secret.clone();
917
918 let currently_playing = get_currently_playing(
919 cache.clone(),
920 &spotify_email,
921 &token,
922 &client_id,
923 &client_secret,
924 )
925 .await;
926 let currently_playing = match currently_playing {
927 Ok(currently_playing) => currently_playing,
928 Err(e) => {
929 println!(
930 "{} {}",
931 format!("[{}]", spotify_email).bright_green(),
932 e.to_string().bright_red()
933 );
934 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
935 continue;
936 }
937 };
938
939 if let Some((data, changed)) = currently_playing {
940 if data.item.is_none() {
941 println!(
942 "{} {}",
943 format!("[{}]", spotify_email).bright_green(),
944 "No song playing".yellow()
945 );
946 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
947 continue;
948 }
949 let data_item = data.item.unwrap();
950 println!(
951 "{} {} is_playing: {} changed: {}",
952 format!("[{}]", spotify_email).bright_green(),
953 format!("{} - {}", data_item.name, data_item.artists[0].name).yellow(),
954 data.is_playing,
955 changed
956 );
957
958 if changed {
959 cache.setex(
960 &format!("changed:{}:{}", spotify_email, data_item.id),
961 &data_item.duration_ms.to_string(),
962 3600 * 24,
963 )?;
964 }
965
966 let current_track =
967 match cache.get(&format!("changed:{}:{}", spotify_email, data_item.id)) {
968 Ok(x) => x.is_some(),
969 Err(_) => false,
970 };
971
972 if let Ok(Some(cached)) = cache.get(&format!("{}:current", spotify_email)) {
973 let current_song = serde_json::from_str::<CurrentlyPlaying>(&cached)?;
974 if let Some(item) = current_song.item {
975 let percentage = current_song.progress_ms.unwrap_or(0) as f32
976 / data_item.duration_ms as f32
977 * 100.0;
978 if current_track && percentage >= 40.0 && item.id == data_item.id {
979 println!(
980 "{} Scrobbling track: {} {}",
981 format!("[{}]", spotify_email).bright_green(),
982 item.name.yellow(),
983 format!("{:.2}%", percentage)
984 );
985 scrobble(
986 cache.clone(),
987 &spotify_email,
988 &did,
989 &token,
990 &client_id,
991 &client_secret,
992 )
993 .await?;
994
995 match cache.del(&format!("changed:{}:{}", spotify_email, data_item.id)) {
996 Ok(_) => {}
997 Err(_) => tracing::error!("Failed to delete cache entry"),
998 };
999
1000 thread::spawn(move || {
1001 let rt = tokio::runtime::Runtime::new().unwrap();
1002 match rt.block_on(async {
1003 get_album_tracks(
1004 cache.clone(),
1005 &data_item.album.id,
1006 &token,
1007 &client_id,
1008 &client_secret,
1009 )
1010 .await?;
1011 get_album(
1012 cache.clone(),
1013 &data_item.album.id,
1014 &token,
1015 &client_id,
1016 &client_secret,
1017 )
1018 .await?;
1019 update_library(
1020 cache.clone(),
1021 &spotify_email,
1022 &did,
1023 &token,
1024 &client_id,
1025 &client_secret,
1026 )
1027 .await?;
1028 Ok::<(), Error>(())
1029 }) {
1030 Ok(_) => {}
1031 Err(e) => {
1032 println!(
1033 "{} {}",
1034 format!("[{}]", spotify_email).bright_green(),
1035 e.to_string().bright_red()
1036 );
1037 }
1038 }
1039 });
1040 }
1041 }
1042 }
1043 }
1044
1045 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1046 }
1047
1048 Ok(())
1049}