forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use anyhow::Error;
2use async_nats::connect;
3use owo_colors::OwoColorize;
4use reqwest::Client;
5use serde::{Deserialize, Serialize};
6use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
7use std::{
8 collections::HashMap,
9 env,
10 sync::{atomic::AtomicBool, Arc, Mutex},
11 thread,
12 time::{SystemTime, UNIX_EPOCH},
13};
14use tokio_stream::StreamExt;
15
16use crate::{
17 cache::Cache,
18 crypto::decrypt_aes_256_ctr,
19 rocksky::{scrobble, update_library},
20 types::{
21 album_tracks::AlbumTracks,
22 currently_playing::{Album, Artist, CurrentlyPlaying},
23 spotify_token::SpotifyTokenWithEmail,
24 token::AccessToken,
25 },
26};
27
28pub mod cache;
29pub mod crypto;
30pub mod rocksky;
31pub mod token;
32pub mod types;
33
34pub const BASE_URL: &str = "https://spotify-api.rocksky.app/v1";
35pub const MAX_USERS: usize = 100;
36
37#[derive(Serialize, Deserialize, Debug, Clone)]
38struct TrackState {
39 track_id: String,
40 progress_ms: u64,
41 scrobbled: bool,
42 last_updated: u64,
43}
44
45pub async fn run() -> Result<(), Error> {
46 let cache = Cache::new()?;
47 let pool = PgPoolOptions::new()
48 .max_connections(5)
49 .connect(&env::var("XATA_POSTGRES_URL")?)
50 .await?;
51
52 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
53 let nc = connect(&addr).await?;
54 println!("Connected to NATS server at {}", addr.bright_green());
55
56 let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?;
57 println!("Subscribed to {}", "rocksky.spotify.user".bright_green());
58
59 let users = find_spotify_users(&pool, 0, MAX_USERS).await?;
60 println!("Found {} users", users.len().bright_green());
61
62 let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> =
63 Arc::new(Mutex::new(HashMap::new()));
64
65 for user in users {
66 let email = user.0.clone();
67 let token = user.1.clone();
68 let did = user.2.clone();
69 let stop_flag = Arc::new(AtomicBool::new(false));
70 let cache = cache.clone();
71 let nc = nc.clone();
72 let thread_map = Arc::clone(&thread_map);
73
74 thread_map
75 .lock()
76 .unwrap()
77 .insert(email.clone(), Arc::clone(&stop_flag));
78
79 thread::spawn(move || {
80 let rt = tokio::runtime::Runtime::new().unwrap();
81 match rt.block_on(async {
82 watch_currently_playing(email.clone(), token, did, stop_flag, cache.clone())
83 .await?;
84 Ok::<(), Error>(())
85 }) {
86 Ok(_) => {}
87 Err(e) => {
88 println!(
89 "{} Error starting thread for user: {} - {}",
90 format!("[{}]", email).bright_green(),
91 email.bright_green(),
92 e.to_string().bright_red()
93 );
94 // If there's an error, publish a message to restart the thread
95 match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) {
96 Ok(_) => {
97 println!(
98 "{} Published message to restart thread for user: {}",
99 format!("[{}]", email).bright_green(),
100 email.bright_green()
101 );
102 }
103 Err(e) => {
104 println!(
105 "{} Error publishing message to restart thread: {}",
106 format!("[{}]", email).bright_green(),
107 e.to_string().bright_red()
108 );
109 }
110 }
111 }
112 }
113 });
114 }
115
116 while let Some(message) = sub.next().await {
117 let user_id = String::from_utf8(message.payload.to_vec()).unwrap();
118 println!(
119 "Received message to restart thread for user: {}",
120 user_id.bright_green()
121 );
122
123 let mut thread_map = thread_map.lock().unwrap();
124
125 if let Some(stop_flag) = thread_map.get(&user_id) {
126 stop_flag.store(true, std::sync::atomic::Ordering::Relaxed);
127
128 let new_stop_flag = Arc::new(AtomicBool::new(false));
129 thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag));
130
131 let user = find_spotify_user(&pool, &user_id).await?;
132 if user.is_none() {
133 println!(
134 "Spotify user not found: {}, skipping",
135 user_id.bright_green()
136 );
137 continue;
138 }
139
140 let user = user.unwrap();
141 let email = user.0.clone();
142 let token = user.1.clone();
143 let did = user.2.clone();
144 let cache = cache.clone();
145
146 thread::spawn(move || {
147 let rt = tokio::runtime::Runtime::new().unwrap();
148 match rt.block_on(async {
149 watch_currently_playing(
150 email.clone(),
151 token,
152 did,
153 new_stop_flag,
154 cache.clone(),
155 )
156 .await?;
157 Ok::<(), Error>(())
158 }) {
159 Ok(_) => {}
160 Err(e) => {
161 println!(
162 "{} Error restarting thread for user: {} - {}",
163 format!("[{}]", email).bright_green(),
164 email.bright_green(),
165 e.to_string().bright_red()
166 );
167 }
168 }
169 });
170 println!("Restarted thread for user: {}", user_id.bright_green());
171 } else {
172 println!(
173 "No thread found for user: {}, starting new thread",
174 user_id.bright_green()
175 );
176 let user = find_spotify_user(&pool, &user_id).await?;
177 if let Some(user) = user {
178 let email = user.0.clone();
179 let token = user.1.clone();
180 let did = user.2.clone();
181 let stop_flag = Arc::new(AtomicBool::new(false));
182 let cache = cache.clone();
183 let nc = nc.clone();
184
185 thread_map.insert(email.clone(), Arc::clone(&stop_flag));
186
187 thread::spawn(move || {
188 let rt = tokio::runtime::Runtime::new().unwrap();
189 match rt.block_on(async {
190 watch_currently_playing(
191 email.clone(),
192 token,
193 did,
194 stop_flag,
195 cache.clone(),
196 )
197 .await?;
198 Ok::<(), Error>(())
199 }) {
200 Ok(_) => {}
201 Err(e) => {
202 println!(
203 "{} Error starting thread for user: {} - {}",
204 format!("[{}]", email).bright_green(),
205 email.bright_green(),
206 e.to_string().bright_red()
207 );
208 match rt
209 .block_on(nc.publish("rocksky.spotify.user", email.clone().into()))
210 {
211 Ok(_) => {}
212 Err(e) => {
213 println!(
214 "{} Error publishing message to restart thread: {}",
215 format!("[{}]", email).bright_green(),
216 e.to_string().bright_red()
217 );
218 }
219 }
220 }
221 }
222 });
223 }
224 }
225 }
226
227 Ok(())
228}
229
230pub async fn refresh_token(token: &str) -> Result<AccessToken, Error> {
231 if env::var("SPOTIFY_CLIENT_ID").is_err() || env::var("SPOTIFY_CLIENT_SECRET").is_err() {
232 panic!("Please set SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET environment variables");
233 }
234
235 let client_id = env::var("SPOTIFY_CLIENT_ID")?;
236 let client_secret = env::var("SPOTIFY_CLIENT_SECRET")?;
237
238 let client = Client::new();
239 let response = client
240 .post("https://accounts.spotify.com/api/token")
241 .basic_auth(&client_id, Some(client_secret))
242 .form(&[
243 ("grant_type", "refresh_token"),
244 ("refresh_token", token),
245 ("client_id", &client_id),
246 ])
247 .send()
248 .await?;
249
250 let token = response.json::<AccessToken>().await?;
251 Ok(token)
252}
253
254pub async fn get_currently_playing(
255 cache: Cache,
256 user_id: &str,
257 token: &str,
258) -> Result<Option<(CurrentlyPlaying, bool)>, Error> {
259 // Check if we have cached data
260 if let Ok(Some(data)) = cache.get(user_id) {
261 println!(
262 "{} {}",
263 format!("[{}]", user_id).bright_green(),
264 "Using cache".cyan()
265 );
266 if data == "No content" {
267 return Ok(None);
268 }
269
270 let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data);
271 if decoded_data.is_err() {
272 println!(
273 "{} {} {}",
274 format!("[{}]", user_id).bright_green(),
275 "Cache is invalid".red(),
276 data
277 );
278 cache.setex(user_id, "No content", 10)?;
279 cache.del(&format!("{}:current", user_id))?;
280 cache.del(&format!("{}:track_state", user_id))?;
281 return Ok(None);
282 }
283
284 let data: CurrentlyPlaying = decoded_data.unwrap();
285
286 let changed = detect_track_change(&cache, user_id, &data)?;
287 return Ok(Some((data, changed)));
288 }
289
290 let token = refresh_token(token).await?;
291 let client = Client::new();
292 let response = client
293 .get(format!("{}/me/player/currently-playing", BASE_URL))
294 .bearer_auth(token.access_token)
295 .send()
296 .await?;
297
298 let headers = response.headers().clone();
299 let status = response.status().as_u16();
300 let data = response.text().await?;
301
302 if status == 429 {
303 println!(
304 "{} Too many requests, retry-after {}",
305 format!("[{}]", user_id).bright_green(),
306 headers
307 .get("retry-after")
308 .unwrap()
309 .to_str()
310 .unwrap()
311 .bright_green()
312 );
313 return Ok(None);
314 }
315
316 if status == 204 {
317 println!("No content");
318 // Clear track state when nothing is playing
319 cache.del(&format!("{}:track_state", user_id))?;
320
321 let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() {
322 30
323 } else {
324 10
325 };
326
327 cache.setex(user_id, "No content", ttl)?;
328 cache.del(&format!("{}:current", user_id))?;
329 return Ok(None);
330 }
331
332 if serde_json::from_str::<CurrentlyPlaying>(&data).is_err() {
333 println!(
334 "{} {} {}",
335 format!("[{}]", user_id).bright_green(),
336 "Invalid data received".red(),
337 data
338 );
339 cache.setex(user_id, "No content", 10)?;
340 cache.del(&format!("{}:current", user_id))?;
341 cache.del(&format!("{}:track_state", user_id))?;
342 return Ok(None);
343 }
344
345 let currently_playing_data = serde_json::from_str::<CurrentlyPlaying>(&data)?;
346
347 let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() {
348 30
349 } else {
350 15
351 };
352
353 cache.setex(
354 user_id,
355 &serde_json::to_string(¤tly_playing_data)?,
356 ttl,
357 )?;
358 cache.del(&format!("{}:current", user_id))?;
359
360 // Detect track change and update track state
361 let changed = detect_track_change(&cache, user_id, ¤tly_playing_data)?;
362
363 // Update previous song cache
364 cache.setex(
365 &format!("{}:previous", user_id),
366 &serde_json::to_string(¤tly_playing_data)?,
367 600,
368 )?;
369
370 Ok(Some((currently_playing_data, changed)))
371}
372
373fn detect_track_change(
374 cache: &Cache,
375 user_id: &str,
376 current: &CurrentlyPlaying,
377) -> Result<bool, Error> {
378 let track_state_key = format!("{}:track_state", user_id);
379
380 let now = SystemTime::now()
381 .duration_since(UNIX_EPOCH)
382 .unwrap()
383 .as_secs();
384
385 let current_item = match ¤t.item {
386 Some(item) => item,
387 None => {
388 let _ = cache.del(&track_state_key);
389 return Ok(false);
390 }
391 };
392
393 let previous_state = cache.get(&track_state_key)?;
394
395 let changed = match previous_state {
396 Some(state_str) => {
397 if let Ok(prev_state) = serde_json::from_str::<TrackState>(&state_str) {
398 if prev_state.track_id != current_item.id {
399 true
400 } else {
401 // Same track - check if we should scrobble based on progress and time
402 let progress_diff =
403 current.progress_ms.unwrap_or(0) as i64 - prev_state.progress_ms as i64;
404 let time_diff = now - prev_state.last_updated;
405
406 // Only consider it changed if:
407 // 1. We haven't scrobbled this track yet
408 // 2. Significant progress was made (more than 10 seconds or reasonable time passed)
409 // 3. Track is actually playing
410 !prev_state.scrobbled
411 && current.is_playing
412 && (progress_diff > 10000 || (time_diff > 30 && progress_diff > 0))
413 }
414 } else {
415 // Invalid previous state, treat as changed
416 true
417 }
418 }
419 None => {
420 // No previous state, treat as new track
421 current.is_playing
422 }
423 };
424
425 let new_state = TrackState {
426 track_id: current_item.id.clone(),
427 progress_ms: current.progress_ms.unwrap_or(0),
428 scrobbled: changed, // Mark as scrobbled if we're reporting a change
429 last_updated: now,
430 };
431
432 cache.setex(&track_state_key, &serde_json::to_string(&new_state)?, 300)?;
433
434 Ok(changed)
435}
436
437pub async fn get_artist(
438 cache: Cache,
439 artist_id: &str,
440 token: &str,
441) -> Result<Option<Artist>, Error> {
442 if let Ok(Some(data)) = cache.get(artist_id) {
443 return Ok(Some(serde_json::from_str(&data)?));
444 }
445
446 let token = refresh_token(token).await?;
447 let client = Client::new();
448 let response = client
449 .get(&format!("{}/artists/{}", BASE_URL, artist_id))
450 .bearer_auth(token.access_token)
451 .send()
452 .await?;
453
454 let headers = response.headers().clone();
455 let data = response.text().await?;
456
457 if data == "Too many requests" {
458 println!(
459 "> retry-after {}",
460 headers.get("retry-after").unwrap().to_str().unwrap()
461 );
462 println!("> {} [get_artist]", data);
463 return Ok(None);
464 }
465
466 cache.setex(artist_id, &data, 20)?;
467 Ok(Some(serde_json::from_str(&data)?))
468}
469
470pub async fn get_album(cache: Cache, album_id: &str, token: &str) -> Result<Option<Album>, Error> {
471 if let Ok(Some(data)) = cache.get(album_id) {
472 return Ok(Some(serde_json::from_str(&data)?));
473 }
474
475 let token = refresh_token(token).await?;
476 let client = Client::new();
477 let response = client
478 .get(&format!("{}/albums/{}", BASE_URL, album_id))
479 .bearer_auth(token.access_token)
480 .send()
481 .await?;
482
483 let headers = response.headers().clone();
484 let data = response.text().await?;
485
486 if data == "Too many requests" {
487 println!(
488 "> retry-after {}",
489 headers.get("retry-after").unwrap().to_str().unwrap()
490 );
491 println!("> {} [get_album]", data);
492 return Ok(None);
493 }
494
495 cache.setex(album_id, &data, 20)?;
496 Ok(Some(serde_json::from_str(&data)?))
497}
498
499pub async fn get_album_tracks(
500 cache: Cache,
501 album_id: &str,
502 token: &str,
503) -> Result<AlbumTracks, Error> {
504 if let Ok(Some(data)) = cache.get(&format!("{}:tracks", album_id)) {
505 return Ok(serde_json::from_str(&data)?);
506 }
507
508 let token = refresh_token(token).await?;
509 let client = Client::new();
510 let mut all_tracks = Vec::new();
511 let mut offset = 0;
512 let limit = 50;
513
514 loop {
515 let response = client
516 .get(&format!("{}/albums/{}/tracks", BASE_URL, album_id))
517 .bearer_auth(&token.access_token)
518 .query(&[
519 ("limit", &limit.to_string()),
520 ("offset", &offset.to_string()),
521 ])
522 .send()
523 .await?;
524
525 let headers = response.headers().clone();
526 let data = response.text().await?;
527
528 if data == "Too many requests" {
529 println!(
530 "> retry-after {}",
531 headers.get("retry-after").unwrap().to_str().unwrap()
532 );
533 println!("> {} [get_album_tracks]", data);
534 continue;
535 }
536
537 let album_tracks: AlbumTracks = serde_json::from_str(&data)?;
538 if album_tracks.items.is_empty() {
539 break;
540 }
541
542 all_tracks.extend(album_tracks.items);
543 offset += limit;
544 }
545
546 let all_tracks_json = serde_json::to_string(&all_tracks)?;
547 cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20)?;
548
549 Ok(AlbumTracks {
550 items: all_tracks,
551 ..Default::default()
552 })
553}
554
555pub async fn find_spotify_users(
556 pool: &Pool<Postgres>,
557 offset: usize,
558 limit: usize,
559) -> Result<Vec<(String, String, String, String)>, Error> {
560 let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
561 r#"
562 SELECT * FROM spotify_tokens
563 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
564 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
565 LIMIT $1 OFFSET $2
566 "#,
567 )
568 .bind(limit as i64)
569 .bind(offset as i64)
570 .fetch_all(pool)
571 .await?;
572
573 let mut user_tokens = vec![];
574 for result in &results {
575 let token = decrypt_aes_256_ctr(
576 &result.refresh_token,
577 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
578 )?;
579 user_tokens.push((
580 result.email.clone(),
581 token,
582 result.did.clone(),
583 result.user_id.clone(),
584 ));
585 }
586
587 Ok(user_tokens)
588}
589
590pub async fn find_spotify_user(
591 pool: &Pool<Postgres>,
592 email: &str,
593) -> Result<Option<(String, String, String)>, Error> {
594 let result: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
595 r#"
596 SELECT * FROM spotify_tokens
597 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
598 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
599 WHERE spotify_accounts.email = $1
600 "#,
601 )
602 .bind(email)
603 .fetch_all(pool)
604 .await?;
605
606 match result.first() {
607 Some(result) => {
608 let token = decrypt_aes_256_ctr(
609 &result.refresh_token,
610 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
611 )?;
612 Ok(Some((result.email.clone(), token, result.did.clone())))
613 }
614 None => Ok(None),
615 }
616}
617
618pub async fn watch_currently_playing(
619 spotify_email: String,
620 token: String,
621 did: String,
622 stop_flag: Arc<AtomicBool>,
623 cache: Cache,
624) -> Result<(), Error> {
625 println!(
626 "{} {}",
627 format!("[{}]", spotify_email).bright_green(),
628 "Checking currently playing".cyan()
629 );
630
631 // Remove the separate progress tracking thread - it was causing race conditions
632 // and unnecessary complexity
633
634 loop {
635 if stop_flag.load(std::sync::atomic::Ordering::Relaxed) {
636 println!(
637 "{} Stopping Thread",
638 format!("[{}]", spotify_email).bright_green()
639 );
640 break;
641 }
642
643 let spotify_email = spotify_email.clone();
644 let token = token.clone();
645 let did = did.clone();
646 let cache = cache.clone();
647
648 let currently_playing = get_currently_playing(cache.clone(), &spotify_email, &token).await;
649 let currently_playing = match currently_playing {
650 Ok(currently_playing) => currently_playing,
651 Err(e) => {
652 println!(
653 "{} {}",
654 format!("[{}]", spotify_email).bright_green(),
655 e.to_string().bright_red()
656 );
657 tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
658 continue;
659 }
660 };
661
662 if let Some((data, changed)) = currently_playing {
663 if data.item.is_none() {
664 println!(
665 "{} {}",
666 format!("[{}]", spotify_email).bright_green(),
667 "No song playing".yellow()
668 );
669 tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
670 continue;
671 }
672
673 let data_item = data.item.unwrap();
674 println!(
675 "{} {} is_playing: {} changed: {}",
676 format!("[{}]", spotify_email).bright_green(),
677 format!("{} - {}", data_item.name, data_item.artists[0].name).yellow(),
678 data.is_playing,
679 changed
680 );
681
682 // Only scrobble if there's a genuine track change and the track is playing
683 if changed && data.is_playing {
684 // Add a small delay to prevent rapid duplicate scrobbles
685 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
686
687 match scrobble(cache.clone(), &spotify_email, &did, &token).await {
688 Ok(_) => {
689 println!(
690 "{} {}",
691 format!("[{}]", spotify_email).bright_green(),
692 "Scrobbled successfully".green()
693 );
694 }
695 Err(e) => {
696 println!(
697 "{} Scrobble failed: {}",
698 format!("[{}]", spotify_email).bright_green(),
699 e.to_string().bright_red()
700 );
701 }
702 }
703
704 // Spawn background task for library updates
705 let cache_clone = cache.clone();
706 let token_clone = token.clone();
707 let spotify_email_clone = spotify_email.clone();
708 let did_clone = did.clone();
709 let album_id = data_item.album.id.clone();
710
711 thread::spawn(move || {
712 let rt = tokio::runtime::Runtime::new().unwrap();
713 match rt.block_on(async {
714 get_album_tracks(cache_clone.clone(), &album_id, &token_clone).await?;
715 get_album(cache_clone.clone(), &album_id, &token_clone).await?;
716 update_library(
717 cache_clone.clone(),
718 &spotify_email_clone,
719 &did_clone,
720 &token_clone,
721 )
722 .await?;
723 Ok::<(), Error>(())
724 }) {
725 Ok(_) => {
726 println!(
727 "{} Library updated successfully",
728 format!("[{}]", spotify_email_clone).bright_green()
729 );
730 }
731 Err(e) => {
732 println!(
733 "{} Library update failed: {}",
734 format!("[{}]", spotify_email_clone).bright_green(),
735 e.to_string().bright_red()
736 );
737 }
738 }
739 });
740 }
741 }
742
743 tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
744 }
745
746 Ok(())
747}