A decentralized music tracking and discovery platform built on AT Protocol 馃幍
rocksky.app
spotify
atproto
lastfm
musicbrainz
scrobbling
listenbrainz
1use std::sync::{Arc, Mutex};
2
3use anyhow::Error;
4use duckdb::{params, Connection};
5use owo_colors::OwoColorize;
6use sqlx::{Pool, Postgres};
7
8use crate::xata;
9
10pub async fn create_tables(conn: &Connection) -> Result<(), Error> {
11 conn.execute_batch(
12 "BEGIN;
13 CREATE TABLE IF NOT EXISTS artists (
14 id VARCHAR PRIMARY KEY,
15 name VARCHAR NOT NULL,
16 biography TEXT,
17 born DATE,
18 born_in VARCHAR,
19 died DATE,
20 picture VARCHAR,
21 sha256 VARCHAR NOT NULL,
22 spotify_link VARCHAR,
23 tidal_link VARCHAR,
24 youtube_link VARCHAR,
25 apple_music_link VARCHAR,
26 uri VARCHAR,
27 );
28 CREATE TABLE IF NOT EXISTS albums (
29 id VARCHAR PRIMARY KEY,
30 title VARCHAR NOT NULL,
31 artist VARCHAR NOT NULL,
32 release_date DATE,
33 album_art VARCHAR,
34 year INTEGER,
35 spotify_link VARCHAR,
36 tidal_link VARCHAR,
37 youtube_link VARCHAR,
38 apple_music_link VARCHAR,
39 sha256 VARCHAR NOT NULL,
40 uri VARCHAR,
41 artist_uri VARCHAR,
42 );
43 CREATE TABLE IF NOT EXISTS tracks (
44 id VARCHAR PRIMARY KEY,
45 title VARCHAR,
46 artist VARCHAR,
47 album_artist VARCHAR,
48 album_art VARCHAR,
49 album VARCHAR,
50 track_number INTEGER,
51 duration INTEGER,
52 mb_id VARCHAR,
53 youtube_link VARCHAR,
54 spotify_link VARCHAR,
55 tidal_link VARCHAR,
56 apple_music_link VARCHAR,
57 sha256 VARCHAR NOT NULL,
58 lyrics TEXT,
59 composer VARCHAR,
60 genre VARCHAR,
61 disc_number INTEGER,
62 copyright_message VARCHAR,
63 label VARCHAR,
64 uri VARCHAR,
65 artist_uri VARCHAR,
66 album_uri VARCHAR,
67 created_at TIMESTAMP,
68 );
69 CREATE TABLE IF NOT EXISTS album_tracks (
70 id VARCHAR PRIMARY KEY,
71 album_id VARCHAR,
72 track_id VARCHAR,
73 FOREIGN KEY (album_id) REFERENCES albums(id),
74 FOREIGN KEY (track_id) REFERENCES tracks(id),
75 );
76 CREATE TABLE IF NOT EXISTS users (
77 id VARCHAR PRIMARY KEY,
78 display_name VARCHAR,
79 did VARCHAR,
80 handle VARCHAR,
81 avatar VARCHAR,
82 );
83 CREATE TABLE IF NOT EXISTS playlists (
84 id VARCHAR PRIMARY KEY,
85 name VARCHAR,
86 description TEXT,
87 picture VARCHAR,
88 created_at TIMESTAMP,
89 updated_at TIMESTAMP,
90 uri VARCHAR,
91 created_by VARCHAR NOT NULL,
92 FOREIGN KEY (created_by) REFERENCES users(id),
93 );
94 CREATE TABLE IF NOT EXISTS playlist_tracks (
95 id VARCHAR PRIMARY KEY,
96 playlist_id VARCHAR,
97 track_id VARCHAR,
98 added_by VARCHAR,
99 created_at TIMESTAMP,
100 FOREIGN KEY (playlist_id) REFERENCES playlists(id),
101 FOREIGN KEY (track_id) REFERENCES tracks(id),
102 );
103 CREATE TABLE IF NOT EXISTS user_tracks (
104 id VARCHAR PRIMARY KEY,
105 user_id VARCHAR,
106 track_id VARCHAR,
107 created_at TIMESTAMP,
108 FOREIGN KEY (user_id) REFERENCES users(id),
109 FOREIGN KEY (track_id) REFERENCES tracks(id),
110 );
111 CREATE TABLE IF NOT EXISTS user_albums (
112 id VARCHAR PRIMARY KEY,
113 user_id VARCHAR,
114 album_id VARCHAR,
115 created_at TIMESTAMP,
116 FOREIGN KEY (user_id) REFERENCES users(id),
117 FOREIGN KEY (album_id) REFERENCES albums(id),
118 );
119 CREATE TABLE IF NOT EXISTS user_artists (
120 id VARCHAR PRIMARY KEY,
121 user_id VARCHAR,
122 artist_id VARCHAR,
123 created_at TIMESTAMP,
124 FOREIGN KEY (user_id) REFERENCES users(id),
125 FOREIGN KEY (artist_id) REFERENCES artists(id),
126 );
127 CREATE TABLE IF NOT EXISTS user_playlists (
128 id VARCHAR PRIMARY KEY,
129 user_id VARCHAR,
130 playlist_id VARCHAR,
131 created_at TIMESTAMP,
132 FOREIGN KEY (user_id) REFERENCES users(id),
133 FOREIGN KEY (playlist_id) REFERENCES playlists(id),
134 );
135 CREATE TABLE IF NOT EXISTS loved_tracks (
136 id VARCHAR PRIMARY KEY,
137 user_id VARCHAR,
138 track_id VARCHAR,
139 created_at TIMESTAMP,
140 FOREIGN KEY (user_id) REFERENCES users(id),
141 FOREIGN KEY (track_id) REFERENCES tracks(id),
142 );
143 CREATE TABLE IF NOT EXISTS artist_tracks (
144 id VARCHAR PRIMARY KEY,
145 artist_id VARCHAR,
146 track_id VARCHAR,
147 created_at TIMESTAMP,
148 FOREIGN KEY (artist_id) REFERENCES artists(id),
149 FOREIGN KEY (track_id) REFERENCES tracks(id),
150 );
151 CREATE TABLE IF NOT EXISTS artist_albums (
152 id VARCHAR PRIMARY KEY,
153 artist_id VARCHAR,
154 album_id VARCHAR,
155 created_at TIMESTAMP,
156 FOREIGN KEY (artist_id) REFERENCES artists(id),
157 FOREIGN KEY (album_id) REFERENCES albums(id),
158 );
159 CREATE TABLE IF NOT EXISTS album_tracks (
160 id VARCHAR PRIMARY KEY,
161 album_id VARCHAR,
162 track_id VARCHAR,
163 FOREIGN KEY (album_id) REFERENCES albums(id),
164 FOREIGN KEY (track_id) REFERENCES tracks(id),
165 );
166 CREATE TABLE IF NOT EXISTS scrobbles (
167 id VARCHAR PRIMARY KEY,
168 user_id VARCHAR,
169 track_id VARCHAR,
170 album_id VARCHAR,
171 artist_id VARCHAR,
172 uri VARCHAR,
173 created_at TIMESTAMP,
174 FOREIGN KEY (user_id) REFERENCES users(id),
175 FOREIGN KEY (track_id) REFERENCES tracks(id),
176 FOREIGN KEY (album_id) REFERENCES albums(id),
177 FOREIGN KEY (artist_id) REFERENCES artists(id),
178 );
179 COMMIT;
180 ",
181 )?;
182
183 Ok(())
184}
185
186pub async fn load_tracks(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> {
187 let conn = conn.lock().unwrap();
188 let tracks: Vec<xata::track::Track> = sqlx::query_as(
189 r#"
190 SELECT * FROM tracks
191 "#,
192 )
193 .fetch_all(pool)
194 .await?;
195
196 for (i, track) in tracks.clone().into_iter().enumerate() {
197 tracing::info!(track = i, title = %track.title.bright_green(), artist = %track.artist);
198 match conn.execute(
199 "INSERT INTO tracks (
200 id,
201 title,
202 artist,
203 album_artist,
204 album_art,
205 album,
206 track_number,
207 duration,
208 mb_id,
209 youtube_link,
210 spotify_link,
211 tidal_link,
212 apple_music_link,
213 sha256,
214 lyrics,
215 composer,
216 genre,
217 disc_number,
218 copyright_message,
219 label,
220 uri,
221 artist_uri,
222 album_uri,
223 created_at
224 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
225 params![
226 track.xata_id,
227 track.title,
228 track.artist,
229 track.album_artist,
230 track.album_art,
231 track.album,
232 track.track_number,
233 track.duration,
234 track.mb_id,
235 track.youtube_link,
236 track.spotify_link,
237 track.tidal_link,
238 track.apple_music_link,
239 track.sha256,
240 track.lyrics,
241 track.composer,
242 track.genre,
243 track.disc_number,
244 track.copyright_message,
245 track.label,
246 track.uri,
247 track.artist_uri,
248 track.album_uri,
249 track.xata_createdat,
250 ],
251 ) {
252 Ok(_) => (),
253 Err(e) => tracing::error!(error = %e, "Error inserting track"),
254 }
255 }
256
257 tracing::info!(tracks = tracks.len(), "Loaded tracks");
258 Ok(())
259}
260
261pub async fn load_artists(
262 conn: Arc<Mutex<Connection>>,
263 pool: &Pool<Postgres>,
264) -> Result<(), Error> {
265 let conn = conn.lock().unwrap();
266 let artists: Vec<xata::artist::Artist> = sqlx::query_as(
267 r#"
268 SELECT * FROM artists
269 "#,
270 )
271 .fetch_all(pool)
272 .await?;
273
274 for (i, artist) in artists.clone().into_iter().enumerate() {
275 tracing::info!(artist = i, name = %artist.name.bright_green());
276 match conn.execute(
277 "INSERT INTO artists (
278 id,
279 name,
280 biography,
281 born,
282 born_in,
283 died,
284 picture,
285 sha256,
286 spotify_link,
287 tidal_link,
288 youtube_link,
289 apple_music_link,
290 uri
291 ) VALUES (?,
292 ?,
293 ?,
294 ?,
295 ?,
296 ?,
297 ?,
298 ?,
299 ?,
300 ?,
301 ?,
302 ?,
303 ?)",
304 params![
305 artist.xata_id,
306 artist.name,
307 artist.biography,
308 artist.born,
309 artist.born_in,
310 artist.died,
311 artist.picture,
312 artist.sha256,
313 artist.spotify_link,
314 artist.tidal_link,
315 artist.youtube_link,
316 artist.apple_music_link,
317 artist.uri,
318 ],
319 ) {
320 Ok(_) => (),
321 Err(e) => tracing::error!(error = %e, "Error inserting artist"),
322 }
323 }
324
325 tracing::info!(artists = artists.len(), "Loaded artists");
326 Ok(())
327}
328
329pub async fn load_albums(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> {
330 let conn = conn.lock().unwrap();
331 let albums: Vec<xata::album::Album> = sqlx::query_as(
332 r#"
333 SELECT * FROM albums
334 "#,
335 )
336 .fetch_all(pool)
337 .await?;
338
339 for (i, album) in albums.clone().into_iter().enumerate() {
340 tracing::info!(album = i, title = %album.title.bright_green(), artist = %album.artist);
341 match conn.execute(
342 "INSERT INTO albums (
343 id,
344 title,
345 artist,
346 release_date,
347 album_art,
348 year,
349 spotify_link,
350 tidal_link,
351 youtube_link,
352 apple_music_link,
353 sha256,
354 uri,
355 artist_uri
356 ) VALUES (?,
357 ?,
358 ?,
359 ?,
360 ?,
361 ?,
362 ?,
363 ?,
364 ?,
365 ?,
366 ?,
367 ?,
368 ?)",
369 params![
370 album.xata_id,
371 album.title,
372 album.artist,
373 album.release_date,
374 album.album_art,
375 album.year,
376 album.spotify_link,
377 album.tidal_link,
378 album.youtube_link,
379 album.apple_music_link,
380 album.sha256,
381 album.uri,
382 album.artist_uri,
383 ],
384 ) {
385 Ok(_) => (),
386 Err(e) => tracing::error!(error = %e, "Error inserting album"),
387 }
388 }
389
390 tracing::info!(albums = albums.len(), "Loaded albums");
391 Ok(())
392}
393
394pub async fn load_users(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> {
395 let conn = conn.lock().unwrap();
396 let users: Vec<xata::user::User> = sqlx::query_as(
397 r#"
398 SELECT * FROM users
399 "#,
400 )
401 .fetch_all(pool)
402 .await?;
403
404 for (i, user) in users.clone().into_iter().enumerate() {
405 tracing::info!(user = i, name = %user.display_name.bright_green());
406 match conn.execute(
407 "INSERT INTO users (
408 id,
409 display_name,
410 did,
411 handle,
412 avatar
413 ) VALUES (?,
414 ?,
415 ?,
416 ?,
417 ?)",
418 params![
419 user.xata_id,
420 user.display_name,
421 user.did,
422 user.handle,
423 user.avatar,
424 ],
425 ) {
426 Ok(_) => (),
427 Err(e) => tracing::error!(error = %e, "Error inserting user"),
428 }
429 }
430
431 tracing::info!(users = users.len(), "Loaded users");
432 Ok(())
433}
434
435pub async fn load_scrobbles(
436 conn: Arc<Mutex<Connection>>,
437 pool: &Pool<Postgres>,
438) -> Result<(), Error> {
439 let conn = conn.lock().unwrap();
440 let scrobbles: Vec<xata::scrobble::Scrobble> = sqlx::query_as(
441 r#"
442 SELECT * FROM scrobbles
443 "#,
444 )
445 .fetch_all(pool)
446 .await?;
447
448 for (i, scrobble) in scrobbles.clone().into_iter().enumerate() {
449 tracing::info!(scrobble = i, uri = %scrobble.uri.clone().unwrap_or_else(|| "None".to_string()).bright_green());
450 match conn.execute(
451 "INSERT INTO scrobbles (
452 id,
453 user_id,
454 track_id,
455 album_id,
456 artist_id,
457 uri,
458 created_at
459 ) VALUES (
460 ?,
461 ?,
462 ?,
463 ?,
464 ?,
465 ?,
466 ?
467 )",
468 params![
469 scrobble.xata_id,
470 scrobble.user_id,
471 scrobble.track_id,
472 scrobble.album_id,
473 scrobble.artist_id,
474 scrobble.uri,
475 scrobble.xata_createdat,
476 ],
477 ) {
478 Ok(_) => (),
479 Err(e) => tracing::error!(error = %e, "Error inserting scrobble"),
480 }
481 }
482
483 tracing::info!(scrobbles = scrobbles.len(), "Loaded scrobbles");
484 Ok(())
485}
486
487pub async fn load_album_tracks(
488 conn: Arc<Mutex<Connection>>,
489 pool: &Pool<Postgres>,
490) -> Result<(), Error> {
491 let conn = conn.lock().unwrap();
492 let album_tracks: Vec<xata::album_track::AlbumTrack> = sqlx::query_as(
493 r#"
494 SELECT * FROM album_tracks
495 "#,
496 )
497 .fetch_all(pool)
498 .await?;
499
500 for (i, album_track) in album_tracks.clone().into_iter().enumerate() {
501 tracing::info!(album_track = i, album_id = %album_track.album_id.bright_green(), track_id = %album_track.track_id);
502 match conn.execute(
503 "INSERT INTO album_tracks (
504 id,
505 album_id,
506 track_id
507 ) VALUES (?,
508 ?,
509 ?)",
510 params![
511 album_track.xata_id,
512 album_track.album_id,
513 album_track.track_id,
514 ],
515 ) {
516 Ok(_) => (),
517 Err(e) => tracing::error!(error = %e, "Error inserting album_track"),
518 }
519 }
520
521 tracing::info!(album_tracks = album_tracks.len(), "Loaded album_tracks");
522 Ok(())
523}
524
525pub async fn load_loved_tracks(
526 conn: Arc<Mutex<Connection>>,
527 pool: &Pool<Postgres>,
528) -> Result<(), Error> {
529 let conn = conn.lock().unwrap();
530 let loved_tracks: Vec<xata::user_track::UserTrack> = sqlx::query_as(
531 r#"
532 SELECT * FROM loved_tracks
533 "#,
534 )
535 .fetch_all(pool)
536 .await?;
537
538 for (i, loved_track) in loved_tracks.clone().into_iter().enumerate() {
539 tracing::info!(loved_track = i, user_id = %loved_track.user_id.bright_green(), track_id = %loved_track.track_id);
540 match conn.execute(
541 "INSERT INTO loved_tracks (
542 id,
543 user_id,
544 track_id,
545 created_at
546 ) VALUES (?,
547 ?,
548 ?,
549 ?)",
550 params![
551 loved_track.xata_id,
552 loved_track.user_id,
553 loved_track.track_id,
554 loved_track.xata_createdat,
555 ],
556 ) {
557 Ok(_) => (),
558 Err(e) => tracing::error!(error = %e, "Error inserting loved_track"),
559 }
560 }
561
562 tracing::info!(loved_tracks = loved_tracks.len(), "Loaded loved_tracks");
563 Ok(())
564}
565
566pub async fn load_artist_tracks(
567 conn: Arc<Mutex<Connection>>,
568 pool: &Pool<Postgres>,
569) -> Result<(), Error> {
570 let conn = conn.lock().unwrap();
571 let artist_tracks: Vec<xata::artist_track::ArtistTrack> = sqlx::query_as(
572 r#"
573 SELECT * FROM artist_tracks
574 "#,
575 )
576 .fetch_all(pool)
577 .await?;
578
579 for (i, artist_track) in artist_tracks.clone().into_iter().enumerate() {
580 tracing::info!(artist_track = i, artist_id = %artist_track.artist_id.bright_green(), track_id = %artist_track.track_id);
581 match conn.execute(
582 "INSERT INTO artist_tracks (id, artist_id, track_id, created_at) VALUES (?, ?, ?, ?)",
583 params![
584 artist_track.xata_id,
585 artist_track.artist_id,
586 artist_track.track_id,
587 artist_track.xata_createdat,
588 ],
589 ) {
590 Ok(_) => (),
591 Err(e) => tracing::error!(error = %e, "Error inserting artist_track"),
592 }
593 }
594
595 tracing::info!(artist_tracks = artist_tracks.len(), "Loaded artist_tracks");
596 Ok(())
597}
598
599pub async fn load_artist_albums(
600 conn: Arc<Mutex<Connection>>,
601 pool: &Pool<Postgres>,
602) -> Result<(), Error> {
603 let conn = conn.lock().unwrap();
604 let artist_albums: Vec<xata::artist_album::ArtistAlbum> = sqlx::query_as(
605 r#"
606 SELECT * FROM artist_albums
607 "#,
608 )
609 .fetch_all(pool)
610 .await?;
611
612 for (i, artist_album) in artist_albums.clone().into_iter().enumerate() {
613 tracing::info!(artist_album = i, artist_id = %artist_album.artist_id.bright_green(), album_id = %artist_album.album_id);
614 match conn.execute(
615 "INSERT INTO artist_albums (id, artist_id, album_id, created_at) VALUES (?, ?, ?, ?)",
616 params![
617 artist_album.xata_id,
618 artist_album.artist_id,
619 artist_album.album_id,
620 artist_album.xata_createdat,
621 ],
622 ) {
623 Ok(_) => (),
624 Err(e) => tracing::error!(error = %e, "Error inserting artist_album"),
625 }
626 }
627
628 tracing::info!(artist_albums = artist_albums.len(), "Loaded artist_albums");
629 Ok(())
630}
631
632pub async fn load_user_albums(
633 conn: Arc<Mutex<Connection>>,
634 pool: &Pool<Postgres>,
635) -> Result<(), Error> {
636 let conn = conn.lock().unwrap();
637 let user_albums: Vec<xata::user_album::UserAlbum> = sqlx::query_as(
638 r#"
639 SELECT * FROM user_albums
640 "#,
641 )
642 .fetch_all(pool)
643 .await?;
644
645 for (i, user_album) in user_albums.clone().into_iter().enumerate() {
646 tracing::info!(user_album = i, user_id = %user_album.user_id.bright_green(), album_id = %user_album.album_id);
647 match conn.execute(
648 "INSERT INTO user_albums (id, user_id, album_id, created_at) VALUES (?, ?, ?, ?)",
649 params![
650 user_album.xata_id,
651 user_album.user_id,
652 user_album.album_id,
653 user_album.xata_createdat,
654 ],
655 ) {
656 Ok(_) => (),
657 Err(e) => tracing::error!(error = %e, "Error inserting user_album"),
658 }
659 }
660
661 tracing::info!(user_albums = user_albums.len(), "Loaded user_albums");
662 Ok(())
663}
664
665pub async fn load_user_artists(
666 conn: Arc<Mutex<Connection>>,
667 pool: &Pool<Postgres>,
668) -> Result<(), Error> {
669 let conn = conn.lock().unwrap();
670 let user_artists: Vec<xata::user_artist::UserArtist> = sqlx::query_as(
671 r#"
672 SELECT * FROM user_artists
673 "#,
674 )
675 .fetch_all(pool)
676 .await?;
677
678 for (i, user_artist) in user_artists.clone().into_iter().enumerate() {
679 tracing::info!(user_artist = i, user_id = %user_artist.user_id.bright_green(), artist_id = %user_artist.artist_id);
680 match conn.execute(
681 "INSERT INTO user_artists (id, user_id, artist_id, created_at) VALUES (?, ?, ?, ?)",
682 params![
683 user_artist.xata_id,
684 user_artist.user_id,
685 user_artist.artist_id,
686 user_artist.xata_createdat,
687 ],
688 ) {
689 Ok(_) => (),
690 Err(e) => tracing::error!(error = %e, "Error inserting user_artist"),
691 }
692 }
693
694 tracing::info!(user_artists = user_artists.len(), "Loaded user_artists");
695 Ok(())
696}
697
698pub async fn load_user_tracks(
699 conn: Arc<Mutex<Connection>>,
700 pool: &Pool<Postgres>,
701) -> Result<(), Error> {
702 let conn = conn.lock().unwrap();
703 let user_tracks: Vec<xata::user_track::UserTrack> = sqlx::query_as(
704 r#"
705 SELECT * FROM user_tracks
706 "#,
707 )
708 .fetch_all(pool)
709 .await?;
710
711 for (i, user_track) in user_tracks.clone().into_iter().enumerate() {
712 tracing::info!(user_track = i, user_id = %user_track.user_id.bright_green(), track_id = %user_track.track_id);
713 match conn.execute(
714 "INSERT INTO user_tracks (id, user_id, track_id, created_at) VALUES (?, ?, ?, ?)",
715 params![
716 user_track.xata_id,
717 user_track.user_id,
718 user_track.track_id,
719 user_track.xata_createdat,
720 ],
721 ) {
722 Ok(_) => (),
723 Err(e) => tracing::error!(error = %e, "Error inserting user_track"),
724 }
725 }
726
727 tracing::info!(user_tracks = user_tracks.len(), "Loaded user_tracks");
728 Ok(())
729}