forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use std::{collections::BTreeMap, env};
2
3use anyhow::Error;
4use rand::Rng;
5use sqlx::{Pool, Postgres};
6
7use crate::{
8 auth::{decode_token, extract_did},
9 cache::Cache,
10 crypto::decrypt_aes_256_ctr,
11 listenbrainz::types::SubmitListensRequest,
12 musicbrainz::{
13 client::MusicbrainzClient, get_best_release_from_recordings, recording::Recording,
14 },
15 repo::{self},
16 rocksky,
17 spotify::{client::SpotifyClient, refresh_token},
18 types::{Scrobble, Track},
19 xata::user::User,
20};
21
22const MAX_SPOTIFY_RETRIES: u32 = 3;
23const INITIAL_RETRY_DELAY_MS: u64 = 1000;
24
25fn parse_batch(form: &BTreeMap<String, String>) -> Result<Vec<Scrobble>, Error> {
26 let mut result = vec![];
27 let mut index = 0;
28
29 loop {
30 let artist = form.get(&format!("artist[{}]", index));
31 let track = form.get(&format!("track[{}]", index));
32 let timestamp = form.get(&format!("timestamp[{}]", index));
33
34 if artist.is_none() || track.is_none() || timestamp.is_none() {
35 break;
36 }
37
38 let album = form
39 .get(&format!("album[{}]", index))
40 .cloned()
41 .map(|x| x.trim().to_string());
42 let context = form
43 .get(&format!("context[{}]", index))
44 .cloned()
45 .map(|x| x.trim().to_string());
46 let stream_id = form
47 .get(&format!("streamId[{}]", index))
48 .and_then(|s| s.trim().parse().ok());
49 let chosen_by_user = form
50 .get(&format!("chosenByUser[{}]", index))
51 .and_then(|s| s.trim().parse().ok());
52 let track_number = form
53 .get(&format!("trackNumber[{}]", index))
54 .and_then(|s| s.trim().parse().ok());
55 let mbid = form.get(&format!("mbid[{}]", index)).cloned();
56 let album_artist = form
57 .get(&format!("albumArtist[{}]", index))
58 .map(|x| x.trim().to_string());
59 let duration = form
60 .get(&format!("duration[{}]", index))
61 .and_then(|s| s.trim().parse().ok());
62
63 let timestamp = timestamp
64 .unwrap()
65 .trim()
66 .parse()
67 .unwrap_or(chrono::Utc::now().timestamp() as u64);
68
69 // validate timestamp, must be in the past (between 14 days before to present)
70 let now = chrono::Utc::now().timestamp() as u64;
71 if timestamp > now {
72 return Err(Error::msg("Timestamp is in the future"));
73 }
74
75 if timestamp < now - 14 * 24 * 60 * 60 {
76 return Err(Error::msg("Timestamp is too old"));
77 }
78
79 result.push(Scrobble {
80 artist: artist.unwrap().trim().to_string(),
81 track: track.unwrap().trim().to_string(),
82 timestamp,
83 album,
84 context,
85 stream_id,
86 chosen_by_user,
87 track_number,
88 mbid,
89 album_artist,
90 duration,
91 ignored: None,
92 });
93
94 index += 1;
95 }
96
97 Ok(result)
98}
99
100pub async fn scrobble(
101 pool: &Pool<Postgres>,
102 cache: &Cache,
103 mb_client: &MusicbrainzClient,
104 form: &BTreeMap<String, String>,
105) -> Result<Vec<Scrobble>, Error> {
106 let mut scrobbles = parse_batch(form)?;
107
108 if scrobbles.is_empty() {
109 return Err(Error::msg("No scrobbles found"));
110 }
111
112 let did = extract_did(pool, form).await?;
113
114 let spofity_tokens = repo::spotify_token::get_spotify_tokens(pool, 100).await?;
115
116 if spofity_tokens.is_empty() {
117 return Err(Error::msg("No Spotify tokens found"));
118 }
119
120 for scrobble in &mut scrobbles {
121 /*
122 0. check if scrobble is cached
123 1. if mbid is present, check if it exists in the database
124 2. if it exists, scrobble
125 3. if it doesn't exist, check if it exists in Musicbrainz (using mbid)
126 4. if it exists, get album art from spotify and scrobble
127 5. if it doesn't exist, check if it exists in Spotify
128 6. if it exists, scrobble
129 7. if it doesn't exist, check if it exists in Musicbrainz (using track and artist)
130 8. if it exists, scrobble
131 9. if it doesn't exist, skip unknown track
132 */
133 let key = format!(
134 "{} - {}",
135 scrobble.artist.to_lowercase(),
136 scrobble.track.to_lowercase()
137 );
138 let cached = cache.get(&key)?;
139 if cached.is_some() {
140 tracing::info!(key = %key, "Cached:");
141 let track = serde_json::from_str::<Track>(&cached.unwrap())?;
142 scrobble.album = Some(track.album.clone());
143 rocksky::scrobble(cache, &did, track, scrobble.timestamp).await?;
144 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
145 continue;
146 }
147
148 if let Some(mbid) = &scrobble.mbid {
149 // let result = repo::track::get_track_by_mbid(pool, mbid).await?;
150 let result = mb_client.get_recording(mbid).await?;
151 tracing::info!(%scrobble.artist, %scrobble.track, "Musicbrainz (mbid)");
152 scrobble.album = Some(Track::from(result.clone()).album);
153 rocksky::scrobble(cache, &did, result.into(), scrobble.timestamp).await?;
154 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
155 continue;
156 }
157
158 let result = repo::track::get_track(pool, &scrobble.track, &scrobble.artist).await?;
159
160 if let Some(track) = result {
161 tracing::info!(artist = %scrobble.artist, track = %scrobble.track, "Xata (track)");
162 scrobble.album = Some(track.album.clone());
163 let album = repo::album::get_album_by_track_id(pool, &track.xata_id).await?;
164 let artist = repo::artist::get_artist_by_track_id(pool, &track.xata_id).await?;
165 let mut track: Track = track.into();
166 track.year = match album.year {
167 Some(year) => Some(year as u32),
168 None => match album.release_date.clone() {
169 Some(release_date) => {
170 let year = release_date.split("-").next();
171 year.and_then(|x| x.parse::<u32>().ok())
172 }
173 None => None,
174 },
175 };
176 track.release_date = album
177 .release_date
178 .map(|x| x.split("T").next().unwrap().to_string());
179 track.artist_picture = artist.picture.clone();
180
181 rocksky::scrobble(cache, &did, track, scrobble.timestamp).await?;
182 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
183 continue;
184 }
185
186 // we need to pick a random token to avoid Spotify rate limiting
187 // and to avoid using the same token for all scrobbles
188 // this is a simple way to do it, but we can improve it later
189 // by using a more sophisticated algorithm
190 // or by using a token pool
191 let mut rng = rand::rng();
192 let random_index = rng.random_range(0..spofity_tokens.len());
193 let spotify_token = &spofity_tokens[random_index];
194 let client_id = spotify_token.spotify_app_id.clone();
195
196 let client_secret = decrypt_aes_256_ctr(
197 &spotify_token.spotify_secret,
198 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
199 )?;
200
201 let spotify_token = decrypt_aes_256_ctr(
202 &spotify_token.refresh_token,
203 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
204 )?;
205
206 let spotify_token = refresh_token(&spotify_token, &client_id, &client_secret).await?;
207 let spotify_client = SpotifyClient::new(&spotify_token.access_token);
208
209 let result = retry_spotify_call(
210 || async {
211 tokio::time::timeout(
212 std::time::Duration::from_secs(5),
213 spotify_client.search(&format!(
214 r#"track:"{}" artist:"{}""#,
215 scrobble.track, scrobble.artist
216 )),
217 )
218 .await?
219 },
220 "search",
221 )
222 .await?;
223
224 if let Some(track) = result.tracks.items.first() {
225 tracing::info!(artist = %scrobble.artist, track = %scrobble.track, "Spotify (track)");
226 scrobble.album = Some(track.album.name.clone());
227 let mut track = track.clone();
228
229 if let Some(album) = retry_spotify_call(
230 || async { spotify_client.get_album(&track.album.id).await },
231 "get_album",
232 )
233 .await?
234 {
235 track.album = album;
236 }
237
238 if let Some(artist) = retry_spotify_call(
239 || async { spotify_client.get_artist(&track.album.artists[0].id).await },
240 "get_artist",
241 )
242 .await?
243 {
244 track.album.artists[0] = artist;
245 }
246
247 rocksky::scrobble(cache, &did, track.into(), scrobble.timestamp).await?;
248 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
249 continue;
250 }
251
252 let query = format!(
253 r#"recording:"{}" AND artist:"{}" AND status:Official"#,
254 scrobble.track, scrobble.artist
255 );
256 let result = mb_client.search(&query).await?;
257
258 if let Some(recording) = result.recordings.first() {
259 let result = mb_client.get_recording(&recording.id).await?;
260 tracing::info!(%scrobble.artist, %scrobble.track, "Musicbrainz (recording)");
261 scrobble.album = Some(Track::from(result.clone()).album);
262 rocksky::scrobble(cache, &did, result.into(), scrobble.timestamp).await?;
263 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
264 continue;
265 }
266
267 tracing::info!(artist = %scrobble.artist, track = %scrobble.track, "Track not found, skipping");
268 scrobble.ignored = Some(true);
269 }
270
271 Ok(scrobbles.clone())
272}
273
274pub async fn scrobble_v1(
275 pool: &Pool<Postgres>,
276 cache: &Cache,
277 mb_client: &MusicbrainzClient,
278 form: &BTreeMap<String, String>,
279) -> Result<(), Error> {
280 let session_id = form.get("s").unwrap().to_string();
281 let artist = form.get("a[0]").unwrap().to_string();
282 let track = form.get("t[0]").unwrap().to_string();
283 let timestamp = form.get("i[0]").unwrap().to_string();
284
285 let user = cache.get(&format!("lastfm:{}", session_id))?;
286 if user.is_none() {
287 return Err(Error::msg("Session ID not found"));
288 }
289
290 let user = user.unwrap();
291 let user = serde_json::from_str::<User>(&user)?;
292
293 let spofity_tokens = repo::spotify_token::get_spotify_tokens(pool, 100).await?;
294
295 if spofity_tokens.is_empty() {
296 return Err(Error::msg("No Spotify tokens found"));
297 }
298
299 let mut scrobble = Scrobble {
300 artist: artist.trim().to_string(),
301 track: track.trim().to_string(),
302 timestamp: timestamp.parse::<u64>()?,
303 album: None,
304 context: None,
305 stream_id: None,
306 chosen_by_user: None,
307 track_number: None,
308 mbid: None,
309 album_artist: None,
310 duration: None,
311 ignored: None,
312 };
313
314 let did = user.did.clone();
315
316 /*
317 0. check if scrobble is cached
318 1. if mbid is present, check if it exists in the database
319 2. if it exists, scrobble
320 3. if it doesn't exist, check if it exists in Musicbrainz (using mbid)
321 4. if it exists, get album art from spotify and scrobble
322 5. if it doesn't exist, check if it exists in Spotify
323 6. if it exists, scrobble
324 7. if it doesn't exist, check if it exists in Musicbrainz (using track and artist)
325 8. if it exists, scrobble
326 9. if it doesn't exist, skip unknown track
327 */
328 let key = format!(
329 "{} - {}",
330 scrobble.artist.to_lowercase(),
331 scrobble.track.to_lowercase()
332 );
333 let cached = cache.get(&key)?;
334 if cached.is_some() {
335 tracing::info!(key = %key, "Cached:");
336 let track = serde_json::from_str::<Track>(&cached.unwrap())?;
337 scrobble.album = Some(track.album.clone());
338 rocksky::scrobble(cache, &did, track, scrobble.timestamp).await?;
339 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
340 return Ok(());
341 }
342
343 if let Some(mbid) = &scrobble.mbid {
344 // let result = repo::track::get_track_by_mbid(pool, mbid).await?;
345 let result = mb_client.get_recording(mbid).await?;
346 tracing::info!(%scrobble.artist, %scrobble.track, "Musicbrainz (mbid)");
347 scrobble.album = Some(Track::from(result.clone()).album);
348 rocksky::scrobble(cache, &did, result.into(), scrobble.timestamp).await?;
349 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
350 return Ok(());
351 }
352
353 let result = repo::track::get_track(pool, &scrobble.track, &scrobble.artist).await?;
354
355 if let Some(track) = result {
356 tracing::info!(artist = %scrobble.artist, track = %scrobble.track, "Xata (track)");
357 scrobble.album = Some(track.album.clone());
358 let album = repo::album::get_album_by_track_id(pool, &track.xata_id).await?;
359 let artist = repo::artist::get_artist_by_track_id(pool, &track.xata_id).await?;
360 let mut track: Track = track.into();
361 track.year = match album.year {
362 Some(year) => Some(year as u32),
363 None => match album.release_date.clone() {
364 Some(release_date) => {
365 let year = release_date.split("-").next();
366 year.and_then(|x| x.parse::<u32>().ok())
367 }
368 None => None,
369 },
370 };
371 track.release_date = album
372 .release_date
373 .map(|x| x.split("T").next().unwrap().to_string());
374 track.artist_picture = artist.picture.clone();
375
376 rocksky::scrobble(cache, &did, track, scrobble.timestamp).await?;
377 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
378 return Ok(());
379 }
380
381 // we need to pick a random token to avoid Spotify rate limiting
382 // and to avoid using the same token for all scrobbles
383 // this is a simple way to do it, but we can improve it later
384 // by using a more sophisticated algorithm
385 // or by using a token pool
386 let mut rng = rand::rng();
387 let random_index = rng.random_range(0..spofity_tokens.len());
388 let spotify_token = &spofity_tokens[random_index];
389 let client_id = spotify_token.spotify_app_id.clone();
390
391 let client_secret = decrypt_aes_256_ctr(
392 &spotify_token.spotify_secret,
393 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
394 )?;
395
396 let spotify_token = decrypt_aes_256_ctr(
397 &spotify_token.refresh_token,
398 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
399 )?;
400
401 let spotify_token = refresh_token(&spotify_token, &client_id, &client_secret).await?;
402 let spotify_client = SpotifyClient::new(&spotify_token.access_token);
403
404 let result = retry_spotify_call(
405 || async {
406 spotify_client
407 .search(&format!(
408 r#"track:"{}" artist:"{}""#,
409 scrobble.track, scrobble.artist
410 ))
411 .await
412 },
413 "search",
414 )
415 .await?;
416
417 if let Some(track) = result.tracks.items.first() {
418 let normalize = |s: &str| -> String {
419 s.to_lowercase()
420 .chars()
421 .filter_map(|c| match c {
422 '谩' | '脿' | '盲' | '芒' | '茫' | '氓' => Some('a'),
423 '茅' | '猫' | '毛' | '锚' => Some('e'),
424 '铆' | '矛' | '茂' | '卯' => Some('i'),
425 '贸' | '貌' | '枚' | '么' | '玫' => Some('o'),
426 '煤' | '霉' | '眉' | '没' => Some('u'),
427 '帽' => Some('n'),
428 '莽' => Some('c'),
429 _ => Some(c),
430 })
431 .collect()
432 };
433
434 let spotify_artists: Vec<String> =
435 track.artists.iter().map(|a| normalize(&a.name)).collect();
436
437 // check if artists don't contain the scrobble artist (to avoid wrong matches)
438 // scrobble artist can contain multiple artists separated by ", "
439 let scrobble_artists: Vec<String> = scrobble
440 .artist
441 .split(", ")
442 .map(|a| normalize(a.trim()))
443 .collect();
444
445 // Check for matches with partial matching:
446 // 1. Check if any scrobble artist is contained in any Spotify artist
447 // 2. Check if any Spotify artist is contained in any scrobble artist
448 let has_artist_match = scrobble_artists.iter().any(|scrobble_artist| {
449 spotify_artists.iter().any(|spotify_artist| {
450 scrobble_artist.contains(spotify_artist) || spotify_artist.contains(scrobble_artist)
451 })
452 });
453
454 if !has_artist_match {
455 tracing::warn!(artist = %artist, track = ?track, "Artist mismatch, skipping");
456 } else {
457 tracing::info!(artist = %scrobble.artist, track = %scrobble.track, "Spotify (track)");
458 scrobble.album = Some(track.album.name.clone());
459 let mut track = track.clone();
460
461 if let Some(album) = retry_spotify_call(
462 || async { spotify_client.get_album(&track.album.id).await },
463 "get_album",
464 )
465 .await?
466 {
467 track.album = album;
468 }
469
470 if let Some(artist) = retry_spotify_call(
471 || async { spotify_client.get_artist(&track.album.artists[0].id).await },
472 "get_artist",
473 )
474 .await?
475 {
476 track.album.artists[0] = artist;
477 }
478
479 rocksky::scrobble(cache, &did, track.into(), scrobble.timestamp).await?;
480 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
481 return Ok(());
482 }
483 }
484
485 let query = format!(
486 r#"recording:"{}" AND artist:"{}" AND status:Official"#,
487 scrobble.track, scrobble.artist
488 );
489 let result = search_musicbrainz_recording(&query, mb_client, &scrobble).await;
490 if let Err(e) = result {
491 tracing::warn!(artist = %scrobble.artist, track = %scrobble.track, "Musicbrainz search error: {}", e);
492 return Ok(());
493 }
494 let result = result.unwrap();
495 if let Some(recording) = result {
496 let result = mb_client.get_recording(&recording.id).await?;
497 tracing::info!(%scrobble.artist, %scrobble.track, "Musicbrainz (recording)");
498 scrobble.album = Some(Track::from(result.clone()).album);
499 rocksky::scrobble(cache, &did, result.into(), scrobble.timestamp).await?;
500 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
501 return Ok(());
502 }
503
504 tracing::info!(artist = %artist, track = %track, "Track not found, skipping");
505
506 Ok(())
507}
508
509pub async fn scrobble_listenbrainz(
510 pool: &Pool<Postgres>,
511 cache: &Cache,
512 mb_client: &MusicbrainzClient,
513 req: &SubmitListensRequest,
514 token: &str,
515) -> Result<(), Error> {
516 tracing::info!(req = ?req, "Listenbrainz submission");
517
518 if req.payload.is_empty() {
519 return Err(Error::msg("No payload found"));
520 }
521
522 let artist = req.payload[0].track_metadata.artist_name.clone();
523 let track = req.payload[0].track_metadata.track_name.clone();
524 let timestamp = match req.payload[0].listened_at {
525 Some(timestamp) => timestamp.to_string(),
526 None => chrono::Utc::now().timestamp().to_string(),
527 };
528
529 let did = match decode_token(token) {
530 Ok(claims) => claims.did,
531 Err(e) => {
532 let user = repo::user::get_user_by_apikey(pool, token)
533 .await?
534 .map(|user| user.did);
535 if let Some(did) = user {
536 did
537 } else {
538 return Err(Error::msg(format!(
539 "Failed to decode token: {} {}",
540 e, token
541 )));
542 }
543 }
544 };
545
546 let user = repo::user::get_user_by_did(pool, &did).await?;
547
548 if user.is_none() {
549 return Err(Error::msg("User not found"));
550 }
551
552 cache.setex(
553 &format!("listenbrainz:emby:{}:{}:{}", artist, track, did),
554 "1",
555 60 * 5, // 5 minutes
556 )?;
557
558 if cache
559 .get(&format!("listenbrainz:cache:{}:{}:{}", artist, track, did))?
560 .is_some()
561 {
562 tracing::info!(artist= %artist, track = %track, "Recently scrobbled, skipping");
563 return Ok(());
564 }
565
566 let spotify_user = repo::spotify_account::get_spotify_account(pool, &did).await?;
567 if let Some(spotify_user) = spotify_user {
568 if cache
569 .get(&format!("{}:current", spotify_user.email))?
570 .is_some()
571 {
572 tracing::info!(artist= %artist, track = %track, "Currently scrobbling, skipping");
573 return Ok(());
574 }
575 }
576
577 if cache.get(&format!("nowplaying:{}", did))?.is_some() {
578 tracing::info!(artist= %artist, track = %track, "Currently scrobbling, skipping");
579 return Ok(());
580 }
581
582 // set cache for 60 seconds to avoid duplicate scrobbles
583 cache.setex(
584 &format!("listenbrainz:cache:{}:{}:{}", artist, track, did),
585 "1",
586 30, // 30 seconds
587 )?;
588
589 let spofity_tokens = repo::spotify_token::get_spotify_tokens(pool, 100).await?;
590
591 if spofity_tokens.is_empty() {
592 return Err(Error::msg("No Spotify tokens found"));
593 }
594
595 let mut scrobble = Scrobble {
596 artist: artist.trim().to_string(),
597 track: track.trim().to_string(),
598 timestamp: timestamp.parse::<u64>()?,
599 album: None,
600 context: None,
601 stream_id: None,
602 chosen_by_user: None,
603 track_number: None,
604 mbid: None,
605 album_artist: None,
606 duration: None,
607 ignored: None,
608 };
609
610 /*
611 0. check if scrobble is cached
612 1. if mbid is present, check if it exists in the database
613 2. if it exists, scrobble
614 3. if it doesn't exist, check if it exists in Musicbrainz (using mbid)
615 4. if it exists, get album art from spotify and scrobble
616 5. if it doesn't exist, check if it exists in Spotify
617 6. if it exists, scrobble
618 7. if it doesn't exist, check if it exists in Musicbrainz (using track and artist)
619 8. if it exists, scrobble
620 9. if it doesn't exist, skip unknown track
621 */
622 let key = format!(
623 "{} - {}",
624 scrobble.artist.to_lowercase(),
625 scrobble.track.to_lowercase()
626 );
627 let cached = cache.get(&key)?;
628 if cached.is_some() {
629 tracing::info!(key = %key, "Cached");
630 let track = serde_json::from_str::<Track>(&cached.unwrap())?;
631 scrobble.album = Some(track.album.clone());
632 rocksky::scrobble(cache, &did, track, scrobble.timestamp).await?;
633 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
634 return Ok(());
635 }
636
637 if let Some(mbid) = &scrobble.mbid {
638 // let result = repo::track::get_track_by_mbid(pool, mbid).await?;
639 let result = mb_client.get_recording(mbid).await?;
640 tracing::info!("Musicbrainz (mbid)");
641 scrobble.album = Some(Track::from(result.clone()).album);
642 rocksky::scrobble(cache, &did, result.into(), scrobble.timestamp).await?;
643 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
644 return Ok(());
645 }
646
647 let result = repo::track::get_track(pool, &scrobble.track, &scrobble.artist).await?;
648
649 if let Some(track) = result {
650 tracing::info!(id = %track.xata_id, artist = %track.artist, album = %track.album, album_atist = %track.album_artist, album_uri = ?track.album_uri, artist_uri = ?track.artist_uri, "Xata (track)");
651 scrobble.album = Some(track.album.clone());
652 let album =
653 repo::album::get_album_by_uri(pool, &track.album_uri.clone().unwrap_or_default())
654 .await?;
655 let artist =
656 repo::artist::get_artist_by_uri(pool, &track.artist_uri.clone().unwrap_or_default())
657 .await?;
658 let mut track: Track = track.into();
659 track.year = match album.year {
660 Some(year) => Some(year as u32),
661 None => match album.release_date.clone() {
662 Some(release_date) => {
663 let year = release_date.split("-").next();
664 year.and_then(|x| x.parse::<u32>().ok())
665 }
666 None => None,
667 },
668 };
669 track.release_date = album
670 .release_date
671 .map(|x| x.split("T").next().unwrap().to_string());
672 track.artist_picture = artist.picture.clone();
673
674 rocksky::scrobble(cache, &did, track, scrobble.timestamp).await?;
675 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
676 return Ok(());
677 }
678
679 // we need to pick a random token to avoid Spotify rate limiting
680 // and to avoid using the same token for all scrobbles
681 // this is a simple way to do it, but we can improve it later
682 // by using a more sophisticated algorithm
683 // or by using a token pool
684 let mut rng = rand::rng();
685 let random_index = rng.random_range(0..spofity_tokens.len());
686 let spotify_token = &spofity_tokens[random_index];
687
688 let client_id = spotify_token.spotify_app_id.clone();
689 let client_secret = decrypt_aes_256_ctr(
690 &spotify_token.spotify_secret,
691 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
692 )?;
693
694 let spotify_token = decrypt_aes_256_ctr(
695 &spotify_token.refresh_token,
696 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
697 )?;
698
699 let spotify_token = refresh_token(&spotify_token, &client_id, &client_secret).await?;
700 let spotify_client = SpotifyClient::new(&spotify_token.access_token);
701
702 let result = retry_spotify_call(
703 || async {
704 spotify_client
705 .search(&format!(
706 r#"track:"{}" artist:"{}""#,
707 scrobble.track, scrobble.artist
708 ))
709 .await
710 },
711 "search",
712 )
713 .await?;
714
715 if let Some(track) = result.tracks.items.first() {
716 let normalize = |s: &str| -> String {
717 s.to_lowercase()
718 .chars()
719 .filter_map(|c| match c {
720 '谩' | '脿' | '盲' | '芒' | '茫' | '氓' => Some('a'),
721 '茅' | '猫' | '毛' | '锚' => Some('e'),
722 '铆' | '矛' | '茂' | '卯' => Some('i'),
723 '贸' | '貌' | '枚' | '么' | '玫' => Some('o'),
724 '煤' | '霉' | '眉' | '没' => Some('u'),
725 '帽' => Some('n'),
726 '莽' => Some('c'),
727 _ => Some(c),
728 })
729 .collect()
730 };
731
732 let spotify_artists: Vec<String> =
733 track.artists.iter().map(|a| normalize(&a.name)).collect();
734
735 let scrobble_artist_normalized = normalize(&scrobble.artist);
736
737 // Check for matches with partial matching:
738 // 1. Check if scrobble artist is contained in any Spotify artist
739 // 2. Check if any Spotify artist is contained in scrobble artist
740 let has_artist_match = spotify_artists.iter().any(|spotify_artist| {
741 scrobble_artist_normalized.contains(spotify_artist)
742 || spotify_artist.contains(&scrobble_artist_normalized)
743 });
744
745 if !has_artist_match {
746 tracing::warn!(artist = %artist, track = ?track, "Artist mismatch, skipping");
747 } else {
748 tracing::info!("Spotify (track)");
749 scrobble.album = Some(track.album.name.clone());
750 let mut track = track.clone();
751
752 if let Some(album) = retry_spotify_call(
753 || async { spotify_client.get_album(&track.album.id).await },
754 "get_album",
755 )
756 .await?
757 {
758 track.album = album;
759 }
760
761 if let Some(artist) = retry_spotify_call(
762 || async { spotify_client.get_artist(&track.album.artists[0].id).await },
763 "get_artist",
764 )
765 .await?
766 {
767 track.album.artists[0] = artist;
768 }
769
770 rocksky::scrobble(cache, &did, track.into(), scrobble.timestamp).await?;
771 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
772 return Ok(());
773 }
774 }
775
776 let query = format!(
777 r#"recording:"{}" AND artist:"{}" AND status:Official"#,
778 scrobble.track, scrobble.artist
779 );
780 let result = search_musicbrainz_recording(&query, mb_client, &scrobble).await;
781 if let Err(e) = result {
782 tracing::warn!(artist = %artist, track = %track, "Musicbrainz search error: {}", e);
783 return Ok(());
784 }
785 let result = result.unwrap();
786 if let Some(result) = result {
787 tracing::info!("Musicbrainz (recording)");
788 rocksky::scrobble(cache, &did, result.into(), scrobble.timestamp).await?;
789 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
790 return Ok(());
791 }
792
793 tracing::warn!(artist = %artist, track = %track, "Track not found, skipping");
794
795 Ok(())
796}
797
798async fn search_musicbrainz_recording(
799 query: &str,
800 mb_client: &MusicbrainzClient,
801 scrobble: &Scrobble,
802) -> Result<Option<Recording>, Error> {
803 let result = mb_client.search(&query).await;
804 if let Err(e) = result {
805 tracing::warn!(artist = %scrobble.artist, track = %scrobble.track, "Musicbrainz search error: {}", e);
806 return Ok(None);
807 }
808 let result = result.unwrap();
809
810 let release = get_best_release_from_recordings(&result, &scrobble.artist);
811
812 if let Some(release) = release {
813 let recording = result.recordings.into_iter().find(|r| {
814 r.releases
815 .as_ref()
816 .map(|releases| releases.iter().any(|rel| rel.id == release.id))
817 .unwrap_or(false)
818 });
819 if recording.is_none() {
820 tracing::warn!(artist = %scrobble.artist, track = %scrobble.track, "Recording not found in MusicBrainz result, skipping");
821 return Ok(None);
822 }
823 let recording = recording.unwrap();
824 let mut result = mb_client.get_recording(&recording.id).await?;
825 tracing::info!("Musicbrainz (recording)");
826 result.releases = Some(vec![release]);
827 return Ok(Some(result));
828 }
829
830 Ok(None)
831}
832
833async fn retry_spotify_call<F, Fut, T>(mut f: F, operation: &str) -> Result<T, Error>
834where
835 F: FnMut() -> Fut,
836 Fut: std::future::Future<Output = Result<T, Error>>,
837{
838 let mut last_error = None;
839
840 for attempt in 0..MAX_SPOTIFY_RETRIES {
841 match f().await {
842 Ok(result) => return Ok(result),
843 Err(e) => {
844 let is_timeout = e.to_string().contains("timed out")
845 || e.to_string().contains("timeout")
846 || e.to_string().contains("operation timed out");
847
848 if is_timeout && attempt < MAX_SPOTIFY_RETRIES - 1 {
849 let delay = INITIAL_RETRY_DELAY_MS * 2_u64.pow(attempt);
850 tracing::warn!(
851 attempt = attempt + 1,
852 max_attempts = MAX_SPOTIFY_RETRIES,
853 delay_ms = delay,
854 operation = operation,
855 "Spotify API timeout, retrying..."
856 );
857 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
858 last_error = Some(e);
859 } else {
860 return Err(e);
861 }
862 }
863 }
864 }
865
866 Err(last_error.unwrap_or_else(|| Error::msg("Max retries exceeded")))
867}