A decentralized music tracking and discovery platform built on AT Protocol 馃幍
rocksky.app
spotify
atproto
lastfm
musicbrainz
scrobbling
listenbrainz
1use anyhow::Error;
2use async_nats::{connect, Client};
3use duckdb::{params, Connection};
4use owo_colors::OwoColorize;
5use std::{
6 env,
7 sync::{Arc, Mutex},
8 thread,
9};
10use tokio_stream::StreamExt;
11use types::{LikePayload, NewTrackPayload, ScrobblePayload, UnlikePayload, UserPayload};
12
13pub mod types;
14
15pub async fn subscribe(conn: Arc<Mutex<Connection>>) -> Result<(), Error> {
16 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
17 let conn = conn.clone();
18 let nc = connect(&addr).await?;
19 tracing::info!(server = %addr.bright_green(), "Connected to NATS");
20
21 let nc = Arc::new(Mutex::new(nc));
22 on_scrobble(nc.clone(), conn.clone());
23 on_new_track(nc.clone(), conn.clone());
24 on_like(nc.clone(), conn.clone());
25 on_unlike(nc.clone(), conn.clone());
26 on_new_user(nc.clone(), conn.clone());
27 on_delete_scrobble(nc, conn.clone());
28
29 Ok(())
30}
31
32pub fn on_scrobble(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) {
33 thread::spawn(move || {
34 let rt = tokio::runtime::Runtime::new().unwrap();
35 let conn = conn.clone();
36 let nc = nc.clone();
37 rt.block_on(async {
38 let nc = nc.lock().unwrap();
39 let mut sub = nc.subscribe("rocksky.scrobble".to_string()).await?;
40 drop(nc);
41
42 while let Some(msg) = sub.next().await {
43 let data = String::from_utf8(msg.payload.to_vec()).unwrap();
44 match serde_json::from_str::<ScrobblePayload>(&data) {
45 Ok(payload) => match save_scrobble(conn.clone(), payload.clone()).await {
46 Ok(_) => tracing::info!(
47 uri = %payload.scrobble.uri.cyan(),
48 "Scrobble saved successfully",
49 ),
50 Err(e) => tracing::error!("Error saving scrobble: {}", e),
51 },
52 Err(e) => {
53 tracing::error!("Error parsing payload: {}", e);
54 tracing::debug!("{}", data);
55 }
56 }
57 }
58
59 Ok::<(), Error>(())
60 })?;
61
62 Ok::<(), Error>(())
63 });
64}
65
66pub fn on_new_track(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) {
67 thread::spawn(move || {
68 let rt = tokio::runtime::Runtime::new().unwrap();
69 let conn = conn.clone();
70 let nc = nc.clone();
71 rt.block_on(async {
72 let nc = nc.lock().unwrap();
73 let mut sub = nc.subscribe("rocksky.track".to_string()).await?;
74 drop(nc);
75
76 while let Some(msg) = sub.next().await {
77 let data = String::from_utf8(msg.payload.to_vec()).unwrap();
78 match serde_json::from_str::<NewTrackPayload>(&data) {
79 Ok(payload) => match save_track(conn.clone(), payload.clone()).await {
80 Ok(_) => {
81 tracing::info!(
82 title = %payload.track.title.cyan(),
83 "Track saved successfully",
84 )
85 }
86 Err(e) => tracing::error!("Error saving track: {}", e),
87 },
88 Err(e) => {
89 tracing::error!("Error parsing payload: {}", e);
90 tracing::debug!("{}", data);
91 }
92 }
93 }
94
95 Ok::<(), Error>(())
96 })?;
97
98 Ok::<(), Error>(())
99 });
100}
101
102pub fn on_like(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) {
103 thread::spawn(move || {
104 let rt = tokio::runtime::Runtime::new().unwrap();
105 let conn = conn.clone();
106 let nc = nc.clone();
107 rt.block_on(async {
108 let nc = nc.lock().unwrap();
109 let mut sub = nc.subscribe("rocksky.like".to_string()).await?;
110 drop(nc);
111
112 while let Some(msg) = sub.next().await {
113 let data = String::from_utf8(msg.payload.to_vec()).unwrap();
114 match serde_json::from_str::<LikePayload>(&data) {
115 Ok(payload) => match like(conn.clone(), payload.clone()).await {
116 Ok(_) => tracing::info!(
117 track_id = %payload.track_id.xata_id.cyan(),
118 "Like saved successfully",
119 ),
120 Err(e) => tracing::error!("Error saving like: {}", e),
121 },
122 Err(e) => {
123 tracing::error!("Error parsing payload: {}", e);
124 tracing::debug!("{}", data);
125 }
126 }
127 }
128
129 Ok::<(), Error>(())
130 })?;
131
132 Ok::<(), Error>(())
133 });
134}
135
136pub fn on_unlike(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) {
137 thread::spawn(move || {
138 let rt = tokio::runtime::Runtime::new().unwrap();
139 let conn = conn.clone();
140 let nc = nc.clone();
141 rt.block_on(async {
142 let nc = nc.lock().unwrap();
143 let mut sub = nc.subscribe("rocksky.unlike".to_string()).await?;
144 drop(nc);
145
146 while let Some(msg) = sub.next().await {
147 let data = String::from_utf8(msg.payload.to_vec()).unwrap();
148 match serde_json::from_str::<UnlikePayload>(&data) {
149 Ok(payload) => match unlike(conn.clone(), payload.clone()).await {
150 Ok(_) => tracing::info!(
151 track_id = %payload.track_id.xata_id.cyan(),
152 "Unlike saved successfully",
153 ),
154 Err(e) => tracing::error!("Error saving unlike: {}", e),
155 },
156 Err(e) => {
157 tracing::error!("Error parsing payload: {}", e);
158 tracing::debug!("{}", data);
159 }
160 }
161 }
162
163 Ok::<(), Error>(())
164 })?;
165
166 Ok::<(), Error>(())
167 });
168}
169
170pub fn on_new_user(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) {
171 thread::spawn(move || {
172 let rt = tokio::runtime::Runtime::new().unwrap();
173 let conn = conn.clone();
174 let nc = nc.clone();
175 rt.block_on(async {
176 let nc = nc.lock().unwrap();
177 let mut sub = nc.subscribe("rocksky.user".to_string()).await?;
178 drop(nc);
179
180 while let Some(msg) = sub.next().await {
181 let data = String::from_utf8(msg.payload.to_vec()).unwrap();
182 match serde_json::from_str::<UserPayload>(&data) {
183 Ok(payload) => match save_user(conn.clone(), payload.clone()).await {
184 Ok(_) => tracing::info!(
185 handle = %payload.handle.cyan(),
186 "User saved successfully",
187 ),
188 Err(e) => tracing::error!("Error saving user: {}", e),
189 },
190 Err(e) => {
191 tracing::error!("Error parsing payload: {}", e);
192 tracing::debug!("{}", data);
193 }
194 }
195 }
196
197 Ok::<(), Error>(())
198 })?;
199
200 Ok::<(), Error>(())
201 });
202}
203
204pub fn on_delete_scrobble(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) {
205 thread::spawn(move || {
206 let rt = tokio::runtime::Runtime::new().unwrap();
207 let conn = conn.clone();
208 let nc = nc.clone();
209 rt.block_on(async {
210 let nc = nc.lock().unwrap();
211 let mut sub = nc.subscribe("rocksky.delete.scrobble".to_string()).await?;
212 drop(nc);
213
214 while let Some(msg) = sub.next().await {
215 let uri = String::from_utf8(msg.payload.to_vec()).unwrap();
216 match delete_scrobble(conn.clone(), &uri).await {
217 Ok(_) => tracing::info!(uri = %uri.cyan(), "Scrobble deleted successfully"),
218 Err(e) => tracing::error!("Error deleting scrobble: {}", e),
219 }
220 }
221
222 Ok::<(), Error>(())
223 })?;
224
225 Ok::<(), Error>(())
226 });
227}
228
229pub async fn save_scrobble(
230 conn: Arc<Mutex<Connection>>,
231 payload: ScrobblePayload,
232) -> Result<(), Error> {
233 let conn = conn.lock().unwrap();
234
235 match conn.execute(
236 &format!(
237 "INSERT INTO artists (
238 id,
239 name,
240 biography,
241 born,
242 born_in,
243 died,
244 picture,
245 sha256,
246 spotify_link,
247 tidal_link,
248 youtube_link,
249 apple_music_link,
250 uri,
251 genres
252 ) VALUES (
253 ?,
254 ?,
255 ?,
256 ?,
257 ?,
258 ?,
259 ?,
260 ?,
261 ?,
262 ?,
263 ?,
264 ?,
265 ?,
266 [{}]
267 )",
268 payload
269 .scrobble
270 .artist_id
271 .genres
272 .as_ref()
273 .map(|genres| genres
274 .iter()
275 .map(|g| format!("'{}'", g.replace("'", "''")))
276 .collect::<Vec<_>>()
277 .join(", "))
278 .unwrap_or_default()
279 ),
280 params![
281 payload.scrobble.artist_id.xata_id,
282 payload.scrobble.artist_id.name,
283 payload.scrobble.artist_id.biography,
284 payload.scrobble.artist_id.born,
285 payload.scrobble.artist_id.born_in,
286 payload.scrobble.artist_id.died,
287 payload.scrobble.artist_id.picture,
288 payload.scrobble.artist_id.sha256,
289 payload.scrobble.artist_id.spotify_link,
290 payload.scrobble.artist_id.tidal_link,
291 payload.scrobble.artist_id.youtube_link,
292 payload.scrobble.artist_id.apple_music_link,
293 payload.scrobble.artist_id.uri,
294 ],
295 ) {
296 Ok(_) => (),
297 Err(e) => {
298 if !e.to_string().contains("violates primary key constraint") {
299 tracing::error!("[artists] error: {}", e);
300 return Err(e.into());
301 }
302 }
303 }
304
305 match conn.execute(
306 "INSERT INTO albums (
307 id,
308 title,
309 artist,
310 release_date,
311 album_art,
312 year,
313 spotify_link,
314 tidal_link,
315 youtube_link,
316 apple_music_link,
317 sha256,
318 uri,
319 artist_uri
320 ) VALUES (
321 ?,
322 ?,
323 ?,
324 ?,
325 ?,
326 ?,
327 ?,
328 ?,
329 ?,
330 ?,
331 ?,
332 ?,
333 ?
334 )",
335 params![
336 payload.scrobble.album_id.xata_id,
337 payload.scrobble.album_id.title,
338 payload.scrobble.album_id.artist,
339 payload.scrobble.album_id.release_date,
340 payload.scrobble.album_id.album_art,
341 payload.scrobble.album_id.year,
342 payload.scrobble.album_id.spotify_link,
343 payload.scrobble.album_id.tidal_link,
344 payload.scrobble.album_id.youtube_link,
345 payload.scrobble.album_id.apple_music_link,
346 payload.scrobble.album_id.sha256,
347 payload.scrobble.album_id.uri,
348 payload.scrobble.album_id.artist_uri,
349 ],
350 ) {
351 Ok(_) => (),
352 Err(e) => {
353 if !e.to_string().contains("violates primary key constraint") {
354 tracing::error!("[albums] error: {}", e);
355 return Err(e.into());
356 }
357 }
358 }
359
360 match conn.execute(
361 "INSERT INTO tracks (
362 id,
363 title,
364 artist,
365 album_artist,
366 album_art,
367 album,
368 track_number,
369 duration,
370 mb_id,
371 youtube_link,
372 spotify_link,
373 tidal_link,
374 apple_music_link,
375 sha256,
376 lyrics,
377 composer,
378 genre,
379 disc_number,
380 copyright_message,
381 label,
382 uri,
383 artist_uri,
384 album_uri,
385 created_at
386 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
387 params![
388 payload.scrobble.track_id.xata_id,
389 payload.scrobble.track_id.title,
390 payload.scrobble.track_id.artist,
391 payload.scrobble.track_id.album_artist,
392 payload.scrobble.track_id.album_art,
393 payload.scrobble.track_id.album,
394 payload.scrobble.track_id.track_number,
395 payload.scrobble.track_id.duration,
396 payload.scrobble.track_id.mb_id,
397 payload.scrobble.track_id.youtube_link,
398 payload.scrobble.track_id.spotify_link,
399 payload.scrobble.track_id.tidal_link,
400 payload.scrobble.track_id.apple_music_link,
401 payload.scrobble.track_id.sha256,
402 payload.scrobble.track_id.lyrics,
403 payload.scrobble.track_id.composer,
404 payload.scrobble.track_id.genre,
405 payload.scrobble.track_id.disc_number,
406 payload.scrobble.track_id.copyright_message,
407 payload.scrobble.track_id.label,
408 payload.scrobble.track_id.uri,
409 payload.scrobble.track_id.artist_uri,
410 payload.scrobble.track_id.album_uri,
411 payload.scrobble.track_id.xata_createdat,
412 ],
413 ) {
414 Ok(_) => (),
415 Err(e) => {
416 if !e.to_string().contains("violates primary key constraint") {
417 tracing::error!("[tracks] error: {}", e);
418 return Err(e.into());
419 }
420 }
421 }
422
423 match conn.execute(
424 "INSERT INTO album_tracks (
425 id,
426 album_id,
427 track_id
428 ) VALUES (?,
429 ?,
430 ?)",
431 params![
432 payload.album_track.xata_id,
433 payload.album_track.album_id.xata_id,
434 payload.album_track.track_id.xata_id,
435 ],
436 ) {
437 Ok(_) => (),
438 Err(e) => {
439 if !e.to_string().contains("violates primary key constraint") {
440 tracing::error!("[album_tracks] error: {}", e);
441 return Err(e.into());
442 }
443 }
444 }
445
446 match conn.execute(
447 "INSERT INTO artist_tracks (id, artist_id, track_id, created_at) VALUES (?, ?, ?, ?)",
448 params![
449 payload.artist_track.xata_id,
450 payload.artist_track.artist_id.xata_id,
451 payload.artist_track.track_id.xata_id,
452 payload.artist_track.xata_createdat,
453 ],
454 ) {
455 Ok(_) => (),
456 Err(e) => {
457 if !e.to_string().contains("violates primary key constraint") {
458 tracing::error!("[artist_tracks] error: {}", e);
459 return Err(e.into());
460 }
461 }
462 }
463
464 match conn.execute(
465 "INSERT INTO artist_albums (id, artist_id, album_id, created_at) VALUES (?, ?, ?, ?)",
466 params![
467 payload.artist_album.xata_id,
468 payload.artist_album.artist_id.xata_id,
469 payload.artist_album.album_id.xata_id,
470 payload.artist_album.xata_createdat,
471 ],
472 ) {
473 Ok(_) => (),
474 Err(e) => {
475 if !e.to_string().contains("violates primary key constraint") {
476 tracing::error!("[artist_albums] error: {}", e);
477 return Err(e.into());
478 }
479 }
480 }
481
482 match conn.execute(
483 "INSERT INTO user_albums (id, user_id, album_id, created_at) VALUES (?, ?, ?, ?)",
484 params![
485 payload.user_album.xata_id,
486 payload.user_album.user_id.xata_id,
487 payload.user_album.album_id.xata_id,
488 payload.user_album.xata_createdat,
489 ],
490 ) {
491 Ok(_) => (),
492 Err(e) => {
493 if !e.to_string().contains("violates primary key constraint") {
494 tracing::error!("[user_albums] error: {}", e);
495 return Err(e.into());
496 }
497 }
498 }
499
500 match conn.execute(
501 "INSERT INTO user_artists (id, user_id, artist_id, created_at) VALUES (?, ?, ?, ?)",
502 params![
503 payload.user_artist.xata_id,
504 payload.user_artist.user_id.xata_id,
505 payload.user_artist.artist_id.xata_id,
506 payload.user_artist.xata_createdat,
507 ],
508 ) {
509 Ok(_) => (),
510 Err(e) => {
511 if !e.to_string().contains("violates primary key constraint") {
512 tracing::error!("[user_artists] error: {}", e);
513 return Err(e.into());
514 }
515 }
516 }
517
518 match conn.execute(
519 "INSERT INTO user_tracks (id, user_id, track_id, created_at) VALUES (?, ?, ?, ?)",
520 params![
521 payload.user_track.xata_id,
522 payload.user_track.user_id.xata_id,
523 payload.user_track.track_id.xata_id,
524 payload.user_track.xata_createdat,
525 ],
526 ) {
527 Ok(_) => (),
528 Err(e) => {
529 if !e.to_string().contains("violates primary key constraint") {
530 tracing::error!("[user_tracks] error: {}", e);
531 return Err(e.into());
532 }
533 }
534 }
535
536 match conn.execute(
537 "INSERT INTO scrobbles (
538 id,
539 user_id,
540 track_id,
541 album_id,
542 artist_id,
543 uri,
544 created_at
545 ) VALUES (
546 ?,
547 ?,
548 ?,
549 ?,
550 ?,
551 ?,
552 ?
553 )",
554 params![
555 payload.scrobble.xata_id,
556 payload.scrobble.user_id.xata_id,
557 payload.scrobble.track_id.xata_id,
558 payload.scrobble.album_id.xata_id,
559 payload.scrobble.artist_id.xata_id,
560 payload.scrobble.uri,
561 payload.scrobble.timestamp,
562 ],
563 ) {
564 Ok(_) => (),
565 Err(e) => {
566 if !e.to_string().contains("violates primary key constraint") {
567 tracing::error!("[scrobbles] error: {}", e);
568 return Err(e.into());
569 }
570 }
571 }
572
573 Ok(())
574}
575
576pub async fn save_track(
577 conn: Arc<Mutex<Connection>>,
578 payload: NewTrackPayload,
579) -> Result<(), Error> {
580 let conn = conn.lock().unwrap();
581
582 match conn.execute(
583 "INSERT INTO tracks (
584 id,
585 title,
586 artist,
587 album_artist,
588 album_art,
589 album,
590 track_number,
591 duration,
592 mb_id,
593 youtube_link,
594 spotify_link,
595 tidal_link,
596 apple_music_link,
597 sha256,
598 lyrics,
599 composer,
600 genre,
601 disc_number,
602 copyright_message,
603 label,
604 uri,
605 artist_uri,
606 album_uri,
607 created_at
608 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
609 params![
610 payload.track.xata_id,
611 payload.track.title,
612 payload.track.artist,
613 payload.track.album_artist,
614 payload.track.album_art,
615 payload.track.album,
616 payload.track.track_number,
617 payload.track.duration,
618 payload.track.mb_id,
619 payload.track.youtube_link,
620 payload.track.spotify_link,
621 payload.track.tidal_link,
622 payload.track.apple_music_link,
623 payload.track.sha256,
624 payload.track.lyrics,
625 payload.track.composer,
626 payload.track.genre,
627 payload.track.disc_number,
628 payload.track.copyright_message,
629 payload.track.label,
630 payload.track.uri,
631 payload.track.artist_uri,
632 payload.track.album_uri,
633 payload.track.xata_createdat,
634 ],
635 ) {
636 Ok(_) => (),
637 Err(e) => {
638 if !e.to_string().contains("violates primary key constraint") {
639 tracing::error!("[tracks] error: {}", e);
640 return Err(e.into());
641 }
642 }
643 }
644
645 match conn.execute(
646 "INSERT INTO album_tracks (
647 id,
648 album_id,
649 track_id
650 ) VALUES (?,
651 ?,
652 ?)",
653 params![
654 payload.album_track.xata_id,
655 payload.album_track.album_id.xata_id,
656 payload.album_track.track_id.xata_id,
657 ],
658 ) {
659 Ok(_) => (),
660 Err(e) => {
661 if !e.to_string().contains("violates primary key constraint") {
662 tracing::error!("[album_tracks] error: {}", e);
663 return Err(e.into());
664 }
665 }
666 }
667
668 match conn.execute(
669 "INSERT INTO artist_tracks (id, artist_id, track_id, created_at) VALUES (?, ?, ?, ?)",
670 params![
671 payload.artist_track.xata_id,
672 payload.artist_track.artist_id.xata_id,
673 payload.artist_track.track_id.xata_id,
674 payload.artist_track.xata_createdat,
675 ],
676 ) {
677 Ok(_) => (),
678 Err(e) => {
679 if !e.to_string().contains("violates primary key constraint") {
680 tracing::error!("[artist_tracks] error: {}", e);
681 return Err(e.into());
682 }
683 }
684 }
685
686 match conn.execute(
687 "INSERT INTO artist_albums (id, artist_id, album_id, created_at) VALUES (?, ?, ?, ?)",
688 params![
689 payload.artist_album.xata_id,
690 payload.artist_album.artist_id.xata_id,
691 payload.artist_album.album_id.xata_id,
692 payload.artist_album.xata_createdat,
693 ],
694 ) {
695 Ok(_) => (),
696 Err(e) => {
697 if !e.to_string().contains("violates primary key constraint") {
698 tracing::error!("[artist_albums] error: {}", e);
699 return Err(e.into());
700 }
701 }
702 }
703 Ok(())
704}
705
706pub async fn like(conn: Arc<Mutex<Connection>>, payload: LikePayload) -> Result<(), Error> {
707 let conn = conn.lock().unwrap();
708
709 let exists: bool = conn.query_row(
710 "SELECT EXISTS(SELECT 1 FROM loved_tracks WHERE user_id = ? AND track_id = ?)",
711 params![payload.user_id.xata_id, payload.track_id.xata_id],
712 |row| row.get(0),
713 )?;
714
715 if exists {
716 tracing::warn!(
717 "Like already exists, user_id = {} track_id = {}",
718 payload.user_id.xata_id,
719 payload.track_id.xata_id
720 );
721 return Ok(());
722 }
723
724 match conn.execute(
725 "INSERT INTO loved_tracks (
726 id,
727 user_id,
728 track_id,
729 created_at
730 ) VALUES (
731 ?,
732 ?,
733 ?,
734 ?
735 )",
736 params![
737 payload.xata_id,
738 payload.user_id.xata_id,
739 payload.track_id.xata_id,
740 payload.xata_createdat,
741 ],
742 ) {
743 Ok(_) => (),
744 Err(e) => {
745 if !e.to_string().contains("violates primary key constraint") {
746 tracing::error!("[likes] error: {}", e);
747 return Err(e.into());
748 }
749 }
750 }
751 Ok(())
752}
753
754pub async fn unlike(conn: Arc<Mutex<Connection>>, payload: UnlikePayload) -> Result<(), Error> {
755 let conn = conn.lock().unwrap();
756 match conn.execute(
757 "DELETE FROM loved_tracks WHERE user_id = ? AND track_id = ?",
758 params![payload.user_id.xata_id, payload.track_id.xata_id,],
759 ) {
760 Ok(_) => (),
761 Err(e) => {
762 tracing::error!("[unlikes] error: {}", e);
763 return Err(e.into());
764 }
765 }
766 Ok(())
767}
768
769pub async fn save_user(conn: Arc<Mutex<Connection>>, payload: UserPayload) -> Result<(), Error> {
770 let conn = conn.lock().unwrap();
771
772 match conn.execute(
773 "INSERT INTO users (
774 id,
775 avatar,
776 did,
777 display_name,
778 handle
779 ) VALUES (
780 ?,
781 ?,
782 ?,
783 ?,
784 ?
785 )
786 ON CONFLICT (id) DO UPDATE SET
787 avatar = EXCLUDED.avatar,
788 did = EXCLUDED.did,
789 display_name = EXCLUDED.display_name,
790 handle = EXCLUDED.handle",
791 params![
792 payload.xata_id,
793 payload.avatar,
794 payload.did,
795 payload.display_name,
796 payload.handle,
797 ],
798 ) {
799 Ok(_) => (),
800 Err(e) => {
801 if !e.to_string().contains("violates primary key constraint") {
802 tracing::error!("[users] error: {}", e);
803 return Err(e.into());
804 }
805 }
806 }
807 Ok(())
808}
809
810pub async fn delete_scrobble(conn: Arc<Mutex<Connection>>, uri: &str) -> Result<(), Error> {
811 let conn = conn.lock().unwrap();
812 match conn.execute("DELETE FROM scrobbles WHERE uri = ?", params![uri]) {
813 Ok(_) => (),
814 Err(e) => {
815 tracing::error!("[scrobbles] error: {}", e);
816 return Err(e.into());
817 }
818 }
819 Ok(())
820}
821
822#[cfg(test)]
823mod tests {
824
825 use super::types;
826
827 #[test]
828 fn test_parse_scrobble() {
829 let data = r#"
830 {
831 "scrobble": {
832 "album_id": {
833 "album_art": "https://cdn.rocksky.app/covers/9e004bc175df6c338cab2a9e465b736f.jpg",
834 "artist": "Kid Ink",
835 "artist_uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.artist/3lhlly4tvws2k",
836 "release_date": "2012-06-26T00:00:00.000Z",
837 "sha256": "8d3f54501cf22aeb5d7ecb2a21c43b8a0b21839df3c61007ec781b278ec2806f",
838 "title": "Up & Away",
839 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.album/3lhlly5k7sk2k",
840 "xata_createdat": "2025-02-05T22:54:59.422Z",
841 "xata_id": "rec_cuhuogpo74fi003af7og",
842 "xata_updatedat": "2025-03-03T07:20:51.237Z",
843 "xata_version": 29,
844 "year": 2012,
845 "apple_music_link": null,
846 "spotify_link": null,
847 "tidal_link": null,
848 "youtube_link": null
849 },
850 "artist_id": {
851 "name": "Kid Ink",
852 "picture": "https://i.scdn.co/image/ab6761610000e5ebf4904a817005f3b96f4e6e53",
853 "sha256": "7e9e30fecceedb10bf69e0c81dd036aeb5cf83befb0c3aeedf84684fe1ab1860",
854 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.artist/3lhlly4tvws2k",
855 "xata_createdat": "2025-02-05T22:40:50.310Z",
856 "xata_id": "rec_cuhuhsho74fi003af740",
857 "xata_updatedat": "2025-03-03T07:20:50.648Z",
858 "xata_version": 82,
859 "apple_music_link": null,
860 "biography": null,
861 "born": null,
862 "born_in": null,
863 "died": null,
864 "spotify_link": null,
865 "tidal_link": null,
866 "youtube_link": null
867 },
868 "track_id": {
869 "album": "Up & Away",
870 "album_art": "https://cdn.rocksky.app/covers/9e004bc175df6c338cab2a9e465b736f.jpg",
871 "album_artist": "Kid Ink",
872 "album_uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.album/3lhlly5k7sk2k",
873 "artist": "Kid Ink",
874 "artist_uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.artist/3lhlly4tvws2k",
875 "composer": "The Arsenals",
876 "copyright_message": "2012 Tha Alumni",
877 "disc_number": 1,
878 "duration": 251922,
879 "lyrics": "[00:11.91] I know, they ain't know what I'm on\n[00:26.97] Sorry excuse me, how I'm feelin' right now\n[00:30.12] Soon they gon' understand that\n[00:32.80] Try to do it like me you can tell 'em\n[00:35.63] I'm a beast, I'm a dog, they let me off the leash\n[00:39.12] Now I'm comin' for 'em all\n[00:40.87] Man I need another drink, it's the last call\n[00:43.79] Just gimme a minute lemme show 'em how I ball\n[00:46.60] Then we'll roll out, let's roll out\n[00:50.31] Let's roll out, we could roll out\n[00:59.92] Live, reportin' from the cockpit\n[01:02.62] Red eyes but I'm tryna get my mind clear\n[01:05.60] Celebratin' like we just won a contest\n[01:08.80] No contest, motherfuckers couldn't digest\n[01:11.66] What I'm on, man of my home\n[01:14.46] Bands on deck, you ain't gotta blow my horn\n[01:17.54] Paint a perfect picture like frida kahlo\n[01:20.41] Red or green pill don't trip just swallow that\n[01:23.77] And gon' have the time of your life\n[01:26.21] On me, no strings up, high as a kite\n[01:29.22] Watch the molly turn a straight girl right into a dyke\n[01:31.84] Soon you'll understand by the end of the night\n[01:35.04] Tell 'em\n[01:36.01] I know, they ain't know what I'm on\n[01:38.55] Sorry excuse me, how I'm feelin' right now\n[01:41.98] Soon they gon' understand that\n[01:44.63] Try to do it like me you can tell 'em\n[01:47.16] I'm a beast, I'm a dog, they let me off the leash\n[01:51.15] Now I'm comin' for 'em all\n[01:52.79] Man I need another drink, it's the last call\n[01:55.62] Just gimme a minute lemme show 'em how I ball\n[01:58.76] Then we'll roll out, let's roll out\n[02:02.97] Let's roll out, we could roll out\n[02:11.86] Just sayin', I need to get a point across\n[02:14.77] Somebody find these niggas cuz they fuckin' lost\n[02:17.70] Tryna be the boss, couldn't pay the cost\n[02:20.77] Let my chain speak for me we ain't gotta talk\n[02:23.73] I go, til, the bottle's, hollow\n[02:27.50] Smokin' on diablo, smellin' like patron and\n[02:30.68] Marc jacob's cologne, up & away new generation\n[02:34.65] Apollo shit, so ready to roll, and rockout\n[02:38.72] These lames can't ball like the nba lockout\n[02:41.11] Hit 'em in the head, might pull a knot out\n[02:44.65] Show these motherfuckers what they not 'bout\n[02:47.11] Tell 'em\n[02:48.17] ",
880 "sha256": "0565f7815bc60c7fd96341073dd6420ca0e21ee36279d381ac5acf361fd27183",
881 "title": "Roll Out",
882 "track_number": 8,
883 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.song/3lhlly2gob22k",
884 "xata_createdat": "2025-02-05T22:54:58.062Z",
885 "xata_id": "rec_cuhuogho74fi003af7o0",
886 "xata_updatedat": "2025-03-03T07:21:04.449Z",
887 "xata_version": 16,
888 "apple_music_link": "null",
889 "genre": "null",
890 "label": "null",
891 "mb_id": "null",
892 "spotify_link": null,
893 "tidal_link": null,
894 "youtube_link": null
895 },
896 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.scrobble/3ljhfzlkhy225",
897 "user_id": {
898 "avatar": "https://cdn.bsky.app/img/avatar/plain/did:plc:7vdlgi2bflelz7mmuxoqjfcr/bafkreiabxfnhhk72ik2vgze6yjnjzbxps37nutkzbmnoo67ffoasgyeqwm@jpeg",
899 "did": "did:plc:7vdlgi2bflelz7mmuxoqjfcr",
900 "display_name": "Tsiry Sandratraina 馃",
901 "handle": "tsiry-sandratraina.com",
902 "xata_createdat": "2025-02-03T04:39:54.139Z",
903 "xata_id": "rec_cug4h6ibhfbm7uq5dte0",
904 "xata_updatedat": "2025-02-03T04:39:54.139Z",
905 "xata_version": 0
906 },
907 "xata_createdat": "2025-03-03T07:21:04.679Z",
908 "xata_id": "rec_cv2lgo4ddc7scqp7svv0",
909 "xata_updatedat": "2025-03-03T07:21:04.679Z",
910 "xata_version": 0
911 },
912 "user_album": {
913 "album_id": {
914 "xata_id": "rec_cuhuogpo74fi003af7og"
915 },
916 "scrobbles": 10,
917 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.album/3lhlly5k7sk2k",
918 "user_id": {
919 "xata_id": "rec_cug4h6ibhfbm7uq5dte0"
920 },
921 "xata_createdat": "2025-02-09T05:27:35.019Z",
922 "xata_id": "rec_cuk3phssvaqtev3d9l60",
923 "xata_updatedat": "2025-03-03T07:21:04.220Z",
924 "xata_version": 10
925 },
926 "user_artist": {
927 "artist_id": {
928 "xata_id": "rec_cuhuhsho74fi003af740"
929 },
930 "scrobbles": 21,
931 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.artist/3lhlly4tvws2k",
932 "user_id": {
933 "xata_id": "rec_cug4h6ibhfbm7uq5dte0"
934 },
935 "xata_createdat": "2025-02-08T21:38:11.888Z",
936 "xata_id": "rec_cujstgpdl6q579droij0",
937 "xata_updatedat": "2025-03-03T07:21:03.643Z",
938 "xata_version": 21
939 },
940 "user_track": {
941 "scrobbles": 6,
942 "track_id": {
943 "xata_id": "rec_cuhuogho74fi003af7o0"
944 },
945 "uri": "at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.song/3lhlly2gob22k",
946 "user_id": {
947 "xata_id": "rec_cug4h6ibhfbm7uq5dte0"
948 },
949 "xata_createdat": "2025-02-09T05:27:34.172Z",
950 "xata_id": "rec_cuk3phhdl6q579drp6f0",
951 "xata_updatedat": "2025-03-03T07:21:02.405Z",
952 "xata_version": 6
953 },
954 "album_track": {
955 "album_id": {
956 "xata_id": "rec_cuhuogpo74fi003af7og"
957 },
958 "track_id": {
959 "xata_id": "rec_cuhuogho74fi003af7o0"
960 },
961 "xata_createdat": "2025-02-05T22:54:59.922Z",
962 "xata_id": "rec_cuhuogpo74fi003af7p0",
963 "xata_updatedat": "2025-03-03T07:20:51.736Z",
964 "xata_version": 11
965 },
966 "artist_track": {
967 "artist_id": {
968 "xata_id": "rec_cuhuhsho74fi003af740"
969 },
970 "track_id": {
971 "xata_id": "rec_cuhuogho74fi003af7o0"
972 },
973 "xata_createdat": "2025-02-05T22:55:00.706Z",
974 "xata_id": "rec_cuhuoh2e5drjqa1arhf0",
975 "xata_updatedat": "2025-03-03T07:20:52.218Z",
976 "xata_version": 11
977 },
978 "artist_album": {
979 "album_id": {
980 "xata_id": "rec_cuhuogpo74fi003af7og"
981 },
982 "artist_id": {
983 "xata_id": "rec_cuhuhsho74fi003af740"
984 },
985 "xata_createdat": "2025-02-05T22:55:01.205Z",
986 "xata_id": "rec_cuhuohe7vkdf9dh0pkh0",
987 "xata_updatedat": "2025-03-03T07:20:53.007Z",
988 "xata_version": 29
989 }
990}
991 "#;
992
993 match serde_json::from_str::<types::ScrobblePayload>(data) {
994 Err(e) => {
995 tracing::error!("Error parsing payload: {}", e);
996 tracing::error!("{}", data);
997 }
998 Ok(_) => {}
999 }
1000 assert!(true);
1001 }
1002
1003 #[test]
1004 fn test_parse_like() {
1005 let data = r#"{
1006 "uri":"at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/app.rocksky.like/3mb6kxku6js2u",
1007 "user_id": {
1008 "xata_id": "rec_cug4h6ibhfbm7uq5dte0"
1009 },
1010 "track_id": {
1011 "xata_id":"rec_d11h6cdqrj64hn24639g"
1012 },
1013 "xata_createdat": "2025-12-30T04:59:55.203Z",
1014 "xata_id":"rec_d59loiod60d9sc81mc80",
1015 "xata_updatedat":"2025-12-30T04:59:55.203Z",
1016 "xata_version":0
1017 }"#;
1018
1019 match serde_json::from_str::<types::LikePayload>(data) {
1020 Err(e) => {
1021 tracing::error!("Error parsing payload: {}", e);
1022 tracing::error!("{}", data);
1023 }
1024 Ok(_) => {}
1025 }
1026 assert!(true);
1027 }
1028}