forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use std::{
2 env,
3 sync::{Arc, Mutex},
4};
5
6use anyhow::Error;
7use duckdb::{params, Connection};
8use owo_colors::OwoColorize;
9use reqwest::Client;
10use serde_json::json;
11use sha2::Digest;
12use sqlx::{Pool, Postgres};
13
14use crate::{
15 crypto::{decrypt_aes_256_ctr, generate_token},
16 types::{self, spotify_token::SpotifyTokenWithEmail},
17 xata::{self},
18};
19
20const ROCKSKY_API: &str = "https://api.rocksky.app";
21
22pub fn create_tables(conn: Arc<Mutex<Connection>>) -> Result<(), Error> {
23 let conn = conn.lock().unwrap();
24 conn.execute_batch(r#"
25 CREATE TABLE IF NOT EXISTS playlists (
26 id TEXT PRIMARY KEY,
27 name TEXT,
28 description TEXT,
29 picture TEXT,
30 spotify_link TEXT,
31 tidal_link TEXT,
32 apple_music_link TEXT,
33 xata_createdat TIMESTAMP,
34 xata_updatedat TIMESTAMP,
35 uri TEXT,
36 created_by TEXT
37 );
38 CREATE TABLE IF NOT EXISTS tracks (
39 id VARCHAR PRIMARY KEY,
40 title VARCHAR,
41 artist VARCHAR,
42 album_artist VARCHAR,
43 album_art VARCHAR,
44 album VARCHAR,
45 track_number INTEGER,
46 duration INTEGER,
47 mb_id VARCHAR,
48 youtube_link VARCHAR,
49 spotify_link VARCHAR,
50 tidal_link VARCHAR,
51 apple_music_link VARCHAR,
52 sha256 VARCHAR NOT NULL,
53 lyrics TEXT,
54 composer VARCHAR,
55 genre VARCHAR,
56 disc_number INTEGER,
57 copyright_message VARCHAR,
58 label VARCHAR,
59 uri VARCHAR,
60 artist_uri VARCHAR,
61 album_uri VARCHAR,
62 created_at TIMESTAMP,
63 );
64 CREATE TABLE IF NOT EXISTS users (
65 id VARCHAR PRIMARY KEY,
66 display_name VARCHAR,
67 did VARCHAR,
68 handle VARCHAR,
69 avatar VARCHAR,
70 );
71 CREATE TABLE IF NOT EXISTS playlist_tracks (
72 id VARCHAR PRIMARY KEY,
73 playlist_id VARCHAR,
74 track_id VARCHAR,
75 added_by VARCHAR,
76 created_at TIMESTAMP,
77 FOREIGN KEY (playlist_id) REFERENCES playlists(id),
78 FOREIGN KEY (track_id) REFERENCES tracks(id),
79 );
80 CREATE TABLE IF NOT EXISTS user_playlists (
81 id VARCHAR PRIMARY KEY,
82 user_id VARCHAR,
83 playlist_id VARCHAR,
84 created_at TIMESTAMP,
85 FOREIGN KEY (user_id) REFERENCES users(id),
86 FOREIGN KEY (playlist_id) REFERENCES playlists(id),
87 );
88
89 CREATE UNIQUE INDEX IF NOT EXISTS user_playlists_unique_index ON user_playlists (user_id, playlist_id);
90 "#)?;
91 Ok(())
92}
93
94pub async fn load_users(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> {
95 let conn = conn.lock().unwrap();
96 let users: Vec<xata::user::User> = sqlx::query_as(
97 r#"
98 SELECT * FROM users
99 "#,
100 )
101 .fetch_all(pool)
102 .await?;
103
104 for (i, user) in users.clone().into_iter().enumerate() {
105 println!("user {} - {}", i, user.display_name.bright_green());
106 match conn.execute(
107 "INSERT INTO users (
108 id,
109 display_name,
110 did,
111 handle,
112 avatar
113 ) VALUES (?,
114 ?,
115 ?,
116 ?,
117 ?) ON CONFLICT DO NOTHING",
118 params![
119 user.xata_id,
120 user.display_name,
121 user.did,
122 user.handle,
123 user.avatar,
124 ],
125 ) {
126 Ok(_) => (),
127 Err(e) => println!("error: {}", e),
128 }
129 }
130
131 println!("users: {:?}", users.len());
132 Ok(())
133}
134
135pub async fn find_spotify_users(
136 pool: &Pool<Postgres>,
137 offset: usize,
138 limit: usize,
139) -> Result<Vec<(String, String, String, String, String, String)>, Error> {
140 let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
141 r#"
142 SELECT * FROM spotify_tokens
143 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
144 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
145 LEFT JOIN spotify_apps ON spotify_tokens.spotify_app_id = spotify_apps.spotify_app_id
146 WHERE spotify_accounts.is_beta_user = true
147 LIMIT $1 OFFSET $2
148 "#,
149 )
150 .bind(limit as i64)
151 .bind(offset as i64)
152 .fetch_all(pool)
153 .await?;
154
155 let mut user_tokens = vec![];
156
157 for result in &results {
158 let token = decrypt_aes_256_ctr(
159 &result.refresh_token,
160 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
161 )?;
162 let spotify_secret = decrypt_aes_256_ctr(
163 &result.spotify_secret,
164 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
165 )?;
166 user_tokens.push((
167 result.email.clone(),
168 token,
169 result.did.clone(),
170 result.user_id.clone(),
171 result.spotify_app_id.clone(),
172 spotify_secret.clone(),
173 ));
174 }
175
176 Ok(user_tokens)
177}
178
179pub async fn save_playlists(
180 pool: &Pool<Postgres>,
181 conn: Arc<Mutex<Connection>>,
182 nc: Arc<Mutex<async_nats::Client>>,
183 playlists: Vec<types::playlist::Playlist>,
184 user_id: &str,
185 did: &str,
186) -> Result<(), Error> {
187 let token = generate_token(did)?;
188 for playlist in playlists {
189 println!(
190 "Saving playlist: {} - {} tracks",
191 playlist.name.bright_green(),
192 playlist.tracks.total
193 );
194
195 sqlx::query(
196 r#"
197 INSERT INTO playlists (name, description, picture, spotify_link, created_by)
198 VALUES ($1, $2, $3, $4, $5)
199 ON CONFLICT (spotify_link) DO UPDATE set
200 name = EXCLUDED.name,
201 description = EXCLUDED.description,
202 picture = EXCLUDED.picture,
203 spotify_link = EXCLUDED.spotify_link,
204 created_by = EXCLUDED.created_by
205 "#,
206 )
207 .bind(playlist.name)
208 .bind(playlist.description)
209 .bind(playlist.images.first().map(|i| i.url.clone()))
210 .bind(&playlist.external_urls.spotify)
211 .bind(user_id)
212 .execute(pool)
213 .await?;
214
215 let new_playlist: Vec<xata::playlist::Playlist> =
216 sqlx::query_as(r#"SELECT * FROM playlists WHERE spotify_link = $1"#)
217 .bind(&playlist.external_urls.spotify)
218 .fetch_all(pool)
219 .await?;
220
221 let new_playlist = new_playlist.first().unwrap();
222
223 let nc = nc.lock().unwrap();
224 nc.publish(
225 "rocksky.playlist",
226 serde_json::to_string(&json!({
227 "id": new_playlist.xata_id.clone(),
228 "did": did,
229 }))
230 .unwrap()
231 .into(),
232 )
233 .await?;
234 drop(nc);
235
236 let mut tracks_to_save: Vec<(String, String)> = vec![];
237 let mut i = 1;
238 for track in playlist.tracks.items.unwrap_or_default() {
239 println!(
240 "Saving track: {} - {}/{}",
241 track.track.name.bright_green(),
242 i,
243 playlist.tracks.total
244 );
245 i += 1;
246 match save_track(track.track, &token).await? {
247 Some(track) => {
248 println!("Saved track: {}", track.xata_id.bright_green());
249 tracks_to_save.push((new_playlist.xata_id.clone(), track.xata_id.clone()));
250 }
251 None => {
252 println!("Failed to save track");
253 }
254 };
255 }
256
257 // delete all tracks from playlist
258 sqlx::query(
259 r#"
260 DELETE FROM playlist_tracks WHERE playlist_id = $1
261 "#,
262 )
263 .bind(&new_playlist.xata_id)
264 .execute(pool)
265 .await?;
266
267 // save tracks to playlist
268 for (playlist_id, track_id) in tracks_to_save {
269 sqlx::query(
270 r#"
271 INSERT INTO playlist_tracks (playlist_id, track_id)
272 VALUES ($1, $2)
273 ON CONFLICT DO NOTHING
274 "#,
275 )
276 .bind(&playlist_id)
277 .bind(&track_id)
278 .execute(pool)
279 .await?;
280 }
281
282 sqlx::query(
283 r#"
284 INSERT INTO user_playlists (user_id, playlist_id)
285 VALUES ($1, $2)
286 ON CONFLICT (user_id, playlist_id) DO NOTHING
287 "#,
288 )
289 .bind(user_id)
290 .bind(&new_playlist.xata_id)
291 .execute(pool)
292 .await?;
293
294 let user_playlist: Vec<xata::user_playlist::UserPlaylist> =
295 sqlx::query_as("SELECT * FROM user_playlists WHERE user_id = $1 AND playlist_id = $2")
296 .bind(user_id)
297 .bind(&new_playlist.xata_id)
298 .fetch_all(pool)
299 .await?;
300 let user_playlist = user_playlist.first().unwrap();
301
302 let conn = conn.lock().unwrap();
303 conn.execute("INSERT INTO playlists (id, name, description, picture, spotify_link, uri, created_by) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING",
304 params![
305 &new_playlist.xata_id,
306 &new_playlist.name,
307 new_playlist.description,
308 new_playlist.picture,
309 new_playlist.spotify_link,
310 new_playlist.uri,
311 user_id
312 ]
313 )?;
314
315 conn.execute(
316 "INSERT INTO user_playlists (id, user_id, playlist_id, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING",
317 params![
318 &user_playlist.xata_id,
319 user_id,
320 &new_playlist.xata_id,
321 chrono::Utc::now()
322 ]
323 )?;
324 }
325 Ok(())
326}
327
328pub async fn save_track(
329 track: types::playlist::Track,
330 token: &str,
331) -> Result<Option<xata::track::Track>, Error> {
332 let client = Client::new();
333 let response = client
334 .post(&format!("{}/tracks", ROCKSKY_API))
335 .bearer_auth(token)
336 .json(&serde_json::json!({
337 "title": track.name,
338 "album": track.album.name,
339 "artist": track.artists.iter().map(|artist| artist.name.clone()).collect::<Vec<String>>().join(", "),
340 "albumArtist": track.album.artists.first().map(|artist| artist.name.clone()),
341 "duration": track.duration_ms,
342 "trackNumber": track.track_number,
343 "releaseDate": match track.album.release_date_precision.as_str() {
344 "day" => Some(track.album.release_date.clone()),
345 _ => None
346 },
347 "year": match track.album.release_date_precision.as_str() {
348 "day" => Some(track.album.release_date.split('-').next().unwrap().parse::<u32>().unwrap()),
349 "year" => Some(track.album.release_date.parse::<u32>().unwrap()),
350 _ => None
351 },
352 "discNumber": track.disc_number,
353 "albumArt": track.album.images.first().map(|image| image.url.clone()),
354 "spotifyLink": track.external_urls.spotify,
355 }))
356 .send()
357 .await?;
358
359 if !response.status().is_success() {
360 println!("Failed to save track: {}", response.text().await?);
361 return Ok(None);
362 }
363
364 // `${track.title} - ${track.artist} - ${track.album}`.toLowerCase()
365 let sha256 = format!(
366 "{:x}",
367 sha2::Sha256::digest(
368 format!(
369 "{} - {} - {}",
370 track.name,
371 track
372 .artists
373 .iter()
374 .map(|artist| artist.name.clone())
375 .collect::<Vec<String>>()
376 .join(", "),
377 track.album.name
378 )
379 .to_lowercase()
380 .as_bytes()
381 )
382 );
383 // get by sha256
384 let response = client
385 .get(&format!("{}/tracks/{}", ROCKSKY_API, sha256))
386 .bearer_auth(token)
387 .send()
388 .await?;
389
390 // wait 6 seconds to avoid rate limiting
391 tokio::time::sleep(tokio::time::Duration::from_secs(6)).await;
392 let status = response.status();
393 let data = response.text().await?;
394
395 if !status.is_success() {
396 println!("Failed to get track: {}", data);
397 }
398
399 let track: xata::track::Track = serde_json::from_str(&data)?;
400
401 Ok(Some(track))
402}