A decentralized music tracking and discovery platform built on AT Protocol 馃幍
rocksky.app
spotify
atproto
lastfm
musicbrainz
scrobbling
listenbrainz
1use std::{
2 env,
3 sync::{Arc, Mutex},
4};
5
6use anyhow::Error;
7use duckdb::{params, Connection};
8use owo_colors::OwoColorize;
9use reqwest::Client;
10use serde_json::json;
11use sha2::Digest;
12use sqlx::{Pool, Postgres};
13
14use crate::{
15 crypto::{decrypt_aes_256_ctr, generate_token},
16 types::{self, spotify_token::SpotifyTokenWithEmail},
17 xata::{self},
18};
19
20const ROCKSKY_API: &str = "https://api.rocksky.app";
21
22pub fn create_tables(conn: Arc<Mutex<Connection>>) -> Result<(), Error> {
23 let conn = conn.lock().unwrap();
24 conn.execute_batch(r#"
25 CREATE TABLE IF NOT EXISTS playlists (
26 id TEXT PRIMARY KEY,
27 name TEXT,
28 description TEXT,
29 picture TEXT,
30 spotify_link TEXT,
31 tidal_link TEXT,
32 apple_music_link TEXT,
33 xata_createdat TIMESTAMP,
34 xata_updatedat TIMESTAMP,
35 uri TEXT,
36 created_by TEXT
37 );
38 CREATE TABLE IF NOT EXISTS tracks (
39 id VARCHAR PRIMARY KEY,
40 title VARCHAR,
41 artist VARCHAR,
42 album_artist VARCHAR,
43 album_art VARCHAR,
44 album VARCHAR,
45 track_number INTEGER,
46 duration INTEGER,
47 mb_id VARCHAR,
48 youtube_link VARCHAR,
49 spotify_link VARCHAR,
50 tidal_link VARCHAR,
51 apple_music_link VARCHAR,
52 sha256 VARCHAR NOT NULL,
53 lyrics TEXT,
54 composer VARCHAR,
55 genre VARCHAR,
56 disc_number INTEGER,
57 copyright_message VARCHAR,
58 label VARCHAR,
59 uri VARCHAR,
60 artist_uri VARCHAR,
61 album_uri VARCHAR,
62 created_at TIMESTAMP,
63 );
64 CREATE TABLE IF NOT EXISTS users (
65 id VARCHAR PRIMARY KEY,
66 display_name VARCHAR,
67 did VARCHAR,
68 handle VARCHAR,
69 avatar VARCHAR,
70 );
71 CREATE TABLE IF NOT EXISTS playlist_tracks (
72 id VARCHAR PRIMARY KEY,
73 playlist_id VARCHAR,
74 track_id VARCHAR,
75 added_by VARCHAR,
76 created_at TIMESTAMP,
77 FOREIGN KEY (playlist_id) REFERENCES playlists(id),
78 FOREIGN KEY (track_id) REFERENCES tracks(id),
79 );
80 CREATE TABLE IF NOT EXISTS user_playlists (
81 id VARCHAR PRIMARY KEY,
82 user_id VARCHAR,
83 playlist_id VARCHAR,
84 created_at TIMESTAMP,
85 FOREIGN KEY (user_id) REFERENCES users(id),
86 FOREIGN KEY (playlist_id) REFERENCES playlists(id),
87 );
88
89 CREATE UNIQUE INDEX IF NOT EXISTS user_playlists_unique_index ON user_playlists (user_id, playlist_id);
90 "#)?;
91 Ok(())
92}
93
94pub async fn load_users(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> {
95 let conn = conn.lock().unwrap();
96 let users: Vec<xata::user::User> = sqlx::query_as(
97 r#"
98 SELECT * FROM users
99 "#,
100 )
101 .fetch_all(pool)
102 .await?;
103
104 for (i, user) in users.clone().into_iter().enumerate() {
105 println!("user {} - {}", i, user.display_name.bright_green());
106 match conn.execute(
107 "INSERT INTO users (
108 id,
109 display_name,
110 did,
111 handle,
112 avatar
113 ) VALUES (?,
114 ?,
115 ?,
116 ?,
117 ?) ON CONFLICT DO NOTHING",
118 params![
119 user.xata_id,
120 user.display_name,
121 user.did,
122 user.handle,
123 user.avatar,
124 ],
125 ) {
126 Ok(_) => (),
127 Err(e) => println!("error: {}", e),
128 }
129 }
130
131 println!("users: {:?}", users.len());
132 Ok(())
133}
134
135pub async fn find_spotify_users(
136 pool: &Pool<Postgres>,
137 offset: usize,
138 limit: usize,
139) -> Result<Vec<(String, String, String, String)>, Error> {
140 let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
141 r#"
142 SELECT * FROM spotify_tokens
143 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
144 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
145 LIMIT $1 OFFSET $2
146 "#,
147 )
148 .bind(limit as i64)
149 .bind(offset as i64)
150 .fetch_all(pool)
151 .await?;
152
153 let mut user_tokens = vec![];
154
155 for result in &results {
156 let token = decrypt_aes_256_ctr(
157 &result.refresh_token,
158 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
159 )?;
160 user_tokens.push((
161 result.email.clone(),
162 token,
163 result.did.clone(),
164 result.user_id.clone(),
165 ));
166 }
167
168 Ok(user_tokens)
169}
170
171pub async fn save_playlists(
172 pool: &Pool<Postgres>,
173 conn: Arc<Mutex<Connection>>,
174 nc: Arc<Mutex<async_nats::Client>>,
175 playlists: Vec<types::playlist::Playlist>,
176 user_id: &str,
177 did: &str,
178) -> Result<(), Error> {
179 let token = generate_token(did)?;
180 for playlist in playlists {
181 println!(
182 "Saving playlist: {} - {} tracks",
183 playlist.name.bright_green(),
184 playlist.tracks.total
185 );
186
187 sqlx::query(
188 r#"
189 INSERT INTO playlists (name, description, picture, spotify_link, created_by)
190 VALUES ($1, $2, $3, $4, $5)
191 ON CONFLICT (spotify_link) DO UPDATE set
192 name = EXCLUDED.name,
193 description = EXCLUDED.description,
194 picture = EXCLUDED.picture,
195 spotify_link = EXCLUDED.spotify_link,
196 created_by = EXCLUDED.created_by
197 "#,
198 )
199 .bind(playlist.name)
200 .bind(playlist.description)
201 .bind(playlist.images.first().map(|i| i.url.clone()))
202 .bind(&playlist.external_urls.spotify)
203 .bind(user_id)
204 .execute(pool)
205 .await?;
206
207 let new_playlist: Vec<xata::playlist::Playlist> =
208 sqlx::query_as(r#"SELECT * FROM playlists WHERE spotify_link = $1"#)
209 .bind(&playlist.external_urls.spotify)
210 .fetch_all(pool)
211 .await?;
212
213 let new_playlist = new_playlist.first().unwrap();
214
215 let nc = nc.lock().unwrap();
216 nc.publish(
217 "rocksky.playlist",
218 serde_json::to_string(&json!({
219 "id": new_playlist.xata_id.clone(),
220 "did": did,
221 }))
222 .unwrap()
223 .into(),
224 )
225 .await?;
226 drop(nc);
227
228 let mut tracks_to_save: Vec<(String, String)> = vec![];
229 let mut i = 1;
230 for track in playlist.tracks.items.unwrap_or_default() {
231 println!(
232 "Saving track: {} - {}/{}",
233 track.track.name.bright_green(),
234 i,
235 playlist.tracks.total
236 );
237 i += 1;
238 match save_track(track.track, &token).await? {
239 Some(track) => {
240 println!("Saved track: {}", track.xata_id.bright_green());
241 tracks_to_save.push((new_playlist.xata_id.clone(), track.xata_id.clone()));
242 }
243 None => {
244 println!("Failed to save track");
245 }
246 };
247 }
248
249 // delete all tracks from playlist
250 sqlx::query(
251 r#"
252 DELETE FROM playlist_tracks WHERE playlist_id = $1
253 "#,
254 )
255 .bind(&new_playlist.xata_id)
256 .execute(pool)
257 .await?;
258
259 // save tracks to playlist
260 for (playlist_id, track_id) in tracks_to_save {
261 sqlx::query(
262 r#"
263 INSERT INTO playlist_tracks (playlist_id, track_id)
264 VALUES ($1, $2)
265 ON CONFLICT DO NOTHING
266 "#,
267 )
268 .bind(&playlist_id)
269 .bind(&track_id)
270 .execute(pool)
271 .await?;
272 }
273
274 sqlx::query(
275 r#"
276 INSERT INTO user_playlists (user_id, playlist_id)
277 VALUES ($1, $2)
278 ON CONFLICT (user_id, playlist_id) DO NOTHING
279 "#,
280 )
281 .bind(user_id)
282 .bind(&new_playlist.xata_id)
283 .execute(pool)
284 .await?;
285
286 let user_playlist: Vec<xata::user_playlist::UserPlaylist> =
287 sqlx::query_as("SELECT * FROM user_playlists WHERE user_id = $1 AND playlist_id = $2")
288 .bind(user_id)
289 .bind(&new_playlist.xata_id)
290 .fetch_all(pool)
291 .await?;
292 let user_playlist = user_playlist.first().unwrap();
293
294 let conn = conn.lock().unwrap();
295 conn.execute("INSERT INTO playlists (id, name, description, picture, spotify_link, uri, created_by) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING",
296 params![
297 &new_playlist.xata_id,
298 &new_playlist.name,
299 new_playlist.description,
300 new_playlist.picture,
301 new_playlist.spotify_link,
302 new_playlist.uri,
303 user_id
304 ]
305 )?;
306
307 conn.execute(
308 "INSERT INTO user_playlists (id, user_id, playlist_id, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING",
309 params![
310 &user_playlist.xata_id,
311 user_id,
312 &new_playlist.xata_id,
313 chrono::Utc::now()
314 ]
315 )?;
316 }
317 Ok(())
318}
319
320pub async fn save_track(
321 track: types::playlist::Track,
322 token: &str,
323) -> Result<Option<xata::track::Track>, Error> {
324 let client = Client::new();
325 let response = client
326 .post(&format!("{}/tracks", ROCKSKY_API))
327 .bearer_auth(token)
328 .json(&serde_json::json!({
329 "title": track.name,
330 "album": track.album.name,
331 "artist": track.artists.iter().map(|artist| artist.name.clone()).collect::<Vec<String>>().join(", "),
332 "albumArtist": track.album.artists.first().map(|artist| artist.name.clone()),
333 "duration": track.duration_ms,
334 "trackNumber": track.track_number,
335 "releaseDate": match track.album.release_date_precision.as_str() {
336 "day" => Some(track.album.release_date.clone()),
337 _ => None
338 },
339 "year": match track.album.release_date_precision.as_str() {
340 "day" => Some(track.album.release_date.split('-').next().unwrap().parse::<u32>().unwrap()),
341 "year" => Some(track.album.release_date.parse::<u32>().unwrap()),
342 _ => None
343 },
344 "discNumber": track.disc_number,
345 "albumArt": track.album.images.first().map(|image| image.url.clone()),
346 "spotifyLink": track.external_urls.spotify,
347 }))
348 .send()
349 .await?;
350
351 if !response.status().is_success() {
352 println!("Failed to save track: {}", response.text().await?);
353 return Ok(None);
354 }
355
356 // `${track.title} - ${track.artist} - ${track.album}`.toLowerCase()
357 let sha256 = format!(
358 "{:x}",
359 sha2::Sha256::digest(
360 format!(
361 "{} - {} - {}",
362 track.name,
363 track
364 .artists
365 .iter()
366 .map(|artist| artist.name.clone())
367 .collect::<Vec<String>>()
368 .join(", "),
369 track.album.name
370 )
371 .to_lowercase()
372 .as_bytes()
373 )
374 );
375 // get by sha256
376 let response = client
377 .get(&format!("{}/tracks/{}", ROCKSKY_API, sha256))
378 .bearer_auth(token)
379 .send()
380 .await?;
381
382 // wait 6 seconds to avoid rate limiting
383 tokio::time::sleep(tokio::time::Duration::from_secs(6)).await;
384 let status = response.status();
385 let data = response.text().await?;
386
387 if !status.is_success() {
388 println!("Failed to get track: {}", data);
389 }
390
391 let track: xata::track::Track = serde_json::from_str(&data)?;
392
393 Ok(Some(track))
394}