A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at main 1028 lines 34 kB view raw
1use anyhow::Error; 2use async_nats::{connect, Client}; 3use duckdb::{params, Connection}; 4use owo_colors::OwoColorize; 5use std::{ 6 env, 7 sync::{Arc, Mutex}, 8 thread, 9}; 10use tokio_stream::StreamExt; 11use types::{LikePayload, NewTrackPayload, ScrobblePayload, UnlikePayload, UserPayload}; 12 13pub mod types; 14 15pub async fn subscribe(conn: Arc<Mutex<Connection>>) -> Result<(), Error> { 16 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()); 17 let conn = conn.clone(); 18 let nc = connect(&addr).await?; 19 tracing::info!(server = %addr.bright_green(), "Connected to NATS"); 20 21 let nc = Arc::new(Mutex::new(nc)); 22 on_scrobble(nc.clone(), conn.clone()); 23 on_new_track(nc.clone(), conn.clone()); 24 on_like(nc.clone(), conn.clone()); 25 on_unlike(nc.clone(), conn.clone()); 26 on_new_user(nc.clone(), conn.clone()); 27 on_delete_scrobble(nc, conn.clone()); 28 29 Ok(()) 30} 31 32pub fn on_scrobble(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) { 33 thread::spawn(move || { 34 let rt = tokio::runtime::Runtime::new().unwrap(); 35 let conn = conn.clone(); 36 let nc = nc.clone(); 37 rt.block_on(async { 38 let nc = nc.lock().unwrap(); 39 let mut sub = nc.subscribe("rocksky.scrobble".to_string()).await?; 40 drop(nc); 41 42 while let Some(msg) = sub.next().await { 43 let data = String::from_utf8(msg.payload.to_vec()).unwrap(); 44 match serde_json::from_str::<ScrobblePayload>(&data) { 45 Ok(payload) => match save_scrobble(conn.clone(), payload.clone()).await { 46 Ok(_) => tracing::info!( 47 uri = %payload.scrobble.uri.cyan(), 48 "Scrobble saved successfully", 49 ), 50 Err(e) => tracing::error!("Error saving scrobble: {}", e), 51 }, 52 Err(e) => { 53 tracing::error!("Error parsing payload: {}", e); 54 tracing::debug!("{}", data); 55 } 56 } 57 } 58 59 Ok::<(), Error>(()) 60 })?; 61 62 Ok::<(), Error>(()) 63 }); 64} 65 66pub fn on_new_track(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) { 67 thread::spawn(move || { 68 let rt = tokio::runtime::Runtime::new().unwrap(); 69 let conn = conn.clone(); 70 let nc = nc.clone(); 71 rt.block_on(async { 72 let nc = nc.lock().unwrap(); 73 let mut sub = nc.subscribe("rocksky.track".to_string()).await?; 74 drop(nc); 75 76 while let Some(msg) = sub.next().await { 77 let data = String::from_utf8(msg.payload.to_vec()).unwrap(); 78 match serde_json::from_str::<NewTrackPayload>(&data) { 79 Ok(payload) => match save_track(conn.clone(), payload.clone()).await { 80 Ok(_) => { 81 tracing::info!( 82 title = %payload.track.title.cyan(), 83 "Track saved successfully", 84 ) 85 } 86 Err(e) => tracing::error!("Error saving track: {}", e), 87 }, 88 Err(e) => { 89 tracing::error!("Error parsing payload: {}", e); 90 tracing::debug!("{}", data); 91 } 92 } 93 } 94 95 Ok::<(), Error>(()) 96 })?; 97 98 Ok::<(), Error>(()) 99 }); 100} 101 102pub fn on_like(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) { 103 thread::spawn(move || { 104 let rt = tokio::runtime::Runtime::new().unwrap(); 105 let conn = conn.clone(); 106 let nc = nc.clone(); 107 rt.block_on(async { 108 let nc = nc.lock().unwrap(); 109 let mut sub = nc.subscribe("rocksky.like".to_string()).await?; 110 drop(nc); 111 112 while let Some(msg) = sub.next().await { 113 let data = String::from_utf8(msg.payload.to_vec()).unwrap(); 114 match serde_json::from_str::<LikePayload>(&data) { 115 Ok(payload) => match like(conn.clone(), payload.clone()).await { 116 Ok(_) => tracing::info!( 117 track_id = %payload.track_id.xata_id.cyan(), 118 "Like saved successfully", 119 ), 120 Err(e) => tracing::error!("Error saving like: {}", e), 121 }, 122 Err(e) => { 123 tracing::error!("Error parsing payload: {}", e); 124 tracing::debug!("{}", data); 125 } 126 } 127 } 128 129 Ok::<(), Error>(()) 130 })?; 131 132 Ok::<(), Error>(()) 133 }); 134} 135 136pub fn on_unlike(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) { 137 thread::spawn(move || { 138 let rt = tokio::runtime::Runtime::new().unwrap(); 139 let conn = conn.clone(); 140 let nc = nc.clone(); 141 rt.block_on(async { 142 let nc = nc.lock().unwrap(); 143 let mut sub = nc.subscribe("rocksky.unlike".to_string()).await?; 144 drop(nc); 145 146 while let Some(msg) = sub.next().await { 147 let data = String::from_utf8(msg.payload.to_vec()).unwrap(); 148 match serde_json::from_str::<UnlikePayload>(&data) { 149 Ok(payload) => match unlike(conn.clone(), payload.clone()).await { 150 Ok(_) => tracing::info!( 151 track_id = %payload.track_id.xata_id.cyan(), 152 "Unlike saved successfully", 153 ), 154 Err(e) => tracing::error!("Error saving unlike: {}", e), 155 }, 156 Err(e) => { 157 tracing::error!("Error parsing payload: {}", e); 158 tracing::debug!("{}", data); 159 } 160 } 161 } 162 163 Ok::<(), Error>(()) 164 })?; 165 166 Ok::<(), Error>(()) 167 }); 168} 169 170pub fn on_new_user(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) { 171 thread::spawn(move || { 172 let rt = tokio::runtime::Runtime::new().unwrap(); 173 let conn = conn.clone(); 174 let nc = nc.clone(); 175 rt.block_on(async { 176 let nc = nc.lock().unwrap(); 177 let mut sub = nc.subscribe("rocksky.user".to_string()).await?; 178 drop(nc); 179 180 while let Some(msg) = sub.next().await { 181 let data = String::from_utf8(msg.payload.to_vec()).unwrap(); 182 match serde_json::from_str::<UserPayload>(&data) { 183 Ok(payload) => match save_user(conn.clone(), payload.clone()).await { 184 Ok(_) => tracing::info!( 185 handle = %payload.handle.cyan(), 186 "User saved successfully", 187 ), 188 Err(e) => tracing::error!("Error saving user: {}", e), 189 }, 190 Err(e) => { 191 tracing::error!("Error parsing payload: {}", e); 192 tracing::debug!("{}", data); 193 } 194 } 195 } 196 197 Ok::<(), Error>(()) 198 })?; 199 200 Ok::<(), Error>(()) 201 }); 202} 203 204pub fn on_delete_scrobble(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) { 205 thread::spawn(move || { 206 let rt = tokio::runtime::Runtime::new().unwrap(); 207 let conn = conn.clone(); 208 let nc = nc.clone(); 209 rt.block_on(async { 210 let nc = nc.lock().unwrap(); 211 let mut sub = nc.subscribe("rocksky.delete.scrobble".to_string()).await?; 212 drop(nc); 213 214 while let Some(msg) = sub.next().await { 215 let uri = String::from_utf8(msg.payload.to_vec()).unwrap(); 216 match delete_scrobble(conn.clone(), &uri).await { 217 Ok(_) => tracing::info!(uri = %uri.cyan(), "Scrobble deleted successfully"), 218 Err(e) => tracing::error!("Error deleting scrobble: {}", e), 219 } 220 } 221 222 Ok::<(), Error>(()) 223 })?; 224 225 Ok::<(), Error>(()) 226 }); 227} 228 229pub async fn save_scrobble( 230 conn: Arc<Mutex<Connection>>, 231 payload: ScrobblePayload, 232) -> Result<(), Error> { 233 let conn = conn.lock().unwrap(); 234 235 match conn.execute( 236 &format!( 237 "INSERT INTO artists ( 238 id, 239 name, 240 biography, 241 born, 242 born_in, 243 died, 244 picture, 245 sha256, 246 spotify_link, 247 tidal_link, 248 youtube_link, 249 apple_music_link, 250 uri, 251 genres 252 ) VALUES ( 253 ?, 254 ?, 255 ?, 256 ?, 257 ?, 258 ?, 259 ?, 260 ?, 261 ?, 262 ?, 263 ?, 264 ?, 265 ?, 266 [{}] 267 )", 268 payload 269 .scrobble 270 .artist_id 271 .genres 272 .as_ref() 273 .map(|genres| genres 274 .iter() 275 .map(|g| format!("'{}'", g.replace("'", "''"))) 276 .collect::<Vec<_>>() 277 .join(", ")) 278 .unwrap_or_default() 279 ), 280 params![ 281 payload.scrobble.artist_id.xata_id, 282 payload.scrobble.artist_id.name, 283 payload.scrobble.artist_id.biography, 284 payload.scrobble.artist_id.born, 285 payload.scrobble.artist_id.born_in, 286 payload.scrobble.artist_id.died, 287 payload.scrobble.artist_id.picture, 288 payload.scrobble.artist_id.sha256, 289 payload.scrobble.artist_id.spotify_link, 290 payload.scrobble.artist_id.tidal_link, 291 payload.scrobble.artist_id.youtube_link, 292 payload.scrobble.artist_id.apple_music_link, 293 payload.scrobble.artist_id.uri, 294 ], 295 ) { 296 Ok(_) => (), 297 Err(e) => { 298 if !e.to_string().contains("violates primary key constraint") { 299 tracing::error!("[artists] error: {}", e); 300 return Err(e.into()); 301 } 302 } 303 } 304 305 match conn.execute( 306 "INSERT INTO albums ( 307 id, 308 title, 309 artist, 310 release_date, 311 album_art, 312 year, 313 spotify_link, 314 tidal_link, 315 youtube_link, 316 apple_music_link, 317 sha256, 318 uri, 319 artist_uri 320 ) VALUES ( 321 ?, 322 ?, 323 ?, 324 ?, 325 ?, 326 ?, 327 ?, 328 ?, 329 ?, 330 ?, 331 ?, 332 ?, 333 ? 334 )", 335 params![ 336 payload.scrobble.album_id.xata_id, 337 payload.scrobble.album_id.title, 338 payload.scrobble.album_id.artist, 339 payload.scrobble.album_id.release_date, 340 payload.scrobble.album_id.album_art, 341 payload.scrobble.album_id.year, 342 payload.scrobble.album_id.spotify_link, 343 payload.scrobble.album_id.tidal_link, 344 payload.scrobble.album_id.youtube_link, 345 payload.scrobble.album_id.apple_music_link, 346 payload.scrobble.album_id.sha256, 347 payload.scrobble.album_id.uri, 348 payload.scrobble.album_id.artist_uri, 349 ], 350 ) { 351 Ok(_) => (), 352 Err(e) => { 353 if !e.to_string().contains("violates primary key constraint") { 354 tracing::error!("[albums] error: {}", e); 355 return Err(e.into()); 356 } 357 } 358 } 359 360 match conn.execute( 361 "INSERT INTO tracks ( 362 id, 363 title, 364 artist, 365 album_artist, 366 album_art, 367 album, 368 track_number, 369 duration, 370 mb_id, 371 youtube_link, 372 spotify_link, 373 tidal_link, 374 apple_music_link, 375 sha256, 376 lyrics, 377 composer, 378 genre, 379 disc_number, 380 copyright_message, 381 label, 382 uri, 383 artist_uri, 384 album_uri, 385 created_at 386 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 387 params![ 388 payload.scrobble.track_id.xata_id, 389 payload.scrobble.track_id.title, 390 payload.scrobble.track_id.artist, 391 payload.scrobble.track_id.album_artist, 392 payload.scrobble.track_id.album_art, 393 payload.scrobble.track_id.album, 394 payload.scrobble.track_id.track_number, 395 payload.scrobble.track_id.duration, 396 payload.scrobble.track_id.mb_id, 397 payload.scrobble.track_id.youtube_link, 398 payload.scrobble.track_id.spotify_link, 399 payload.scrobble.track_id.tidal_link, 400 payload.scrobble.track_id.apple_music_link, 401 payload.scrobble.track_id.sha256, 402 payload.scrobble.track_id.lyrics, 403 payload.scrobble.track_id.composer, 404 payload.scrobble.track_id.genre, 405 payload.scrobble.track_id.disc_number, 406 payload.scrobble.track_id.copyright_message, 407 payload.scrobble.track_id.label, 408 payload.scrobble.track_id.uri, 409 payload.scrobble.track_id.artist_uri, 410 payload.scrobble.track_id.album_uri, 411 payload.scrobble.track_id.xata_createdat, 412 ], 413 ) { 414 Ok(_) => (), 415 Err(e) => { 416 if !e.to_string().contains("violates primary key constraint") { 417 tracing::error!("[tracks] error: {}", e); 418 return Err(e.into()); 419 } 420 } 421 } 422 423 match conn.execute( 424 "INSERT INTO album_tracks ( 425 id, 426 album_id, 427 track_id 428 ) VALUES (?, 429 ?, 430 ?)", 431 params![ 432 payload.album_track.xata_id, 433 payload.album_track.album_id.xata_id, 434 payload.album_track.track_id.xata_id, 435 ], 436 ) { 437 Ok(_) => (), 438 Err(e) => { 439 if !e.to_string().contains("violates primary key constraint") { 440 tracing::error!("[album_tracks] error: {}", e); 441 return Err(e.into()); 442 } 443 } 444 } 445 446 match conn.execute( 447 "INSERT INTO artist_tracks (id, artist_id, track_id, created_at) VALUES (?, ?, ?, ?)", 448 params![ 449 payload.artist_track.xata_id, 450 payload.artist_track.artist_id.xata_id, 451 payload.artist_track.track_id.xata_id, 452 payload.artist_track.xata_createdat, 453 ], 454 ) { 455 Ok(_) => (), 456 Err(e) => { 457 if !e.to_string().contains("violates primary key constraint") { 458 tracing::error!("[artist_tracks] error: {}", e); 459 return Err(e.into()); 460 } 461 } 462 } 463 464 match conn.execute( 465 "INSERT INTO artist_albums (id, artist_id, album_id, created_at) VALUES (?, ?, ?, ?)", 466 params![ 467 payload.artist_album.xata_id, 468 payload.artist_album.artist_id.xata_id, 469 payload.artist_album.album_id.xata_id, 470 payload.artist_album.xata_createdat, 471 ], 472 ) { 473 Ok(_) => (), 474 Err(e) => { 475 if !e.to_string().contains("violates primary key constraint") { 476 tracing::error!("[artist_albums] error: {}", e); 477 return Err(e.into()); 478 } 479 } 480 } 481 482 match conn.execute( 483 "INSERT INTO user_albums (id, user_id, album_id, created_at) VALUES (?, ?, ?, ?)", 484 params![ 485 payload.user_album.xata_id, 486 payload.user_album.user_id.xata_id, 487 payload.user_album.album_id.xata_id, 488 payload.user_album.xata_createdat, 489 ], 490 ) { 491 Ok(_) => (), 492 Err(e) => { 493 if !e.to_string().contains("violates primary key constraint") { 494 tracing::error!("[user_albums] error: {}", e); 495 return Err(e.into()); 496 } 497 } 498 } 499 500 match conn.execute( 501 "INSERT INTO user_artists (id, user_id, artist_id, created_at) VALUES (?, ?, ?, ?)", 502 params![ 503 payload.user_artist.xata_id, 504 payload.user_artist.user_id.xata_id, 505 payload.user_artist.artist_id.xata_id, 506 payload.user_artist.xata_createdat, 507 ], 508 ) { 509 Ok(_) => (), 510 Err(e) => { 511 if !e.to_string().contains("violates primary key constraint") { 512 tracing::error!("[user_artists] error: {}", e); 513 return Err(e.into()); 514 } 515 } 516 } 517 518 match conn.execute( 519 "INSERT INTO user_tracks (id, user_id, track_id, created_at) VALUES (?, ?, ?, ?)", 520 params![ 521 payload.user_track.xata_id, 522 payload.user_track.user_id.xata_id, 523 payload.user_track.track_id.xata_id, 524 payload.user_track.xata_createdat, 525 ], 526 ) { 527 Ok(_) => (), 528 Err(e) => { 529 if !e.to_string().contains("violates primary key constraint") { 530 tracing::error!("[user_tracks] error: {}", e); 531 return Err(e.into()); 532 } 533 } 534 } 535 536 match conn.execute( 537 "INSERT INTO scrobbles ( 538 id, 539 user_id, 540 track_id, 541 album_id, 542 artist_id, 543 uri, 544 created_at 545 ) VALUES ( 546 ?, 547 ?, 548 ?, 549 ?, 550 ?, 551 ?, 552 ? 553 )", 554 params![ 555 payload.scrobble.xata_id, 556 payload.scrobble.user_id.xata_id, 557 payload.scrobble.track_id.xata_id, 558 payload.scrobble.album_id.xata_id, 559 payload.scrobble.artist_id.xata_id, 560 payload.scrobble.uri, 561 payload.scrobble.timestamp, 562 ], 563 ) { 564 Ok(_) => (), 565 Err(e) => { 566 if !e.to_string().contains("violates primary key constraint") { 567 tracing::error!("[scrobbles] error: {}", e); 568 return Err(e.into()); 569 } 570 } 571 } 572 573 Ok(()) 574} 575 576pub async fn save_track( 577 conn: Arc<Mutex<Connection>>, 578 payload: NewTrackPayload, 579) -> Result<(), Error> { 580 let conn = conn.lock().unwrap(); 581 582 match conn.execute( 583 "INSERT INTO tracks ( 584 id, 585 title, 586 artist, 587 album_artist, 588 album_art, 589 album, 590 track_number, 591 duration, 592 mb_id, 593 youtube_link, 594 spotify_link, 595 tidal_link, 596 apple_music_link, 597 sha256, 598 lyrics, 599 composer, 600 genre, 601 disc_number, 602 copyright_message, 603 label, 604 uri, 605 artist_uri, 606 album_uri, 607 created_at 608 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 609 params![ 610 payload.track.xata_id, 611 payload.track.title, 612 payload.track.artist, 613 payload.track.album_artist, 614 payload.track.album_art, 615 payload.track.album, 616 payload.track.track_number, 617 payload.track.duration, 618 payload.track.mb_id, 619 payload.track.youtube_link, 620 payload.track.spotify_link, 621 payload.track.tidal_link, 622 payload.track.apple_music_link, 623 payload.track.sha256, 624 payload.track.lyrics, 625 payload.track.composer, 626 payload.track.genre, 627 payload.track.disc_number, 628 payload.track.copyright_message, 629 payload.track.label, 630 payload.track.uri, 631 payload.track.artist_uri, 632 payload.track.album_uri, 633 payload.track.xata_createdat, 634 ], 635 ) { 636 Ok(_) => (), 637 Err(e) => { 638 if !e.to_string().contains("violates primary key constraint") { 639 tracing::error!("[tracks] error: {}", e); 640 return Err(e.into()); 641 } 642 } 643 } 644 645 match conn.execute( 646 "INSERT INTO album_tracks ( 647 id, 648 album_id, 649 track_id 650 ) VALUES (?, 651 ?, 652 ?)", 653 params![ 654 payload.album_track.xata_id, 655 payload.album_track.album_id.xata_id, 656 payload.album_track.track_id.xata_id, 657 ], 658 ) { 659 Ok(_) => (), 660 Err(e) => { 661 if !e.to_string().contains("violates primary key constraint") { 662 tracing::error!("[album_tracks] error: {}", e); 663 return Err(e.into()); 664 } 665 } 666 } 667 668 match conn.execute( 669 "INSERT INTO artist_tracks (id, artist_id, track_id, created_at) VALUES (?, ?, ?, ?)", 670 params![ 671 payload.artist_track.xata_id, 672 payload.artist_track.artist_id.xata_id, 673 payload.artist_track.track_id.xata_id, 674 payload.artist_track.xata_createdat, 675 ], 676 ) { 677 Ok(_) => (), 678 Err(e) => { 679 if !e.to_string().contains("violates primary key constraint") { 680 tracing::error!("[artist_tracks] error: {}", e); 681 return Err(e.into()); 682 } 683 } 684 } 685 686 match conn.execute( 687 "INSERT INTO artist_albums (id, artist_id, album_id, created_at) VALUES (?, ?, ?, ?)", 688 params![ 689 payload.artist_album.xata_id, 690 payload.artist_album.artist_id.xata_id, 691 payload.artist_album.album_id.xata_id, 692 payload.artist_album.xata_createdat, 693 ], 694 ) { 695 Ok(_) => (), 696 Err(e) => { 697 if !e.to_string().contains("violates primary key constraint") { 698 tracing::error!("[artist_albums] error: {}", e); 699 return Err(e.into()); 700 } 701 } 702 } 703 Ok(()) 704} 705 706pub async fn like(conn: Arc<Mutex<Connection>>, payload: LikePayload) -> Result<(), Error> { 707 let conn = conn.lock().unwrap(); 708 709 let exists: bool = conn.query_row( 710 "SELECT EXISTS(SELECT 1 FROM loved_tracks WHERE user_id = ? AND track_id = ?)", 711 params![payload.user_id.xata_id, payload.track_id.xata_id], 712 |row| row.get(0), 713 )?; 714 715 if exists { 716 tracing::warn!( 717 "Like already exists, user_id = {} track_id = {}", 718 payload.user_id.xata_id, 719 payload.track_id.xata_id 720 ); 721 return Ok(()); 722 } 723 724 match conn.execute( 725 "INSERT INTO loved_tracks ( 726 id, 727 user_id, 728 track_id, 729 created_at 730 ) VALUES ( 731 ?, 732 ?, 733 ?, 734 ? 735 )", 736 params![ 737 payload.xata_id, 738 payload.user_id.xata_id, 739 payload.track_id.xata_id, 740 payload.xata_createdat, 741 ], 742 ) { 743 Ok(_) => (), 744 Err(e) => { 745 if !e.to_string().contains("violates primary key constraint") { 746 tracing::error!("[likes] error: {}", e); 747 return Err(e.into()); 748 } 749 } 750 } 751 Ok(()) 752} 753 754pub async fn unlike(conn: Arc<Mutex<Connection>>, payload: UnlikePayload) -> Result<(), Error> { 755 let conn = conn.lock().unwrap(); 756 match conn.execute( 757 "DELETE FROM loved_tracks WHERE user_id = ? AND track_id = ?", 758 params![payload.user_id.xata_id, payload.track_id.xata_id,], 759 ) { 760 Ok(_) => (), 761 Err(e) => { 762 tracing::error!("[unlikes] error: {}", e); 763 return Err(e.into()); 764 } 765 } 766 Ok(()) 767} 768 769pub async fn save_user(conn: Arc<Mutex<Connection>>, payload: UserPayload) -> Result<(), Error> { 770 let conn = conn.lock().unwrap(); 771 772 match conn.execute( 773 "INSERT INTO users ( 774 id, 775 avatar, 776 did, 777 display_name, 778 handle 779 ) VALUES ( 780 ?, 781 ?, 782 ?, 783 ?, 784 ? 785 ) 786 ON CONFLICT (id) DO UPDATE SET 787 avatar = EXCLUDED.avatar, 788 did = EXCLUDED.did, 789 display_name = EXCLUDED.display_name, 790 handle = EXCLUDED.handle", 791 params![ 792 payload.xata_id, 793 payload.avatar, 794 payload.did, 795 payload.display_name, 796 payload.handle, 797 ], 798 ) { 799 Ok(_) => (), 800 Err(e) => { 801 if !e.to_string().contains("violates primary key constraint") { 802 tracing::error!("[users] error: {}", e); 803 return Err(e.into()); 804 } 805 } 806 } 807 Ok(()) 808} 809 810pub async fn delete_scrobble(conn: Arc<Mutex<Connection>>, uri: &str) -> Result<(), Error> { 811 let conn = conn.lock().unwrap(); 812 match conn.execute("DELETE FROM scrobbles WHERE uri = ?", params![uri]) { 813 Ok(_) => (), 814 Err(e) => { 815 tracing::error!("[scrobbles] error: {}", e); 816 return Err(e.into()); 817 } 818 } 819 Ok(()) 820} 821 822#[cfg(test)] 823mod tests { 824 825 use super::types; 826 827 #[test] 828 fn test_parse_scrobble() { 829 let data = r#" 830 { 831 "scrobble": { 832 "album_id": { 833 "album_art": "https://cdn.rocksky.app/covers/9e004bc175df6c338cab2a9e465b736f.jpg", 834 "artist": "Kid Ink", 835 "artist_uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.artist/3lhlly4tvws2k", 836 "release_date": "2012-06-26T00:00:00.000Z", 837 "sha256": "8d3f54501cf22aeb5d7ecb2a21c43b8a0b21839df3c61007ec781b278ec2806f", 838 "title": "Up & Away", 839 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.album/3lhlly5k7sk2k", 840 "xata_createdat": "2025-02-05T22:54:59.422Z", 841 "xata_id": "rec_cuhuogpo74fi003af7og", 842 "xata_updatedat": "2025-03-03T07:20:51.237Z", 843 "xata_version": 29, 844 "year": 2012, 845 "apple_music_link": null, 846 "spotify_link": null, 847 "tidal_link": null, 848 "youtube_link": null 849 }, 850 "artist_id": { 851 "name": "Kid Ink", 852 "picture": "https://i.scdn.co/image/ab6761610000e5ebf4904a817005f3b96f4e6e53", 853 "sha256": "7e9e30fecceedb10bf69e0c81dd036aeb5cf83befb0c3aeedf84684fe1ab1860", 854 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.artist/3lhlly4tvws2k", 855 "xata_createdat": "2025-02-05T22:40:50.310Z", 856 "xata_id": "rec_cuhuhsho74fi003af740", 857 "xata_updatedat": "2025-03-03T07:20:50.648Z", 858 "xata_version": 82, 859 "apple_music_link": null, 860 "biography": null, 861 "born": null, 862 "born_in": null, 863 "died": null, 864 "spotify_link": null, 865 "tidal_link": null, 866 "youtube_link": null 867 }, 868 "track_id": { 869 "album": "Up & Away", 870 "album_art": "https://cdn.rocksky.app/covers/9e004bc175df6c338cab2a9e465b736f.jpg", 871 "album_artist": "Kid Ink", 872 "album_uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.album/3lhlly5k7sk2k", 873 "artist": "Kid Ink", 874 "artist_uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.artist/3lhlly4tvws2k", 875 "composer": "The Arsenals", 876 "copyright_message": "2012 Tha Alumni", 877 "disc_number": 1, 878 "duration": 251922, 879 "lyrics": "[00:11.91] I know, they ain't know what I'm on\n[00:26.97] Sorry excuse me, how I'm feelin' right now\n[00:30.12] Soon they gon' understand that\n[00:32.80] Try to do it like me you can tell 'em\n[00:35.63] I'm a beast, I'm a dog, they let me off the leash\n[00:39.12] Now I'm comin' for 'em all\n[00:40.87] Man I need another drink, it's the last call\n[00:43.79] Just gimme a minute lemme show 'em how I ball\n[00:46.60] Then we'll roll out, let's roll out\n[00:50.31] Let's roll out, we could roll out\n[00:59.92] Live, reportin' from the cockpit\n[01:02.62] Red eyes but I'm tryna get my mind clear\n[01:05.60] Celebratin' like we just won a contest\n[01:08.80] No contest, motherfuckers couldn't digest\n[01:11.66] What I'm on, man of my home\n[01:14.46] Bands on deck, you ain't gotta blow my horn\n[01:17.54] Paint a perfect picture like frida kahlo\n[01:20.41] Red or green pill don't trip just swallow that\n[01:23.77] And gon' have the time of your life\n[01:26.21] On me, no strings up, high as a kite\n[01:29.22] Watch the molly turn a straight girl right into a dyke\n[01:31.84] Soon you'll understand by the end of the night\n[01:35.04] Tell 'em\n[01:36.01] I know, they ain't know what I'm on\n[01:38.55] Sorry excuse me, how I'm feelin' right now\n[01:41.98] Soon they gon' understand that\n[01:44.63] Try to do it like me you can tell 'em\n[01:47.16] I'm a beast, I'm a dog, they let me off the leash\n[01:51.15] Now I'm comin' for 'em all\n[01:52.79] Man I need another drink, it's the last call\n[01:55.62] Just gimme a minute lemme show 'em how I ball\n[01:58.76] Then we'll roll out, let's roll out\n[02:02.97] Let's roll out, we could roll out\n[02:11.86] Just sayin', I need to get a point across\n[02:14.77] Somebody find these niggas cuz they fuckin' lost\n[02:17.70] Tryna be the boss, couldn't pay the cost\n[02:20.77] Let my chain speak for me we ain't gotta talk\n[02:23.73] I go, til, the bottle's, hollow\n[02:27.50] Smokin' on diablo, smellin' like patron and\n[02:30.68] Marc jacob's cologne, up & away new generation\n[02:34.65] Apollo shit, so ready to roll, and rockout\n[02:38.72] These lames can't ball like the nba lockout\n[02:41.11] Hit 'em in the head, might pull a knot out\n[02:44.65] Show these motherfuckers what they not 'bout\n[02:47.11] Tell 'em\n[02:48.17] ", 880 "sha256": "0565f7815bc60c7fd96341073dd6420ca0e21ee36279d381ac5acf361fd27183", 881 "title": "Roll Out", 882 "track_number": 8, 883 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.song/3lhlly2gob22k", 884 "xata_createdat": "2025-02-05T22:54:58.062Z", 885 "xata_id": "rec_cuhuogho74fi003af7o0", 886 "xata_updatedat": "2025-03-03T07:21:04.449Z", 887 "xata_version": 16, 888 "apple_music_link": "null", 889 "genre": "null", 890 "label": "null", 891 "mb_id": "null", 892 "spotify_link": null, 893 "tidal_link": null, 894 "youtube_link": null 895 }, 896 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.scrobble/3ljhfzlkhy225", 897 "user_id": { 898 "avatar": "https://cdn.bsky.app/img/avatar/plain/did:plc:7vdlgi2bflelz7mmuxoqjfcr/bafkreiabxfnhhk72ik2vgze6yjnjzbxps37nutkzbmnoo67ffoasgyeqwm@jpeg", 899 "did": "did:plc:7vdlgi2bflelz7mmuxoqjfcr", 900 "display_name": "Tsiry Sandratraina 馃", 901 "handle": "tsiry-sandratraina.com", 902 "xata_createdat": "2025-02-03T04:39:54.139Z", 903 "xata_id": "rec_cug4h6ibhfbm7uq5dte0", 904 "xata_updatedat": "2025-02-03T04:39:54.139Z", 905 "xata_version": 0 906 }, 907 "xata_createdat": "2025-03-03T07:21:04.679Z", 908 "xata_id": "rec_cv2lgo4ddc7scqp7svv0", 909 "xata_updatedat": "2025-03-03T07:21:04.679Z", 910 "xata_version": 0 911 }, 912 "user_album": { 913 "album_id": { 914 "xata_id": "rec_cuhuogpo74fi003af7og" 915 }, 916 "scrobbles": 10, 917 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.album/3lhlly5k7sk2k", 918 "user_id": { 919 "xata_id": "rec_cug4h6ibhfbm7uq5dte0" 920 }, 921 "xata_createdat": "2025-02-09T05:27:35.019Z", 922 "xata_id": "rec_cuk3phssvaqtev3d9l60", 923 "xata_updatedat": "2025-03-03T07:21:04.220Z", 924 "xata_version": 10 925 }, 926 "user_artist": { 927 "artist_id": { 928 "xata_id": "rec_cuhuhsho74fi003af740" 929 }, 930 "scrobbles": 21, 931 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.artist/3lhlly4tvws2k", 932 "user_id": { 933 "xata_id": "rec_cug4h6ibhfbm7uq5dte0" 934 }, 935 "xata_createdat": "2025-02-08T21:38:11.888Z", 936 "xata_id": "rec_cujstgpdl6q579droij0", 937 "xata_updatedat": "2025-03-03T07:21:03.643Z", 938 "xata_version": 21 939 }, 940 "user_track": { 941 "scrobbles": 6, 942 "track_id": { 943 "xata_id": "rec_cuhuogho74fi003af7o0" 944 }, 945 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.song/3lhlly2gob22k", 946 "user_id": { 947 "xata_id": "rec_cug4h6ibhfbm7uq5dte0" 948 }, 949 "xata_createdat": "2025-02-09T05:27:34.172Z", 950 "xata_id": "rec_cuk3phhdl6q579drp6f0", 951 "xata_updatedat": "2025-03-03T07:21:02.405Z", 952 "xata_version": 6 953 }, 954 "album_track": { 955 "album_id": { 956 "xata_id": "rec_cuhuogpo74fi003af7og" 957 }, 958 "track_id": { 959 "xata_id": "rec_cuhuogho74fi003af7o0" 960 }, 961 "xata_createdat": "2025-02-05T22:54:59.922Z", 962 "xata_id": "rec_cuhuogpo74fi003af7p0", 963 "xata_updatedat": "2025-03-03T07:20:51.736Z", 964 "xata_version": 11 965 }, 966 "artist_track": { 967 "artist_id": { 968 "xata_id": "rec_cuhuhsho74fi003af740" 969 }, 970 "track_id": { 971 "xata_id": "rec_cuhuogho74fi003af7o0" 972 }, 973 "xata_createdat": "2025-02-05T22:55:00.706Z", 974 "xata_id": "rec_cuhuoh2e5drjqa1arhf0", 975 "xata_updatedat": "2025-03-03T07:20:52.218Z", 976 "xata_version": 11 977 }, 978 "artist_album": { 979 "album_id": { 980 "xata_id": "rec_cuhuogpo74fi003af7og" 981 }, 982 "artist_id": { 983 "xata_id": "rec_cuhuhsho74fi003af740" 984 }, 985 "xata_createdat": "2025-02-05T22:55:01.205Z", 986 "xata_id": "rec_cuhuohe7vkdf9dh0pkh0", 987 "xata_updatedat": "2025-03-03T07:20:53.007Z", 988 "xata_version": 29 989 } 990} 991 "#; 992 993 match serde_json::from_str::<types::ScrobblePayload>(data) { 994 Err(e) => { 995 tracing::error!("Error parsing payload: {}", e); 996 tracing::error!("{}", data); 997 } 998 Ok(_) => {} 999 } 1000 assert!(true); 1001 } 1002 1003 #[test] 1004 fn test_parse_like() { 1005 let data = r#"{ 1006 "uri":"at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.like/3mb6kxku6js2u", 1007 "user_id": { 1008 "xata_id": "rec_cug4h6ibhfbm7uq5dte0" 1009 }, 1010 "track_id": { 1011 "xata_id":"rec_d11h6cdqrj64hn24639g" 1012 }, 1013 "xata_createdat": "2025-12-30T04:59:55.203Z", 1014 "xata_id":"rec_d59loiod60d9sc81mc80", 1015 "xata_updatedat":"2025-12-30T04:59:55.203Z", 1016 "xata_version":0 1017 }"#; 1018 1019 match serde_json::from_str::<types::LikePayload>(data) { 1020 Err(e) => { 1021 tracing::error!("Error parsing payload: {}", e); 1022 tracing::error!("{}", data); 1023 } 1024 Ok(_) => {} 1025 } 1026 assert!(true); 1027 } 1028}