forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use crate::{
2 cache::Cache, consts::BANNER, musicbrainz::client::MusicbrainzClient, repo,
3 scrobbler::scrobble, types::ScrobbleRequest,
4};
5use actix_web::{get, post, web, HttpRequest, HttpResponse, Responder};
6use owo_colors::OwoColorize;
7use sqlx::{Pool, Postgres};
8use std::sync::Arc;
9use tokio_stream::StreamExt;
10
11#[macro_export]
12macro_rules! read_payload {
13 ($payload:expr) => {{
14 let mut body = Vec::new();
15 while let Some(chunk) = $payload.next().await {
16 match chunk {
17 Ok(bytes) => body.extend_from_slice(&bytes),
18 Err(err) => return Err(err.into()),
19 }
20 }
21 body
22 }};
23}
24
25#[get("/")]
26pub async fn index() -> impl Responder {
27 HttpResponse::Ok().body(BANNER)
28}
29
30#[post("/{id}")]
31async fn handle_scrobble(
32 data: web::Data<Arc<Pool<Postgres>>>,
33 cache: web::Data<Cache>,
34 mb_client: web::Data<Arc<MusicbrainzClient>>,
35 mut payload: web::Payload,
36 req: HttpRequest,
37) -> Result<impl Responder, actix_web::Error> {
38 let id = req.match_info().get("id").unwrap();
39 tracing::info!(id = %id.bright_green(), "Received scrobble");
40
41 let pool = data.get_ref().clone();
42
43 let user = repo::user::get_user_by_webscrobbler(&pool, id)
44 .await
45 .map_err(|err| {
46 actix_web::error::ErrorInternalServerError(format!("Database error: {}", err))
47 })?;
48
49 if user.is_none() {
50 return Ok(HttpResponse::NotFound().body("There is no user with this webscrobbler ID"));
51 }
52 let user = user.unwrap();
53
54 let body = read_payload!(payload);
55 let params = serde_json::from_slice::<ScrobbleRequest>(&body).map_err(|err| {
56 let body = String::from_utf8_lossy(&body);
57 tracing::error!(body = %body, error = %err, "Failed to parse JSON");
58 actix_web::error::ErrorBadRequest(format!("Failed to parse JSON: {}", err))
59 })?;
60
61 tracing::info!(params = ?params, "Parsed scrobble request");
62
63 if params.event_name != "scrobble" {
64 tracing::info!(event_name = %params.event_name.cyan(), "Skipping non-scrobble event");
65 return Ok(HttpResponse::Ok().body("Skipping non-scrobble event"));
66 }
67
68 // Check if connector is Spotify
69 if params.data.song.connector.id == "spotify" {
70 // Skip if the user has a Spotify token
71 let spotify_token = repo::spotify_token::get_spotify_token(&pool, &user.did)
72 .await
73 .map_err(|err| {
74 actix_web::error::ErrorInternalServerError(format!(
75 "Failed to get Spotify tokens: {}",
76 err
77 ))
78 })?;
79
80 if spotify_token.is_some() {
81 tracing::info!("User has a Spotify token, skipping scrobble");
82 return Ok(HttpResponse::Ok().body("User has a Spotify token, skipping scrobble"));
83 }
84 }
85
86 let cache = cache.get_ref().clone();
87
88 if params.data.song.connector.id == "emby" {
89 let artist = params.data.song.parsed.artist.clone();
90 let track = params.data.song.parsed.track.clone();
91 let cached = cache.get(&format!(
92 "listenbrainz:emby:{}:{}:{}",
93 artist, track, user.did
94 ));
95
96 if cached.is_err() {
97 tracing::error!(artist = %artist, track = %track, error = %cached.unwrap_err(), "Failed to check cache for Emby scrobble");
98 return Ok(HttpResponse::Ok().body("Failed to check cache for Emby scrobble"));
99 }
100
101 if cached.unwrap().is_some() {
102 tracing::warn!(artist = %artist, track = %track, "Skipping duplicate scrobble for Emby");
103 return Ok(HttpResponse::Ok().body("Skipping duplicate scrobble for Emby"));
104 }
105 }
106
107 let mb_client = mb_client.get_ref().as_ref();
108 scrobble(&pool, &cache, mb_client, params, &user.did)
109 .await
110 .map_err(|err| {
111 actix_web::error::ErrorInternalServerError(format!("Failed to scrobble: {}", err))
112 })?;
113
114 Ok(HttpResponse::Ok().body("Scrobble received"))
115}