A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at main 402 lines 12 kB view raw
1use std::{ 2 env, 3 sync::{Arc, Mutex}, 4}; 5 6use anyhow::Error; 7use duckdb::{params, Connection}; 8use owo_colors::OwoColorize; 9use reqwest::Client; 10use serde_json::json; 11use sha2::Digest; 12use sqlx::{Pool, Postgres}; 13 14use crate::{ 15 crypto::{decrypt_aes_256_ctr, generate_token}, 16 types::{self, spotify_token::SpotifyTokenWithEmail}, 17 xata::{self}, 18}; 19 20const ROCKSKY_API: &str = "https://api.rocksky.app"; 21 22pub fn create_tables(conn: Arc<Mutex<Connection>>) -> Result<(), Error> { 23 let conn = conn.lock().unwrap(); 24 conn.execute_batch(r#" 25 CREATE TABLE IF NOT EXISTS playlists ( 26 id TEXT PRIMARY KEY, 27 name TEXT, 28 description TEXT, 29 picture TEXT, 30 spotify_link TEXT, 31 tidal_link TEXT, 32 apple_music_link TEXT, 33 xata_createdat TIMESTAMP, 34 xata_updatedat TIMESTAMP, 35 uri TEXT, 36 created_by TEXT 37 ); 38 CREATE TABLE IF NOT EXISTS tracks ( 39 id VARCHAR PRIMARY KEY, 40 title VARCHAR, 41 artist VARCHAR, 42 album_artist VARCHAR, 43 album_art VARCHAR, 44 album VARCHAR, 45 track_number INTEGER, 46 duration INTEGER, 47 mb_id VARCHAR, 48 youtube_link VARCHAR, 49 spotify_link VARCHAR, 50 tidal_link VARCHAR, 51 apple_music_link VARCHAR, 52 sha256 VARCHAR NOT NULL, 53 lyrics TEXT, 54 composer VARCHAR, 55 genre VARCHAR, 56 disc_number INTEGER, 57 copyright_message VARCHAR, 58 label VARCHAR, 59 uri VARCHAR, 60 artist_uri VARCHAR, 61 album_uri VARCHAR, 62 created_at TIMESTAMP, 63 ); 64 CREATE TABLE IF NOT EXISTS users ( 65 id VARCHAR PRIMARY KEY, 66 display_name VARCHAR, 67 did VARCHAR, 68 handle VARCHAR, 69 avatar VARCHAR, 70 ); 71 CREATE TABLE IF NOT EXISTS playlist_tracks ( 72 id VARCHAR PRIMARY KEY, 73 playlist_id VARCHAR, 74 track_id VARCHAR, 75 added_by VARCHAR, 76 created_at TIMESTAMP, 77 FOREIGN KEY (playlist_id) REFERENCES playlists(id), 78 FOREIGN KEY (track_id) REFERENCES tracks(id), 79 ); 80 CREATE TABLE IF NOT EXISTS user_playlists ( 81 id VARCHAR PRIMARY KEY, 82 user_id VARCHAR, 83 playlist_id VARCHAR, 84 created_at TIMESTAMP, 85 FOREIGN KEY (user_id) REFERENCES users(id), 86 FOREIGN KEY (playlist_id) REFERENCES playlists(id), 87 ); 88 89 CREATE UNIQUE INDEX IF NOT EXISTS user_playlists_unique_index ON user_playlists (user_id, playlist_id); 90 "#)?; 91 Ok(()) 92} 93 94pub async fn load_users(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> { 95 let conn = conn.lock().unwrap(); 96 let users: Vec<xata::user::User> = sqlx::query_as( 97 r#" 98 SELECT * FROM users 99 "#, 100 ) 101 .fetch_all(pool) 102 .await?; 103 104 for (i, user) in users.clone().into_iter().enumerate() { 105 println!("user {} - {}", i, user.display_name.bright_green()); 106 match conn.execute( 107 "INSERT INTO users ( 108 id, 109 display_name, 110 did, 111 handle, 112 avatar 113 ) VALUES (?, 114 ?, 115 ?, 116 ?, 117 ?) ON CONFLICT DO NOTHING", 118 params![ 119 user.xata_id, 120 user.display_name, 121 user.did, 122 user.handle, 123 user.avatar, 124 ], 125 ) { 126 Ok(_) => (), 127 Err(e) => println!("error: {}", e), 128 } 129 } 130 131 println!("users: {:?}", users.len()); 132 Ok(()) 133} 134 135pub async fn find_spotify_users( 136 pool: &Pool<Postgres>, 137 offset: usize, 138 limit: usize, 139) -> Result<Vec<(String, String, String, String, String, String)>, Error> { 140 let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as( 141 r#" 142 SELECT * FROM spotify_tokens 143 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id 144 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id 145 LEFT JOIN spotify_apps ON spotify_tokens.spotify_app_id = spotify_apps.spotify_app_id 146 WHERE spotify_accounts.is_beta_user = true 147 LIMIT $1 OFFSET $2 148 "#, 149 ) 150 .bind(limit as i64) 151 .bind(offset as i64) 152 .fetch_all(pool) 153 .await?; 154 155 let mut user_tokens = vec![]; 156 157 for result in &results { 158 let token = decrypt_aes_256_ctr( 159 &result.refresh_token, 160 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 161 )?; 162 let spotify_secret = decrypt_aes_256_ctr( 163 &result.spotify_secret, 164 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 165 )?; 166 user_tokens.push(( 167 result.email.clone(), 168 token, 169 result.did.clone(), 170 result.user_id.clone(), 171 result.spotify_app_id.clone(), 172 spotify_secret.clone(), 173 )); 174 } 175 176 Ok(user_tokens) 177} 178 179pub async fn save_playlists( 180 pool: &Pool<Postgres>, 181 conn: Arc<Mutex<Connection>>, 182 nc: Arc<Mutex<async_nats::Client>>, 183 playlists: Vec<types::playlist::Playlist>, 184 user_id: &str, 185 did: &str, 186) -> Result<(), Error> { 187 let token = generate_token(did)?; 188 for playlist in playlists { 189 println!( 190 "Saving playlist: {} - {} tracks", 191 playlist.name.bright_green(), 192 playlist.tracks.total 193 ); 194 195 sqlx::query( 196 r#" 197 INSERT INTO playlists (name, description, picture, spotify_link, created_by) 198 VALUES ($1, $2, $3, $4, $5) 199 ON CONFLICT (spotify_link) DO UPDATE set 200 name = EXCLUDED.name, 201 description = EXCLUDED.description, 202 picture = EXCLUDED.picture, 203 spotify_link = EXCLUDED.spotify_link, 204 created_by = EXCLUDED.created_by 205 "#, 206 ) 207 .bind(playlist.name) 208 .bind(playlist.description) 209 .bind(playlist.images.first().map(|i| i.url.clone())) 210 .bind(&playlist.external_urls.spotify) 211 .bind(user_id) 212 .execute(pool) 213 .await?; 214 215 let new_playlist: Vec<xata::playlist::Playlist> = 216 sqlx::query_as(r#"SELECT * FROM playlists WHERE spotify_link = $1"#) 217 .bind(&playlist.external_urls.spotify) 218 .fetch_all(pool) 219 .await?; 220 221 let new_playlist = new_playlist.first().unwrap(); 222 223 let nc = nc.lock().unwrap(); 224 nc.publish( 225 "rocksky.playlist", 226 serde_json::to_string(&json!({ 227 "id": new_playlist.xata_id.clone(), 228 "did": did, 229 })) 230 .unwrap() 231 .into(), 232 ) 233 .await?; 234 drop(nc); 235 236 let mut tracks_to_save: Vec<(String, String)> = vec![]; 237 let mut i = 1; 238 for track in playlist.tracks.items.unwrap_or_default() { 239 println!( 240 "Saving track: {} - {}/{}", 241 track.track.name.bright_green(), 242 i, 243 playlist.tracks.total 244 ); 245 i += 1; 246 match save_track(track.track, &token).await? { 247 Some(track) => { 248 println!("Saved track: {}", track.xata_id.bright_green()); 249 tracks_to_save.push((new_playlist.xata_id.clone(), track.xata_id.clone())); 250 } 251 None => { 252 println!("Failed to save track"); 253 } 254 }; 255 } 256 257 // delete all tracks from playlist 258 sqlx::query( 259 r#" 260 DELETE FROM playlist_tracks WHERE playlist_id = $1 261 "#, 262 ) 263 .bind(&new_playlist.xata_id) 264 .execute(pool) 265 .await?; 266 267 // save tracks to playlist 268 for (playlist_id, track_id) in tracks_to_save { 269 sqlx::query( 270 r#" 271 INSERT INTO playlist_tracks (playlist_id, track_id) 272 VALUES ($1, $2) 273 ON CONFLICT DO NOTHING 274 "#, 275 ) 276 .bind(&playlist_id) 277 .bind(&track_id) 278 .execute(pool) 279 .await?; 280 } 281 282 sqlx::query( 283 r#" 284 INSERT INTO user_playlists (user_id, playlist_id) 285 VALUES ($1, $2) 286 ON CONFLICT (user_id, playlist_id) DO NOTHING 287 "#, 288 ) 289 .bind(user_id) 290 .bind(&new_playlist.xata_id) 291 .execute(pool) 292 .await?; 293 294 let user_playlist: Vec<xata::user_playlist::UserPlaylist> = 295 sqlx::query_as("SELECT * FROM user_playlists WHERE user_id = $1 AND playlist_id = $2") 296 .bind(user_id) 297 .bind(&new_playlist.xata_id) 298 .fetch_all(pool) 299 .await?; 300 let user_playlist = user_playlist.first().unwrap(); 301 302 let conn = conn.lock().unwrap(); 303 conn.execute("INSERT INTO playlists (id, name, description, picture, spotify_link, uri, created_by) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING", 304 params![ 305 &new_playlist.xata_id, 306 &new_playlist.name, 307 new_playlist.description, 308 new_playlist.picture, 309 new_playlist.spotify_link, 310 new_playlist.uri, 311 user_id 312 ] 313 )?; 314 315 conn.execute( 316 "INSERT INTO user_playlists (id, user_id, playlist_id, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING", 317 params![ 318 &user_playlist.xata_id, 319 user_id, 320 &new_playlist.xata_id, 321 chrono::Utc::now() 322 ] 323 )?; 324 } 325 Ok(()) 326} 327 328pub async fn save_track( 329 track: types::playlist::Track, 330 token: &str, 331) -> Result<Option<xata::track::Track>, Error> { 332 let client = Client::new(); 333 let response = client 334 .post(&format!("{}/tracks", ROCKSKY_API)) 335 .bearer_auth(token) 336 .json(&serde_json::json!({ 337 "title": track.name, 338 "album": track.album.name, 339 "artist": track.artists.iter().map(|artist| artist.name.clone()).collect::<Vec<String>>().join(", "), 340 "albumArtist": track.album.artists.first().map(|artist| artist.name.clone()), 341 "duration": track.duration_ms, 342 "trackNumber": track.track_number, 343 "releaseDate": match track.album.release_date_precision.as_str() { 344 "day" => Some(track.album.release_date.clone()), 345 _ => None 346 }, 347 "year": match track.album.release_date_precision.as_str() { 348 "day" => Some(track.album.release_date.split('-').next().unwrap().parse::<u32>().unwrap()), 349 "year" => Some(track.album.release_date.parse::<u32>().unwrap()), 350 _ => None 351 }, 352 "discNumber": track.disc_number, 353 "albumArt": track.album.images.first().map(|image| image.url.clone()), 354 "spotifyLink": track.external_urls.spotify, 355 })) 356 .send() 357 .await?; 358 359 if !response.status().is_success() { 360 println!("Failed to save track: {}", response.text().await?); 361 return Ok(None); 362 } 363 364 // `${track.title} - ${track.artist} - ${track.album}`.toLowerCase() 365 let sha256 = format!( 366 "{:x}", 367 sha2::Sha256::digest( 368 format!( 369 "{} - {} - {}", 370 track.name, 371 track 372 .artists 373 .iter() 374 .map(|artist| artist.name.clone()) 375 .collect::<Vec<String>>() 376 .join(", "), 377 track.album.name 378 ) 379 .to_lowercase() 380 .as_bytes() 381 ) 382 ); 383 // get by sha256 384 let response = client 385 .get(&format!("{}/tracks/{}", ROCKSKY_API, sha256)) 386 .bearer_auth(token) 387 .send() 388 .await?; 389 390 // wait 6 seconds to avoid rate limiting 391 tokio::time::sleep(tokio::time::Duration::from_secs(6)).await; 392 let status = response.status(); 393 let data = response.text().await?; 394 395 if !status.is_success() { 396 println!("Failed to get track: {}", data); 397 } 398 399 let track: xata::track::Track = serde_json::from_str(&data)?; 400 401 Ok(Some(track)) 402}