forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use std::{
2 collections::HashMap,
3 env,
4 sync::{atomic::AtomicBool, Arc, Mutex},
5 thread,
6};
7
8use anyhow::Error;
9use async_nats::connect;
10use owo_colors::OwoColorize;
11use reqwest::Client;
12use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
13use tokio_stream::StreamExt;
14
15use crate::{
16 cache::Cache,
17 crypto::decrypt_aes_256_ctr,
18 rocksky::{scrobble, update_library},
19 types::{
20 album_tracks::AlbumTracks,
21 currently_playing::{Album, Artist, CurrentlyPlaying},
22 spotify_token::SpotifyTokenWithEmail,
23 token::AccessToken,
24 },
25};
26
27pub mod cache;
28pub mod crypto;
29pub mod rocksky;
30pub mod token;
31pub mod types;
32
33pub const BASE_URL: &str = "https://spotify-api.rocksky.app/v1";
34
35pub async fn run() -> Result<(), Error> {
36 let cache = Cache::new()?;
37 let pool = PgPoolOptions::new()
38 .max_connections(5)
39 .connect(&env::var("XATA_POSTGRES_URL")?)
40 .await?;
41
42 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
43 let nc = connect(&addr).await?;
44 println!("Connected to NATS server at {}", addr.bright_green());
45
46 let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?;
47 println!("Subscribed to {}", "rocksky.spotify.user".bright_green());
48
49 let users = find_spotify_users(&pool, 0, 100).await?;
50 println!("Found {} users", users.len().bright_green());
51
52 // Shared HashMap to manage threads and their stop flags
53 let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> =
54 Arc::new(Mutex::new(HashMap::new()));
55
56 // Start threads for all users
57 for user in users {
58 let email = user.0.clone();
59 let token = user.1.clone();
60 let did = user.2.clone();
61 let stop_flag = Arc::new(AtomicBool::new(false));
62 let cache = cache.clone();
63 let nc = nc.clone();
64 let thread_map = Arc::clone(&thread_map);
65
66 thread_map
67 .lock()
68 .unwrap()
69 .insert(email.clone(), Arc::clone(&stop_flag));
70
71 thread::spawn(move || {
72 let rt = tokio::runtime::Runtime::new().unwrap();
73 match rt.block_on(async {
74 watch_currently_playing(email.clone(), token, did, stop_flag, cache.clone())
75 .await?;
76 Ok::<(), Error>(())
77 }) {
78 Ok(_) => {}
79 Err(e) => {
80 println!(
81 "{} Error starting thread for user: {} - {}",
82 format!("[{}]", email).bright_green(),
83 email.bright_green(),
84 e.to_string().bright_red()
85 );
86
87 // If there's an error, publish a message to restart the thread
88 match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) {
89 Ok(_) => {
90 println!(
91 "{} Published message to restart thread for user: {}",
92 format!("[{}]", email).bright_green(),
93 email.bright_green()
94 );
95 }
96 Err(e) => {
97 println!(
98 "{} Error publishing message to restart thread: {}",
99 format!("[{}]", email).bright_green(),
100 e.to_string().bright_red()
101 );
102 }
103 }
104 }
105 }
106 });
107 }
108
109 // Handle subscription messages
110 while let Some(message) = sub.next().await {
111 let user_id = String::from_utf8(message.payload.to_vec()).unwrap();
112 println!(
113 "Received message to restart thread for user: {}",
114 user_id.bright_green()
115 );
116
117 let mut thread_map = thread_map.lock().unwrap();
118
119 // Check if the user exists in the thread map
120 if let Some(stop_flag) = thread_map.get(&user_id) {
121 // Stop the existing thread
122 stop_flag.store(true, std::sync::atomic::Ordering::Relaxed);
123
124 // Create a new stop flag and restart the thread
125 let new_stop_flag = Arc::new(AtomicBool::new(false));
126 thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag));
127
128 let user = find_spotify_user(&pool, &user_id).await?;
129
130 if user.is_none() {
131 println!(
132 "Spotify user not found: {}, skipping",
133 user_id.bright_green()
134 );
135 continue;
136 }
137
138 let user = user.unwrap();
139
140 let email = user.0.clone();
141 let token = user.1.clone();
142 let did = user.2.clone();
143 let cache = cache.clone();
144
145 thread::spawn(move || {
146 let rt = tokio::runtime::Runtime::new().unwrap();
147 match rt.block_on(async {
148 watch_currently_playing(
149 email.clone(),
150 token,
151 did,
152 new_stop_flag,
153 cache.clone(),
154 )
155 .await?;
156 Ok::<(), Error>(())
157 }) {
158 Ok(_) => {}
159 Err(e) => {
160 println!(
161 "{} Error restarting thread for user: {} - {}",
162 format!("[{}]", email).bright_green(),
163 email.bright_green(),
164 e.to_string().bright_red()
165 );
166 }
167 }
168 });
169
170 println!("Restarted thread for user: {}", user_id.bright_green());
171 } else {
172 println!(
173 "No thread found for user: {}, starting new thread",
174 user_id.bright_green()
175 );
176 let user = find_spotify_user(&pool, &user_id).await?;
177 if let Some(user) = user {
178 let email = user.0.clone();
179 let token = user.1.clone();
180 let did = user.2.clone();
181 let stop_flag = Arc::new(AtomicBool::new(false));
182 let cache = cache.clone();
183 let nc = nc.clone();
184
185 thread_map.insert(email.clone(), Arc::clone(&stop_flag));
186
187 thread::spawn(move || {
188 let rt = tokio::runtime::Runtime::new().unwrap();
189 match rt.block_on(async {
190 watch_currently_playing(
191 email.clone(),
192 token,
193 did,
194 stop_flag,
195 cache.clone(),
196 )
197 .await?;
198 Ok::<(), Error>(())
199 }) {
200 Ok(_) => {}
201 Err(e) => {
202 println!(
203 "{} Error starting thread for user: {} - {}",
204 format!("[{}]", email).bright_green(),
205 email.bright_green(),
206 e.to_string().bright_red()
207 );
208 match rt
209 .block_on(nc.publish("rocksky.spotify.user", email.clone().into()))
210 {
211 Ok(_) => {}
212 Err(e) => {
213 println!(
214 "{} Error publishing message to restart thread: {}",
215 format!("[{}]", email).bright_green(),
216 e.to_string().bright_red()
217 );
218 }
219 }
220 }
221 }
222 });
223 }
224 }
225 }
226
227 Ok(())
228}
229
230pub async fn refresh_token(token: &str) -> Result<AccessToken, Error> {
231 if env::var("SPOTIFY_CLIENT_ID").is_err() || env::var("SPOTIFY_CLIENT_SECRET").is_err() {
232 panic!("Please set SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET environment variables");
233 }
234
235 let client_id = env::var("SPOTIFY_CLIENT_ID")?;
236 let client_secret = env::var("SPOTIFY_CLIENT_SECRET")?;
237
238 let client = Client::new();
239
240 let response = client
241 .post("https://accounts.spotify.com/api/token")
242 .basic_auth(&client_id, Some(client_secret))
243 .form(&[
244 ("grant_type", "refresh_token"),
245 ("refresh_token", token),
246 ("client_id", &client_id),
247 ])
248 .send()
249 .await?;
250 let token = response.json::<AccessToken>().await?;
251 Ok(token)
252}
253
254pub async fn get_currently_playing(
255 cache: Cache,
256 user_id: &str,
257 token: &str,
258) -> Result<Option<(CurrentlyPlaying, bool)>, Error> {
259 if let Ok(Some(data)) = cache.get(user_id) {
260 println!(
261 "{} {}",
262 format!("[{}]", user_id).bright_green(),
263 "Using cache".cyan()
264 );
265 if data == "No content" {
266 return Ok(None);
267 }
268 let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data);
269
270 if decoded_data.is_err() {
271 println!(
272 "{} {} {}",
273 format!("[{}]", user_id).bright_green(),
274 "Cache is invalid".red(),
275 data
276 );
277 cache.setex(user_id, "No content", 10)?;
278 cache.del(&format!("{}:current", user_id))?;
279 return Ok(None);
280 }
281
282 let data: CurrentlyPlaying = decoded_data.unwrap();
283 // detect if the song has changed
284 let previous = cache.get(&format!("{}:previous", user_id));
285
286 if previous.is_err() {
287 println!(
288 "{} redis error: {}",
289 format!("[{}]", user_id).bright_green(),
290 previous.unwrap_err().to_string().bright_red()
291 );
292 return Ok(None);
293 }
294
295 let previous = previous.unwrap();
296
297 let changed = match previous {
298 Some(previous) => {
299 if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() {
300 println!(
301 "{} {} {}",
302 format!("[{}]", user_id).bright_green(),
303 "Previous cache is invalid",
304 previous
305 );
306 return Ok(None);
307 }
308
309 let previous: CurrentlyPlaying = serde_json::from_str(&previous)?;
310 if previous.item.is_none() && data.item.is_some() {
311 return Ok(Some((data, true)));
312 }
313
314 if previous.item.is_some() && data.item.is_none() {
315 return Ok(Some((data, false)));
316 }
317
318 if previous.item.is_none() && data.item.is_none() {
319 return Ok(Some((data, false)));
320 }
321
322 let previous_item = previous.item.unwrap();
323 let data_item = data.clone().item.unwrap();
324 previous_item.id != data_item.id
325 && previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0)
326 }
327 _ => true,
328 };
329 return Ok(Some((data, changed)));
330 }
331
332 let token = refresh_token(token).await?;
333 let client = Client::new();
334 let response = client
335 .get(format!("{}/me/player/currently-playing", BASE_URL))
336 .bearer_auth(token.access_token)
337 .send()
338 .await?;
339
340 let headers = response.headers().clone();
341 let status = response.status().as_u16();
342 let data = response.text().await?;
343
344 if status == 429 {
345 println!(
346 "{} Too many requests, retry-after {}",
347 format!("[{}]", user_id).bright_green(),
348 headers
349 .get("retry-after")
350 .unwrap()
351 .to_str()
352 .unwrap()
353 .bright_green()
354 );
355 return Ok(None);
356 }
357
358 let previous = cache.get(&format!("{}:previous", user_id));
359 if previous.is_err() {
360 println!(
361 "{} redis error: {}",
362 format!("[{}]", user_id).bright_green(),
363 previous.unwrap_err().to_string().bright_red()
364 );
365 return Ok(None);
366 }
367
368 let previous = previous.unwrap();
369
370 // check if status code is 204
371 if status == 204 {
372 println!("No content");
373 match cache.setex(
374 user_id,
375 "No content",
376 match previous.is_none() {
377 true => 30,
378 false => 10,
379 },
380 ) {
381 Ok(_) => {}
382 Err(e) => {
383 println!(
384 "{} redis error: {}",
385 format!("[{}]", user_id).bright_green(),
386 e.to_string().bright_red()
387 );
388 return Ok(None);
389 }
390 }
391 match cache.del(&format!("{}:current", user_id)) {
392 Ok(_) => {}
393 Err(e) => {
394 println!(
395 "{} redis error: {}",
396 format!("[{}]", user_id).bright_green(),
397 e.to_string().bright_red()
398 );
399 return Ok(None);
400 }
401 }
402 return Ok(None);
403 }
404
405 if serde_json::from_str::<CurrentlyPlaying>(&data).is_err() {
406 println!(
407 "{} {} {}",
408 format!("[{}]", user_id).bright_green(),
409 "Invalid data received".red(),
410 data
411 );
412 match cache.setex(user_id, "No content", 10) {
413 Ok(_) => {}
414 Err(e) => {
415 println!(
416 "{} redis error: {}",
417 format!("[{}]", user_id).bright_green(),
418 e.to_string().bright_red()
419 );
420 return Ok(None);
421 }
422 }
423 match cache.del(&format!("{}:current", user_id)) {
424 Ok(_) => {}
425 Err(e) => {
426 println!(
427 "{} redis error: {}",
428 format!("[{}]", user_id).bright_green(),
429 e.to_string().bright_red()
430 );
431 return Ok(None);
432 }
433 }
434 return Ok(None);
435 }
436
437 let data = serde_json::from_str::<CurrentlyPlaying>(&data)?;
438
439 match cache.setex(
440 user_id,
441 &serde_json::to_string(&data)?,
442 match previous.is_none() {
443 true => 30,
444 false => 15,
445 },
446 ) {
447 Ok(_) => {}
448 Err(e) => {
449 println!(
450 "{} redis error: {}",
451 format!("[{}]", user_id).bright_green(),
452 e.to_string().bright_red()
453 );
454 return Ok(None);
455 }
456 }
457 match cache.del(&format!("{}:current", user_id)) {
458 Ok(_) => {}
459 Err(e) => {
460 println!(
461 "{} redis error: {}",
462 format!("[{}]", user_id).bright_green(),
463 e.to_string().bright_red()
464 );
465 return Ok(None);
466 }
467 }
468
469 // detect if the song has changed
470 let previous = cache.get(&format!("{}:previous", user_id));
471
472 if previous.is_err() {
473 println!(
474 "{} redis error: {}",
475 format!("[{}]", user_id).bright_green(),
476 previous.unwrap_err().to_string().bright_red()
477 );
478 return Ok(None);
479 }
480
481 let previous = previous.unwrap();
482 let changed = match previous {
483 Some(previous) => {
484 if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() {
485 println!(
486 "{} {} {}",
487 format!("[{}]", user_id).bright_green(),
488 "Previous cache is invalid",
489 previous
490 );
491 return Ok(None);
492 }
493
494 let previous: CurrentlyPlaying = serde_json::from_str(&previous)?;
495 if previous.item.is_none() || data.item.is_none() {
496 return Ok(Some((data, false)));
497 }
498
499 let previous_item = previous.item.unwrap();
500 let data_item = data.clone().item.unwrap();
501
502 previous_item.id != data_item.id
503 && previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0)
504 }
505 _ => false,
506 };
507
508 // save as previous song
509 match cache.setex(
510 &format!("{}:previous", user_id),
511 &serde_json::to_string(&data)?,
512 600,
513 ) {
514 Ok(_) => {}
515 Err(e) => {
516 println!(
517 "{} redis error: {}",
518 format!("[{}]", user_id).bright_green(),
519 e.to_string().bright_red()
520 );
521 return Ok(None);
522 }
523 }
524
525 Ok(Some((data, changed)))
526}
527
528pub async fn get_artist(
529 cache: Cache,
530 artist_id: &str,
531 token: &str,
532) -> Result<Option<Artist>, Error> {
533 if let Ok(Some(data)) = cache.get(artist_id) {
534 return Ok(Some(serde_json::from_str(&data)?));
535 }
536
537 let token = refresh_token(token).await?;
538 let client = Client::new();
539 let response = client
540 .get(&format!("{}/artists/{}", BASE_URL, artist_id))
541 .bearer_auth(token.access_token)
542 .send()
543 .await?;
544
545 let headers = response.headers().clone();
546 let data = response.text().await?;
547
548 if data == "Too many requests" {
549 println!(
550 "> retry-after {}",
551 headers.get("retry-after").unwrap().to_str().unwrap()
552 );
553 println!("> {} [get_artist]", data);
554 return Ok(None);
555 }
556
557 match cache.setex(artist_id, &data, 20) {
558 Ok(_) => {}
559 Err(e) => {
560 println!(
561 "{} redis error: {}",
562 format!("[{}]", artist_id).bright_green(),
563 e.to_string().bright_red()
564 );
565 return Ok(None);
566 }
567 }
568
569 Ok(Some(serde_json::from_str(&data)?))
570}
571
572pub async fn get_album(cache: Cache, album_id: &str, token: &str) -> Result<Option<Album>, Error> {
573 if let Ok(Some(data)) = cache.get(album_id) {
574 return Ok(Some(serde_json::from_str(&data)?));
575 }
576
577 let token = refresh_token(token).await?;
578 let client = Client::new();
579 let response = client
580 .get(&format!("{}/albums/{}", BASE_URL, album_id))
581 .bearer_auth(token.access_token)
582 .send()
583 .await?;
584
585 let headers = response.headers().clone();
586 let data = response.text().await?;
587
588 if data == "Too many requests" {
589 println!(
590 "> retry-after {}",
591 headers.get("retry-after").unwrap().to_str().unwrap()
592 );
593 println!("> {} [get_album]", data);
594 return Ok(None);
595 }
596
597 match cache.setex(album_id, &data, 20) {
598 Ok(_) => {}
599 Err(e) => {
600 println!(
601 "{} redis error: {}",
602 format!("[{}]", album_id).bright_green(),
603 e.to_string().bright_red()
604 );
605 return Ok(None);
606 }
607 }
608
609 Ok(Some(serde_json::from_str(&data)?))
610}
611
612pub async fn get_album_tracks(
613 cache: Cache,
614 album_id: &str,
615 token: &str,
616) -> Result<AlbumTracks, Error> {
617 if let Ok(Some(data)) = cache.get(&format!("{}:tracks", album_id)) {
618 return Ok(serde_json::from_str(&data)?);
619 }
620
621 let token = refresh_token(token).await?;
622 let client = Client::new();
623 let mut all_tracks = Vec::new();
624 let mut offset = 0;
625 let limit = 50;
626
627 loop {
628 let response = client
629 .get(&format!("{}/albums/{}/tracks", BASE_URL, album_id))
630 .bearer_auth(&token.access_token)
631 .query(&[
632 ("limit", &limit.to_string()),
633 ("offset", &offset.to_string()),
634 ])
635 .send()
636 .await?;
637
638 let headers = response.headers().clone();
639 let data = response.text().await?;
640 if data == "Too many requests" {
641 println!(
642 "> retry-after {}",
643 headers.get("retry-after").unwrap().to_str().unwrap()
644 );
645 println!("> {} [get_album_tracks]", data);
646 continue;
647 }
648
649 let album_tracks: AlbumTracks = serde_json::from_str(&data)?;
650
651 if album_tracks.items.is_empty() {
652 break;
653 }
654
655 all_tracks.extend(album_tracks.items);
656 offset += limit;
657 }
658
659 let all_tracks_json = serde_json::to_string(&all_tracks)?;
660 match cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20) {
661 Ok(_) => {}
662 Err(e) => {
663 println!(
664 "{} redis error: {}",
665 format!("[{}]", album_id).bright_green(),
666 e.to_string().bright_red()
667 );
668 }
669 }
670
671 Ok(AlbumTracks {
672 items: all_tracks,
673 ..Default::default()
674 })
675}
676
677pub async fn find_spotify_users(
678 pool: &Pool<Postgres>,
679 offset: usize,
680 limit: usize,
681) -> Result<Vec<(String, String, String, String)>, Error> {
682 let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
683 r#"
684 SELECT * FROM spotify_tokens
685 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
686 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
687 LIMIT $1 OFFSET $2
688 "#,
689 )
690 .bind(limit as i64)
691 .bind(offset as i64)
692 .fetch_all(pool)
693 .await?;
694
695 let mut user_tokens = vec![];
696
697 for result in &results {
698 let token = decrypt_aes_256_ctr(
699 &result.refresh_token,
700 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
701 )?;
702 user_tokens.push((
703 result.email.clone(),
704 token,
705 result.did.clone(),
706 result.user_id.clone(),
707 ));
708 }
709
710 Ok(user_tokens)
711}
712
713pub async fn find_spotify_user(
714 pool: &Pool<Postgres>,
715 email: &str,
716) -> Result<Option<(String, String, String)>, Error> {
717 let result: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
718 r#"
719 SELECT * FROM spotify_tokens
720 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
721 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
722 WHERE spotify_accounts.email = $1
723 "#,
724 )
725 .bind(email)
726 .fetch_all(pool)
727 .await?;
728
729 match result.first() {
730 Some(result) => {
731 let token = decrypt_aes_256_ctr(
732 &result.refresh_token,
733 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
734 )?;
735 Ok(Some((result.email.clone(), token, result.did.clone())))
736 }
737 None => Ok(None),
738 }
739}
740
741pub async fn watch_currently_playing(
742 spotify_email: String,
743 token: String,
744 did: String,
745 stop_flag: Arc<AtomicBool>,
746 cache: Cache,
747) -> Result<(), Error> {
748 println!(
749 "{} {}",
750 format!("[{}]", spotify_email).bright_green(),
751 "Checking currently playing".cyan()
752 );
753
754 let stop_flag_clone = stop_flag.clone();
755 let spotify_email_clone = spotify_email.clone();
756 let cache_clone = cache.clone();
757 thread::spawn(move || {
758 loop {
759 if stop_flag_clone.load(std::sync::atomic::Ordering::Relaxed) {
760 println!(
761 "{} Stopping Thread",
762 format!("[{}]", spotify_email_clone).bright_green()
763 );
764 break;
765 }
766 if let Ok(Some(cached)) = cache_clone.get(&format!("{}:current", spotify_email_clone)) {
767 if serde_json::from_str::<CurrentlyPlaying>(&cached).is_err() {
768 thread::sleep(std::time::Duration::from_millis(800));
769 continue;
770 }
771
772 let mut current_song = serde_json::from_str::<CurrentlyPlaying>(&cached)?;
773
774 if let Some(item) = current_song.item.clone() {
775 if current_song.is_playing
776 && current_song.progress_ms.unwrap_or(0) < item.duration_ms.into()
777 {
778 current_song.progress_ms =
779 Some(current_song.progress_ms.unwrap_or(0) + 800);
780 match cache_clone.setex(
781 &format!("{}:current", spotify_email_clone),
782 &serde_json::to_string(¤t_song)?,
783 16,
784 ) {
785 Ok(_) => {}
786 Err(e) => {
787 println!(
788 "{} redis error: {}",
789 format!("[{}]", spotify_email_clone).bright_green(),
790 e.to_string().bright_red()
791 );
792 }
793 }
794 thread::sleep(std::time::Duration::from_millis(800));
795 continue;
796 }
797 }
798 continue;
799 }
800
801 if let Ok(Some(cached)) = cache_clone.get(&spotify_email_clone) {
802 if cached == "No content" {
803 thread::sleep(std::time::Duration::from_millis(800));
804 continue;
805 }
806 match cache_clone.setex(&format!("{}:current", spotify_email_clone), &cached, 16) {
807 Ok(_) => {}
808 Err(e) => {
809 println!(
810 "{} redis error: {}",
811 format!("[{}]", spotify_email_clone).bright_green(),
812 e.to_string().bright_red()
813 );
814 }
815 }
816 }
817
818 thread::sleep(std::time::Duration::from_millis(800));
819 }
820 Ok::<(), Error>(())
821 });
822
823 loop {
824 if stop_flag.load(std::sync::atomic::Ordering::Relaxed) {
825 println!(
826 "{} Stopping Thread",
827 format!("[{}]", spotify_email).bright_green()
828 );
829 break;
830 }
831 let spotify_email = spotify_email.clone();
832 let token = token.clone();
833 let did = did.clone();
834 let cache = cache.clone();
835
836 let currently_playing = get_currently_playing(cache.clone(), &spotify_email, &token).await;
837 let currently_playing = match currently_playing {
838 Ok(currently_playing) => currently_playing,
839 Err(e) => {
840 println!(
841 "{} {}",
842 format!("[{}]", spotify_email).bright_green(),
843 e.to_string().bright_red()
844 );
845 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
846 continue;
847 }
848 };
849
850 if let Some((data, changed)) = currently_playing {
851 if data.item.is_none() {
852 println!(
853 "{} {}",
854 format!("[{}]", spotify_email).bright_green(),
855 "No song playing".yellow()
856 );
857 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
858 continue;
859 }
860 let data_item = data.item.unwrap();
861 println!(
862 "{} {} is_playing: {} changed: {}",
863 format!("[{}]", spotify_email).bright_green(),
864 format!("{} - {}", data_item.name, data_item.artists[0].name).yellow(),
865 data.is_playing,
866 changed
867 );
868
869 if changed {
870 scrobble(cache.clone(), &spotify_email, &did, &token).await?;
871
872 thread::spawn(move || {
873 let rt = tokio::runtime::Runtime::new().unwrap();
874 match rt.block_on(async {
875 get_album_tracks(cache.clone(), &data_item.album.id, &token).await?;
876 get_album(cache.clone(), &data_item.album.id, &token).await?;
877 update_library(cache.clone(), &spotify_email, &did, &token).await?;
878 Ok::<(), Error>(())
879 }) {
880 Ok(_) => {}
881 Err(e) => {
882 println!(
883 "{} {}",
884 format!("[{}]", spotify_email).bright_green(),
885 e.to_string().bright_red()
886 );
887 }
888 }
889 });
890 }
891 }
892
893 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
894 }
895
896 Ok(())
897}