···11+use anyhow::Error;
22+use reqwest::Client;
33+44+use crate::{auth::generate_token, cache::Cache, types::Track};
55+66+const ROCKSKY_API: &str = "https://api.rocksky.app";
77+88+pub async fn scrobble(cache: &Cache, did: &str, track: Track, timestamp: u64) -> Result<(), Error> {
99+ let key = format!("{} - {}", track.artist.to_lowercase(), track.title.to_lowercase());
1010+1111+ // Check if the track is already in the cache, if not add it
1212+ if !cache.exists(&key)? {
1313+ let value = serde_json::to_string(&track)?;
1414+ let ttl = 15 * 60; // 15 minutes
1515+ cache.setex(&key, &value, ttl)?;
1616+ }
1717+1818+ let mut track = track;
1919+ track.timestamp = Some(timestamp);
2020+2121+ let token = generate_token(did)?;
2222+ let client = Client::new();
2323+2424+ println!("Scrobbling track: \n {:#?}", track);
2525+2626+ let response= client
2727+ .post(&format!("{}/now-playing", ROCKSKY_API))
2828+ .bearer_auth(token)
2929+ .json(&track)
3030+ .send()
3131+ .await?;
3232+3333+ if !response.status().is_success() {
3434+ return Err(Error::msg(format!("Failed to scrobble track: {}", response.text().await?)));
3535+ }
3636+3737+ Ok(())
3838+}
+204
crates/scrobbler/src/scrobbler.rs
···11+use std::{collections::BTreeMap, env};
22+33+use anyhow::Error;
44+use owo_colors::OwoColorize;
55+use rand::Rng;
66+use sqlx::{Pool, Postgres};
77+88+use crate::{
99+ auth::extract_did,
1010+ cache::Cache, crypto::decrypt_aes_256_ctr,
1111+ types::Scrobble,
1212+ musicbrainz::client::MusicbrainzClient,
1313+ repo,
1414+ rocksky,
1515+ spotify::{
1616+ client::SpotifyClient,
1717+ refresh_token
1818+ }, types::Track
1919+};
2020+2121+fn parse_batch(form: &BTreeMap<String, String>) -> Result<Vec<Scrobble>, Error> {
2222+ let mut result = vec![];
2323+ let mut index = 0;
2424+2525+ loop {
2626+ let artist = form.get(&format!("artist[{}]", index));
2727+ let track = form.get(&format!("track[{}]", index));
2828+ let timestamp = form.get(&format!("timestamp[{}]", index));
2929+3030+ if artist.is_none() || track.is_none() || timestamp.is_none() {
3131+ break;
3232+ }
3333+3434+ let album = form.get(&format!("album[{}]", index))
3535+ .cloned()
3636+ .map(|x| x.trim().to_string());
3737+ let context = form.get(&format!("context[{}]", index))
3838+ .cloned()
3939+ .map(|x| x.trim().to_string());
4040+ let stream_id = form.get(&format!("streamId[{}]", index))
4141+ .and_then(|s| s.trim().parse().ok());
4242+ let chosen_by_user = form
4343+ .get(&format!("chosenByUser[{}]", index))
4444+ .and_then(|s| s.trim().parse().ok());
4545+ let track_number = form
4646+ .get(&format!("trackNumber[{}]", index))
4747+ .and_then(|s| s.trim().parse().ok());
4848+ let mbid = form.get(&format!("mbid[{}]", index)).cloned();
4949+ let album_artist = form.get(&format!("albumArtist[{}]", index)).map(|x| x.trim().to_string());
5050+ let duration = form
5151+ .get(&format!("duration[{}]", index))
5252+ .and_then(|s| s.trim().parse().ok());
5353+5454+ let timestamp = timestamp.unwrap().trim().parse().unwrap_or(
5555+ chrono::Utc::now().timestamp() as u64,
5656+ );
5757+5858+ // validate timestamp, must be in the past (between 14 days before to present)
5959+ let now = chrono::Utc::now().timestamp() as u64;
6060+ if timestamp > now {
6161+ return Err(Error::msg("Timestamp is in the future"));
6262+ }
6363+6464+ if timestamp < now - 14 * 24 * 60 * 60 {
6565+ return Err(Error::msg("Timestamp is too old"));
6666+ }
6767+6868+ result.push(Scrobble {
6969+ artist: artist.unwrap().trim().to_string(),
7070+ track: track.unwrap().trim().to_string(),
7171+ timestamp,
7272+ album,
7373+ context,
7474+ stream_id,
7575+ chosen_by_user,
7676+ track_number,
7777+ mbid,
7878+ album_artist,
7979+ duration,
8080+ ignored: None,
8181+ });
8282+8383+ index += 1;
8484+ }
8585+8686+ Ok(result)
8787+}
8888+8989+pub async fn scrobble(pool: &Pool<Postgres>, cache: &Cache, form: &BTreeMap<String, String>) -> Result<Vec<Scrobble>, Error> {
9090+ let mut scrobbles = parse_batch(form)?;
9191+9292+ if scrobbles.is_empty() {
9393+ return Err(Error::msg("No scrobbles found"));
9494+ }
9595+9696+ let did = extract_did(pool, form).await?;
9797+9898+ let spofity_tokens = repo::spotify_token::get_spotify_tokens(pool, 100).await?;
9999+100100+ if spofity_tokens.is_empty() {
101101+ return Err(Error::msg("No Spotify tokens found"));
102102+ }
103103+104104+ let mb_client = MusicbrainzClient::new();
105105+106106+ for scrobble in &mut scrobbles {
107107+ /*
108108+ 0. check if scrobble is cached
109109+ 1. if mbid is present, check if it exists in the database
110110+ 2. if it exists, scrobble
111111+ 3. if it doesn't exist, check if it exists in Musicbrainz (using mbid)
112112+ 4. if it exists, get album art from spotify and scrobble
113113+ 5. if it doesn't exist, check if it exists in Spotify
114114+ 6. if it exists, scrobble
115115+ 7. if it doesn't exist, check if it exists in Musicbrainz (using track and artist)
116116+ 8. if it exists, scrobble
117117+ 9. if it doesn't exist, skip unknown track
118118+ */
119119+ let key = format!("{} - {}", scrobble.artist.to_lowercase(), scrobble.track.to_lowercase());
120120+ let cached = cache.get(&key)?;
121121+ if cached.is_some() {
122122+ println!("{}", format!("Cached: {}", key).yellow());
123123+ let track = serde_json::from_str::<Track>(&cached.unwrap())?;
124124+ scrobble.album = Some(track.album.clone());
125125+ rocksky::scrobble(cache, &did, track, scrobble.timestamp).await?;
126126+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
127127+ continue;
128128+ }
129129+130130+ if let Some(mbid) = &scrobble.mbid {
131131+ // let result = repo::track::get_track_by_mbid(pool, mbid).await?;
132132+ let result = mb_client.get_recording(mbid).await?;
133133+ println!("{}", "Musicbrainz (mbid)".yellow());
134134+ scrobble.album = Some(Track::from(result.clone()).album);
135135+ rocksky::scrobble(cache, &did, result.into(), scrobble.timestamp).await?;
136136+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
137137+ continue;
138138+ }
139139+140140+ let result = repo::track::get_track(pool, &scrobble.track, &scrobble.artist).await?;
141141+142142+ if let Some(track) = result {
143143+ println!("{}", "Xata (track)".yellow());
144144+ scrobble.album = Some(track.album.clone());
145145+ rocksky::scrobble(cache, &did, track.into(), scrobble.timestamp).await?;
146146+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
147147+ continue;
148148+ }
149149+150150+ // we need to pick a random token to avoid Spotify rate limiting
151151+ // and to avoid using the same token for all scrobbles
152152+ // this is a simple way to do it, but we can improve it later
153153+ // by using a more sophisticated algorithm
154154+ // or by using a token pool
155155+ let mut rng = rand::rng();
156156+ let random_index = rng.random_range(0..spofity_tokens.len());
157157+ let spotify_token = &spofity_tokens[random_index];
158158+159159+ let spotify_token = decrypt_aes_256_ctr(
160160+ &spotify_token.refresh_token,
161161+ &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?
162162+ )?;
163163+164164+ let spotify_token = refresh_token(&spotify_token).await?;
165165+ let spotify_client = SpotifyClient::new(&spotify_token.access_token);
166166+167167+ let result = spotify_client.search(&format!(r#"track:"{}" artist:"{}""#, scrobble.track, scrobble.artist)).await?;
168168+169169+ if let Some(track) = result.tracks.items.first() {
170170+ println!("{}", "Spotify (track)".yellow());
171171+ scrobble.album = Some(track.album.name.clone());
172172+ let mut track = track.clone();
173173+174174+ if let Some(album) = spotify_client.get_album(&track.album.id).await? {
175175+ track.album = album;
176176+ }
177177+178178+ rocksky::scrobble(cache, &did, track.into(), scrobble.timestamp).await?;
179179+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
180180+ continue;
181181+ }
182182+183183+ let query = format!(
184184+ r#"recording:"{}" AND artist:"{}""#,
185185+ scrobble.track, scrobble.artist
186186+ );
187187+ let result = mb_client.search(&query).await?;
188188+189189+ if let Some(recording) = result.recordings.first() {
190190+ let result = mb_client.get_recording(&recording.id).await?;
191191+ println!("{}", "Musicbrainz (recording)".yellow());
192192+ scrobble.album = Some(Track::from(result.clone()).album);
193193+ rocksky::scrobble(cache, &did, result.into(), scrobble.timestamp).await?;
194194+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
195195+ continue;
196196+ }
197197+198198+ println!("{} {} - {}, skipping", "Track not found: ".yellow(), scrobble.artist, scrobble.track);
199199+ scrobble.ignored = Some(true);
200200+ }
201201+202202+203203+ Ok(scrobbles.clone())
204204+}
+20-1
crates/scrobbler/src/spotify/client.rs
···11-use super::types::SearchResponse;
11+use super::types::{Album, SearchResponse};
22use anyhow::Error;
3344pub const BASE_URL: &str = "https://api.spotify.com/v1";
···2727 let result = response.json().await?;
2828 Ok(result)
2929 }
3030+3131+ pub async fn get_album(&self, id: &str) -> Result<Option<Album>, Error> {
3232+ let url = format!("{}/albums/{}", BASE_URL, id);
3333+ let client = reqwest::Client::new();
3434+ let response = client.get(&url)
3535+ .bearer_auth(&self.token)
3636+ .send().await?;
3737+3838+ let headers = response.headers().clone();
3939+ let data = response.text().await?;
4040+4141+ if data == "Too many requests" {
4242+ println!("> retry-after {}", headers.get("retry-after").unwrap().to_str().unwrap());
4343+ println!("> {} [get_album]", data);
4444+ return Ok(None);
4545+ }
4646+4747+ Ok(Some(serde_json::from_str(&data)?))
4848+ }
3049}
+30
crates/scrobbler/src/spotify/mod.rs
···11+use std::env;
22+33+use reqwest::Client;
44+use types::AccessToken;
55+use anyhow::Error;
66+17pub mod client;
28pub mod types;
99+1010+1111+pub async fn refresh_token(token: &str) -> Result<AccessToken, Error> {
1212+ if env::var("SPOTIFY_CLIENT_ID").is_err() || env::var("SPOTIFY_CLIENT_SECRET").is_err() {
1313+ panic!("Please set SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET environment variables");
1414+ }
1515+1616+ let client_id = env::var("SPOTIFY_CLIENT_ID")?;
1717+ let client_secret = env::var("SPOTIFY_CLIENT_SECRET")?;
1818+1919+ let client = Client::new();
2020+2121+ let response = client.post("https://accounts.spotify.com/api/token")
2222+ .basic_auth(&client_id, Some(client_secret))
2323+ .form(&[
2424+ ("grant_type", "refresh_token"),
2525+ ("refresh_token", token),
2626+ ("client_id", &client_id)
2727+ ])
2828+ .send()
2929+ .await?;
3030+ let token = response.json::<AccessToken>().await?;
3131+ Ok(token)
3232+}
···33import { equals, SelectedPick } from "@xata.io/client";
44import { Context } from "context";
55import { createHash } from "crypto";
66+import dayjs from "dayjs";
67import * as Album from "lexicon/types/app/rocksky/album";
78import * as Artist from "lexicon/types/app/rocksky/artist";
89import * as Scrobble from "lexicon/types/app/rocksky/scrobble";
···225226 copyrightMessage: !!track.copyrightMessage
226227 ? track.copyrightMessage
227228 : undefined,
228228- createdAt: new Date().toISOString(),
229229+ // if track.timestamp is not null, set it to the timestamp
230230+ createdAt: track.timestamp
231231+ ? dayjs.unix(track.timestamp).toISOString()
232232+ : new Date().toISOString(),
229233 };
230234231235 if (!Scrobble.validateRecord(record).success) {
···578582 album_id,
579583 artist_id,
580584 uri: scrobbleUri,
585585+ xata_createdat: track.timestamp
586586+ ? dayjs.unix(track.timestamp).toDate()
587587+ : undefined,
581588 });
582589583590 await publishScrobble(ctx, scrobble.xata_id);
···11import albums from "./albums";
22+import apiKeys from "./api-keys";
23import artists from "./artists";
34import playlistTracks from "./playlist-tracks";
45import playlists from "./playlists";
···2223 shoutReports,
2324 playlists,
2425 playlistTracks,
2626+ apiKeys,
2527};
+9
rockskyapi/rocksky-auth/src/types/apikey.ts
···11+import z from "zod";
22+33+export const apiKeySchema = z.object({
44+ name: z.string().nonempty(),
55+ description: z.string().optional().nullable(),
66+ enabled: z.boolean().optional(),
77+});
88+99+export type ApiKey = z.infer<typeof apiKeySchema>;