forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use std::sync::Arc;
2
3use anyhow::Error;
4use chrono::DateTime;
5use owo_colors::OwoColorize;
6use sqlx::{Pool, Postgres};
7use tokio::sync::Mutex;
8
9use crate::{
10 profile::did_to_profile,
11 subscriber::{ALBUM_NSID, ARTIST_NSID, SCROBBLE_NSID, SONG_NSID},
12 types::{AlbumRecord, ArtistRecord, Commit, ScrobbleRecord, SongRecord},
13 webhook::discord::{
14 self,
15 model::{ScrobbleData, WebhookEnvelope},
16 },
17 webhook_worker::{push_to_queue, AppState},
18 xata::{
19 album::Album, album_track::AlbumTrack, artist::Artist, artist_album::ArtistAlbum,
20 artist_track::ArtistTrack, track::Track, user::User, user_album::UserAlbum,
21 user_artist::UserArtist, user_track::UserTrack,
22 },
23};
24
25pub async fn save_scrobble(
26 state: Arc<Mutex<AppState>>,
27 pool: Arc<Mutex<Pool<Postgres>>>,
28 did: &str,
29 commit: Commit,
30) -> Result<(), Error> {
31 // skip unknown collection
32 if !vec![SCROBBLE_NSID, ARTIST_NSID, ALBUM_NSID, SONG_NSID]
33 .contains(&commit.collection.as_str())
34 {
35 return Ok(());
36 }
37
38 let pool = pool.lock().await;
39
40 match commit.operation.as_str() {
41 "create" => {
42 if commit.collection == SCROBBLE_NSID {
43 let mut tx = pool.begin().await?;
44 let scrobble_record: ScrobbleRecord =
45 serde_json::from_value(commit.record.clone())?;
46
47 let album_id = save_album(&mut tx, scrobble_record.clone(), did).await?;
48 let artist_id = save_artist(&mut tx, scrobble_record.clone()).await?;
49 let track_id = save_track(&mut tx, scrobble_record.clone(), did).await?;
50
51 save_album_track(&mut tx, &album_id, &track_id).await?;
52 save_artist_track(&mut tx, &artist_id, &track_id).await?;
53 save_artist_album(&mut tx, &artist_id, &album_id).await?;
54
55 let uri = format!("at://{}/app.rocksky.scrobble/{}", did, commit.rkey);
56
57 let user_id = save_user(&mut tx, did).await?;
58
59 println!(
60 "Saving scrobble: {} ",
61 format!(
62 "{} - {} - {}",
63 scrobble_record.title, scrobble_record.artist, scrobble_record.album
64 )
65 .magenta()
66 );
67
68 sqlx::query(
69 r#"
70 INSERT INTO scrobbles (
71 album_id,
72 artist_id,
73 track_id,
74 uri,
75 user_id,
76 timestamp
77 ) VALUES ($1, $2, $3, $4, $5, $6)
78 "#,
79 )
80 .bind(album_id)
81 .bind(artist_id)
82 .bind(track_id)
83 .bind(uri)
84 .bind(user_id)
85 .bind(
86 DateTime::parse_from_rfc3339(&scrobble_record.created_at)
87 .unwrap()
88 .with_timezone(&chrono::Utc),
89 )
90 .execute(&mut *tx)
91 .await?;
92
93 tx.commit().await?;
94
95 let users: Vec<User> =
96 sqlx::query_as::<_, User>("SELECT * FROM users WHERE did = $1")
97 .bind(did)
98 .fetch_all(&*pool)
99 .await?;
100
101 if users.is_empty() {
102 return Err(anyhow::anyhow!(
103 "User with DID {} not found in database",
104 did
105 ));
106 }
107
108 // Push to webhook queue (Discord)
109 match push_to_queue(
110 state,
111 &WebhookEnvelope {
112 r#type: "scrobble.created".to_string(),
113 id: commit.rkey.clone(),
114 data: ScrobbleData {
115 user: discord::model::User {
116 did: did.to_string(),
117 display_name: users[0].display_name.clone(),
118 handle: users[0].handle.clone(),
119 avatar_url: users[0].avatar.clone(),
120 },
121 track: discord::model::Track {
122 title: scrobble_record.title.clone(),
123 artist: scrobble_record.artist.clone(),
124 album: scrobble_record.album.clone(),
125 duration: scrobble_record.duration,
126 artwork_url: scrobble_record.album_art.clone().map(|x| {
127 format!(
128 "https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}",
129 did,
130 x.r#ref.link,
131 x.mime_type.split('/').last().unwrap_or("jpeg")
132 )
133 }),
134 spotify_url: scrobble_record.spotify_link.clone(),
135 tidal_url: scrobble_record.tidal_link.clone(),
136 youtube_url: scrobble_record.youtube_link.clone(),
137 },
138 played_at: scrobble_record.created_at.clone(),
139 },
140 delivered_at: Some(chrono::Utc::now().to_rfc3339()),
141 },
142 )
143 .await
144 {
145 Ok(_) => {}
146 Err(e) => {
147 eprintln!("Failed to push to webhook queue: {}", e);
148 }
149 }
150 }
151
152 if commit.collection == ARTIST_NSID {
153 let mut tx = pool.begin().await?;
154
155 let user_id = save_user(&mut tx, did).await?;
156 let uri = format!("at://{}/app.rocksky.artist/{}", did, commit.rkey);
157
158 let artist_record: ArtistRecord = serde_json::from_value(commit.record.clone())?;
159 save_user_artist(&mut tx, &user_id, artist_record.clone(), &uri).await?;
160 update_artist_uri(&mut tx, &user_id, artist_record, &uri).await?;
161
162 tx.commit().await?;
163 }
164
165 if commit.collection == ALBUM_NSID {
166 let mut tx = pool.begin().await?;
167 let user_id = save_user(&mut tx, did).await?;
168 let uri = format!("at://{}/app.rocksky.album/{}", did, commit.rkey);
169
170 let album_record: AlbumRecord = serde_json::from_value(commit.record.clone())?;
171 save_user_album(&mut tx, &user_id, album_record.clone(), &uri).await?;
172 update_album_uri(&mut tx, &user_id, album_record, &uri).await?;
173
174 tx.commit().await?;
175 }
176
177 if commit.collection == SONG_NSID {
178 let mut tx = pool.begin().await?;
179
180 let user_id = save_user(&mut tx, did).await?;
181 let uri = format!("at://{}/app.rocksky.song/{}", did, commit.rkey);
182
183 let song_record: SongRecord = serde_json::from_value(commit.record.clone())?;
184 save_user_track(&mut tx, &user_id, song_record.clone(), &uri).await?;
185 update_track_uri(&mut tx, &user_id, song_record, &uri).await?;
186
187 tx.commit().await?;
188 }
189 }
190 _ => {
191 println!("Unsupported operation: {}", commit.operation);
192 }
193 }
194 Ok(())
195}
196
197pub async fn save_user(
198 tx: &mut sqlx::Transaction<'_, Postgres>,
199 did: &str,
200) -> Result<String, Error> {
201 let profile = did_to_profile(did).await?;
202
203 // Check if the user exists in the database
204 let mut users: Vec<User> = sqlx::query_as("SELECT * FROM users WHERE did = $1")
205 .bind(did)
206 .fetch_all(&mut **tx)
207 .await?;
208
209 // If the user does not exist, create a new user
210 if users.is_empty() {
211 let avatar = profile.avatar.map(|blob| {
212 format!(
213 "https://cdn.bsky.app/img/avatar/plain/{}/{}@{}",
214 did,
215 blob.r#ref.link,
216 blob.mime_type.split('/').last().unwrap_or("jpeg")
217 )
218 });
219 sqlx::query(
220 "INSERT INTO users (display_name, did, handle, avatar) VALUES ($1, $2, $3, $4)",
221 )
222 .bind(profile.display_name)
223 .bind(did)
224 .bind(profile.handle)
225 .bind(avatar)
226 .execute(&mut **tx)
227 .await?;
228
229 users = sqlx::query_as("SELECT * FROM users WHERE did = $1")
230 .bind(did)
231 .fetch_all(&mut **tx)
232 .await?;
233 }
234
235 Ok(users[0].xata_id.clone())
236}
237
238pub async fn save_track(
239 tx: &mut sqlx::Transaction<'_, Postgres>,
240 scrobble_record: ScrobbleRecord,
241 did: &str,
242) -> Result<String, Error> {
243 let uri: Option<String> = None;
244 let hash = sha256::digest(
245 format!(
246 "{} - {} - {}",
247 scrobble_record.title, scrobble_record.artist, scrobble_record.album
248 )
249 .to_lowercase(),
250 );
251
252 let tracks: Vec<Track> = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1")
253 .bind(&hash)
254 .fetch_all(&mut **tx)
255 .await?;
256
257 if !tracks.is_empty() {
258 return Ok(tracks[0].xata_id.clone());
259 }
260
261 sqlx::query(
262 r#"
263 INSERT INTO tracks (
264 title,
265 artist,
266 album,
267 album_art,
268 album_artist,
269 track_number,
270 duration,
271 mb_id,
272 composer,
273 lyrics,
274 disc_number,
275 sha256,
276 copyright_message,
277 uri,
278 spotify_link,
279 apple_music_link,
280 tidal_link,
281 youtube_link,
282 label
283 ) VALUES (
284 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19
285 )
286 "#,
287 )
288 .bind(scrobble_record.title)
289 .bind(scrobble_record.artist)
290 .bind(scrobble_record.album)
291 .bind(scrobble_record.album_art.map(|x| {
292 format!(
293 "https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}",
294 did,
295 x.r#ref.link,
296 x.mime_type.split('/').last().unwrap_or("jpeg")
297 )
298 }))
299 .bind(scrobble_record.album_artist)
300 .bind(scrobble_record.track_number)
301 .bind(scrobble_record.duration)
302 .bind(scrobble_record.mbid)
303 .bind(scrobble_record.composer)
304 .bind(scrobble_record.lyrics)
305 .bind(scrobble_record.disc_number)
306 .bind(&hash)
307 .bind(scrobble_record.copyright_message)
308 .bind(uri)
309 .bind(scrobble_record.spotify_link)
310 .bind(scrobble_record.apple_music_link)
311 .bind(scrobble_record.tidal_link)
312 .bind(scrobble_record.youtube_link)
313 .bind(scrobble_record.label)
314 .execute(&mut **tx)
315 .await?;
316
317 let tracks: Vec<Track> = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1")
318 .bind(&hash)
319 .fetch_all(&mut **tx)
320 .await?;
321
322 Ok(tracks[0].xata_id.clone())
323}
324
325pub async fn save_album(
326 tx: &mut sqlx::Transaction<'_, Postgres>,
327 scrobble_record: ScrobbleRecord,
328 did: &str,
329) -> Result<String, Error> {
330 let hash = sha256::digest(
331 format!(
332 "{} - {}",
333 scrobble_record.album, scrobble_record.album_artist
334 )
335 .to_lowercase(),
336 );
337
338 let albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1")
339 .bind(&hash)
340 .fetch_all(&mut **tx)
341 .await?;
342
343 if !albums.is_empty() {
344 println!("Album already exists: {}", albums[0].title.magenta());
345 return Ok(albums[0].xata_id.clone());
346 }
347
348 println!("Saving album: {}", scrobble_record.album.magenta());
349
350 let uri: Option<String> = None;
351 let artist_uri: Option<String> = None;
352 sqlx::query(
353 r#"
354 INSERT INTO albums (
355 title,
356 artist,
357 album_art,
358 year,
359 release_date,
360 sha256,
361 uri,
362 artist_uri
363 ) VALUES (
364 $1, $2, $3, $4, $5, $6, $7, $8
365 )
366 "#,
367 )
368 .bind(scrobble_record.album)
369 .bind(scrobble_record.album_artist)
370 .bind(scrobble_record.album_art.map(|x| {
371 format!(
372 "https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}",
373 did,
374 x.r#ref.link,
375 x.mime_type.split('/').last().unwrap_or("jpeg")
376 )
377 }))
378 .bind(scrobble_record.year)
379 .bind(scrobble_record.release_date)
380 .bind(&hash)
381 .bind(uri)
382 .bind(artist_uri)
383 .execute(&mut **tx)
384 .await?;
385
386 let albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1")
387 .bind(&hash)
388 .fetch_all(&mut **tx)
389 .await?;
390
391 Ok(albums[0].xata_id.clone())
392}
393
394pub async fn save_artist(
395 tx: &mut sqlx::Transaction<'_, Postgres>,
396 scrobble_record: ScrobbleRecord,
397) -> Result<String, Error> {
398 let hash = sha256::digest(scrobble_record.album_artist.to_lowercase());
399 let artists: Vec<Artist> = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1")
400 .bind(&hash)
401 .fetch_all(&mut **tx)
402 .await?;
403
404 if !artists.is_empty() {
405 println!("Artist already exists: {}", artists[0].name.magenta());
406 return Ok(artists[0].xata_id.clone());
407 }
408
409 println!("Saving artist: {}", scrobble_record.album_artist.magenta());
410
411 let uri: Option<String> = None;
412 let picture = "";
413 sqlx::query(
414 r#"
415 INSERT INTO artists (
416 name,
417 sha256,
418 uri,
419 picture
420 ) VALUES (
421 $1, $2, $3, $4
422 )
423 "#,
424 )
425 .bind(scrobble_record.artist)
426 .bind(&hash)
427 .bind(uri)
428 .bind(picture)
429 .execute(&mut **tx)
430 .await?;
431
432 let artists: Vec<Artist> = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1")
433 .bind(&hash)
434 .fetch_all(&mut **tx)
435 .await?;
436
437 Ok(artists[0].xata_id.clone())
438}
439
440pub async fn save_album_track(
441 tx: &mut sqlx::Transaction<'_, Postgres>,
442 album_id: &str,
443 track_id: &str,
444) -> Result<(), Error> {
445 let album_tracks: Vec<AlbumTrack> =
446 sqlx::query_as("SELECT * FROM album_tracks WHERE album_id = $1 AND track_id = $2")
447 .bind(album_id)
448 .bind(track_id)
449 .fetch_all(&mut **tx)
450 .await?;
451
452 if !album_tracks.is_empty() {
453 println!(
454 "Album track already exists: {}",
455 format!("{} - {}", album_id, track_id).magenta()
456 );
457 return Ok(());
458 }
459
460 println!(
461 "Saving album track: {}",
462 format!("{} - {}", album_id, track_id).magenta()
463 );
464
465 sqlx::query(
466 r#"
467 INSERT INTO album_tracks (
468 album_id,
469 track_id
470 ) VALUES (
471 $1, $2
472 )
473 "#,
474 )
475 .bind(album_id)
476 .bind(track_id)
477 .execute(&mut **tx)
478 .await?;
479 Ok(())
480}
481
482pub async fn save_artist_track(
483 tx: &mut sqlx::Transaction<'_, Postgres>,
484 artist_id: &str,
485 track_id: &str,
486) -> Result<(), Error> {
487 let artist_tracks: Vec<ArtistTrack> =
488 sqlx::query_as("SELECT * FROM artist_tracks WHERE artist_id = $1 AND track_id = $2")
489 .bind(artist_id)
490 .bind(track_id)
491 .fetch_all(&mut **tx)
492 .await?;
493
494 if !artist_tracks.is_empty() {
495 println!(
496 "Artist track already exists: {}",
497 format!("{} - {}", artist_id, track_id).magenta()
498 );
499 return Ok(());
500 }
501
502 println!(
503 "Saving artist track: {}",
504 format!("{} - {}", artist_id, track_id).magenta()
505 );
506
507 sqlx::query(
508 r#"
509 INSERT INTO artist_tracks (
510 artist_id,
511 track_id
512 ) VALUES (
513 $1, $2
514 )
515 "#,
516 )
517 .bind(artist_id)
518 .bind(track_id)
519 .execute(&mut **tx)
520 .await?;
521 Ok(())
522}
523
524pub async fn save_artist_album(
525 tx: &mut sqlx::Transaction<'_, Postgres>,
526 artist_id: &str,
527 album_id: &str,
528) -> Result<(), Error> {
529 let artist_albums: Vec<ArtistAlbum> =
530 sqlx::query_as("SELECT * FROM artist_albums WHERE artist_id = $1 AND album_id = $2")
531 .bind(artist_id)
532 .bind(album_id)
533 .fetch_all(&mut **tx)
534 .await?;
535
536 if !artist_albums.is_empty() {
537 println!(
538 "Artist album already exists: {}",
539 format!("{} - {}", artist_id, album_id).magenta()
540 );
541 return Ok(());
542 }
543
544 println!(
545 "Saving artist album: {}",
546 format!("{} - {}", artist_id, album_id).magenta()
547 );
548
549 sqlx::query(
550 r#"
551 INSERT INTO artist_albums (
552 artist_id,
553 album_id
554 ) VALUES (
555 $1, $2
556 )
557 "#,
558 )
559 .bind(artist_id)
560 .bind(album_id)
561 .execute(&mut **tx)
562 .await?;
563 Ok(())
564}
565
566pub async fn save_user_artist(
567 tx: &mut sqlx::Transaction<'_, Postgres>,
568 user_id: &str,
569 record: ArtistRecord,
570 uri: &str,
571) -> Result<(), Error> {
572 let hash = sha256::digest(record.name.to_lowercase());
573
574 let mut artists: Vec<Artist> = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1")
575 .bind(&hash)
576 .fetch_all(&mut **tx)
577 .await?;
578
579 let users: Vec<User> = sqlx::query_as("SELECT * FROM users WHERE xata_id = $1")
580 .bind(user_id)
581 .fetch_all(&mut **tx)
582 .await?;
583
584 let artist_id: &str;
585
586 match artists.is_empty() {
587 true => {
588 println!("Saving artist: {}", record.name.magenta());
589 let did = users[0].did.clone();
590 sqlx::query(
591 r#"
592 INSERT INTO artists (
593 name,
594 sha256,
595 uri,
596 picture
597 ) VALUES (
598 $1, $2, $3, $4
599 )
600 "#,
601 )
602 .bind(record.name)
603 .bind(&hash)
604 .bind(uri)
605 .bind(record.picture.map(|x| {
606 format!(
607 "https://cdn.bsky.app/img/avatar/plain/{}/{}@{}",
608 did,
609 x.r#ref.link,
610 x.mime_type.split('/').last().unwrap_or("jpeg")
611 )
612 }))
613 .execute(&mut **tx)
614 .await?;
615
616 artists = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1")
617 .bind(&hash)
618 .fetch_all(&mut **tx)
619 .await?;
620 artist_id = &artists[0].xata_id;
621 }
622 false => {
623 artist_id = &artists[0].xata_id;
624 }
625 };
626
627 let user_artists: Vec<UserArtist> =
628 sqlx::query_as("SELECT * FROM user_artists WHERE user_id = $1 AND artist_id = $2")
629 .bind(user_id)
630 .bind(artist_id)
631 .fetch_all(&mut **tx)
632 .await?;
633
634 if !user_artists.is_empty() {
635 println!(
636 "User artist already exists: {}",
637 format!("{} - {}", user_id, artist_id).magenta()
638 );
639 sqlx::query(
640 r#"
641 UPDATE user_artists
642 SET scrobbles = scrobbles + 1,
643 uri = $3
644 WHERE user_id = $1 AND artist_id = $2
645 "#,
646 )
647 .bind(user_id)
648 .bind(artist_id)
649 .bind(uri)
650 .execute(&mut **tx)
651 .await?;
652 return Ok(());
653 }
654
655 println!(
656 "Saving user artist: {}",
657 format!("{} - {}", user_id, artist_id).magenta()
658 );
659
660 sqlx::query(
661 r#"
662 INSERT INTO user_artists (
663 user_id,
664 artist_id,
665 uri,
666 scrobbles
667 ) VALUES (
668 $1, $2, $3, $4
669 )
670 "#,
671 )
672 .bind(user_id)
673 .bind(artist_id)
674 .bind(uri)
675 .bind(1)
676 .execute(&mut **tx)
677 .await?;
678 Ok(())
679}
680
681pub async fn save_user_album(
682 tx: &mut sqlx::Transaction<'_, Postgres>,
683 user_id: &str,
684 record: AlbumRecord,
685 uri: &str,
686) -> Result<(), Error> {
687 let users: Vec<User> = sqlx::query_as("SELECT * FROM users WHERE xata_id = $1")
688 .bind(user_id)
689 .fetch_all(&mut **tx)
690 .await?;
691
692 let hash = sha256::digest(format!("{} - {}", record.title, record.artist).to_lowercase());
693 let mut albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1")
694 .bind(&hash)
695 .fetch_all(&mut **tx)
696 .await?;
697
698 let album_id: &str;
699
700 match albums.is_empty() {
701 true => {
702 println!("Saving album: {}", record.title.magenta());
703 let did = users[0].did.clone();
704 sqlx::query(
705 r#"
706 INSERT INTO albums (
707 title,
708 artist,
709 album_art,
710 year,
711 release_date,
712 sha256,
713 uri
714 ) VALUES (
715 $1, $2, $3, $4, $5, $6, $7
716 )
717 "#,
718 )
719 .bind(record.title)
720 .bind(record.artist)
721 .bind(record.album_art.map(|x| {
722 format!(
723 "https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}",
724 did,
725 x.r#ref.link,
726 x.mime_type.split('/').last().unwrap_or("jpeg")
727 )
728 }))
729 .bind(record.year)
730 .bind(record.release_date)
731 .bind(&hash)
732 .bind(uri)
733 .execute(&mut **tx)
734 .await?;
735
736 albums = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1")
737 .bind(&hash)
738 .fetch_all(&mut **tx)
739 .await?;
740 album_id = &albums[0].xata_id;
741 }
742 false => {
743 album_id = &albums[0].xata_id;
744 }
745 };
746
747 let user_albums: Vec<UserAlbum> =
748 sqlx::query_as("SELECT * FROM user_albums WHERE user_id = $1 AND album_id = $2")
749 .bind(user_id)
750 .bind(album_id)
751 .fetch_all(&mut **tx)
752 .await?;
753
754 if !user_albums.is_empty() {
755 println!(
756 "User album already exists: {}",
757 format!("{} - {}", user_id, album_id).magenta()
758 );
759 sqlx::query(
760 r#"
761 UPDATE user_albums
762 SET scrobbles = scrobbles + 1,
763 uri = $3
764 WHERE user_id = $1 AND album_id = $2
765 "#,
766 )
767 .bind(user_id)
768 .bind(album_id)
769 .bind(uri)
770 .execute(&mut **tx)
771 .await?;
772 return Ok(());
773 }
774
775 println!(
776 "Saving user album: {}",
777 format!("{} - {}", user_id, album_id).magenta()
778 );
779
780 sqlx::query(
781 r#"
782 INSERT INTO user_albums (
783 user_id,
784 album_id,
785 uri,
786 scrobbles
787 ) VALUES (
788 $1, $2, $3, $4
789 )
790 "#,
791 )
792 .bind(user_id)
793 .bind(album_id)
794 .bind(uri)
795 .bind(1)
796 .execute(&mut **tx)
797 .await?;
798 Ok(())
799}
800
801pub async fn save_user_track(
802 tx: &mut sqlx::Transaction<'_, Postgres>,
803 user_id: &str,
804 record: SongRecord,
805 uri: &str,
806) -> Result<(), Error> {
807 let hash = sha256::digest(
808 format!("{} - {} - {}", record.title, record.artist, record.album).to_lowercase(),
809 );
810
811 let mut tracks: Vec<Track> = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1")
812 .bind(&hash)
813 .fetch_all(&mut **tx)
814 .await?;
815
816 let users: Vec<User> = sqlx::query_as("SELECT * FROM users WHERE xata_id = $1")
817 .bind(user_id)
818 .fetch_all(&mut **tx)
819 .await?;
820
821 let track_id: &str;
822
823 match tracks.is_empty() {
824 true => {
825 println!("Saving track: {}", record.title.magenta());
826 let did = users[0].did.clone();
827 sqlx::query(
828 r#"
829 INSERT INTO tracks (
830 title,
831 artist,
832 album,
833 album_art,
834 album_artist,
835 track_number,
836 duration,
837 mb_id,
838 composer,
839 lyrics,
840 disc_number,
841 sha256,
842 copyright_message,
843 uri,
844 spotify_link,
845 label
846 ) VALUES (
847 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16
848 )
849 "#,
850 )
851 .bind(record.title)
852 .bind(record.artist)
853 .bind(record.album)
854 .bind(record.album_art.map(|x| {
855 format!(
856 "https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}",
857 did,
858 x.r#ref.link,
859 x.mime_type.split('/').last().unwrap_or("jpeg")
860 )
861 }))
862 .bind(record.album_artist)
863 .bind(record.track_number)
864 .bind(record.duration)
865 .bind(record.mbid)
866 .bind(record.composer)
867 .bind(record.lyrics)
868 .bind(record.disc_number)
869 .bind(&hash)
870 .bind(record.copyright_message)
871 .bind(uri)
872 .bind(record.spotify_link)
873 .bind(record.label)
874 .execute(&mut **tx)
875 .await?;
876
877 tracks = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1")
878 .bind(&hash)
879 .fetch_all(&mut **tx)
880 .await?;
881
882 track_id = &tracks[0].xata_id;
883 }
884 false => {
885 track_id = &tracks[0].xata_id;
886 }
887 }
888
889 let user_tracks: Vec<UserTrack> =
890 sqlx::query_as("SELECT * FROM user_tracks WHERE user_id = $1 AND track_id = $2")
891 .bind(user_id)
892 .bind(track_id)
893 .fetch_all(&mut **tx)
894 .await?;
895
896 if !user_tracks.is_empty() {
897 println!(
898 "User track already exists: {}",
899 format!("{} - {}", user_id, track_id).magenta()
900 );
901 sqlx::query(
902 r#"
903 UPDATE user_tracks
904 SET scrobbles = scrobbles + 1,
905 uri = $3
906 WHERE user_id = $1 AND track_id = $2
907 "#,
908 )
909 .bind(user_id)
910 .bind(track_id)
911 .bind(uri)
912 .execute(&mut **tx)
913 .await?;
914 return Ok(());
915 }
916
917 println!(
918 "Saving user track: {}",
919 format!("{} - {}", user_id, track_id).magenta()
920 );
921
922 sqlx::query(
923 r#"
924 INSERT INTO user_tracks (
925 user_id,
926 track_id,
927 uri,
928 scrobbles
929 ) VALUES (
930 $1, $2, $3, $4
931 )
932 "#,
933 )
934 .bind(user_id)
935 .bind(track_id)
936 .bind(uri)
937 .bind(1)
938 .execute(&mut **tx)
939 .await?;
940
941 Ok(())
942}
943
944pub async fn update_artist_uri(
945 tx: &mut sqlx::Transaction<'_, Postgres>,
946 user_id: &str,
947 record: ArtistRecord,
948 uri: &str,
949) -> Result<(), Error> {
950 let hash = sha256::digest(record.name.to_lowercase());
951 let artists: Vec<Artist> = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1")
952 .bind(&hash)
953 .fetch_all(&mut **tx)
954 .await?;
955
956 if artists.is_empty() {
957 println!("Artist not found: {}", record.name.magenta());
958 return Ok(());
959 }
960
961 let artist_id = &artists[0].xata_id;
962
963 sqlx::query(
964 r#"
965 UPDATE user_artists
966 SET uri = $3
967 WHERE user_id = $1 AND artist_id = $2
968 "#,
969 )
970 .bind(user_id)
971 .bind(artist_id)
972 .bind(uri)
973 .execute(&mut **tx)
974 .await?;
975
976 sqlx::query(
977 r#"
978 UPDATE tracks
979 SET artist_uri = $2
980 WHERE artist_uri IS NULL AND album_artist = $1
981 "#,
982 )
983 .bind(&record.name)
984 .bind(uri)
985 .execute(&mut **tx)
986 .await?;
987
988 sqlx::query(
989 r#"
990 UPDATE artists
991 SET uri = $2
992 WHERE sha256 = $1 AND uri IS NULL
993 "#,
994 )
995 .bind(&hash)
996 .bind(uri)
997 .execute(&mut **tx)
998 .await?;
999
1000 sqlx::query(
1001 r#"
1002 UPDATE albums
1003 SET artist_uri = $2
1004 WHERE artist_uri IS NULL AND artist = $1
1005 "#,
1006 )
1007 .bind(&record.name)
1008 .bind(uri)
1009 .execute(&mut **tx)
1010 .await?;
1011 Ok(())
1012}
1013
1014pub async fn update_album_uri(
1015 tx: &mut sqlx::Transaction<'_, Postgres>,
1016 user_id: &str,
1017 record: AlbumRecord,
1018 uri: &str,
1019) -> Result<(), Error> {
1020 let hash = sha256::digest(format!("{} - {}", record.title, record.artist).to_lowercase());
1021 let albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1")
1022 .bind(&hash)
1023 .fetch_all(&mut **tx)
1024 .await?;
1025 if albums.is_empty() {
1026 println!("Album not found: {}", record.title.magenta());
1027 return Ok(());
1028 }
1029 let album_id = &albums[0].xata_id;
1030 sqlx::query(
1031 r#"
1032 UPDATE user_albums
1033 SET uri = $3
1034 WHERE user_id = $1 AND album_id = $2
1035 "#,
1036 )
1037 .bind(user_id)
1038 .bind(album_id)
1039 .bind(uri)
1040 .execute(&mut **tx)
1041 .await?;
1042
1043 sqlx::query(
1044 r#"
1045 UPDATE tracks
1046 SET album_uri = $2
1047 WHERE album_uri IS NULL AND album = $1
1048 "#,
1049 )
1050 .bind(record.title)
1051 .bind(uri)
1052 .execute(&mut **tx)
1053 .await?;
1054
1055 sqlx::query(
1056 r#"
1057 UPDATE albums
1058 SET uri = $2
1059 WHERE sha256 = $1 AND uri IS NULL
1060 "#,
1061 )
1062 .bind(&hash)
1063 .bind(uri)
1064 .execute(&mut **tx)
1065 .await?;
1066
1067 Ok(())
1068}
1069
1070pub async fn update_track_uri(
1071 tx: &mut sqlx::Transaction<'_, Postgres>,
1072 user_id: &str,
1073 record: SongRecord,
1074 uri: &str,
1075) -> Result<(), Error> {
1076 let hash = sha256::digest(
1077 format!("{} - {} - {}", record.title, record.artist, record.album).to_lowercase(),
1078 );
1079 let tracks: Vec<Track> = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1")
1080 .bind(&hash)
1081 .fetch_all(&mut **tx)
1082 .await?;
1083
1084 if tracks.is_empty() {
1085 println!("Track not found: {}", record.title.magenta());
1086 return Ok(());
1087 }
1088
1089 let track_id = &tracks[0].xata_id;
1090 sqlx::query(
1091 r#"
1092 UPDATE user_tracks
1093 SET uri = $3
1094 WHERE user_id = $1 AND track_id = $2
1095 "#,
1096 )
1097 .bind(user_id)
1098 .bind(track_id)
1099 .bind(uri)
1100 .execute(&mut **tx)
1101 .await?;
1102
1103 sqlx::query(
1104 r#"
1105 UPDATE tracks
1106 SET uri = $2
1107 WHERE sha256 = $1 AND uri IS NULL
1108 "#,
1109 )
1110 .bind(&hash)
1111 .bind(uri)
1112 .execute(&mut **tx)
1113 .await?;
1114
1115 Ok(())
1116}