A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at feat/feed-generator 698 lines 26 kB view raw
1use std::env; 2 3mod repo; 4mod xata; 5 6use anyhow::{Context, Error}; 7use owo_colors::OwoColorize; 8use sqlx::{postgres::PgPoolOptions, PgPool}; 9 10const MAX_CONNECTIONS: u32 = 5; 11const BATCH_SIZE: usize = 1000; 12const BACKUP_URL: &str = "https://backup.rocksky.app/rocksky-backup.sql"; 13const BACKUP_PATH: &str = "/tmp/rocksky-backup.sql"; 14#[derive(Clone)] 15pub struct DatabasePools { 16 pub source: PgPool, 17 pub destination: PgPool, 18} 19 20pub async fn pull_data() -> Result<(), Error> { 21 if env::var("SOURCE_POSTGRES_URL").is_err() { 22 tracing::info!( 23 backup = %BACKUP_URL.magenta(), 24 "SOURCE_POSTGRES_URL not set, downloading backup from Rocksky" 25 ); 26 download_backup().await?; 27 return Ok(()); 28 } 29 30 let pools = setup_database_pools().await?; 31 32 // Sync core entities first 33 let album_sync = tokio::spawn({ 34 let pools = pools.clone(); 35 async move { sync_albums(&pools).await } 36 }); 37 38 let artist_sync = tokio::spawn({ 39 let pools = pools.clone(); 40 async move { sync_artists(&pools).await } 41 }); 42 43 let track_sync = tokio::spawn({ 44 let pools = pools.clone(); 45 async move { sync_tracks(&pools).await } 46 }); 47 48 let user_sync = tokio::spawn({ 49 let pools = pools.clone(); 50 async move { sync_users(&pools).await } 51 }); 52 53 let (album_sync, artist_sync, track_sync, user_sync) = 54 tokio::join!(album_sync, artist_sync, track_sync, user_sync); 55 56 album_sync.context("Album sync task failed")??; 57 artist_sync.context("Artist sync task failed")??; 58 track_sync.context("Track sync task failed")??; 59 user_sync.context("User sync task failed")??; 60 61 // Sync relationship entities 62 let playlist_sync = tokio::spawn({ 63 let pools = pools.clone(); 64 async move { sync_playlists(&pools).await } 65 }); 66 67 let loved_track_sync = tokio::spawn({ 68 let pools = pools.clone(); 69 async move { sync_loved_tracks(&pools).await } 70 }); 71 72 let scrobble_sync = tokio::spawn({ 73 let pools = pools.clone(); 74 async move { sync_scrobbles(&pools).await } 75 }); 76 77 let (loved_track_sync, playlist_sync, scrobble_sync) = 78 tokio::join!(loved_track_sync, playlist_sync, scrobble_sync); 79 loved_track_sync.context("Loved track sync task failed")??; 80 playlist_sync.context("Playlist sync task failed")??; 81 scrobble_sync.context("Scrobble sync task failed")??; 82 83 // Sync junction tables 84 let album_track_sync = tokio::spawn({ 85 let pools = pools.clone(); 86 async move { sync_album_tracks(&pools).await } 87 }); 88 89 let artist_album_sync = tokio::spawn({ 90 let pools = pools.clone(); 91 async move { sync_artist_albums(&pools).await } 92 }); 93 94 let artist_track_sync = tokio::spawn({ 95 let pools = pools.clone(); 96 async move { sync_artist_tracks(&pools).await } 97 }); 98 99 let playlist_track_sync = tokio::spawn({ 100 let pools = pools.clone(); 101 async move { sync_playlist_tracks(&pools).await } 102 }); 103 104 let user_album_sync = tokio::spawn({ 105 let pools = pools.clone(); 106 async move { sync_user_albums(&pools).await } 107 }); 108 109 let user_artist_sync = tokio::spawn({ 110 let pools = pools.clone(); 111 async move { sync_user_artists(&pools).await } 112 }); 113 114 let user_track_sync = tokio::spawn({ 115 let pools = pools.clone(); 116 async move { sync_user_tracks(&pools).await } 117 }); 118 119 let user_playlist_sync = tokio::spawn({ 120 let pools = pools.clone(); 121 async move { sync_user_playlists(&pools).await } 122 }); 123 124 let ( 125 album_track_sync, 126 artist_album_sync, 127 artist_track_sync, 128 playlist_track_sync, 129 user_album_sync, 130 user_artist_sync, 131 user_track_sync, 132 user_playlist_sync, 133 ) = tokio::join!( 134 album_track_sync, 135 artist_album_sync, 136 artist_track_sync, 137 playlist_track_sync, 138 user_album_sync, 139 user_artist_sync, 140 user_track_sync, 141 user_playlist_sync 142 ); 143 144 album_track_sync.context("Album track sync task failed")??; 145 artist_album_sync.context("Artist album sync task failed")??; 146 artist_track_sync.context("Artist track sync task failed")??; 147 playlist_track_sync.context("Playlist track sync task failed")??; 148 user_album_sync.context("User album sync task failed")??; 149 user_artist_sync.context("User artist sync task failed")??; 150 user_track_sync.context("User track sync task failed")??; 151 user_playlist_sync.context("User playlist sync task failed")??; 152 153 Ok(()) 154} 155 156async fn download_backup() -> Result<(), Error> { 157 let _ = tokio::fs::remove_file(BACKUP_PATH).await; 158 159 tokio::process::Command::new("curl") 160 .arg("-o") 161 .arg(BACKUP_PATH) 162 .arg(BACKUP_URL) 163 .stderr(std::process::Stdio::inherit()) 164 .stdout(std::process::Stdio::inherit()) 165 .spawn()? 166 .wait() 167 .await?; 168 169 tokio::process::Command::new("psql") 170 .arg(&env::var("XATA_POSTGRES_URL")?) 171 .arg("-f") 172 .arg(BACKUP_PATH) 173 .stderr(std::process::Stdio::inherit()) 174 .stdout(std::process::Stdio::inherit()) 175 .spawn()? 176 .wait() 177 .await?; 178 Ok(()) 179} 180 181async fn setup_database_pools() -> Result<DatabasePools, Error> { 182 let source = PgPoolOptions::new() 183 .max_connections(MAX_CONNECTIONS) 184 .connect(&env::var("SOURCE_POSTGRES_URL")?) 185 .await?; 186 187 let destination = PgPoolOptions::new() 188 .max_connections(MAX_CONNECTIONS) 189 .connect(&env::var("XATA_POSTGRES_URL")?) 190 .await?; 191 192 Ok(DatabasePools { 193 source, 194 destination, 195 }) 196} 197 198async fn sync_albums(pools: &DatabasePools) -> Result<(), Error> { 199 let total_albums: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM albums") 200 .fetch_one(&pools.source) 201 .await?; 202 let total_albums = total_albums.0; 203 tracing::info!(total = %total_albums.magenta(), "Total albums to sync"); 204 205 let start = 0; 206 let mut i = 1; 207 208 for offset in (start..total_albums).step_by(BATCH_SIZE) { 209 let albums = 210 repo::album::get_albums(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 211 tracing::info!( 212 offset = %offset.magenta(), 213 end = %((offset + albums.len() as i64).min(total_albums)).magenta(), 214 total = %total_albums.magenta(), 215 "Fetched albums" 216 ); 217 for album in &albums { 218 tracing::info!(title = %album.title.cyan(), i = %i.magenta(), total = %total_albums.magenta(), "Inserting album"); 219 repo::album::insert_album(&pools.destination, album).await?; 220 i += 1; 221 } 222 } 223 Ok(()) 224} 225 226async fn sync_artists(pools: &DatabasePools) -> Result<(), Error> { 227 let total_artists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM artists") 228 .fetch_one(&pools.source) 229 .await?; 230 let total_artists = total_artists.0; 231 tracing::info!(total = %total_artists.magenta(), "Total artists to sync"); 232 233 let start = 0; 234 let mut i = 1; 235 236 for offset in (start..total_artists).step_by(BATCH_SIZE) { 237 let artists = 238 repo::artist::get_artists(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 239 tracing::info!( 240 offset = %offset.magenta(), 241 end = %((offset + artists.len() as i64).min(total_artists)).magenta(), 242 total = %total_artists.magenta(), 243 "Fetched artists" 244 ); 245 for artist in &artists { 246 tracing::info!(name = %artist.name.cyan(), i = %i.magenta(), total = %total_artists.magenta(), "Inserting artist"); 247 repo::artist::insert_artist(&pools.destination, artist).await?; 248 i += 1; 249 } 250 } 251 Ok(()) 252} 253 254async fn sync_tracks(pools: &DatabasePools) -> Result<(), Error> { 255 let total_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM tracks") 256 .fetch_one(&pools.source) 257 .await?; 258 let total_tracks = total_tracks.0; 259 tracing::info!(total = %total_tracks.magenta(), "Total tracks to sync"); 260 261 let start = 0; 262 let mut i = 1; 263 264 for offset in (start..total_tracks).step_by(BATCH_SIZE) { 265 let tracks = 266 repo::track::get_tracks(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 267 tracing::info!( 268 offset = %offset.magenta(), 269 end = %((offset + tracks.len() as i64).min(total_tracks)).magenta(), 270 total = %total_tracks.magenta(), 271 "Fetched tracks" 272 ); 273 274 for track in &tracks { 275 tracing::info!(title = %track.title.cyan(), i = %i.magenta(), total = %total_tracks.magenta(), "Inserting track"); 276 match repo::track::insert_track(&pools.destination, track).await { 277 Ok(_) => {} 278 Err(e) => { 279 tracing::error!(error = %e, "Failed to insert track"); 280 } 281 } 282 i += 1; 283 } 284 } 285 Ok(()) 286} 287 288async fn sync_users(pools: &DatabasePools) -> Result<(), Error> { 289 let total_users: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users") 290 .fetch_one(&pools.source) 291 .await?; 292 let total_users = total_users.0; 293 tracing::info!(total = %total_users.magenta(), "Total users to sync"); 294 295 let start = 0; 296 let mut i = 1; 297 298 for offset in (start..total_users).step_by(BATCH_SIZE) { 299 let users = repo::user::get_users(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 300 tracing::info!( 301 offset = %offset.magenta(), 302 end = %((offset + users.len() as i64).min(total_users)).magenta(), 303 total = %total_users.magenta(), 304 "Fetched users" 305 ); 306 307 for user in &users { 308 tracing::info!(handle = %user.handle.cyan(), i = %i.magenta(), total = %total_users.magenta(), "Inserting user"); 309 match repo::user::insert_user(&pools.destination, user).await { 310 Ok(_) => {} 311 Err(e) => { 312 tracing::error!(error = %e, "Failed to insert user"); 313 } 314 } 315 i += 1; 316 } 317 } 318 Ok(()) 319} 320 321async fn sync_playlists(pools: &DatabasePools) -> Result<(), Error> { 322 let total_playlists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM playlists") 323 .fetch_one(&pools.source) 324 .await?; 325 let total_playlists = total_playlists.0; 326 tracing::info!(total = %total_playlists.magenta(), "Total playlists to sync"); 327 328 let start = 0; 329 let mut i = 1; 330 331 for offset in (start..total_playlists).step_by(BATCH_SIZE) { 332 let playlists = 333 repo::playlist::get_playlists(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 334 tracing::info!( 335 offset = %offset.magenta(), 336 end = %((offset + playlists.len() as i64).min(total_playlists)).magenta(), 337 total = %total_playlists.magenta(), 338 "Fetched playlists" 339 ); 340 341 for playlist in &playlists { 342 tracing::info!(name = %playlist.name.cyan(), i = %i.magenta(), total = %total_playlists.magenta(), "Inserting playlist"); 343 match repo::playlist::insert_playlist(&pools.destination, playlist).await { 344 Ok(_) => {} 345 Err(e) => { 346 tracing::error!(error = %e, "Failed to insert playlist"); 347 } 348 } 349 i += 1; 350 } 351 } 352 Ok(()) 353} 354 355async fn sync_loved_tracks(pools: &DatabasePools) -> Result<(), Error> { 356 let total_loved_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM loved_tracks") 357 .fetch_one(&pools.source) 358 .await?; 359 let total_loved_tracks = total_loved_tracks.0; 360 tracing::info!(total = %total_loved_tracks.magenta(), "Total loved tracks to sync"); 361 362 let start = 0; 363 let mut i = 1; 364 365 for offset in (start..total_loved_tracks).step_by(BATCH_SIZE) { 366 let loved_tracks = 367 repo::loved_track::get_loved_tracks(&pools.source, offset as i64, BATCH_SIZE as i64) 368 .await?; 369 tracing::info!( 370 offset = %offset.magenta(), 371 end = %((offset + loved_tracks.len() as i64).min(total_loved_tracks)).magenta(), 372 total = %total_loved_tracks.magenta(), 373 "Fetched loved tracks" 374 ); 375 376 for loved_track in &loved_tracks { 377 tracing::info!(user_id = %loved_track.user_id.cyan(), track_id = %loved_track.track_id.magenta(), i = %i.magenta(), total = %total_loved_tracks.magenta(), "Inserting loved track"); 378 match repo::loved_track::insert_loved_track(&pools.destination, loved_track).await { 379 Ok(_) => {} 380 Err(e) => { 381 tracing::error!(error = %e, "Failed to insert loved track"); 382 } 383 } 384 i += 1; 385 } 386 } 387 Ok(()) 388} 389 390async fn sync_scrobbles(pools: &DatabasePools) -> Result<(), Error> { 391 let total_scrobbles: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scrobbles") 392 .fetch_one(&pools.source) 393 .await?; 394 let total_scrobbles = total_scrobbles.0; 395 tracing::info!(total = %total_scrobbles.magenta(), "Total scrobbles to sync"); 396 397 let start = 0; 398 let mut i = 1; 399 400 for offset in (start..total_scrobbles).step_by(BATCH_SIZE) { 401 let scrobbles = 402 repo::scrobble::get_scrobbles(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 403 tracing::info!( 404 offset = %offset.magenta(), 405 end = %((offset + scrobbles.len() as i64).min(total_scrobbles)).magenta(), 406 total = %total_scrobbles.magenta(), 407 "Fetched scrobbles" 408 ); 409 410 for scrobble in &scrobbles { 411 tracing::info!(user_id = %scrobble.user_id.cyan(), track_id = %scrobble.track_id.magenta(), i = %i.magenta(), total = %total_scrobbles.magenta(), "Inserting scrobble"); 412 match repo::scrobble::insert_scrobble(&pools.destination, scrobble).await { 413 Ok(_) => {} 414 Err(e) => { 415 tracing::error!(error = %e, "Failed to insert scrobble"); 416 } 417 } 418 i += 1; 419 } 420 } 421 Ok(()) 422} 423 424async fn sync_album_tracks(pools: &DatabasePools) -> Result<(), Error> { 425 let total_album_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM album_tracks") 426 .fetch_one(&pools.source) 427 .await?; 428 let total_album_tracks = total_album_tracks.0; 429 tracing::info!(total = %total_album_tracks.magenta(), "Total album tracks to sync"); 430 431 let start = 0; 432 let mut i = 1; 433 434 for offset in (start..total_album_tracks).step_by(BATCH_SIZE) { 435 let album_tracks = 436 repo::album::get_album_tracks(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 437 tracing::info!( 438 offset = %offset.magenta(), 439 end = %((offset + album_tracks.len() as i64).min(total_album_tracks)).magenta(), 440 total = %total_album_tracks.magenta(), 441 "Fetched album tracks" 442 ); 443 444 for album_track in &album_tracks { 445 tracing::info!(album_id = %album_track.album_id.cyan(), track_id = %album_track.track_id.magenta(), i = %i.magenta(), total = %total_album_tracks.magenta(), "Inserting album track"); 446 match repo::album::insert_album_track(&pools.destination, album_track).await { 447 Ok(_) => {} 448 Err(e) => { 449 tracing::error!(error = %e, "Failed to insert album track"); 450 } 451 } 452 i += 1; 453 } 454 } 455 Ok(()) 456} 457 458async fn sync_artist_albums(pools: &DatabasePools) -> Result<(), Error> { 459 let total_artist_albums: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM artist_albums") 460 .fetch_one(&pools.source) 461 .await?; 462 let total_artist_albums = total_artist_albums.0; 463 tracing::info!(total = %total_artist_albums.magenta(), "Total artist albums to sync"); 464 465 let start = 0; 466 let mut i = 1; 467 468 for offset in (start..total_artist_albums).step_by(BATCH_SIZE) { 469 let artist_albums = 470 repo::artist::get_artist_albums(&pools.source, offset as i64, BATCH_SIZE as i64) 471 .await?; 472 tracing::info!( 473 offset = %offset.magenta(), 474 end = %((offset + artist_albums.len() as i64).min(total_artist_albums)).magenta(), 475 total = %total_artist_albums.magenta(), 476 "Fetched artist albums" 477 ); 478 479 for artist_album in &artist_albums { 480 tracing::info!(artist_id = %artist_album.artist_id.cyan(), album_id = %artist_album.album_id.magenta(), i = %i.magenta(), total = %total_artist_albums.magenta(), "Inserting artist album"); 481 match repo::artist::insert_artist_album(&pools.destination, artist_album).await { 482 Ok(_) => {} 483 Err(e) => { 484 tracing::error!(error = %e, "Failed to insert artist album"); 485 } 486 } 487 i += 1; 488 } 489 } 490 Ok(()) 491} 492 493async fn sync_artist_tracks(pools: &DatabasePools) -> Result<(), Error> { 494 let total_artist_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM artist_tracks") 495 .fetch_one(&pools.source) 496 .await?; 497 let total_artist_tracks = total_artist_tracks.0; 498 tracing::info!(total = %total_artist_tracks.magenta(), "Total artist tracks to sync"); 499 500 let start = 0; 501 let mut i = 1; 502 503 for offset in (start..total_artist_tracks).step_by(BATCH_SIZE) { 504 let artist_tracks = 505 repo::artist::get_artist_tracks(&pools.source, offset as i64, BATCH_SIZE as i64) 506 .await?; 507 tracing::info!( 508 offset = %offset.magenta(), 509 end = %((offset + artist_tracks.len() as i64).min(total_artist_tracks)).magenta(), 510 total = %total_artist_tracks.magenta(), 511 "Fetched artist tracks" 512 ); 513 514 for artist_track in &artist_tracks { 515 tracing::info!(artist_id = %artist_track.artist_id.cyan(), track_id = %artist_track.track_id.magenta(), i = %i.magenta(), total = %total_artist_tracks.magenta(), "Inserting artist track"); 516 match repo::artist::insert_artist_track(&pools.destination, artist_track).await { 517 Ok(_) => {} 518 Err(e) => { 519 tracing::error!(error = %e, "Failed to insert artist track"); 520 } 521 } 522 i += 1; 523 } 524 } 525 Ok(()) 526} 527 528async fn sync_playlist_tracks(pools: &DatabasePools) -> Result<(), Error> { 529 let total_playlist_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM playlist_tracks") 530 .fetch_one(&pools.source) 531 .await?; 532 let total_playlist_tracks = total_playlist_tracks.0; 533 tracing::info!(total = %total_playlist_tracks.magenta(), "Total playlist tracks to sync"); 534 535 let start = 0; 536 let mut i = 1; 537 538 for offset in (start..total_playlist_tracks).step_by(BATCH_SIZE) { 539 let playlist_tracks = 540 repo::playlist::get_playlist_tracks(&pools.source, offset as i64, BATCH_SIZE as i64) 541 .await?; 542 tracing::info!( 543 offset = %offset.magenta(), 544 end = %((offset + playlist_tracks.len() as i64).min(total_playlist_tracks)).magenta(), 545 total = %total_playlist_tracks.magenta(), 546 "Fetched playlist tracks" 547 ); 548 549 for playlist_track in &playlist_tracks { 550 tracing::info!(playlist_id = %playlist_track.playlist_id.cyan(), track_id = %playlist_track.track_id.magenta(), i = %i.magenta(), total = %total_playlist_tracks.magenta(), "Inserting playlist track"); 551 match repo::playlist::insert_playlist_track(&pools.destination, playlist_track).await { 552 Ok(_) => {} 553 Err(e) => { 554 tracing::error!(error = %e, "Failed to insert playlist track"); 555 } 556 } 557 i += 1; 558 } 559 } 560 Ok(()) 561} 562 563async fn sync_user_albums(pools: &DatabasePools) -> Result<(), Error> { 564 let total_user_albums: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_albums") 565 .fetch_one(&pools.source) 566 .await?; 567 let total_user_albums = total_user_albums.0; 568 tracing::info!(total = %total_user_albums.magenta(), "Total user albums to sync"); 569 570 let start = 0; 571 let mut i = 1; 572 573 for offset in (start..total_user_albums).step_by(BATCH_SIZE) { 574 let user_albums = 575 repo::album::get_user_albums(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 576 tracing::info!( 577 offset = %offset.magenta(), 578 end = %((offset + user_albums.len() as i64).min(total_user_albums)).magenta(), 579 total = %total_user_albums.magenta(), 580 "Fetched user albums" 581 ); 582 583 for user_album in &user_albums { 584 tracing::info!(user_id = %user_album.user_id.cyan(), album_id = %user_album.album_id.magenta(), i = %i.magenta(), total = %total_user_albums.magenta(), "Inserting user album"); 585 match repo::album::insert_user_album(&pools.destination, user_album).await { 586 Ok(_) => {} 587 Err(e) => { 588 tracing::error!(error = %e, "Failed to insert user album"); 589 } 590 } 591 i += 1; 592 } 593 } 594 Ok(()) 595} 596 597async fn sync_user_artists(pools: &DatabasePools) -> Result<(), Error> { 598 let total_user_artists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_artists") 599 .fetch_one(&pools.source) 600 .await?; 601 let total_user_artists = total_user_artists.0; 602 tracing::info!(total = %total_user_artists.magenta(), "Total user artists to sync"); 603 604 let start = 0; 605 let mut i = 1; 606 607 for offset in (start..total_user_artists).step_by(BATCH_SIZE) { 608 let user_artists = 609 repo::artist::get_user_artists(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 610 tracing::info!( 611 offset = %offset.magenta(), 612 end = %((offset + user_artists.len() as i64).min(total_user_artists)).magenta(), 613 total = %total_user_artists.magenta(), 614 "Fetched user artists" 615 ); 616 617 for user_artist in &user_artists { 618 tracing::info!(user_id = %user_artist.user_id.cyan(), artist_id = %user_artist.artist_id.magenta(), i = %i.magenta(), total = %total_user_artists.magenta(), "Inserting user artist"); 619 match repo::artist::insert_user_artist(&pools.destination, user_artist).await { 620 Ok(_) => {} 621 Err(e) => { 622 tracing::error!(error = %e, "Failed to insert user artist"); 623 } 624 } 625 i += 1; 626 } 627 } 628 Ok(()) 629} 630 631async fn sync_user_tracks(pools: &DatabasePools) -> Result<(), Error> { 632 let total_user_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_tracks") 633 .fetch_one(&pools.source) 634 .await?; 635 let total_user_tracks = total_user_tracks.0; 636 tracing::info!(total = %total_user_tracks.magenta(), "Total user tracks to sync"); 637 638 let start = 0; 639 let mut i = 1; 640 641 for offset in (start..total_user_tracks).step_by(BATCH_SIZE) { 642 let user_tracks = 643 repo::track::get_user_tracks(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 644 tracing::info!( 645 offset = %offset.magenta(), 646 end = %((offset + user_tracks.len() as i64).min(total_user_tracks)).magenta(), 647 total = %total_user_tracks.magenta(), 648 "Fetched user tracks" 649 ); 650 651 for user_track in &user_tracks { 652 tracing::info!(user_id = %user_track.user_id.cyan(), track_id = %user_track.track_id.magenta(), i = %i.magenta(), total = %total_user_tracks.magenta(), "Inserting user track"); 653 match repo::track::insert_user_track(&pools.destination, user_track).await { 654 Ok(_) => {} 655 Err(e) => { 656 tracing::error!(error = %e, "Failed to insert user track"); 657 } 658 } 659 i += 1; 660 } 661 } 662 Ok(()) 663} 664 665async fn sync_user_playlists(pools: &DatabasePools) -> Result<(), Error> { 666 let total_user_playlists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_playlists") 667 .fetch_one(&pools.source) 668 .await?; 669 let total_user_playlists = total_user_playlists.0; 670 tracing::info!(total = %total_user_playlists.magenta(), "Total user playlists to sync"); 671 672 let start = 0; 673 let mut i = 1; 674 675 for offset in (start..total_user_playlists).step_by(BATCH_SIZE) { 676 let user_playlists = 677 repo::playlist::get_user_playlists(&pools.source, offset as i64, BATCH_SIZE as i64) 678 .await?; 679 tracing::info!( 680 offset = %offset.magenta(), 681 end = %((offset + user_playlists.len() as i64).min(total_user_playlists)).magenta(), 682 total = %total_user_playlists.magenta(), 683 "Fetched user playlists" 684 ); 685 686 for user_playlist in &user_playlists { 687 tracing::info!(user_id = %user_playlist.user_id.cyan(), playlist_id = %user_playlist.playlist_id.magenta(), i = %i.magenta(), total = %total_user_playlists.magenta(), "Inserting user playlist"); 688 match repo::playlist::insert_user_playlist(&pools.destination, user_playlist).await { 689 Ok(_) => {} 690 Err(e) => { 691 tracing::error!(error = %e, "Failed to insert user playlist"); 692 } 693 } 694 i += 1; 695 } 696 } 697 Ok(()) 698}