A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at main 814 lines 24 kB view raw
1use std::{ 2 env, 3 sync::{Arc, Mutex}, 4}; 5 6use anyhow::Error; 7use duckdb::{params, Connection}; 8use owo_colors::OwoColorize; 9use sqlx::{Pool, Postgres}; 10 11use crate::xata; 12 13pub async fn create_tables(conn: &Connection) -> Result<(), Error> { 14 conn.execute_batch( 15 "BEGIN; 16 CREATE TABLE IF NOT EXISTS artists ( 17 id VARCHAR PRIMARY KEY, 18 name VARCHAR NOT NULL, 19 biography TEXT, 20 born DATE, 21 born_in VARCHAR, 22 died DATE, 23 picture VARCHAR, 24 sha256 VARCHAR NOT NULL, 25 spotify_link VARCHAR, 26 tidal_link VARCHAR, 27 youtube_link VARCHAR, 28 apple_music_link VARCHAR, 29 uri VARCHAR, 30 ); 31 CREATE TABLE IF NOT EXISTS albums ( 32 id VARCHAR PRIMARY KEY, 33 title VARCHAR NOT NULL, 34 artist VARCHAR NOT NULL, 35 release_date DATE, 36 album_art VARCHAR, 37 year INTEGER, 38 spotify_link VARCHAR, 39 tidal_link VARCHAR, 40 youtube_link VARCHAR, 41 apple_music_link VARCHAR, 42 sha256 VARCHAR NOT NULL, 43 uri VARCHAR, 44 artist_uri VARCHAR, 45 ); 46 CREATE TABLE IF NOT EXISTS tracks ( 47 id VARCHAR PRIMARY KEY, 48 title VARCHAR, 49 artist VARCHAR, 50 album_artist VARCHAR, 51 album_art VARCHAR, 52 album VARCHAR, 53 track_number INTEGER, 54 duration INTEGER, 55 mb_id VARCHAR, 56 youtube_link VARCHAR, 57 spotify_link VARCHAR, 58 tidal_link VARCHAR, 59 apple_music_link VARCHAR, 60 sha256 VARCHAR NOT NULL, 61 lyrics TEXT, 62 composer VARCHAR, 63 genre VARCHAR, 64 disc_number INTEGER, 65 copyright_message VARCHAR, 66 label VARCHAR, 67 uri VARCHAR, 68 artist_uri VARCHAR, 69 album_uri VARCHAR, 70 created_at TIMESTAMP, 71 ); 72 CREATE TABLE IF NOT EXISTS album_tracks ( 73 id VARCHAR PRIMARY KEY, 74 album_id VARCHAR, 75 track_id VARCHAR, 76 FOREIGN KEY (album_id) REFERENCES albums(id), 77 FOREIGN KEY (track_id) REFERENCES tracks(id), 78 ); 79 CREATE TABLE IF NOT EXISTS users ( 80 id VARCHAR PRIMARY KEY, 81 display_name VARCHAR, 82 did VARCHAR, 83 handle VARCHAR, 84 avatar VARCHAR, 85 ); 86 CREATE TABLE IF NOT EXISTS playlists ( 87 id VARCHAR PRIMARY KEY, 88 name VARCHAR, 89 description TEXT, 90 picture VARCHAR, 91 created_at TIMESTAMP, 92 updated_at TIMESTAMP, 93 uri VARCHAR, 94 created_by VARCHAR NOT NULL, 95 FOREIGN KEY (created_by) REFERENCES users(id), 96 ); 97 CREATE TABLE IF NOT EXISTS playlist_tracks ( 98 id VARCHAR PRIMARY KEY, 99 playlist_id VARCHAR, 100 track_id VARCHAR, 101 added_by VARCHAR, 102 created_at TIMESTAMP, 103 FOREIGN KEY (playlist_id) REFERENCES playlists(id), 104 FOREIGN KEY (track_id) REFERENCES tracks(id), 105 ); 106 CREATE TABLE IF NOT EXISTS user_tracks ( 107 id VARCHAR PRIMARY KEY, 108 user_id VARCHAR, 109 track_id VARCHAR, 110 created_at TIMESTAMP, 111 FOREIGN KEY (user_id) REFERENCES users(id), 112 FOREIGN KEY (track_id) REFERENCES tracks(id), 113 ); 114 CREATE TABLE IF NOT EXISTS user_albums ( 115 id VARCHAR PRIMARY KEY, 116 user_id VARCHAR, 117 album_id VARCHAR, 118 created_at TIMESTAMP, 119 FOREIGN KEY (user_id) REFERENCES users(id), 120 FOREIGN KEY (album_id) REFERENCES albums(id), 121 ); 122 CREATE TABLE IF NOT EXISTS user_artists ( 123 id VARCHAR PRIMARY KEY, 124 user_id VARCHAR, 125 artist_id VARCHAR, 126 created_at TIMESTAMP, 127 FOREIGN KEY (user_id) REFERENCES users(id), 128 FOREIGN KEY (artist_id) REFERENCES artists(id), 129 ); 130 CREATE TABLE IF NOT EXISTS user_playlists ( 131 id VARCHAR PRIMARY KEY, 132 user_id VARCHAR, 133 playlist_id VARCHAR, 134 created_at TIMESTAMP, 135 FOREIGN KEY (user_id) REFERENCES users(id), 136 FOREIGN KEY (playlist_id) REFERENCES playlists(id), 137 ); 138 CREATE TABLE IF NOT EXISTS loved_tracks ( 139 id VARCHAR PRIMARY KEY, 140 user_id VARCHAR, 141 track_id VARCHAR, 142 created_at TIMESTAMP, 143 FOREIGN KEY (user_id) REFERENCES users(id), 144 FOREIGN KEY (track_id) REFERENCES tracks(id), 145 ); 146 CREATE TABLE IF NOT EXISTS artist_tracks ( 147 id VARCHAR PRIMARY KEY, 148 artist_id VARCHAR, 149 track_id VARCHAR, 150 created_at TIMESTAMP, 151 FOREIGN KEY (artist_id) REFERENCES artists(id), 152 FOREIGN KEY (track_id) REFERENCES tracks(id), 153 ); 154 CREATE TABLE IF NOT EXISTS artist_albums ( 155 id VARCHAR PRIMARY KEY, 156 artist_id VARCHAR, 157 album_id VARCHAR, 158 created_at TIMESTAMP, 159 FOREIGN KEY (artist_id) REFERENCES artists(id), 160 FOREIGN KEY (album_id) REFERENCES albums(id), 161 ); 162 CREATE TABLE IF NOT EXISTS album_tracks ( 163 id VARCHAR PRIMARY KEY, 164 album_id VARCHAR, 165 track_id VARCHAR, 166 FOREIGN KEY (album_id) REFERENCES albums(id), 167 FOREIGN KEY (track_id) REFERENCES tracks(id), 168 ); 169 CREATE TABLE IF NOT EXISTS scrobbles ( 170 id VARCHAR PRIMARY KEY, 171 user_id VARCHAR, 172 track_id VARCHAR, 173 album_id VARCHAR, 174 artist_id VARCHAR, 175 uri VARCHAR, 176 created_at TIMESTAMP, 177 FOREIGN KEY (user_id) REFERENCES users(id), 178 FOREIGN KEY (track_id) REFERENCES tracks(id), 179 FOREIGN KEY (album_id) REFERENCES albums(id), 180 FOREIGN KEY (artist_id) REFERENCES artists(id), 181 ); 182 COMMIT; 183 ", 184 )?; 185 186 match conn.execute("ALTER TABLE artists ADD COLUMN genres VARCHAR[]", []) { 187 Ok(_) => tracing::info!("Added genres column to artists table"), 188 Err(e) => tracing::warn!("Could not add genres column to artists table: {}", e), 189 } 190 191 Ok(()) 192} 193 194pub async fn update_artist_genres( 195 conn: Arc<Mutex<Connection>>, 196 pool: &Pool<Postgres>, 197) -> Result<(), Error> { 198 if env::var("UPDATE_ARTIST_GENRES").is_err() { 199 tracing::info!("Skipping update_artist_genres as UPDATE_ARTIST_GENRES is not set"); 200 return Ok(()); 201 } 202 203 let artists: Vec<xata::artist::ArtistWithoutDate> = sqlx::query_as( 204 r#" 205 SELECT * FROM artists 206 "#, 207 ) 208 .fetch_all(pool) 209 .await?; 210 211 let conn = conn.lock().unwrap(); 212 213 conn.execute_batch( 214 " 215 CREATE TABLE user_artists_new AS SELECT * FROM user_artists; 216 CREATE TABLE artist_tracks_new AS SELECT * FROM artist_tracks; 217 CREATE TABLE artist_albums_new AS SELECT * FROM artist_albums; 218 CREATE TABLE scrobbles_new AS SELECT * FROM scrobbles; 219 CREATE TABLE loved_tracks_new AS SELECT * FROM loved_tracks; 220 DROP TABLE user_artists; 221 DROP TABLE artist_tracks; 222 DROP TABLE artist_albums; 223 DROP TABLE scrobbles; 224 DROP TABLE loved_tracks; 225 ALTER TABLE user_artists_new RENAME TO user_artists; 226 ALTER TABLE artist_tracks_new RENAME TO artist_tracks; 227 ALTER TABLE artist_albums_new RENAME TO artist_albums; 228 ALTER TABLE scrobbles_new RENAME TO scrobbles; 229 ALTER TABLE loved_tracks_new RENAME TO loved_tracks; 230 ", 231 )?; 232 233 for (i, artist) in artists.clone().into_iter().enumerate() { 234 let genres_array = artist 235 .genres 236 .as_ref() 237 .map(|tags| { 238 tags.iter() 239 .map(|tag| format!("'{}'", tag.replace("'", "''"))) 240 .collect::<Vec<_>>() 241 .join(", ") 242 }) 243 .unwrap_or_default() 244 .trim() 245 .to_string(); 246 247 if genres_array.is_empty() { 248 continue; 249 } 250 251 tracing::info!(artist = i, name = %artist.name.bright_green(), genres = %genres_array, "Updating artist genres"); 252 253 match conn.execute( 254 &format!( 255 "UPDATE artists SET genres = [{}] WHERE id = ? AND genres IS NULL", 256 genres_array 257 ), 258 params![artist.xata_id], 259 ) { 260 Ok(_) => (), 261 Err(e) => { 262 tracing::error!(error = %e, genres = %genres_array, "Error updating artist >> ") 263 } 264 } 265 } 266 267 tracing::info!(artists = artists.len(), "Updated artist genres"); 268 Ok(()) 269} 270 271pub async fn load_tracks(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> { 272 let conn = conn.lock().unwrap(); 273 let tracks: Vec<xata::track::Track> = sqlx::query_as( 274 r#" 275 SELECT * FROM tracks 276 "#, 277 ) 278 .fetch_all(pool) 279 .await?; 280 281 for (i, track) in tracks.clone().into_iter().enumerate() { 282 tracing::info!(track = i, title = %track.title.bright_green(), artist = %track.artist); 283 match conn.execute( 284 "INSERT INTO tracks ( 285 id, 286 title, 287 artist, 288 album_artist, 289 album_art, 290 album, 291 track_number, 292 duration, 293 mb_id, 294 youtube_link, 295 spotify_link, 296 tidal_link, 297 apple_music_link, 298 sha256, 299 lyrics, 300 composer, 301 genre, 302 disc_number, 303 copyright_message, 304 label, 305 uri, 306 artist_uri, 307 album_uri, 308 created_at 309 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 310 params![ 311 track.xata_id, 312 track.title, 313 track.artist, 314 track.album_artist, 315 track.album_art, 316 track.album, 317 track.track_number, 318 track.duration, 319 track.mb_id, 320 track.youtube_link, 321 track.spotify_link, 322 track.tidal_link, 323 track.apple_music_link, 324 track.sha256, 325 track.lyrics, 326 track.composer, 327 track.genre, 328 track.disc_number, 329 track.copyright_message, 330 track.label, 331 track.uri, 332 track.artist_uri, 333 track.album_uri, 334 track.xata_createdat, 335 ], 336 ) { 337 Ok(_) => (), 338 Err(e) => tracing::error!(error = %e, "Error inserting track"), 339 } 340 } 341 342 tracing::info!(tracks = tracks.len(), "Loaded tracks"); 343 Ok(()) 344} 345 346pub async fn load_artists( 347 conn: Arc<Mutex<Connection>>, 348 pool: &Pool<Postgres>, 349) -> Result<(), Error> { 350 let conn = conn.lock().unwrap(); 351 let artists: Vec<xata::artist::Artist> = sqlx::query_as( 352 r#" 353 SELECT * FROM artists 354 "#, 355 ) 356 .fetch_all(pool) 357 .await?; 358 359 for (i, artist) in artists.clone().into_iter().enumerate() { 360 tracing::info!(artist = i, name = %artist.name.bright_green()); 361 match conn.execute( 362 "INSERT INTO artists ( 363 id, 364 name, 365 biography, 366 born, 367 born_in, 368 died, 369 picture, 370 sha256, 371 spotify_link, 372 tidal_link, 373 youtube_link, 374 apple_music_link, 375 uri 376 ) VALUES (?, 377 ?, 378 ?, 379 ?, 380 ?, 381 ?, 382 ?, 383 ?, 384 ?, 385 ?, 386 ?, 387 ?, 388 ?)", 389 params![ 390 artist.xata_id, 391 artist.name, 392 artist.biography, 393 artist.born, 394 artist.born_in, 395 artist.died, 396 artist.picture, 397 artist.sha256, 398 artist.spotify_link, 399 artist.tidal_link, 400 artist.youtube_link, 401 artist.apple_music_link, 402 artist.uri, 403 ], 404 ) { 405 Ok(_) => (), 406 Err(e) => tracing::error!(error = %e, "Error inserting artist"), 407 } 408 } 409 410 tracing::info!(artists = artists.len(), "Loaded artists"); 411 Ok(()) 412} 413 414pub async fn load_albums(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> { 415 let conn = conn.lock().unwrap(); 416 let albums: Vec<xata::album::Album> = sqlx::query_as( 417 r#" 418 SELECT * FROM albums 419 "#, 420 ) 421 .fetch_all(pool) 422 .await?; 423 424 for (i, album) in albums.clone().into_iter().enumerate() { 425 tracing::info!(album = i, title = %album.title.bright_green(), artist = %album.artist); 426 match conn.execute( 427 "INSERT INTO albums ( 428 id, 429 title, 430 artist, 431 release_date, 432 album_art, 433 year, 434 spotify_link, 435 tidal_link, 436 youtube_link, 437 apple_music_link, 438 sha256, 439 uri, 440 artist_uri 441 ) VALUES (?, 442 ?, 443 ?, 444 ?, 445 ?, 446 ?, 447 ?, 448 ?, 449 ?, 450 ?, 451 ?, 452 ?, 453 ?)", 454 params![ 455 album.xata_id, 456 album.title, 457 album.artist, 458 album.release_date, 459 album.album_art, 460 album.year, 461 album.spotify_link, 462 album.tidal_link, 463 album.youtube_link, 464 album.apple_music_link, 465 album.sha256, 466 album.uri, 467 album.artist_uri, 468 ], 469 ) { 470 Ok(_) => (), 471 Err(e) => tracing::error!(error = %e, "Error inserting album"), 472 } 473 } 474 475 tracing::info!(albums = albums.len(), "Loaded albums"); 476 Ok(()) 477} 478 479pub async fn load_users(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> { 480 let conn = conn.lock().unwrap(); 481 let users: Vec<xata::user::User> = sqlx::query_as( 482 r#" 483 SELECT * FROM users 484 "#, 485 ) 486 .fetch_all(pool) 487 .await?; 488 489 for (i, user) in users.clone().into_iter().enumerate() { 490 tracing::info!(user = i, name = %user.display_name.bright_green()); 491 match conn.execute( 492 "INSERT INTO users ( 493 id, 494 display_name, 495 did, 496 handle, 497 avatar 498 ) VALUES (?, 499 ?, 500 ?, 501 ?, 502 ?)", 503 params![ 504 user.xata_id, 505 user.display_name, 506 user.did, 507 user.handle, 508 user.avatar, 509 ], 510 ) { 511 Ok(_) => (), 512 Err(e) => tracing::error!(error = %e, "Error inserting user"), 513 } 514 } 515 516 tracing::info!(users = users.len(), "Loaded users"); 517 Ok(()) 518} 519 520pub async fn load_scrobbles( 521 conn: Arc<Mutex<Connection>>, 522 pool: &Pool<Postgres>, 523) -> Result<(), Error> { 524 let conn = conn.lock().unwrap(); 525 let scrobbles: Vec<xata::scrobble::Scrobble> = sqlx::query_as( 526 r#" 527 SELECT * FROM scrobbles 528 "#, 529 ) 530 .fetch_all(pool) 531 .await?; 532 533 for (i, scrobble) in scrobbles.clone().into_iter().enumerate() { 534 tracing::info!(scrobble = i, uri = %scrobble.uri.clone().unwrap_or_else(|| "None".to_string()).bright_green()); 535 match conn.execute( 536 "INSERT INTO scrobbles ( 537 id, 538 user_id, 539 track_id, 540 album_id, 541 artist_id, 542 uri, 543 created_at 544 ) VALUES ( 545 ?, 546 ?, 547 ?, 548 ?, 549 ?, 550 ?, 551 ? 552 )", 553 params![ 554 scrobble.xata_id, 555 scrobble.user_id, 556 scrobble.track_id, 557 scrobble.album_id, 558 scrobble.artist_id, 559 scrobble.uri, 560 scrobble.xata_createdat, 561 ], 562 ) { 563 Ok(_) => (), 564 Err(e) => tracing::error!(error = %e, "Error inserting scrobble"), 565 } 566 } 567 568 tracing::info!(scrobbles = scrobbles.len(), "Loaded scrobbles"); 569 Ok(()) 570} 571 572pub async fn load_album_tracks( 573 conn: Arc<Mutex<Connection>>, 574 pool: &Pool<Postgres>, 575) -> Result<(), Error> { 576 let conn = conn.lock().unwrap(); 577 let album_tracks: Vec<xata::album_track::AlbumTrack> = sqlx::query_as( 578 r#" 579 SELECT * FROM album_tracks 580 "#, 581 ) 582 .fetch_all(pool) 583 .await?; 584 585 for (i, album_track) in album_tracks.clone().into_iter().enumerate() { 586 tracing::info!(album_track = i, album_id = %album_track.album_id.bright_green(), track_id = %album_track.track_id); 587 match conn.execute( 588 "INSERT INTO album_tracks ( 589 id, 590 album_id, 591 track_id 592 ) VALUES (?, 593 ?, 594 ?)", 595 params![ 596 album_track.xata_id, 597 album_track.album_id, 598 album_track.track_id, 599 ], 600 ) { 601 Ok(_) => (), 602 Err(e) => tracing::error!(error = %e, "Error inserting album_track"), 603 } 604 } 605 606 tracing::info!(album_tracks = album_tracks.len(), "Loaded album_tracks"); 607 Ok(()) 608} 609 610pub async fn load_loved_tracks( 611 conn: Arc<Mutex<Connection>>, 612 pool: &Pool<Postgres>, 613) -> Result<(), Error> { 614 let conn = conn.lock().unwrap(); 615 let loved_tracks: Vec<xata::user_track::UserTrack> = sqlx::query_as( 616 r#" 617 SELECT * FROM loved_tracks 618 "#, 619 ) 620 .fetch_all(pool) 621 .await?; 622 623 for (i, loved_track) in loved_tracks.clone().into_iter().enumerate() { 624 tracing::info!(loved_track = i, user_id = %loved_track.user_id.bright_green(), track_id = %loved_track.track_id); 625 match conn.execute( 626 "INSERT INTO loved_tracks ( 627 id, 628 user_id, 629 track_id, 630 created_at 631 ) VALUES (?, 632 ?, 633 ?, 634 ?)", 635 params![ 636 loved_track.xata_id, 637 loved_track.user_id, 638 loved_track.track_id, 639 loved_track.xata_createdat, 640 ], 641 ) { 642 Ok(_) => (), 643 Err(e) => tracing::error!(error = %e, "Error inserting loved_track"), 644 } 645 } 646 647 tracing::info!(loved_tracks = loved_tracks.len(), "Loaded loved_tracks"); 648 Ok(()) 649} 650 651pub async fn load_artist_tracks( 652 conn: Arc<Mutex<Connection>>, 653 pool: &Pool<Postgres>, 654) -> Result<(), Error> { 655 let conn = conn.lock().unwrap(); 656 let artist_tracks: Vec<xata::artist_track::ArtistTrack> = sqlx::query_as( 657 r#" 658 SELECT * FROM artist_tracks 659 "#, 660 ) 661 .fetch_all(pool) 662 .await?; 663 664 for (i, artist_track) in artist_tracks.clone().into_iter().enumerate() { 665 tracing::info!(artist_track = i, artist_id = %artist_track.artist_id.bright_green(), track_id = %artist_track.track_id); 666 match conn.execute( 667 "INSERT INTO artist_tracks (id, artist_id, track_id, created_at) VALUES (?, ?, ?, ?)", 668 params![ 669 artist_track.xata_id, 670 artist_track.artist_id, 671 artist_track.track_id, 672 artist_track.xata_createdat, 673 ], 674 ) { 675 Ok(_) => (), 676 Err(e) => tracing::error!(error = %e, "Error inserting artist_track"), 677 } 678 } 679 680 tracing::info!(artist_tracks = artist_tracks.len(), "Loaded artist_tracks"); 681 Ok(()) 682} 683 684pub async fn load_artist_albums( 685 conn: Arc<Mutex<Connection>>, 686 pool: &Pool<Postgres>, 687) -> Result<(), Error> { 688 let conn = conn.lock().unwrap(); 689 let artist_albums: Vec<xata::artist_album::ArtistAlbum> = sqlx::query_as( 690 r#" 691 SELECT * FROM artist_albums 692 "#, 693 ) 694 .fetch_all(pool) 695 .await?; 696 697 for (i, artist_album) in artist_albums.clone().into_iter().enumerate() { 698 tracing::info!(artist_album = i, artist_id = %artist_album.artist_id.bright_green(), album_id = %artist_album.album_id); 699 match conn.execute( 700 "INSERT INTO artist_albums (id, artist_id, album_id, created_at) VALUES (?, ?, ?, ?)", 701 params![ 702 artist_album.xata_id, 703 artist_album.artist_id, 704 artist_album.album_id, 705 artist_album.xata_createdat, 706 ], 707 ) { 708 Ok(_) => (), 709 Err(e) => tracing::error!(error = %e, "Error inserting artist_album"), 710 } 711 } 712 713 tracing::info!(artist_albums = artist_albums.len(), "Loaded artist_albums"); 714 Ok(()) 715} 716 717pub async fn load_user_albums( 718 conn: Arc<Mutex<Connection>>, 719 pool: &Pool<Postgres>, 720) -> Result<(), Error> { 721 let conn = conn.lock().unwrap(); 722 let user_albums: Vec<xata::user_album::UserAlbum> = sqlx::query_as( 723 r#" 724 SELECT * FROM user_albums 725 "#, 726 ) 727 .fetch_all(pool) 728 .await?; 729 730 for (i, user_album) in user_albums.clone().into_iter().enumerate() { 731 tracing::info!(user_album = i, user_id = %user_album.user_id.bright_green(), album_id = %user_album.album_id); 732 match conn.execute( 733 "INSERT INTO user_albums (id, user_id, album_id, created_at) VALUES (?, ?, ?, ?)", 734 params![ 735 user_album.xata_id, 736 user_album.user_id, 737 user_album.album_id, 738 user_album.xata_createdat, 739 ], 740 ) { 741 Ok(_) => (), 742 Err(e) => tracing::error!(error = %e, "Error inserting user_album"), 743 } 744 } 745 746 tracing::info!(user_albums = user_albums.len(), "Loaded user_albums"); 747 Ok(()) 748} 749 750pub async fn load_user_artists( 751 conn: Arc<Mutex<Connection>>, 752 pool: &Pool<Postgres>, 753) -> Result<(), Error> { 754 let conn = conn.lock().unwrap(); 755 let user_artists: Vec<xata::user_artist::UserArtist> = sqlx::query_as( 756 r#" 757 SELECT * FROM user_artists 758 "#, 759 ) 760 .fetch_all(pool) 761 .await?; 762 763 for (i, user_artist) in user_artists.clone().into_iter().enumerate() { 764 tracing::info!(user_artist = i, user_id = %user_artist.user_id.bright_green(), artist_id = %user_artist.artist_id); 765 match conn.execute( 766 "INSERT INTO user_artists (id, user_id, artist_id, created_at) VALUES (?, ?, ?, ?)", 767 params![ 768 user_artist.xata_id, 769 user_artist.user_id, 770 user_artist.artist_id, 771 user_artist.xata_createdat, 772 ], 773 ) { 774 Ok(_) => (), 775 Err(e) => tracing::error!(error = %e, "Error inserting user_artist"), 776 } 777 } 778 779 tracing::info!(user_artists = user_artists.len(), "Loaded user_artists"); 780 Ok(()) 781} 782 783pub async fn load_user_tracks( 784 conn: Arc<Mutex<Connection>>, 785 pool: &Pool<Postgres>, 786) -> Result<(), Error> { 787 let conn = conn.lock().unwrap(); 788 let user_tracks: Vec<xata::user_track::UserTrack> = sqlx::query_as( 789 r#" 790 SELECT * FROM user_tracks 791 "#, 792 ) 793 .fetch_all(pool) 794 .await?; 795 796 for (i, user_track) in user_tracks.clone().into_iter().enumerate() { 797 tracing::info!(user_track = i, user_id = %user_track.user_id.bright_green(), track_id = %user_track.track_id); 798 match conn.execute( 799 "INSERT INTO user_tracks (id, user_id, track_id, created_at) VALUES (?, ?, ?, ?)", 800 params![ 801 user_track.xata_id, 802 user_track.user_id, 803 user_track.track_id, 804 user_track.xata_createdat, 805 ], 806 ) { 807 Ok(_) => (), 808 Err(e) => tracing::error!(error = %e, "Error inserting user_track"), 809 } 810 } 811 812 tracing::info!(user_tracks = user_tracks.len(), "Loaded user_tracks"); 813 Ok(()) 814}