A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at fix/spotify 1116 lines 30 kB view raw
1use std::sync::Arc; 2 3use anyhow::Error; 4use chrono::DateTime; 5use owo_colors::OwoColorize; 6use sqlx::{Pool, Postgres}; 7use tokio::sync::Mutex; 8 9use crate::{ 10 profile::did_to_profile, 11 subscriber::{ALBUM_NSID, ARTIST_NSID, SCROBBLE_NSID, SONG_NSID}, 12 types::{AlbumRecord, ArtistRecord, Commit, ScrobbleRecord, SongRecord}, 13 webhook::discord::{ 14 self, 15 model::{ScrobbleData, WebhookEnvelope}, 16 }, 17 webhook_worker::{push_to_queue, AppState}, 18 xata::{ 19 album::Album, album_track::AlbumTrack, artist::Artist, artist_album::ArtistAlbum, 20 artist_track::ArtistTrack, track::Track, user::User, user_album::UserAlbum, 21 user_artist::UserArtist, user_track::UserTrack, 22 }, 23}; 24 25pub async fn save_scrobble( 26 state: Arc<Mutex<AppState>>, 27 pool: Arc<Mutex<Pool<Postgres>>>, 28 did: &str, 29 commit: Commit, 30) -> Result<(), Error> { 31 // skip unknown collection 32 if !vec![SCROBBLE_NSID, ARTIST_NSID, ALBUM_NSID, SONG_NSID] 33 .contains(&commit.collection.as_str()) 34 { 35 return Ok(()); 36 } 37 38 let pool = pool.lock().await; 39 40 match commit.operation.as_str() { 41 "create" => { 42 if commit.collection == SCROBBLE_NSID { 43 let mut tx = pool.begin().await?; 44 let scrobble_record: ScrobbleRecord = 45 serde_json::from_value(commit.record.clone())?; 46 47 let album_id = save_album(&mut tx, scrobble_record.clone(), did).await?; 48 let artist_id = save_artist(&mut tx, scrobble_record.clone()).await?; 49 let track_id = save_track(&mut tx, scrobble_record.clone(), did).await?; 50 51 save_album_track(&mut tx, &album_id, &track_id).await?; 52 save_artist_track(&mut tx, &artist_id, &track_id).await?; 53 save_artist_album(&mut tx, &artist_id, &album_id).await?; 54 55 let uri = format!("at://{}/app.rocksky.scrobble/{}", did, commit.rkey); 56 57 let user_id = save_user(&mut tx, did).await?; 58 59 println!( 60 "Saving scrobble: {} ", 61 format!( 62 "{} - {} - {}", 63 scrobble_record.title, scrobble_record.artist, scrobble_record.album 64 ) 65 .magenta() 66 ); 67 68 sqlx::query( 69 r#" 70 INSERT INTO scrobbles ( 71 album_id, 72 artist_id, 73 track_id, 74 uri, 75 user_id, 76 timestamp 77 ) VALUES ($1, $2, $3, $4, $5, $6) 78 "#, 79 ) 80 .bind(album_id) 81 .bind(artist_id) 82 .bind(track_id) 83 .bind(uri) 84 .bind(user_id) 85 .bind( 86 DateTime::parse_from_rfc3339(&scrobble_record.created_at) 87 .unwrap() 88 .with_timezone(&chrono::Utc), 89 ) 90 .execute(&mut *tx) 91 .await?; 92 93 tx.commit().await?; 94 95 let users: Vec<User> = 96 sqlx::query_as::<_, User>("SELECT * FROM users WHERE did = $1") 97 .bind(did) 98 .fetch_all(&*pool) 99 .await?; 100 101 if users.is_empty() { 102 return Err(anyhow::anyhow!( 103 "User with DID {} not found in database", 104 did 105 )); 106 } 107 108 // Push to webhook queue (Discord) 109 match push_to_queue( 110 state, 111 &WebhookEnvelope { 112 r#type: "scrobble.created".to_string(), 113 id: commit.rkey.clone(), 114 data: ScrobbleData { 115 user: discord::model::User { 116 did: did.to_string(), 117 display_name: users[0].display_name.clone(), 118 handle: users[0].handle.clone(), 119 avatar_url: users[0].avatar.clone(), 120 }, 121 track: discord::model::Track { 122 title: scrobble_record.title.clone(), 123 artist: scrobble_record.artist.clone(), 124 album: scrobble_record.album.clone(), 125 duration: scrobble_record.duration, 126 artwork_url: scrobble_record.album_art.clone().map(|x| { 127 format!( 128 "https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}", 129 did, 130 x.r#ref.link, 131 x.mime_type.split('/').last().unwrap_or("jpeg") 132 ) 133 }), 134 spotify_url: scrobble_record.spotify_link.clone(), 135 tidal_url: scrobble_record.tidal_link.clone(), 136 youtube_url: scrobble_record.youtube_link.clone(), 137 }, 138 played_at: scrobble_record.created_at.clone(), 139 }, 140 delivered_at: Some(chrono::Utc::now().to_rfc3339()), 141 }, 142 ) 143 .await 144 { 145 Ok(_) => {} 146 Err(e) => { 147 eprintln!("Failed to push to webhook queue: {}", e); 148 } 149 } 150 } 151 152 if commit.collection == ARTIST_NSID { 153 let mut tx = pool.begin().await?; 154 155 let user_id = save_user(&mut tx, did).await?; 156 let uri = format!("at://{}/app.rocksky.artist/{}", did, commit.rkey); 157 158 let artist_record: ArtistRecord = serde_json::from_value(commit.record.clone())?; 159 save_user_artist(&mut tx, &user_id, artist_record.clone(), &uri).await?; 160 update_artist_uri(&mut tx, &user_id, artist_record, &uri).await?; 161 162 tx.commit().await?; 163 } 164 165 if commit.collection == ALBUM_NSID { 166 let mut tx = pool.begin().await?; 167 let user_id = save_user(&mut tx, did).await?; 168 let uri = format!("at://{}/app.rocksky.album/{}", did, commit.rkey); 169 170 let album_record: AlbumRecord = serde_json::from_value(commit.record.clone())?; 171 save_user_album(&mut tx, &user_id, album_record.clone(), &uri).await?; 172 update_album_uri(&mut tx, &user_id, album_record, &uri).await?; 173 174 tx.commit().await?; 175 } 176 177 if commit.collection == SONG_NSID { 178 let mut tx = pool.begin().await?; 179 180 let user_id = save_user(&mut tx, did).await?; 181 let uri = format!("at://{}/app.rocksky.song/{}", did, commit.rkey); 182 183 let song_record: SongRecord = serde_json::from_value(commit.record.clone())?; 184 save_user_track(&mut tx, &user_id, song_record.clone(), &uri).await?; 185 update_track_uri(&mut tx, &user_id, song_record, &uri).await?; 186 187 tx.commit().await?; 188 } 189 } 190 _ => { 191 println!("Unsupported operation: {}", commit.operation); 192 } 193 } 194 Ok(()) 195} 196 197pub async fn save_user( 198 tx: &mut sqlx::Transaction<'_, Postgres>, 199 did: &str, 200) -> Result<String, Error> { 201 let profile = did_to_profile(did).await?; 202 203 // Check if the user exists in the database 204 let mut users: Vec<User> = sqlx::query_as("SELECT * FROM users WHERE did = $1") 205 .bind(did) 206 .fetch_all(&mut **tx) 207 .await?; 208 209 // If the user does not exist, create a new user 210 if users.is_empty() { 211 let avatar = profile.avatar.map(|blob| { 212 format!( 213 "https://cdn.bsky.app/img/avatar/plain/{}/{}@{}", 214 did, 215 blob.r#ref.link, 216 blob.mime_type.split('/').last().unwrap_or("jpeg") 217 ) 218 }); 219 sqlx::query( 220 "INSERT INTO users (display_name, did, handle, avatar) VALUES ($1, $2, $3, $4)", 221 ) 222 .bind(profile.display_name) 223 .bind(did) 224 .bind(profile.handle) 225 .bind(avatar) 226 .execute(&mut **tx) 227 .await?; 228 229 users = sqlx::query_as("SELECT * FROM users WHERE did = $1") 230 .bind(did) 231 .fetch_all(&mut **tx) 232 .await?; 233 } 234 235 Ok(users[0].xata_id.clone()) 236} 237 238pub async fn save_track( 239 tx: &mut sqlx::Transaction<'_, Postgres>, 240 scrobble_record: ScrobbleRecord, 241 did: &str, 242) -> Result<String, Error> { 243 let uri: Option<String> = None; 244 let hash = sha256::digest( 245 format!( 246 "{} - {} - {}", 247 scrobble_record.title, scrobble_record.artist, scrobble_record.album 248 ) 249 .to_lowercase(), 250 ); 251 252 let tracks: Vec<Track> = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1") 253 .bind(&hash) 254 .fetch_all(&mut **tx) 255 .await?; 256 257 if !tracks.is_empty() { 258 return Ok(tracks[0].xata_id.clone()); 259 } 260 261 sqlx::query( 262 r#" 263 INSERT INTO tracks ( 264 title, 265 artist, 266 album, 267 album_art, 268 album_artist, 269 track_number, 270 duration, 271 mb_id, 272 composer, 273 lyrics, 274 disc_number, 275 sha256, 276 copyright_message, 277 uri, 278 spotify_link, 279 apple_music_link, 280 tidal_link, 281 youtube_link, 282 label 283 ) VALUES ( 284 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19 285 ) 286 "#, 287 ) 288 .bind(scrobble_record.title) 289 .bind(scrobble_record.artist) 290 .bind(scrobble_record.album) 291 .bind(scrobble_record.album_art.map(|x| { 292 format!( 293 "https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}", 294 did, 295 x.r#ref.link, 296 x.mime_type.split('/').last().unwrap_or("jpeg") 297 ) 298 })) 299 .bind(scrobble_record.album_artist) 300 .bind(scrobble_record.track_number) 301 .bind(scrobble_record.duration) 302 .bind(scrobble_record.mbid) 303 .bind(scrobble_record.composer) 304 .bind(scrobble_record.lyrics) 305 .bind(scrobble_record.disc_number) 306 .bind(&hash) 307 .bind(scrobble_record.copyright_message) 308 .bind(uri) 309 .bind(scrobble_record.spotify_link) 310 .bind(scrobble_record.apple_music_link) 311 .bind(scrobble_record.tidal_link) 312 .bind(scrobble_record.youtube_link) 313 .bind(scrobble_record.label) 314 .execute(&mut **tx) 315 .await?; 316 317 let tracks: Vec<Track> = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1") 318 .bind(&hash) 319 .fetch_all(&mut **tx) 320 .await?; 321 322 Ok(tracks[0].xata_id.clone()) 323} 324 325pub async fn save_album( 326 tx: &mut sqlx::Transaction<'_, Postgres>, 327 scrobble_record: ScrobbleRecord, 328 did: &str, 329) -> Result<String, Error> { 330 let hash = sha256::digest( 331 format!( 332 "{} - {}", 333 scrobble_record.album, scrobble_record.album_artist 334 ) 335 .to_lowercase(), 336 ); 337 338 let albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1") 339 .bind(&hash) 340 .fetch_all(&mut **tx) 341 .await?; 342 343 if !albums.is_empty() { 344 println!("Album already exists: {}", albums[0].title.magenta()); 345 return Ok(albums[0].xata_id.clone()); 346 } 347 348 println!("Saving album: {}", scrobble_record.album.magenta()); 349 350 let uri: Option<String> = None; 351 let artist_uri: Option<String> = None; 352 sqlx::query( 353 r#" 354 INSERT INTO albums ( 355 title, 356 artist, 357 album_art, 358 year, 359 release_date, 360 sha256, 361 uri, 362 artist_uri 363 ) VALUES ( 364 $1, $2, $3, $4, $5, $6, $7, $8 365 ) 366 "#, 367 ) 368 .bind(scrobble_record.album) 369 .bind(scrobble_record.album_artist) 370 .bind(scrobble_record.album_art.map(|x| { 371 format!( 372 "https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}", 373 did, 374 x.r#ref.link, 375 x.mime_type.split('/').last().unwrap_or("jpeg") 376 ) 377 })) 378 .bind(scrobble_record.year) 379 .bind(scrobble_record.release_date) 380 .bind(&hash) 381 .bind(uri) 382 .bind(artist_uri) 383 .execute(&mut **tx) 384 .await?; 385 386 let albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1") 387 .bind(&hash) 388 .fetch_all(&mut **tx) 389 .await?; 390 391 Ok(albums[0].xata_id.clone()) 392} 393 394pub async fn save_artist( 395 tx: &mut sqlx::Transaction<'_, Postgres>, 396 scrobble_record: ScrobbleRecord, 397) -> Result<String, Error> { 398 let hash = sha256::digest(scrobble_record.album_artist.to_lowercase()); 399 let artists: Vec<Artist> = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1") 400 .bind(&hash) 401 .fetch_all(&mut **tx) 402 .await?; 403 404 if !artists.is_empty() { 405 println!("Artist already exists: {}", artists[0].name.magenta()); 406 return Ok(artists[0].xata_id.clone()); 407 } 408 409 println!("Saving artist: {}", scrobble_record.album_artist.magenta()); 410 411 let uri: Option<String> = None; 412 let picture = ""; 413 sqlx::query( 414 r#" 415 INSERT INTO artists ( 416 name, 417 sha256, 418 uri, 419 picture 420 ) VALUES ( 421 $1, $2, $3, $4 422 ) 423 "#, 424 ) 425 .bind(scrobble_record.artist) 426 .bind(&hash) 427 .bind(uri) 428 .bind(picture) 429 .execute(&mut **tx) 430 .await?; 431 432 let artists: Vec<Artist> = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1") 433 .bind(&hash) 434 .fetch_all(&mut **tx) 435 .await?; 436 437 Ok(artists[0].xata_id.clone()) 438} 439 440pub async fn save_album_track( 441 tx: &mut sqlx::Transaction<'_, Postgres>, 442 album_id: &str, 443 track_id: &str, 444) -> Result<(), Error> { 445 let album_tracks: Vec<AlbumTrack> = 446 sqlx::query_as("SELECT * FROM album_tracks WHERE album_id = $1 AND track_id = $2") 447 .bind(album_id) 448 .bind(track_id) 449 .fetch_all(&mut **tx) 450 .await?; 451 452 if !album_tracks.is_empty() { 453 println!( 454 "Album track already exists: {}", 455 format!("{} - {}", album_id, track_id).magenta() 456 ); 457 return Ok(()); 458 } 459 460 println!( 461 "Saving album track: {}", 462 format!("{} - {}", album_id, track_id).magenta() 463 ); 464 465 sqlx::query( 466 r#" 467 INSERT INTO album_tracks ( 468 album_id, 469 track_id 470 ) VALUES ( 471 $1, $2 472 ) 473 "#, 474 ) 475 .bind(album_id) 476 .bind(track_id) 477 .execute(&mut **tx) 478 .await?; 479 Ok(()) 480} 481 482pub async fn save_artist_track( 483 tx: &mut sqlx::Transaction<'_, Postgres>, 484 artist_id: &str, 485 track_id: &str, 486) -> Result<(), Error> { 487 let artist_tracks: Vec<ArtistTrack> = 488 sqlx::query_as("SELECT * FROM artist_tracks WHERE artist_id = $1 AND track_id = $2") 489 .bind(artist_id) 490 .bind(track_id) 491 .fetch_all(&mut **tx) 492 .await?; 493 494 if !artist_tracks.is_empty() { 495 println!( 496 "Artist track already exists: {}", 497 format!("{} - {}", artist_id, track_id).magenta() 498 ); 499 return Ok(()); 500 } 501 502 println!( 503 "Saving artist track: {}", 504 format!("{} - {}", artist_id, track_id).magenta() 505 ); 506 507 sqlx::query( 508 r#" 509 INSERT INTO artist_tracks ( 510 artist_id, 511 track_id 512 ) VALUES ( 513 $1, $2 514 ) 515 "#, 516 ) 517 .bind(artist_id) 518 .bind(track_id) 519 .execute(&mut **tx) 520 .await?; 521 Ok(()) 522} 523 524pub async fn save_artist_album( 525 tx: &mut sqlx::Transaction<'_, Postgres>, 526 artist_id: &str, 527 album_id: &str, 528) -> Result<(), Error> { 529 let artist_albums: Vec<ArtistAlbum> = 530 sqlx::query_as("SELECT * FROM artist_albums WHERE artist_id = $1 AND album_id = $2") 531 .bind(artist_id) 532 .bind(album_id) 533 .fetch_all(&mut **tx) 534 .await?; 535 536 if !artist_albums.is_empty() { 537 println!( 538 "Artist album already exists: {}", 539 format!("{} - {}", artist_id, album_id).magenta() 540 ); 541 return Ok(()); 542 } 543 544 println!( 545 "Saving artist album: {}", 546 format!("{} - {}", artist_id, album_id).magenta() 547 ); 548 549 sqlx::query( 550 r#" 551 INSERT INTO artist_albums ( 552 artist_id, 553 album_id 554 ) VALUES ( 555 $1, $2 556 ) 557 "#, 558 ) 559 .bind(artist_id) 560 .bind(album_id) 561 .execute(&mut **tx) 562 .await?; 563 Ok(()) 564} 565 566pub async fn save_user_artist( 567 tx: &mut sqlx::Transaction<'_, Postgres>, 568 user_id: &str, 569 record: ArtistRecord, 570 uri: &str, 571) -> Result<(), Error> { 572 let hash = sha256::digest(record.name.to_lowercase()); 573 574 let mut artists: Vec<Artist> = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1") 575 .bind(&hash) 576 .fetch_all(&mut **tx) 577 .await?; 578 579 let users: Vec<User> = sqlx::query_as("SELECT * FROM users WHERE xata_id = $1") 580 .bind(user_id) 581 .fetch_all(&mut **tx) 582 .await?; 583 584 let artist_id: &str; 585 586 match artists.is_empty() { 587 true => { 588 println!("Saving artist: {}", record.name.magenta()); 589 let did = users[0].did.clone(); 590 sqlx::query( 591 r#" 592 INSERT INTO artists ( 593 name, 594 sha256, 595 uri, 596 picture 597 ) VALUES ( 598 $1, $2, $3, $4 599 ) 600 "#, 601 ) 602 .bind(record.name) 603 .bind(&hash) 604 .bind(uri) 605 .bind(record.picture.map(|x| { 606 format!( 607 "https://cdn.bsky.app/img/avatar/plain/{}/{}@{}", 608 did, 609 x.r#ref.link, 610 x.mime_type.split('/').last().unwrap_or("jpeg") 611 ) 612 })) 613 .execute(&mut **tx) 614 .await?; 615 616 artists = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1") 617 .bind(&hash) 618 .fetch_all(&mut **tx) 619 .await?; 620 artist_id = &artists[0].xata_id; 621 } 622 false => { 623 artist_id = &artists[0].xata_id; 624 } 625 }; 626 627 let user_artists: Vec<UserArtist> = 628 sqlx::query_as("SELECT * FROM user_artists WHERE user_id = $1 AND artist_id = $2") 629 .bind(user_id) 630 .bind(artist_id) 631 .fetch_all(&mut **tx) 632 .await?; 633 634 if !user_artists.is_empty() { 635 println!( 636 "User artist already exists: {}", 637 format!("{} - {}", user_id, artist_id).magenta() 638 ); 639 sqlx::query( 640 r#" 641 UPDATE user_artists 642 SET scrobbles = scrobbles + 1, 643 uri = $3 644 WHERE user_id = $1 AND artist_id = $2 645 "#, 646 ) 647 .bind(user_id) 648 .bind(artist_id) 649 .bind(uri) 650 .execute(&mut **tx) 651 .await?; 652 return Ok(()); 653 } 654 655 println!( 656 "Saving user artist: {}", 657 format!("{} - {}", user_id, artist_id).magenta() 658 ); 659 660 sqlx::query( 661 r#" 662 INSERT INTO user_artists ( 663 user_id, 664 artist_id, 665 uri, 666 scrobbles 667 ) VALUES ( 668 $1, $2, $3, $4 669 ) 670 "#, 671 ) 672 .bind(user_id) 673 .bind(artist_id) 674 .bind(uri) 675 .bind(1) 676 .execute(&mut **tx) 677 .await?; 678 Ok(()) 679} 680 681pub async fn save_user_album( 682 tx: &mut sqlx::Transaction<'_, Postgres>, 683 user_id: &str, 684 record: AlbumRecord, 685 uri: &str, 686) -> Result<(), Error> { 687 let users: Vec<User> = sqlx::query_as("SELECT * FROM users WHERE xata_id = $1") 688 .bind(user_id) 689 .fetch_all(&mut **tx) 690 .await?; 691 692 let hash = sha256::digest(format!("{} - {}", record.title, record.artist).to_lowercase()); 693 let mut albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1") 694 .bind(&hash) 695 .fetch_all(&mut **tx) 696 .await?; 697 698 let album_id: &str; 699 700 match albums.is_empty() { 701 true => { 702 println!("Saving album: {}", record.title.magenta()); 703 let did = users[0].did.clone(); 704 sqlx::query( 705 r#" 706 INSERT INTO albums ( 707 title, 708 artist, 709 album_art, 710 year, 711 release_date, 712 sha256, 713 uri 714 ) VALUES ( 715 $1, $2, $3, $4, $5, $6, $7 716 ) 717 "#, 718 ) 719 .bind(record.title) 720 .bind(record.artist) 721 .bind(record.album_art.map(|x| { 722 format!( 723 "https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}", 724 did, 725 x.r#ref.link, 726 x.mime_type.split('/').last().unwrap_or("jpeg") 727 ) 728 })) 729 .bind(record.year) 730 .bind(record.release_date) 731 .bind(&hash) 732 .bind(uri) 733 .execute(&mut **tx) 734 .await?; 735 736 albums = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1") 737 .bind(&hash) 738 .fetch_all(&mut **tx) 739 .await?; 740 album_id = &albums[0].xata_id; 741 } 742 false => { 743 album_id = &albums[0].xata_id; 744 } 745 }; 746 747 let user_albums: Vec<UserAlbum> = 748 sqlx::query_as("SELECT * FROM user_albums WHERE user_id = $1 AND album_id = $2") 749 .bind(user_id) 750 .bind(album_id) 751 .fetch_all(&mut **tx) 752 .await?; 753 754 if !user_albums.is_empty() { 755 println!( 756 "User album already exists: {}", 757 format!("{} - {}", user_id, album_id).magenta() 758 ); 759 sqlx::query( 760 r#" 761 UPDATE user_albums 762 SET scrobbles = scrobbles + 1, 763 uri = $3 764 WHERE user_id = $1 AND album_id = $2 765 "#, 766 ) 767 .bind(user_id) 768 .bind(album_id) 769 .bind(uri) 770 .execute(&mut **tx) 771 .await?; 772 return Ok(()); 773 } 774 775 println!( 776 "Saving user album: {}", 777 format!("{} - {}", user_id, album_id).magenta() 778 ); 779 780 sqlx::query( 781 r#" 782 INSERT INTO user_albums ( 783 user_id, 784 album_id, 785 uri, 786 scrobbles 787 ) VALUES ( 788 $1, $2, $3, $4 789 ) 790 "#, 791 ) 792 .bind(user_id) 793 .bind(album_id) 794 .bind(uri) 795 .bind(1) 796 .execute(&mut **tx) 797 .await?; 798 Ok(()) 799} 800 801pub async fn save_user_track( 802 tx: &mut sqlx::Transaction<'_, Postgres>, 803 user_id: &str, 804 record: SongRecord, 805 uri: &str, 806) -> Result<(), Error> { 807 let hash = sha256::digest( 808 format!("{} - {} - {}", record.title, record.artist, record.album).to_lowercase(), 809 ); 810 811 let mut tracks: Vec<Track> = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1") 812 .bind(&hash) 813 .fetch_all(&mut **tx) 814 .await?; 815 816 let users: Vec<User> = sqlx::query_as("SELECT * FROM users WHERE xata_id = $1") 817 .bind(user_id) 818 .fetch_all(&mut **tx) 819 .await?; 820 821 let track_id: &str; 822 823 match tracks.is_empty() { 824 true => { 825 println!("Saving track: {}", record.title.magenta()); 826 let did = users[0].did.clone(); 827 sqlx::query( 828 r#" 829 INSERT INTO tracks ( 830 title, 831 artist, 832 album, 833 album_art, 834 album_artist, 835 track_number, 836 duration, 837 mb_id, 838 composer, 839 lyrics, 840 disc_number, 841 sha256, 842 copyright_message, 843 uri, 844 spotify_link, 845 label 846 ) VALUES ( 847 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16 848 ) 849 "#, 850 ) 851 .bind(record.title) 852 .bind(record.artist) 853 .bind(record.album) 854 .bind(record.album_art.map(|x| { 855 format!( 856 "https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}", 857 did, 858 x.r#ref.link, 859 x.mime_type.split('/').last().unwrap_or("jpeg") 860 ) 861 })) 862 .bind(record.album_artist) 863 .bind(record.track_number) 864 .bind(record.duration) 865 .bind(record.mbid) 866 .bind(record.composer) 867 .bind(record.lyrics) 868 .bind(record.disc_number) 869 .bind(&hash) 870 .bind(record.copyright_message) 871 .bind(uri) 872 .bind(record.spotify_link) 873 .bind(record.label) 874 .execute(&mut **tx) 875 .await?; 876 877 tracks = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1") 878 .bind(&hash) 879 .fetch_all(&mut **tx) 880 .await?; 881 882 track_id = &tracks[0].xata_id; 883 } 884 false => { 885 track_id = &tracks[0].xata_id; 886 } 887 } 888 889 let user_tracks: Vec<UserTrack> = 890 sqlx::query_as("SELECT * FROM user_tracks WHERE user_id = $1 AND track_id = $2") 891 .bind(user_id) 892 .bind(track_id) 893 .fetch_all(&mut **tx) 894 .await?; 895 896 if !user_tracks.is_empty() { 897 println!( 898 "User track already exists: {}", 899 format!("{} - {}", user_id, track_id).magenta() 900 ); 901 sqlx::query( 902 r#" 903 UPDATE user_tracks 904 SET scrobbles = scrobbles + 1, 905 uri = $3 906 WHERE user_id = $1 AND track_id = $2 907 "#, 908 ) 909 .bind(user_id) 910 .bind(track_id) 911 .bind(uri) 912 .execute(&mut **tx) 913 .await?; 914 return Ok(()); 915 } 916 917 println!( 918 "Saving user track: {}", 919 format!("{} - {}", user_id, track_id).magenta() 920 ); 921 922 sqlx::query( 923 r#" 924 INSERT INTO user_tracks ( 925 user_id, 926 track_id, 927 uri, 928 scrobbles 929 ) VALUES ( 930 $1, $2, $3, $4 931 ) 932 "#, 933 ) 934 .bind(user_id) 935 .bind(track_id) 936 .bind(uri) 937 .bind(1) 938 .execute(&mut **tx) 939 .await?; 940 941 Ok(()) 942} 943 944pub async fn update_artist_uri( 945 tx: &mut sqlx::Transaction<'_, Postgres>, 946 user_id: &str, 947 record: ArtistRecord, 948 uri: &str, 949) -> Result<(), Error> { 950 let hash = sha256::digest(record.name.to_lowercase()); 951 let artists: Vec<Artist> = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1") 952 .bind(&hash) 953 .fetch_all(&mut **tx) 954 .await?; 955 956 if artists.is_empty() { 957 println!("Artist not found: {}", record.name.magenta()); 958 return Ok(()); 959 } 960 961 let artist_id = &artists[0].xata_id; 962 963 sqlx::query( 964 r#" 965 UPDATE user_artists 966 SET uri = $3 967 WHERE user_id = $1 AND artist_id = $2 968 "#, 969 ) 970 .bind(user_id) 971 .bind(artist_id) 972 .bind(uri) 973 .execute(&mut **tx) 974 .await?; 975 976 sqlx::query( 977 r#" 978 UPDATE tracks 979 SET artist_uri = $2 980 WHERE artist_uri IS NULL AND album_artist = $1 981 "#, 982 ) 983 .bind(&record.name) 984 .bind(uri) 985 .execute(&mut **tx) 986 .await?; 987 988 sqlx::query( 989 r#" 990 UPDATE artists 991 SET uri = $2 992 WHERE sha256 = $1 AND uri IS NULL 993 "#, 994 ) 995 .bind(&hash) 996 .bind(uri) 997 .execute(&mut **tx) 998 .await?; 999 1000 sqlx::query( 1001 r#" 1002 UPDATE albums 1003 SET artist_uri = $2 1004 WHERE artist_uri IS NULL AND artist = $1 1005 "#, 1006 ) 1007 .bind(&record.name) 1008 .bind(uri) 1009 .execute(&mut **tx) 1010 .await?; 1011 Ok(()) 1012} 1013 1014pub async fn update_album_uri( 1015 tx: &mut sqlx::Transaction<'_, Postgres>, 1016 user_id: &str, 1017 record: AlbumRecord, 1018 uri: &str, 1019) -> Result<(), Error> { 1020 let hash = sha256::digest(format!("{} - {}", record.title, record.artist).to_lowercase()); 1021 let albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1") 1022 .bind(&hash) 1023 .fetch_all(&mut **tx) 1024 .await?; 1025 if albums.is_empty() { 1026 println!("Album not found: {}", record.title.magenta()); 1027 return Ok(()); 1028 } 1029 let album_id = &albums[0].xata_id; 1030 sqlx::query( 1031 r#" 1032 UPDATE user_albums 1033 SET uri = $3 1034 WHERE user_id = $1 AND album_id = $2 1035 "#, 1036 ) 1037 .bind(user_id) 1038 .bind(album_id) 1039 .bind(uri) 1040 .execute(&mut **tx) 1041 .await?; 1042 1043 sqlx::query( 1044 r#" 1045 UPDATE tracks 1046 SET album_uri = $2 1047 WHERE album_uri IS NULL AND album = $1 1048 "#, 1049 ) 1050 .bind(record.title) 1051 .bind(uri) 1052 .execute(&mut **tx) 1053 .await?; 1054 1055 sqlx::query( 1056 r#" 1057 UPDATE albums 1058 SET uri = $2 1059 WHERE sha256 = $1 AND uri IS NULL 1060 "#, 1061 ) 1062 .bind(&hash) 1063 .bind(uri) 1064 .execute(&mut **tx) 1065 .await?; 1066 1067 Ok(()) 1068} 1069 1070pub async fn update_track_uri( 1071 tx: &mut sqlx::Transaction<'_, Postgres>, 1072 user_id: &str, 1073 record: SongRecord, 1074 uri: &str, 1075) -> Result<(), Error> { 1076 let hash = sha256::digest( 1077 format!("{} - {} - {}", record.title, record.artist, record.album).to_lowercase(), 1078 ); 1079 let tracks: Vec<Track> = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1") 1080 .bind(&hash) 1081 .fetch_all(&mut **tx) 1082 .await?; 1083 1084 if tracks.is_empty() { 1085 println!("Track not found: {}", record.title.magenta()); 1086 return Ok(()); 1087 } 1088 1089 let track_id = &tracks[0].xata_id; 1090 sqlx::query( 1091 r#" 1092 UPDATE user_tracks 1093 SET uri = $3 1094 WHERE user_id = $1 AND track_id = $2 1095 "#, 1096 ) 1097 .bind(user_id) 1098 .bind(track_id) 1099 .bind(uri) 1100 .execute(&mut **tx) 1101 .await?; 1102 1103 sqlx::query( 1104 r#" 1105 UPDATE tracks 1106 SET uri = $2 1107 WHERE sha256 = $1 AND uri IS NULL 1108 "#, 1109 ) 1110 .bind(&hash) 1111 .bind(uri) 1112 .execute(&mut **tx) 1113 .await?; 1114 1115 Ok(()) 1116}