A decentralized music tracking and discovery platform built on AT Protocol 馃幍
rocksky.app
spotify
atproto
lastfm
musicbrainz
scrobbling
listenbrainz
1use std::env;
2
3mod repo;
4mod xata;
5
6use anyhow::{Context, Error};
7use owo_colors::OwoColorize;
8use sqlx::{postgres::PgPoolOptions, PgPool};
9
10const MAX_CONNECTIONS: u32 = 5;
11const BATCH_SIZE: usize = 1000;
12const BACKUP_URL: &str = "https://backup.rocksky.app/rocksky-backup.sql";
13const BACKUP_PATH: &str = "/tmp/rocksky-backup.sql";
14#[derive(Clone)]
15pub struct DatabasePools {
16 pub source: PgPool,
17 pub destination: PgPool,
18}
19
20pub async fn pull_data() -> Result<(), Error> {
21 if env::var("SOURCE_POSTGRES_URL").is_err() {
22 tracing::info!(
23 backup = %BACKUP_URL.magenta(),
24 "SOURCE_POSTGRES_URL not set, downloading backup from Rocksky"
25 );
26 download_backup().await?;
27 return Ok(());
28 }
29
30 let pools = setup_database_pools().await?;
31
32 // Sync core entities first
33 let album_sync = tokio::spawn({
34 let pools = pools.clone();
35 async move { sync_albums(&pools).await }
36 });
37
38 let artist_sync = tokio::spawn({
39 let pools = pools.clone();
40 async move { sync_artists(&pools).await }
41 });
42
43 let track_sync = tokio::spawn({
44 let pools = pools.clone();
45 async move { sync_tracks(&pools).await }
46 });
47
48 let user_sync = tokio::spawn({
49 let pools = pools.clone();
50 async move { sync_users(&pools).await }
51 });
52
53 let (album_sync, artist_sync, track_sync, user_sync) =
54 tokio::join!(album_sync, artist_sync, track_sync, user_sync);
55
56 album_sync.context("Album sync task failed")??;
57 artist_sync.context("Artist sync task failed")??;
58 track_sync.context("Track sync task failed")??;
59 user_sync.context("User sync task failed")??;
60
61 // Sync relationship entities
62 let playlist_sync = tokio::spawn({
63 let pools = pools.clone();
64 async move { sync_playlists(&pools).await }
65 });
66
67 let loved_track_sync = tokio::spawn({
68 let pools = pools.clone();
69 async move { sync_loved_tracks(&pools).await }
70 });
71
72 let scrobble_sync = tokio::spawn({
73 let pools = pools.clone();
74 async move { sync_scrobbles(&pools).await }
75 });
76
77 let (loved_track_sync, playlist_sync, scrobble_sync) =
78 tokio::join!(loved_track_sync, playlist_sync, scrobble_sync);
79 loved_track_sync.context("Loved track sync task failed")??;
80 playlist_sync.context("Playlist sync task failed")??;
81 scrobble_sync.context("Scrobble sync task failed")??;
82
83 // Sync junction tables
84 let album_track_sync = tokio::spawn({
85 let pools = pools.clone();
86 async move { sync_album_tracks(&pools).await }
87 });
88
89 let artist_album_sync = tokio::spawn({
90 let pools = pools.clone();
91 async move { sync_artist_albums(&pools).await }
92 });
93
94 let artist_track_sync = tokio::spawn({
95 let pools = pools.clone();
96 async move { sync_artist_tracks(&pools).await }
97 });
98
99 let playlist_track_sync = tokio::spawn({
100 let pools = pools.clone();
101 async move { sync_playlist_tracks(&pools).await }
102 });
103
104 let user_album_sync = tokio::spawn({
105 let pools = pools.clone();
106 async move { sync_user_albums(&pools).await }
107 });
108
109 let user_artist_sync = tokio::spawn({
110 let pools = pools.clone();
111 async move { sync_user_artists(&pools).await }
112 });
113
114 let user_track_sync = tokio::spawn({
115 let pools = pools.clone();
116 async move { sync_user_tracks(&pools).await }
117 });
118
119 let user_playlist_sync = tokio::spawn({
120 let pools = pools.clone();
121 async move { sync_user_playlists(&pools).await }
122 });
123
124 let (
125 album_track_sync,
126 artist_album_sync,
127 artist_track_sync,
128 playlist_track_sync,
129 user_album_sync,
130 user_artist_sync,
131 user_track_sync,
132 user_playlist_sync,
133 ) = tokio::join!(
134 album_track_sync,
135 artist_album_sync,
136 artist_track_sync,
137 playlist_track_sync,
138 user_album_sync,
139 user_artist_sync,
140 user_track_sync,
141 user_playlist_sync
142 );
143
144 album_track_sync.context("Album track sync task failed")??;
145 artist_album_sync.context("Artist album sync task failed")??;
146 artist_track_sync.context("Artist track sync task failed")??;
147 playlist_track_sync.context("Playlist track sync task failed")??;
148 user_album_sync.context("User album sync task failed")??;
149 user_artist_sync.context("User artist sync task failed")??;
150 user_track_sync.context("User track sync task failed")??;
151 user_playlist_sync.context("User playlist sync task failed")??;
152
153 Ok(())
154}
155
156async fn download_backup() -> Result<(), Error> {
157 let _ = tokio::fs::remove_file(BACKUP_PATH).await;
158
159 tokio::process::Command::new("curl")
160 .arg("-o")
161 .arg(BACKUP_PATH)
162 .arg(BACKUP_URL)
163 .stderr(std::process::Stdio::inherit())
164 .stdout(std::process::Stdio::inherit())
165 .spawn()?
166 .wait()
167 .await?;
168
169 tokio::process::Command::new("psql")
170 .arg(&env::var("XATA_POSTGRES_URL")?)
171 .arg("-f")
172 .arg(BACKUP_PATH)
173 .stderr(std::process::Stdio::inherit())
174 .stdout(std::process::Stdio::inherit())
175 .spawn()?
176 .wait()
177 .await?;
178 Ok(())
179}
180
181async fn setup_database_pools() -> Result<DatabasePools, Error> {
182 let source = PgPoolOptions::new()
183 .max_connections(MAX_CONNECTIONS)
184 .connect(&env::var("SOURCE_POSTGRES_URL")?)
185 .await?;
186
187 let destination = PgPoolOptions::new()
188 .max_connections(MAX_CONNECTIONS)
189 .connect(&env::var("XATA_POSTGRES_URL")?)
190 .await?;
191
192 Ok(DatabasePools {
193 source,
194 destination,
195 })
196}
197
198async fn sync_albums(pools: &DatabasePools) -> Result<(), Error> {
199 let total_albums: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM albums")
200 .fetch_one(&pools.source)
201 .await?;
202 let total_albums = total_albums.0;
203 tracing::info!(total = %total_albums.magenta(), "Total albums to sync");
204
205 let start = 0;
206 let mut i = 1;
207
208 for offset in (start..total_albums).step_by(BATCH_SIZE) {
209 let albums =
210 repo::album::get_albums(&pools.source, offset as i64, BATCH_SIZE as i64).await?;
211 tracing::info!(
212 offset = %offset.magenta(),
213 end = %((offset + albums.len() as i64).min(total_albums)).magenta(),
214 total = %total_albums.magenta(),
215 "Fetched albums"
216 );
217 for album in &albums {
218 tracing::info!(title = %album.title.cyan(), i = %i.magenta(), total = %total_albums.magenta(), "Inserting album");
219 repo::album::insert_album(&pools.destination, album).await?;
220 i += 1;
221 }
222 }
223 Ok(())
224}
225
226async fn sync_artists(pools: &DatabasePools) -> Result<(), Error> {
227 let total_artists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM artists")
228 .fetch_one(&pools.source)
229 .await?;
230 let total_artists = total_artists.0;
231 tracing::info!(total = %total_artists.magenta(), "Total artists to sync");
232
233 let start = 0;
234 let mut i = 1;
235
236 for offset in (start..total_artists).step_by(BATCH_SIZE) {
237 let artists =
238 repo::artist::get_artists(&pools.source, offset as i64, BATCH_SIZE as i64).await?;
239 tracing::info!(
240 offset = %offset.magenta(),
241 end = %((offset + artists.len() as i64).min(total_artists)).magenta(),
242 total = %total_artists.magenta(),
243 "Fetched artists"
244 );
245 for artist in &artists {
246 tracing::info!(name = %artist.name.cyan(), i = %i.magenta(), total = %total_artists.magenta(), "Inserting artist");
247 repo::artist::insert_artist(&pools.destination, artist).await?;
248 i += 1;
249 }
250 }
251 Ok(())
252}
253
254async fn sync_tracks(pools: &DatabasePools) -> Result<(), Error> {
255 let total_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM tracks")
256 .fetch_one(&pools.source)
257 .await?;
258 let total_tracks = total_tracks.0;
259 tracing::info!(total = %total_tracks.magenta(), "Total tracks to sync");
260
261 let start = 0;
262 let mut i = 1;
263
264 for offset in (start..total_tracks).step_by(BATCH_SIZE) {
265 let tracks =
266 repo::track::get_tracks(&pools.source, offset as i64, BATCH_SIZE as i64).await?;
267 tracing::info!(
268 offset = %offset.magenta(),
269 end = %((offset + tracks.len() as i64).min(total_tracks)).magenta(),
270 total = %total_tracks.magenta(),
271 "Fetched tracks"
272 );
273
274 for track in &tracks {
275 tracing::info!(title = %track.title.cyan(), i = %i.magenta(), total = %total_tracks.magenta(), "Inserting track");
276 match repo::track::insert_track(&pools.destination, track).await {
277 Ok(_) => {}
278 Err(e) => {
279 tracing::error!(error = %e, "Failed to insert track");
280 }
281 }
282 i += 1;
283 }
284 }
285 Ok(())
286}
287
288async fn sync_users(pools: &DatabasePools) -> Result<(), Error> {
289 let total_users: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users")
290 .fetch_one(&pools.source)
291 .await?;
292 let total_users = total_users.0;
293 tracing::info!(total = %total_users.magenta(), "Total users to sync");
294
295 let start = 0;
296 let mut i = 1;
297
298 for offset in (start..total_users).step_by(BATCH_SIZE) {
299 let users = repo::user::get_users(&pools.source, offset as i64, BATCH_SIZE as i64).await?;
300 tracing::info!(
301 offset = %offset.magenta(),
302 end = %((offset + users.len() as i64).min(total_users)).magenta(),
303 total = %total_users.magenta(),
304 "Fetched users"
305 );
306
307 for user in &users {
308 tracing::info!(handle = %user.handle.cyan(), i = %i.magenta(), total = %total_users.magenta(), "Inserting user");
309 match repo::user::insert_user(&pools.destination, user).await {
310 Ok(_) => {}
311 Err(e) => {
312 tracing::error!(error = %e, "Failed to insert user");
313 }
314 }
315 i += 1;
316 }
317 }
318 Ok(())
319}
320
321async fn sync_playlists(pools: &DatabasePools) -> Result<(), Error> {
322 let total_playlists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM playlists")
323 .fetch_one(&pools.source)
324 .await?;
325 let total_playlists = total_playlists.0;
326 tracing::info!(total = %total_playlists.magenta(), "Total playlists to sync");
327
328 let start = 0;
329 let mut i = 1;
330
331 for offset in (start..total_playlists).step_by(BATCH_SIZE) {
332 let playlists =
333 repo::playlist::get_playlists(&pools.source, offset as i64, BATCH_SIZE as i64).await?;
334 tracing::info!(
335 offset = %offset.magenta(),
336 end = %((offset + playlists.len() as i64).min(total_playlists)).magenta(),
337 total = %total_playlists.magenta(),
338 "Fetched playlists"
339 );
340
341 for playlist in &playlists {
342 tracing::info!(name = %playlist.name.cyan(), i = %i.magenta(), total = %total_playlists.magenta(), "Inserting playlist");
343 match repo::playlist::insert_playlist(&pools.destination, playlist).await {
344 Ok(_) => {}
345 Err(e) => {
346 tracing::error!(error = %e, "Failed to insert playlist");
347 }
348 }
349 i += 1;
350 }
351 }
352 Ok(())
353}
354
355async fn sync_loved_tracks(pools: &DatabasePools) -> Result<(), Error> {
356 let total_loved_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM loved_tracks")
357 .fetch_one(&pools.source)
358 .await?;
359 let total_loved_tracks = total_loved_tracks.0;
360 tracing::info!(total = %total_loved_tracks.magenta(), "Total loved tracks to sync");
361
362 let start = 0;
363 let mut i = 1;
364
365 for offset in (start..total_loved_tracks).step_by(BATCH_SIZE) {
366 let loved_tracks =
367 repo::loved_track::get_loved_tracks(&pools.source, offset as i64, BATCH_SIZE as i64)
368 .await?;
369 tracing::info!(
370 offset = %offset.magenta(),
371 end = %((offset + loved_tracks.len() as i64).min(total_loved_tracks)).magenta(),
372 total = %total_loved_tracks.magenta(),
373 "Fetched loved tracks"
374 );
375
376 for loved_track in &loved_tracks {
377 tracing::info!(user_id = %loved_track.user_id.cyan(), track_id = %loved_track.track_id.magenta(), i = %i.magenta(), total = %total_loved_tracks.magenta(), "Inserting loved track");
378 match repo::loved_track::insert_loved_track(&pools.destination, loved_track).await {
379 Ok(_) => {}
380 Err(e) => {
381 tracing::error!(error = %e, "Failed to insert loved track");
382 }
383 }
384 i += 1;
385 }
386 }
387 Ok(())
388}
389
390async fn sync_scrobbles(pools: &DatabasePools) -> Result<(), Error> {
391 let total_scrobbles: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scrobbles")
392 .fetch_one(&pools.source)
393 .await?;
394 let total_scrobbles = total_scrobbles.0;
395 tracing::info!(total = %total_scrobbles.magenta(), "Total scrobbles to sync");
396
397 let start = 0;
398 let mut i = 1;
399
400 for offset in (start..total_scrobbles).step_by(BATCH_SIZE) {
401 let scrobbles =
402 repo::scrobble::get_scrobbles(&pools.source, offset as i64, BATCH_SIZE as i64).await?;
403 tracing::info!(
404 offset = %offset.magenta(),
405 end = %((offset + scrobbles.len() as i64).min(total_scrobbles)).magenta(),
406 total = %total_scrobbles.magenta(),
407 "Fetched scrobbles"
408 );
409
410 for scrobble in &scrobbles {
411 tracing::info!(user_id = %scrobble.user_id.cyan(), track_id = %scrobble.track_id.magenta(), i = %i.magenta(), total = %total_scrobbles.magenta(), "Inserting scrobble");
412 match repo::scrobble::insert_scrobble(&pools.destination, scrobble).await {
413 Ok(_) => {}
414 Err(e) => {
415 tracing::error!(error = %e, "Failed to insert scrobble");
416 }
417 }
418 i += 1;
419 }
420 }
421 Ok(())
422}
423
424async fn sync_album_tracks(pools: &DatabasePools) -> Result<(), Error> {
425 let total_album_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM album_tracks")
426 .fetch_one(&pools.source)
427 .await?;
428 let total_album_tracks = total_album_tracks.0;
429 tracing::info!(total = %total_album_tracks.magenta(), "Total album tracks to sync");
430
431 let start = 0;
432 let mut i = 1;
433
434 for offset in (start..total_album_tracks).step_by(BATCH_SIZE) {
435 let album_tracks =
436 repo::album::get_album_tracks(&pools.source, offset as i64, BATCH_SIZE as i64).await?;
437 tracing::info!(
438 offset = %offset.magenta(),
439 end = %((offset + album_tracks.len() as i64).min(total_album_tracks)).magenta(),
440 total = %total_album_tracks.magenta(),
441 "Fetched album tracks"
442 );
443
444 for album_track in &album_tracks {
445 tracing::info!(album_id = %album_track.album_id.cyan(), track_id = %album_track.track_id.magenta(), i = %i.magenta(), total = %total_album_tracks.magenta(), "Inserting album track");
446 match repo::album::insert_album_track(&pools.destination, album_track).await {
447 Ok(_) => {}
448 Err(e) => {
449 tracing::error!(error = %e, "Failed to insert album track");
450 }
451 }
452 i += 1;
453 }
454 }
455 Ok(())
456}
457
458async fn sync_artist_albums(pools: &DatabasePools) -> Result<(), Error> {
459 let total_artist_albums: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM artist_albums")
460 .fetch_one(&pools.source)
461 .await?;
462 let total_artist_albums = total_artist_albums.0;
463 tracing::info!(total = %total_artist_albums.magenta(), "Total artist albums to sync");
464
465 let start = 0;
466 let mut i = 1;
467
468 for offset in (start..total_artist_albums).step_by(BATCH_SIZE) {
469 let artist_albums =
470 repo::artist::get_artist_albums(&pools.source, offset as i64, BATCH_SIZE as i64)
471 .await?;
472 tracing::info!(
473 offset = %offset.magenta(),
474 end = %((offset + artist_albums.len() as i64).min(total_artist_albums)).magenta(),
475 total = %total_artist_albums.magenta(),
476 "Fetched artist albums"
477 );
478
479 for artist_album in &artist_albums {
480 tracing::info!(artist_id = %artist_album.artist_id.cyan(), album_id = %artist_album.album_id.magenta(), i = %i.magenta(), total = %total_artist_albums.magenta(), "Inserting artist album");
481 match repo::artist::insert_artist_album(&pools.destination, artist_album).await {
482 Ok(_) => {}
483 Err(e) => {
484 tracing::error!(error = %e, "Failed to insert artist album");
485 }
486 }
487 i += 1;
488 }
489 }
490 Ok(())
491}
492
493async fn sync_artist_tracks(pools: &DatabasePools) -> Result<(), Error> {
494 let total_artist_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM artist_tracks")
495 .fetch_one(&pools.source)
496 .await?;
497 let total_artist_tracks = total_artist_tracks.0;
498 tracing::info!(total = %total_artist_tracks.magenta(), "Total artist tracks to sync");
499
500 let start = 0;
501 let mut i = 1;
502
503 for offset in (start..total_artist_tracks).step_by(BATCH_SIZE) {
504 let artist_tracks =
505 repo::artist::get_artist_tracks(&pools.source, offset as i64, BATCH_SIZE as i64)
506 .await?;
507 tracing::info!(
508 offset = %offset.magenta(),
509 end = %((offset + artist_tracks.len() as i64).min(total_artist_tracks)).magenta(),
510 total = %total_artist_tracks.magenta(),
511 "Fetched artist tracks"
512 );
513
514 for artist_track in &artist_tracks {
515 tracing::info!(artist_id = %artist_track.artist_id.cyan(), track_id = %artist_track.track_id.magenta(), i = %i.magenta(), total = %total_artist_tracks.magenta(), "Inserting artist track");
516 match repo::artist::insert_artist_track(&pools.destination, artist_track).await {
517 Ok(_) => {}
518 Err(e) => {
519 tracing::error!(error = %e, "Failed to insert artist track");
520 }
521 }
522 i += 1;
523 }
524 }
525 Ok(())
526}
527
528async fn sync_playlist_tracks(pools: &DatabasePools) -> Result<(), Error> {
529 let total_playlist_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM playlist_tracks")
530 .fetch_one(&pools.source)
531 .await?;
532 let total_playlist_tracks = total_playlist_tracks.0;
533 tracing::info!(total = %total_playlist_tracks.magenta(), "Total playlist tracks to sync");
534
535 let start = 0;
536 let mut i = 1;
537
538 for offset in (start..total_playlist_tracks).step_by(BATCH_SIZE) {
539 let playlist_tracks =
540 repo::playlist::get_playlist_tracks(&pools.source, offset as i64, BATCH_SIZE as i64)
541 .await?;
542 tracing::info!(
543 offset = %offset.magenta(),
544 end = %((offset + playlist_tracks.len() as i64).min(total_playlist_tracks)).magenta(),
545 total = %total_playlist_tracks.magenta(),
546 "Fetched playlist tracks"
547 );
548
549 for playlist_track in &playlist_tracks {
550 tracing::info!(playlist_id = %playlist_track.playlist_id.cyan(), track_id = %playlist_track.track_id.magenta(), i = %i.magenta(), total = %total_playlist_tracks.magenta(), "Inserting playlist track");
551 match repo::playlist::insert_playlist_track(&pools.destination, playlist_track).await {
552 Ok(_) => {}
553 Err(e) => {
554 tracing::error!(error = %e, "Failed to insert playlist track");
555 }
556 }
557 i += 1;
558 }
559 }
560 Ok(())
561}
562
563async fn sync_user_albums(pools: &DatabasePools) -> Result<(), Error> {
564 let total_user_albums: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_albums")
565 .fetch_one(&pools.source)
566 .await?;
567 let total_user_albums = total_user_albums.0;
568 tracing::info!(total = %total_user_albums.magenta(), "Total user albums to sync");
569
570 let start = 0;
571 let mut i = 1;
572
573 for offset in (start..total_user_albums).step_by(BATCH_SIZE) {
574 let user_albums =
575 repo::album::get_user_albums(&pools.source, offset as i64, BATCH_SIZE as i64).await?;
576 tracing::info!(
577 offset = %offset.magenta(),
578 end = %((offset + user_albums.len() as i64).min(total_user_albums)).magenta(),
579 total = %total_user_albums.magenta(),
580 "Fetched user albums"
581 );
582
583 for user_album in &user_albums {
584 tracing::info!(user_id = %user_album.user_id.cyan(), album_id = %user_album.album_id.magenta(), i = %i.magenta(), total = %total_user_albums.magenta(), "Inserting user album");
585 match repo::album::insert_user_album(&pools.destination, user_album).await {
586 Ok(_) => {}
587 Err(e) => {
588 tracing::error!(error = %e, "Failed to insert user album");
589 }
590 }
591 i += 1;
592 }
593 }
594 Ok(())
595}
596
597async fn sync_user_artists(pools: &DatabasePools) -> Result<(), Error> {
598 let total_user_artists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_artists")
599 .fetch_one(&pools.source)
600 .await?;
601 let total_user_artists = total_user_artists.0;
602 tracing::info!(total = %total_user_artists.magenta(), "Total user artists to sync");
603
604 let start = 0;
605 let mut i = 1;
606
607 for offset in (start..total_user_artists).step_by(BATCH_SIZE) {
608 let user_artists =
609 repo::artist::get_user_artists(&pools.source, offset as i64, BATCH_SIZE as i64).await?;
610 tracing::info!(
611 offset = %offset.magenta(),
612 end = %((offset + user_artists.len() as i64).min(total_user_artists)).magenta(),
613 total = %total_user_artists.magenta(),
614 "Fetched user artists"
615 );
616
617 for user_artist in &user_artists {
618 tracing::info!(user_id = %user_artist.user_id.cyan(), artist_id = %user_artist.artist_id.magenta(), i = %i.magenta(), total = %total_user_artists.magenta(), "Inserting user artist");
619 match repo::artist::insert_user_artist(&pools.destination, user_artist).await {
620 Ok(_) => {}
621 Err(e) => {
622 tracing::error!(error = %e, "Failed to insert user artist");
623 }
624 }
625 i += 1;
626 }
627 }
628 Ok(())
629}
630
631async fn sync_user_tracks(pools: &DatabasePools) -> Result<(), Error> {
632 let total_user_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_tracks")
633 .fetch_one(&pools.source)
634 .await?;
635 let total_user_tracks = total_user_tracks.0;
636 tracing::info!(total = %total_user_tracks.magenta(), "Total user tracks to sync");
637
638 let start = 0;
639 let mut i = 1;
640
641 for offset in (start..total_user_tracks).step_by(BATCH_SIZE) {
642 let user_tracks =
643 repo::track::get_user_tracks(&pools.source, offset as i64, BATCH_SIZE as i64).await?;
644 tracing::info!(
645 offset = %offset.magenta(),
646 end = %((offset + user_tracks.len() as i64).min(total_user_tracks)).magenta(),
647 total = %total_user_tracks.magenta(),
648 "Fetched user tracks"
649 );
650
651 for user_track in &user_tracks {
652 tracing::info!(user_id = %user_track.user_id.cyan(), track_id = %user_track.track_id.magenta(), i = %i.magenta(), total = %total_user_tracks.magenta(), "Inserting user track");
653 match repo::track::insert_user_track(&pools.destination, user_track).await {
654 Ok(_) => {}
655 Err(e) => {
656 tracing::error!(error = %e, "Failed to insert user track");
657 }
658 }
659 i += 1;
660 }
661 }
662 Ok(())
663}
664
665async fn sync_user_playlists(pools: &DatabasePools) -> Result<(), Error> {
666 let total_user_playlists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_playlists")
667 .fetch_one(&pools.source)
668 .await?;
669 let total_user_playlists = total_user_playlists.0;
670 tracing::info!(total = %total_user_playlists.magenta(), "Total user playlists to sync");
671
672 let start = 0;
673 let mut i = 1;
674
675 for offset in (start..total_user_playlists).step_by(BATCH_SIZE) {
676 let user_playlists =
677 repo::playlist::get_user_playlists(&pools.source, offset as i64, BATCH_SIZE as i64)
678 .await?;
679 tracing::info!(
680 offset = %offset.magenta(),
681 end = %((offset + user_playlists.len() as i64).min(total_user_playlists)).magenta(),
682 total = %total_user_playlists.magenta(),
683 "Fetched user playlists"
684 );
685
686 for user_playlist in &user_playlists {
687 tracing::info!(user_id = %user_playlist.user_id.cyan(), playlist_id = %user_playlist.playlist_id.magenta(), i = %i.magenta(), total = %total_user_playlists.magenta(), "Inserting user playlist");
688 match repo::playlist::insert_user_playlist(&pools.destination, user_playlist).await {
689 Ok(_) => {}
690 Err(e) => {
691 tracing::error!(error = %e, "Failed to insert user playlist");
692 }
693 }
694 i += 1;
695 }
696 }
697 Ok(())
698}