A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz

feat: enhance pull_data function to include backup download and sync all entities

+160 -132
+160 -132
crates/pgpull/src/lib.rs
··· 9 9 10 10 const MAX_CONNECTIONS: u32 = 5; 11 11 const BATCH_SIZE: usize = 1000; 12 - 12 + const BACKUP_URL: &str = "https://backup.rocksky.app/rocksky-backup.sql"; 13 + const BACKUP_PATH: &str = "/tmp/rocksky-backup.sql"; 13 14 #[derive(Clone)] 14 15 pub struct DatabasePools { 15 16 pub source: PgPool, 16 17 pub destination: PgPool, 17 18 } 18 19 19 - async fn setup_database_pools() -> Result<DatabasePools, Error> { 20 + pub async fn pull_data() -> Result<(), Error> { 20 21 if env::var("SOURCE_POSTGRES_URL").is_err() { 21 - tracing::error!( 22 - "SOURCE_POSTGRES_URL is not set. Please set it to your PostgreSQL connection string." 22 + tracing::info!( 23 + backup = %BACKUP_URL.magenta(), 24 + "SOURCE_POSTGRES_URL not set, downloading backup from Rocksky" 23 25 ); 24 - std::process::exit(1); 26 + download_backup().await?; 27 + return Ok(()); 25 28 } 26 29 30 + let pools = setup_database_pools().await?; 31 + 32 + // Sync core entities first 33 + let album_sync = tokio::spawn({ 34 + let pools = pools.clone(); 35 + async move { sync_albums(&pools).await } 36 + }); 37 + 38 + let artist_sync = tokio::spawn({ 39 + let pools = pools.clone(); 40 + async move { sync_artists(&pools).await } 41 + }); 42 + 43 + let track_sync = tokio::spawn({ 44 + let pools = pools.clone(); 45 + async move { sync_tracks(&pools).await } 46 + }); 47 + 48 + let user_sync = tokio::spawn({ 49 + let pools = pools.clone(); 50 + async move { sync_users(&pools).await } 51 + }); 52 + 53 + let (album_sync, artist_sync, track_sync, user_sync) = 54 + tokio::join!(album_sync, artist_sync, track_sync, user_sync); 55 + 56 + album_sync.context("Album sync task failed")??; 57 + artist_sync.context("Artist sync task failed")??; 58 + track_sync.context("Track sync task failed")??; 59 + user_sync.context("User sync task failed")??; 60 + 61 + // Sync relationship entities 62 + let playlist_sync = tokio::spawn({ 63 + let pools = pools.clone(); 64 + async move { sync_playlists(&pools).await } 65 + }); 66 + 67 + let loved_track_sync = tokio::spawn({ 68 + let pools = pools.clone(); 69 + async move { sync_loved_tracks(&pools).await } 70 + }); 71 + 72 + let scrobble_sync = tokio::spawn({ 73 + let pools = pools.clone(); 74 + async move { sync_scrobbles(&pools).await } 75 + }); 76 + 77 + let (loved_track_sync, playlist_sync, scrobble_sync) = 78 + tokio::join!(loved_track_sync, playlist_sync, scrobble_sync); 79 + loved_track_sync.context("Loved track sync task failed")??; 80 + playlist_sync.context("Playlist sync task failed")??; 81 + scrobble_sync.context("Scrobble sync task failed")??; 82 + 83 + // Sync junction tables 84 + let album_track_sync = tokio::spawn({ 85 + let pools = pools.clone(); 86 + async move { sync_album_tracks(&pools).await } 87 + }); 88 + 89 + let artist_album_sync = tokio::spawn({ 90 + let pools = pools.clone(); 91 + async move { sync_artist_albums(&pools).await } 92 + }); 93 + 94 + let artist_track_sync = tokio::spawn({ 95 + let pools = pools.clone(); 96 + async move { sync_artist_tracks(&pools).await } 97 + }); 98 + 99 + let playlist_track_sync = tokio::spawn({ 100 + let pools = pools.clone(); 101 + async move { sync_playlist_tracks(&pools).await } 102 + }); 103 + 104 + let user_album_sync = tokio::spawn({ 105 + let pools = pools.clone(); 106 + async move { sync_user_albums(&pools).await } 107 + }); 108 + 109 + let user_artist_sync = tokio::spawn({ 110 + let pools = pools.clone(); 111 + async move { sync_user_artists(&pools).await } 112 + }); 113 + 114 + let user_track_sync = tokio::spawn({ 115 + let pools = pools.clone(); 116 + async move { sync_user_tracks(&pools).await } 117 + }); 118 + 119 + let user_playlist_sync = tokio::spawn({ 120 + let pools = pools.clone(); 121 + async move { sync_user_playlists(&pools).await } 122 + }); 123 + 124 + let ( 125 + album_track_sync, 126 + artist_album_sync, 127 + artist_track_sync, 128 + playlist_track_sync, 129 + user_album_sync, 130 + user_artist_sync, 131 + user_track_sync, 132 + user_playlist_sync, 133 + ) = tokio::join!( 134 + album_track_sync, 135 + artist_album_sync, 136 + artist_track_sync, 137 + playlist_track_sync, 138 + user_album_sync, 139 + user_artist_sync, 140 + user_track_sync, 141 + user_playlist_sync 142 + ); 143 + 144 + album_track_sync.context("Album track sync task failed")??; 145 + artist_album_sync.context("Artist album sync task failed")??; 146 + artist_track_sync.context("Artist track sync task failed")??; 147 + playlist_track_sync.context("Playlist track sync task failed")??; 148 + user_album_sync.context("User album sync task failed")??; 149 + user_artist_sync.context("User artist sync task failed")??; 150 + user_track_sync.context("User track sync task failed")??; 151 + user_playlist_sync.context("User playlist sync task failed")??; 152 + 153 + Ok(()) 154 + } 155 + 156 + async fn download_backup() -> Result<(), Error> { 157 + let _ = tokio::fs::remove_file(BACKUP_PATH).await; 158 + 159 + tokio::process::Command::new("curl") 160 + .arg("-o") 161 + .arg(BACKUP_PATH) 162 + .arg(BACKUP_URL) 163 + .stderr(std::process::Stdio::inherit()) 164 + .stdout(std::process::Stdio::inherit()) 165 + .spawn()? 166 + .wait() 167 + .await?; 168 + 169 + tokio::process::Command::new("psql") 170 + .arg(&env::var("XATA_POSTGRES_URL")?) 171 + .arg("-f") 172 + .arg(BACKUP_PATH) 173 + .stderr(std::process::Stdio::inherit()) 174 + .stdout(std::process::Stdio::inherit()) 175 + .spawn()? 176 + .wait() 177 + .await?; 178 + Ok(()) 179 + } 180 + 181 + async fn setup_database_pools() -> Result<DatabasePools, Error> { 27 182 let source = PgPoolOptions::new() 28 183 .max_connections(MAX_CONNECTIONS) 29 184 .connect(&env::var("SOURCE_POSTGRES_URL")?) ··· 541 696 } 542 697 Ok(()) 543 698 } 544 - 545 - pub async fn pull_data() -> Result<(), Error> { 546 - let pools = setup_database_pools().await?; 547 - 548 - // Sync core entities first 549 - let album_sync = tokio::spawn({ 550 - let pools = pools.clone(); 551 - async move { sync_albums(&pools).await } 552 - }); 553 - 554 - let artist_sync = tokio::spawn({ 555 - let pools = pools.clone(); 556 - async move { sync_artists(&pools).await } 557 - }); 558 - 559 - let track_sync = tokio::spawn({ 560 - let pools = pools.clone(); 561 - async move { sync_tracks(&pools).await } 562 - }); 563 - 564 - let user_sync = tokio::spawn({ 565 - let pools = pools.clone(); 566 - async move { sync_users(&pools).await } 567 - }); 568 - 569 - let (album_sync, artist_sync, track_sync, user_sync) = 570 - tokio::join!(album_sync, artist_sync, track_sync, user_sync); 571 - 572 - album_sync.context("Album sync task failed")??; 573 - artist_sync.context("Artist sync task failed")??; 574 - track_sync.context("Track sync task failed")??; 575 - user_sync.context("User sync task failed")??; 576 - 577 - // Sync relationship entities 578 - let playlist_sync = tokio::spawn({ 579 - let pools = pools.clone(); 580 - async move { sync_playlists(&pools).await } 581 - }); 582 - 583 - let loved_track_sync = tokio::spawn({ 584 - let pools = pools.clone(); 585 - async move { sync_loved_tracks(&pools).await } 586 - }); 587 - 588 - let scrobble_sync = tokio::spawn({ 589 - let pools = pools.clone(); 590 - async move { sync_scrobbles(&pools).await } 591 - }); 592 - 593 - let (loved_track_sync, playlist_sync, scrobble_sync) = 594 - tokio::join!(loved_track_sync, playlist_sync, scrobble_sync); 595 - loved_track_sync.context("Loved track sync task failed")??; 596 - playlist_sync.context("Playlist sync task failed")??; 597 - scrobble_sync.context("Scrobble sync task failed")??; 598 - 599 - // Sync junction tables 600 - let album_track_sync = tokio::spawn({ 601 - let pools = pools.clone(); 602 - async move { sync_album_tracks(&pools).await } 603 - }); 604 - 605 - let artist_album_sync = tokio::spawn({ 606 - let pools = pools.clone(); 607 - async move { sync_artist_albums(&pools).await } 608 - }); 609 - 610 - let artist_track_sync = tokio::spawn({ 611 - let pools = pools.clone(); 612 - async move { sync_artist_tracks(&pools).await } 613 - }); 614 - 615 - let playlist_track_sync = tokio::spawn({ 616 - let pools = pools.clone(); 617 - async move { sync_playlist_tracks(&pools).await } 618 - }); 619 - 620 - let user_album_sync = tokio::spawn({ 621 - let pools = pools.clone(); 622 - async move { sync_user_albums(&pools).await } 623 - }); 624 - 625 - let user_artist_sync = tokio::spawn({ 626 - let pools = pools.clone(); 627 - async move { sync_user_artists(&pools).await } 628 - }); 629 - 630 - let user_track_sync = tokio::spawn({ 631 - let pools = pools.clone(); 632 - async move { sync_user_tracks(&pools).await } 633 - }); 634 - 635 - let user_playlist_sync = tokio::spawn({ 636 - let pools = pools.clone(); 637 - async move { sync_user_playlists(&pools).await } 638 - }); 639 - 640 - let ( 641 - album_track_sync, 642 - artist_album_sync, 643 - artist_track_sync, 644 - playlist_track_sync, 645 - user_album_sync, 646 - user_artist_sync, 647 - user_track_sync, 648 - user_playlist_sync, 649 - ) = tokio::join!( 650 - album_track_sync, 651 - artist_album_sync, 652 - artist_track_sync, 653 - playlist_track_sync, 654 - user_album_sync, 655 - user_artist_sync, 656 - user_track_sync, 657 - user_playlist_sync 658 - ); 659 - 660 - album_track_sync.context("Album track sync task failed")??; 661 - artist_album_sync.context("Artist album sync task failed")??; 662 - artist_track_sync.context("Artist track sync task failed")??; 663 - playlist_track_sync.context("Playlist track sync task failed")??; 664 - user_album_sync.context("User album sync task failed")??; 665 - user_artist_sync.context("User artist sync task failed")??; 666 - user_track_sync.context("User track sync task failed")??; 667 - user_playlist_sync.context("User playlist sync task failed")??; 668 - 669 - Ok(()) 670 - }