A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at feat/pgpull 729 lines 22 kB view raw
1use std::sync::{Arc, Mutex}; 2 3use anyhow::Error; 4use duckdb::{params, Connection}; 5use owo_colors::OwoColorize; 6use sqlx::{Pool, Postgres}; 7 8use crate::xata; 9 10pub async fn create_tables(conn: &Connection) -> Result<(), Error> { 11 conn.execute_batch( 12 "BEGIN; 13 CREATE TABLE IF NOT EXISTS artists ( 14 id VARCHAR PRIMARY KEY, 15 name VARCHAR NOT NULL, 16 biography TEXT, 17 born DATE, 18 born_in VARCHAR, 19 died DATE, 20 picture VARCHAR, 21 sha256 VARCHAR NOT NULL, 22 spotify_link VARCHAR, 23 tidal_link VARCHAR, 24 youtube_link VARCHAR, 25 apple_music_link VARCHAR, 26 uri VARCHAR, 27 ); 28 CREATE TABLE IF NOT EXISTS albums ( 29 id VARCHAR PRIMARY KEY, 30 title VARCHAR NOT NULL, 31 artist VARCHAR NOT NULL, 32 release_date DATE, 33 album_art VARCHAR, 34 year INTEGER, 35 spotify_link VARCHAR, 36 tidal_link VARCHAR, 37 youtube_link VARCHAR, 38 apple_music_link VARCHAR, 39 sha256 VARCHAR NOT NULL, 40 uri VARCHAR, 41 artist_uri VARCHAR, 42 ); 43 CREATE TABLE IF NOT EXISTS tracks ( 44 id VARCHAR PRIMARY KEY, 45 title VARCHAR, 46 artist VARCHAR, 47 album_artist VARCHAR, 48 album_art VARCHAR, 49 album VARCHAR, 50 track_number INTEGER, 51 duration INTEGER, 52 mb_id VARCHAR, 53 youtube_link VARCHAR, 54 spotify_link VARCHAR, 55 tidal_link VARCHAR, 56 apple_music_link VARCHAR, 57 sha256 VARCHAR NOT NULL, 58 lyrics TEXT, 59 composer VARCHAR, 60 genre VARCHAR, 61 disc_number INTEGER, 62 copyright_message VARCHAR, 63 label VARCHAR, 64 uri VARCHAR, 65 artist_uri VARCHAR, 66 album_uri VARCHAR, 67 created_at TIMESTAMP, 68 ); 69 CREATE TABLE IF NOT EXISTS album_tracks ( 70 id VARCHAR PRIMARY KEY, 71 album_id VARCHAR, 72 track_id VARCHAR, 73 FOREIGN KEY (album_id) REFERENCES albums(id), 74 FOREIGN KEY (track_id) REFERENCES tracks(id), 75 ); 76 CREATE TABLE IF NOT EXISTS users ( 77 id VARCHAR PRIMARY KEY, 78 display_name VARCHAR, 79 did VARCHAR, 80 handle VARCHAR, 81 avatar VARCHAR, 82 ); 83 CREATE TABLE IF NOT EXISTS playlists ( 84 id VARCHAR PRIMARY KEY, 85 name VARCHAR, 86 description TEXT, 87 picture VARCHAR, 88 created_at TIMESTAMP, 89 updated_at TIMESTAMP, 90 uri VARCHAR, 91 created_by VARCHAR NOT NULL, 92 FOREIGN KEY (created_by) REFERENCES users(id), 93 ); 94 CREATE TABLE IF NOT EXISTS playlist_tracks ( 95 id VARCHAR PRIMARY KEY, 96 playlist_id VARCHAR, 97 track_id VARCHAR, 98 added_by VARCHAR, 99 created_at TIMESTAMP, 100 FOREIGN KEY (playlist_id) REFERENCES playlists(id), 101 FOREIGN KEY (track_id) REFERENCES tracks(id), 102 ); 103 CREATE TABLE IF NOT EXISTS user_tracks ( 104 id VARCHAR PRIMARY KEY, 105 user_id VARCHAR, 106 track_id VARCHAR, 107 created_at TIMESTAMP, 108 FOREIGN KEY (user_id) REFERENCES users(id), 109 FOREIGN KEY (track_id) REFERENCES tracks(id), 110 ); 111 CREATE TABLE IF NOT EXISTS user_albums ( 112 id VARCHAR PRIMARY KEY, 113 user_id VARCHAR, 114 album_id VARCHAR, 115 created_at TIMESTAMP, 116 FOREIGN KEY (user_id) REFERENCES users(id), 117 FOREIGN KEY (album_id) REFERENCES albums(id), 118 ); 119 CREATE TABLE IF NOT EXISTS user_artists ( 120 id VARCHAR PRIMARY KEY, 121 user_id VARCHAR, 122 artist_id VARCHAR, 123 created_at TIMESTAMP, 124 FOREIGN KEY (user_id) REFERENCES users(id), 125 FOREIGN KEY (artist_id) REFERENCES artists(id), 126 ); 127 CREATE TABLE IF NOT EXISTS user_playlists ( 128 id VARCHAR PRIMARY KEY, 129 user_id VARCHAR, 130 playlist_id VARCHAR, 131 created_at TIMESTAMP, 132 FOREIGN KEY (user_id) REFERENCES users(id), 133 FOREIGN KEY (playlist_id) REFERENCES playlists(id), 134 ); 135 CREATE TABLE IF NOT EXISTS loved_tracks ( 136 id VARCHAR PRIMARY KEY, 137 user_id VARCHAR, 138 track_id VARCHAR, 139 created_at TIMESTAMP, 140 FOREIGN KEY (user_id) REFERENCES users(id), 141 FOREIGN KEY (track_id) REFERENCES tracks(id), 142 ); 143 CREATE TABLE IF NOT EXISTS artist_tracks ( 144 id VARCHAR PRIMARY KEY, 145 artist_id VARCHAR, 146 track_id VARCHAR, 147 created_at TIMESTAMP, 148 FOREIGN KEY (artist_id) REFERENCES artists(id), 149 FOREIGN KEY (track_id) REFERENCES tracks(id), 150 ); 151 CREATE TABLE IF NOT EXISTS artist_albums ( 152 id VARCHAR PRIMARY KEY, 153 artist_id VARCHAR, 154 album_id VARCHAR, 155 created_at TIMESTAMP, 156 FOREIGN KEY (artist_id) REFERENCES artists(id), 157 FOREIGN KEY (album_id) REFERENCES albums(id), 158 ); 159 CREATE TABLE IF NOT EXISTS album_tracks ( 160 id VARCHAR PRIMARY KEY, 161 album_id VARCHAR, 162 track_id VARCHAR, 163 FOREIGN KEY (album_id) REFERENCES albums(id), 164 FOREIGN KEY (track_id) REFERENCES tracks(id), 165 ); 166 CREATE TABLE IF NOT EXISTS scrobbles ( 167 id VARCHAR PRIMARY KEY, 168 user_id VARCHAR, 169 track_id VARCHAR, 170 album_id VARCHAR, 171 artist_id VARCHAR, 172 uri VARCHAR, 173 created_at TIMESTAMP, 174 FOREIGN KEY (user_id) REFERENCES users(id), 175 FOREIGN KEY (track_id) REFERENCES tracks(id), 176 FOREIGN KEY (album_id) REFERENCES albums(id), 177 FOREIGN KEY (artist_id) REFERENCES artists(id), 178 ); 179 COMMIT; 180 ", 181 )?; 182 183 Ok(()) 184} 185 186pub async fn load_tracks(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> { 187 let conn = conn.lock().unwrap(); 188 let tracks: Vec<xata::track::Track> = sqlx::query_as( 189 r#" 190 SELECT * FROM tracks 191 "#, 192 ) 193 .fetch_all(pool) 194 .await?; 195 196 for (i, track) in tracks.clone().into_iter().enumerate() { 197 tracing::info!(track = i, title = %track.title.bright_green(), artist = %track.artist); 198 match conn.execute( 199 "INSERT INTO tracks ( 200 id, 201 title, 202 artist, 203 album_artist, 204 album_art, 205 album, 206 track_number, 207 duration, 208 mb_id, 209 youtube_link, 210 spotify_link, 211 tidal_link, 212 apple_music_link, 213 sha256, 214 lyrics, 215 composer, 216 genre, 217 disc_number, 218 copyright_message, 219 label, 220 uri, 221 artist_uri, 222 album_uri, 223 created_at 224 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 225 params![ 226 track.xata_id, 227 track.title, 228 track.artist, 229 track.album_artist, 230 track.album_art, 231 track.album, 232 track.track_number, 233 track.duration, 234 track.mb_id, 235 track.youtube_link, 236 track.spotify_link, 237 track.tidal_link, 238 track.apple_music_link, 239 track.sha256, 240 track.lyrics, 241 track.composer, 242 track.genre, 243 track.disc_number, 244 track.copyright_message, 245 track.label, 246 track.uri, 247 track.artist_uri, 248 track.album_uri, 249 track.xata_createdat, 250 ], 251 ) { 252 Ok(_) => (), 253 Err(e) => tracing::error!(error = %e, "Error inserting track"), 254 } 255 } 256 257 tracing::info!(tracks = tracks.len(), "Loaded tracks"); 258 Ok(()) 259} 260 261pub async fn load_artists( 262 conn: Arc<Mutex<Connection>>, 263 pool: &Pool<Postgres>, 264) -> Result<(), Error> { 265 let conn = conn.lock().unwrap(); 266 let artists: Vec<xata::artist::Artist> = sqlx::query_as( 267 r#" 268 SELECT * FROM artists 269 "#, 270 ) 271 .fetch_all(pool) 272 .await?; 273 274 for (i, artist) in artists.clone().into_iter().enumerate() { 275 tracing::info!(artist = i, name = %artist.name.bright_green()); 276 match conn.execute( 277 "INSERT INTO artists ( 278 id, 279 name, 280 biography, 281 born, 282 born_in, 283 died, 284 picture, 285 sha256, 286 spotify_link, 287 tidal_link, 288 youtube_link, 289 apple_music_link, 290 uri 291 ) VALUES (?, 292 ?, 293 ?, 294 ?, 295 ?, 296 ?, 297 ?, 298 ?, 299 ?, 300 ?, 301 ?, 302 ?, 303 ?)", 304 params![ 305 artist.xata_id, 306 artist.name, 307 artist.biography, 308 artist.born, 309 artist.born_in, 310 artist.died, 311 artist.picture, 312 artist.sha256, 313 artist.spotify_link, 314 artist.tidal_link, 315 artist.youtube_link, 316 artist.apple_music_link, 317 artist.uri, 318 ], 319 ) { 320 Ok(_) => (), 321 Err(e) => tracing::error!(error = %e, "Error inserting artist"), 322 } 323 } 324 325 tracing::info!(artists = artists.len(), "Loaded artists"); 326 Ok(()) 327} 328 329pub async fn load_albums(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> { 330 let conn = conn.lock().unwrap(); 331 let albums: Vec<xata::album::Album> = sqlx::query_as( 332 r#" 333 SELECT * FROM albums 334 "#, 335 ) 336 .fetch_all(pool) 337 .await?; 338 339 for (i, album) in albums.clone().into_iter().enumerate() { 340 tracing::info!(album = i, title = %album.title.bright_green(), artist = %album.artist); 341 match conn.execute( 342 "INSERT INTO albums ( 343 id, 344 title, 345 artist, 346 release_date, 347 album_art, 348 year, 349 spotify_link, 350 tidal_link, 351 youtube_link, 352 apple_music_link, 353 sha256, 354 uri, 355 artist_uri 356 ) VALUES (?, 357 ?, 358 ?, 359 ?, 360 ?, 361 ?, 362 ?, 363 ?, 364 ?, 365 ?, 366 ?, 367 ?, 368 ?)", 369 params![ 370 album.xata_id, 371 album.title, 372 album.artist, 373 album.release_date, 374 album.album_art, 375 album.year, 376 album.spotify_link, 377 album.tidal_link, 378 album.youtube_link, 379 album.apple_music_link, 380 album.sha256, 381 album.uri, 382 album.artist_uri, 383 ], 384 ) { 385 Ok(_) => (), 386 Err(e) => tracing::error!(error = %e, "Error inserting album"), 387 } 388 } 389 390 tracing::info!(albums = albums.len(), "Loaded albums"); 391 Ok(()) 392} 393 394pub async fn load_users(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> { 395 let conn = conn.lock().unwrap(); 396 let users: Vec<xata::user::User> = sqlx::query_as( 397 r#" 398 SELECT * FROM users 399 "#, 400 ) 401 .fetch_all(pool) 402 .await?; 403 404 for (i, user) in users.clone().into_iter().enumerate() { 405 tracing::info!(user = i, name = %user.display_name.bright_green()); 406 match conn.execute( 407 "INSERT INTO users ( 408 id, 409 display_name, 410 did, 411 handle, 412 avatar 413 ) VALUES (?, 414 ?, 415 ?, 416 ?, 417 ?)", 418 params![ 419 user.xata_id, 420 user.display_name, 421 user.did, 422 user.handle, 423 user.avatar, 424 ], 425 ) { 426 Ok(_) => (), 427 Err(e) => tracing::error!(error = %e, "Error inserting user"), 428 } 429 } 430 431 tracing::info!(users = users.len(), "Loaded users"); 432 Ok(()) 433} 434 435pub async fn load_scrobbles( 436 conn: Arc<Mutex<Connection>>, 437 pool: &Pool<Postgres>, 438) -> Result<(), Error> { 439 let conn = conn.lock().unwrap(); 440 let scrobbles: Vec<xata::scrobble::Scrobble> = sqlx::query_as( 441 r#" 442 SELECT * FROM scrobbles 443 "#, 444 ) 445 .fetch_all(pool) 446 .await?; 447 448 for (i, scrobble) in scrobbles.clone().into_iter().enumerate() { 449 tracing::info!(scrobble = i, uri = %scrobble.uri.clone().unwrap_or_else(|| "None".to_string()).bright_green()); 450 match conn.execute( 451 "INSERT INTO scrobbles ( 452 id, 453 user_id, 454 track_id, 455 album_id, 456 artist_id, 457 uri, 458 created_at 459 ) VALUES ( 460 ?, 461 ?, 462 ?, 463 ?, 464 ?, 465 ?, 466 ? 467 )", 468 params![ 469 scrobble.xata_id, 470 scrobble.user_id, 471 scrobble.track_id, 472 scrobble.album_id, 473 scrobble.artist_id, 474 scrobble.uri, 475 scrobble.xata_createdat, 476 ], 477 ) { 478 Ok(_) => (), 479 Err(e) => tracing::error!(error = %e, "Error inserting scrobble"), 480 } 481 } 482 483 tracing::info!(scrobbles = scrobbles.len(), "Loaded scrobbles"); 484 Ok(()) 485} 486 487pub async fn load_album_tracks( 488 conn: Arc<Mutex<Connection>>, 489 pool: &Pool<Postgres>, 490) -> Result<(), Error> { 491 let conn = conn.lock().unwrap(); 492 let album_tracks: Vec<xata::album_track::AlbumTrack> = sqlx::query_as( 493 r#" 494 SELECT * FROM album_tracks 495 "#, 496 ) 497 .fetch_all(pool) 498 .await?; 499 500 for (i, album_track) in album_tracks.clone().into_iter().enumerate() { 501 tracing::info!(album_track = i, album_id = %album_track.album_id.bright_green(), track_id = %album_track.track_id); 502 match conn.execute( 503 "INSERT INTO album_tracks ( 504 id, 505 album_id, 506 track_id 507 ) VALUES (?, 508 ?, 509 ?)", 510 params![ 511 album_track.xata_id, 512 album_track.album_id, 513 album_track.track_id, 514 ], 515 ) { 516 Ok(_) => (), 517 Err(e) => tracing::error!(error = %e, "Error inserting album_track"), 518 } 519 } 520 521 tracing::info!(album_tracks = album_tracks.len(), "Loaded album_tracks"); 522 Ok(()) 523} 524 525pub async fn load_loved_tracks( 526 conn: Arc<Mutex<Connection>>, 527 pool: &Pool<Postgres>, 528) -> Result<(), Error> { 529 let conn = conn.lock().unwrap(); 530 let loved_tracks: Vec<xata::user_track::UserTrack> = sqlx::query_as( 531 r#" 532 SELECT * FROM loved_tracks 533 "#, 534 ) 535 .fetch_all(pool) 536 .await?; 537 538 for (i, loved_track) in loved_tracks.clone().into_iter().enumerate() { 539 tracing::info!(loved_track = i, user_id = %loved_track.user_id.bright_green(), track_id = %loved_track.track_id); 540 match conn.execute( 541 "INSERT INTO loved_tracks ( 542 id, 543 user_id, 544 track_id, 545 created_at 546 ) VALUES (?, 547 ?, 548 ?, 549 ?)", 550 params![ 551 loved_track.xata_id, 552 loved_track.user_id, 553 loved_track.track_id, 554 loved_track.xata_createdat, 555 ], 556 ) { 557 Ok(_) => (), 558 Err(e) => tracing::error!(error = %e, "Error inserting loved_track"), 559 } 560 } 561 562 tracing::info!(loved_tracks = loved_tracks.len(), "Loaded loved_tracks"); 563 Ok(()) 564} 565 566pub async fn load_artist_tracks( 567 conn: Arc<Mutex<Connection>>, 568 pool: &Pool<Postgres>, 569) -> Result<(), Error> { 570 let conn = conn.lock().unwrap(); 571 let artist_tracks: Vec<xata::artist_track::ArtistTrack> = sqlx::query_as( 572 r#" 573 SELECT * FROM artist_tracks 574 "#, 575 ) 576 .fetch_all(pool) 577 .await?; 578 579 for (i, artist_track) in artist_tracks.clone().into_iter().enumerate() { 580 tracing::info!(artist_track = i, artist_id = %artist_track.artist_id.bright_green(), track_id = %artist_track.track_id); 581 match conn.execute( 582 "INSERT INTO artist_tracks (id, artist_id, track_id, created_at) VALUES (?, ?, ?, ?)", 583 params![ 584 artist_track.xata_id, 585 artist_track.artist_id, 586 artist_track.track_id, 587 artist_track.xata_createdat, 588 ], 589 ) { 590 Ok(_) => (), 591 Err(e) => tracing::error!(error = %e, "Error inserting artist_track"), 592 } 593 } 594 595 tracing::info!(artist_tracks = artist_tracks.len(), "Loaded artist_tracks"); 596 Ok(()) 597} 598 599pub async fn load_artist_albums( 600 conn: Arc<Mutex<Connection>>, 601 pool: &Pool<Postgres>, 602) -> Result<(), Error> { 603 let conn = conn.lock().unwrap(); 604 let artist_albums: Vec<xata::artist_album::ArtistAlbum> = sqlx::query_as( 605 r#" 606 SELECT * FROM artist_albums 607 "#, 608 ) 609 .fetch_all(pool) 610 .await?; 611 612 for (i, artist_album) in artist_albums.clone().into_iter().enumerate() { 613 tracing::info!(artist_album = i, artist_id = %artist_album.artist_id.bright_green(), album_id = %artist_album.album_id); 614 match conn.execute( 615 "INSERT INTO artist_albums (id, artist_id, album_id, created_at) VALUES (?, ?, ?, ?)", 616 params![ 617 artist_album.xata_id, 618 artist_album.artist_id, 619 artist_album.album_id, 620 artist_album.xata_createdat, 621 ], 622 ) { 623 Ok(_) => (), 624 Err(e) => tracing::error!(error = %e, "Error inserting artist_album"), 625 } 626 } 627 628 tracing::info!(artist_albums = artist_albums.len(), "Loaded artist_albums"); 629 Ok(()) 630} 631 632pub async fn load_user_albums( 633 conn: Arc<Mutex<Connection>>, 634 pool: &Pool<Postgres>, 635) -> Result<(), Error> { 636 let conn = conn.lock().unwrap(); 637 let user_albums: Vec<xata::user_album::UserAlbum> = sqlx::query_as( 638 r#" 639 SELECT * FROM user_albums 640 "#, 641 ) 642 .fetch_all(pool) 643 .await?; 644 645 for (i, user_album) in user_albums.clone().into_iter().enumerate() { 646 tracing::info!(user_album = i, user_id = %user_album.user_id.bright_green(), album_id = %user_album.album_id); 647 match conn.execute( 648 "INSERT INTO user_albums (id, user_id, album_id, created_at) VALUES (?, ?, ?, ?)", 649 params![ 650 user_album.xata_id, 651 user_album.user_id, 652 user_album.album_id, 653 user_album.xata_createdat, 654 ], 655 ) { 656 Ok(_) => (), 657 Err(e) => tracing::error!(error = %e, "Error inserting user_album"), 658 } 659 } 660 661 tracing::info!(user_albums = user_albums.len(), "Loaded user_albums"); 662 Ok(()) 663} 664 665pub async fn load_user_artists( 666 conn: Arc<Mutex<Connection>>, 667 pool: &Pool<Postgres>, 668) -> Result<(), Error> { 669 let conn = conn.lock().unwrap(); 670 let user_artists: Vec<xata::user_artist::UserArtist> = sqlx::query_as( 671 r#" 672 SELECT * FROM user_artists 673 "#, 674 ) 675 .fetch_all(pool) 676 .await?; 677 678 for (i, user_artist) in user_artists.clone().into_iter().enumerate() { 679 tracing::info!(user_artist = i, user_id = %user_artist.user_id.bright_green(), artist_id = %user_artist.artist_id); 680 match conn.execute( 681 "INSERT INTO user_artists (id, user_id, artist_id, created_at) VALUES (?, ?, ?, ?)", 682 params![ 683 user_artist.xata_id, 684 user_artist.user_id, 685 user_artist.artist_id, 686 user_artist.xata_createdat, 687 ], 688 ) { 689 Ok(_) => (), 690 Err(e) => tracing::error!(error = %e, "Error inserting user_artist"), 691 } 692 } 693 694 tracing::info!(user_artists = user_artists.len(), "Loaded user_artists"); 695 Ok(()) 696} 697 698pub async fn load_user_tracks( 699 conn: Arc<Mutex<Connection>>, 700 pool: &Pool<Postgres>, 701) -> Result<(), Error> { 702 let conn = conn.lock().unwrap(); 703 let user_tracks: Vec<xata::user_track::UserTrack> = sqlx::query_as( 704 r#" 705 SELECT * FROM user_tracks 706 "#, 707 ) 708 .fetch_all(pool) 709 .await?; 710 711 for (i, user_track) in user_tracks.clone().into_iter().enumerate() { 712 tracing::info!(user_track = i, user_id = %user_track.user_id.bright_green(), track_id = %user_track.track_id); 713 match conn.execute( 714 "INSERT INTO user_tracks (id, user_id, track_id, created_at) VALUES (?, ?, ?, ?)", 715 params![ 716 user_track.xata_id, 717 user_track.user_id, 718 user_track.track_id, 719 user_track.xata_createdat, 720 ], 721 ) { 722 Ok(_) => (), 723 Err(e) => tracing::error!(error = %e, "Error inserting user_track"), 724 } 725 } 726 727 tracing::info!(user_tracks = user_tracks.len(), "Loaded user_tracks"); 728 Ok(()) 729}