A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at feat/pgpull 394 lines 12 kB view raw
1use std::{ 2 env, 3 sync::{Arc, Mutex}, 4}; 5 6use anyhow::Error; 7use duckdb::{params, Connection}; 8use owo_colors::OwoColorize; 9use reqwest::Client; 10use serde_json::json; 11use sha2::Digest; 12use sqlx::{Pool, Postgres}; 13 14use crate::{ 15 crypto::{decrypt_aes_256_ctr, generate_token}, 16 types::{self, spotify_token::SpotifyTokenWithEmail}, 17 xata::{self}, 18}; 19 20const ROCKSKY_API: &str = "https://api.rocksky.app"; 21 22pub fn create_tables(conn: Arc<Mutex<Connection>>) -> Result<(), Error> { 23 let conn = conn.lock().unwrap(); 24 conn.execute_batch(r#" 25 CREATE TABLE IF NOT EXISTS playlists ( 26 id TEXT PRIMARY KEY, 27 name TEXT, 28 description TEXT, 29 picture TEXT, 30 spotify_link TEXT, 31 tidal_link TEXT, 32 apple_music_link TEXT, 33 xata_createdat TIMESTAMP, 34 xata_updatedat TIMESTAMP, 35 uri TEXT, 36 created_by TEXT 37 ); 38 CREATE TABLE IF NOT EXISTS tracks ( 39 id VARCHAR PRIMARY KEY, 40 title VARCHAR, 41 artist VARCHAR, 42 album_artist VARCHAR, 43 album_art VARCHAR, 44 album VARCHAR, 45 track_number INTEGER, 46 duration INTEGER, 47 mb_id VARCHAR, 48 youtube_link VARCHAR, 49 spotify_link VARCHAR, 50 tidal_link VARCHAR, 51 apple_music_link VARCHAR, 52 sha256 VARCHAR NOT NULL, 53 lyrics TEXT, 54 composer VARCHAR, 55 genre VARCHAR, 56 disc_number INTEGER, 57 copyright_message VARCHAR, 58 label VARCHAR, 59 uri VARCHAR, 60 artist_uri VARCHAR, 61 album_uri VARCHAR, 62 created_at TIMESTAMP, 63 ); 64 CREATE TABLE IF NOT EXISTS users ( 65 id VARCHAR PRIMARY KEY, 66 display_name VARCHAR, 67 did VARCHAR, 68 handle VARCHAR, 69 avatar VARCHAR, 70 ); 71 CREATE TABLE IF NOT EXISTS playlist_tracks ( 72 id VARCHAR PRIMARY KEY, 73 playlist_id VARCHAR, 74 track_id VARCHAR, 75 added_by VARCHAR, 76 created_at TIMESTAMP, 77 FOREIGN KEY (playlist_id) REFERENCES playlists(id), 78 FOREIGN KEY (track_id) REFERENCES tracks(id), 79 ); 80 CREATE TABLE IF NOT EXISTS user_playlists ( 81 id VARCHAR PRIMARY KEY, 82 user_id VARCHAR, 83 playlist_id VARCHAR, 84 created_at TIMESTAMP, 85 FOREIGN KEY (user_id) REFERENCES users(id), 86 FOREIGN KEY (playlist_id) REFERENCES playlists(id), 87 ); 88 89 CREATE UNIQUE INDEX IF NOT EXISTS user_playlists_unique_index ON user_playlists (user_id, playlist_id); 90 "#)?; 91 Ok(()) 92} 93 94pub async fn load_users(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> { 95 let conn = conn.lock().unwrap(); 96 let users: Vec<xata::user::User> = sqlx::query_as( 97 r#" 98 SELECT * FROM users 99 "#, 100 ) 101 .fetch_all(pool) 102 .await?; 103 104 for (i, user) in users.clone().into_iter().enumerate() { 105 println!("user {} - {}", i, user.display_name.bright_green()); 106 match conn.execute( 107 "INSERT INTO users ( 108 id, 109 display_name, 110 did, 111 handle, 112 avatar 113 ) VALUES (?, 114 ?, 115 ?, 116 ?, 117 ?) ON CONFLICT DO NOTHING", 118 params![ 119 user.xata_id, 120 user.display_name, 121 user.did, 122 user.handle, 123 user.avatar, 124 ], 125 ) { 126 Ok(_) => (), 127 Err(e) => println!("error: {}", e), 128 } 129 } 130 131 println!("users: {:?}", users.len()); 132 Ok(()) 133} 134 135pub async fn find_spotify_users( 136 pool: &Pool<Postgres>, 137 offset: usize, 138 limit: usize, 139) -> Result<Vec<(String, String, String, String)>, Error> { 140 let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as( 141 r#" 142 SELECT * FROM spotify_tokens 143 LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id 144 LEFT JOIN users ON spotify_accounts.user_id = users.xata_id 145 LIMIT $1 OFFSET $2 146 "#, 147 ) 148 .bind(limit as i64) 149 .bind(offset as i64) 150 .fetch_all(pool) 151 .await?; 152 153 let mut user_tokens = vec![]; 154 155 for result in &results { 156 let token = decrypt_aes_256_ctr( 157 &result.refresh_token, 158 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 159 )?; 160 user_tokens.push(( 161 result.email.clone(), 162 token, 163 result.did.clone(), 164 result.user_id.clone(), 165 )); 166 } 167 168 Ok(user_tokens) 169} 170 171pub async fn save_playlists( 172 pool: &Pool<Postgres>, 173 conn: Arc<Mutex<Connection>>, 174 nc: Arc<Mutex<async_nats::Client>>, 175 playlists: Vec<types::playlist::Playlist>, 176 user_id: &str, 177 did: &str, 178) -> Result<(), Error> { 179 let token = generate_token(did)?; 180 for playlist in playlists { 181 println!( 182 "Saving playlist: {} - {} tracks", 183 playlist.name.bright_green(), 184 playlist.tracks.total 185 ); 186 187 sqlx::query( 188 r#" 189 INSERT INTO playlists (name, description, picture, spotify_link, created_by) 190 VALUES ($1, $2, $3, $4, $5) 191 ON CONFLICT (spotify_link) DO UPDATE set 192 name = EXCLUDED.name, 193 description = EXCLUDED.description, 194 picture = EXCLUDED.picture, 195 spotify_link = EXCLUDED.spotify_link, 196 created_by = EXCLUDED.created_by 197 "#, 198 ) 199 .bind(playlist.name) 200 .bind(playlist.description) 201 .bind(playlist.images.first().map(|i| i.url.clone())) 202 .bind(&playlist.external_urls.spotify) 203 .bind(user_id) 204 .execute(pool) 205 .await?; 206 207 let new_playlist: Vec<xata::playlist::Playlist> = 208 sqlx::query_as(r#"SELECT * FROM playlists WHERE spotify_link = $1"#) 209 .bind(&playlist.external_urls.spotify) 210 .fetch_all(pool) 211 .await?; 212 213 let new_playlist = new_playlist.first().unwrap(); 214 215 let nc = nc.lock().unwrap(); 216 nc.publish( 217 "rocksky.playlist", 218 serde_json::to_string(&json!({ 219 "id": new_playlist.xata_id.clone(), 220 "did": did, 221 })) 222 .unwrap() 223 .into(), 224 ) 225 .await?; 226 drop(nc); 227 228 let mut tracks_to_save: Vec<(String, String)> = vec![]; 229 let mut i = 1; 230 for track in playlist.tracks.items.unwrap_or_default() { 231 println!( 232 "Saving track: {} - {}/{}", 233 track.track.name.bright_green(), 234 i, 235 playlist.tracks.total 236 ); 237 i += 1; 238 match save_track(track.track, &token).await? { 239 Some(track) => { 240 println!("Saved track: {}", track.xata_id.bright_green()); 241 tracks_to_save.push((new_playlist.xata_id.clone(), track.xata_id.clone())); 242 } 243 None => { 244 println!("Failed to save track"); 245 } 246 }; 247 } 248 249 // delete all tracks from playlist 250 sqlx::query( 251 r#" 252 DELETE FROM playlist_tracks WHERE playlist_id = $1 253 "#, 254 ) 255 .bind(&new_playlist.xata_id) 256 .execute(pool) 257 .await?; 258 259 // save tracks to playlist 260 for (playlist_id, track_id) in tracks_to_save { 261 sqlx::query( 262 r#" 263 INSERT INTO playlist_tracks (playlist_id, track_id) 264 VALUES ($1, $2) 265 ON CONFLICT DO NOTHING 266 "#, 267 ) 268 .bind(&playlist_id) 269 .bind(&track_id) 270 .execute(pool) 271 .await?; 272 } 273 274 sqlx::query( 275 r#" 276 INSERT INTO user_playlists (user_id, playlist_id) 277 VALUES ($1, $2) 278 ON CONFLICT (user_id, playlist_id) DO NOTHING 279 "#, 280 ) 281 .bind(user_id) 282 .bind(&new_playlist.xata_id) 283 .execute(pool) 284 .await?; 285 286 let user_playlist: Vec<xata::user_playlist::UserPlaylist> = 287 sqlx::query_as("SELECT * FROM user_playlists WHERE user_id = $1 AND playlist_id = $2") 288 .bind(user_id) 289 .bind(&new_playlist.xata_id) 290 .fetch_all(pool) 291 .await?; 292 let user_playlist = user_playlist.first().unwrap(); 293 294 let conn = conn.lock().unwrap(); 295 conn.execute("INSERT INTO playlists (id, name, description, picture, spotify_link, uri, created_by) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING", 296 params![ 297 &new_playlist.xata_id, 298 &new_playlist.name, 299 new_playlist.description, 300 new_playlist.picture, 301 new_playlist.spotify_link, 302 new_playlist.uri, 303 user_id 304 ] 305 )?; 306 307 conn.execute( 308 "INSERT INTO user_playlists (id, user_id, playlist_id, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING", 309 params![ 310 &user_playlist.xata_id, 311 user_id, 312 &new_playlist.xata_id, 313 chrono::Utc::now() 314 ] 315 )?; 316 } 317 Ok(()) 318} 319 320pub async fn save_track( 321 track: types::playlist::Track, 322 token: &str, 323) -> Result<Option<xata::track::Track>, Error> { 324 let client = Client::new(); 325 let response = client 326 .post(&format!("{}/tracks", ROCKSKY_API)) 327 .bearer_auth(token) 328 .json(&serde_json::json!({ 329 "title": track.name, 330 "album": track.album.name, 331 "artist": track.artists.iter().map(|artist| artist.name.clone()).collect::<Vec<String>>().join(", "), 332 "albumArtist": track.album.artists.first().map(|artist| artist.name.clone()), 333 "duration": track.duration_ms, 334 "trackNumber": track.track_number, 335 "releaseDate": match track.album.release_date_precision.as_str() { 336 "day" => Some(track.album.release_date.clone()), 337 _ => None 338 }, 339 "year": match track.album.release_date_precision.as_str() { 340 "day" => Some(track.album.release_date.split('-').next().unwrap().parse::<u32>().unwrap()), 341 "year" => Some(track.album.release_date.parse::<u32>().unwrap()), 342 _ => None 343 }, 344 "discNumber": track.disc_number, 345 "albumArt": track.album.images.first().map(|image| image.url.clone()), 346 "spotifyLink": track.external_urls.spotify, 347 })) 348 .send() 349 .await?; 350 351 if !response.status().is_success() { 352 println!("Failed to save track: {}", response.text().await?); 353 return Ok(None); 354 } 355 356 // `${track.title} - ${track.artist} - ${track.album}`.toLowerCase() 357 let sha256 = format!( 358 "{:x}", 359 sha2::Sha256::digest( 360 format!( 361 "{} - {} - {}", 362 track.name, 363 track 364 .artists 365 .iter() 366 .map(|artist| artist.name.clone()) 367 .collect::<Vec<String>>() 368 .join(", "), 369 track.album.name 370 ) 371 .to_lowercase() 372 .as_bytes() 373 ) 374 ); 375 // get by sha256 376 let response = client 377 .get(&format!("{}/tracks/{}", ROCKSKY_API, sha256)) 378 .bearer_auth(token) 379 .send() 380 .await?; 381 382 // wait 6 seconds to avoid rate limiting 383 tokio::time::sleep(tokio::time::Duration::from_secs(6)).await; 384 let status = response.status(); 385 let data = response.text().await?; 386 387 if !status.is_success() { 388 println!("Failed to get track: {}", data); 389 } 390 391 let track: xata::track::Track = serde_json::from_str(&data)?; 392 393 Ok(Some(track)) 394}