A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at feat/pgpull 115 lines 4.0 kB view raw
1use crate::{ 2 cache::Cache, consts::BANNER, musicbrainz::client::MusicbrainzClient, repo, 3 scrobbler::scrobble, types::ScrobbleRequest, 4}; 5use actix_web::{get, post, web, HttpRequest, HttpResponse, Responder}; 6use owo_colors::OwoColorize; 7use sqlx::{Pool, Postgres}; 8use std::sync::Arc; 9use tokio_stream::StreamExt; 10 11#[macro_export] 12macro_rules! read_payload { 13 ($payload:expr) => {{ 14 let mut body = Vec::new(); 15 while let Some(chunk) = $payload.next().await { 16 match chunk { 17 Ok(bytes) => body.extend_from_slice(&bytes), 18 Err(err) => return Err(err.into()), 19 } 20 } 21 body 22 }}; 23} 24 25#[get("/")] 26pub async fn index() -> impl Responder { 27 HttpResponse::Ok().body(BANNER) 28} 29 30#[post("/{id}")] 31async fn handle_scrobble( 32 data: web::Data<Arc<Pool<Postgres>>>, 33 cache: web::Data<Cache>, 34 mb_client: web::Data<Arc<MusicbrainzClient>>, 35 mut payload: web::Payload, 36 req: HttpRequest, 37) -> Result<impl Responder, actix_web::Error> { 38 let id = req.match_info().get("id").unwrap(); 39 tracing::info!(id = %id.bright_green(), "Received scrobble"); 40 41 let pool = data.get_ref().clone(); 42 43 let user = repo::user::get_user_by_webscrobbler(&pool, id) 44 .await 45 .map_err(|err| { 46 actix_web::error::ErrorInternalServerError(format!("Database error: {}", err)) 47 })?; 48 49 if user.is_none() { 50 return Ok(HttpResponse::NotFound().body("There is no user with this webscrobbler ID")); 51 } 52 let user = user.unwrap(); 53 54 let body = read_payload!(payload); 55 let params = serde_json::from_slice::<ScrobbleRequest>(&body).map_err(|err| { 56 let body = String::from_utf8_lossy(&body); 57 tracing::error!(body = %body, error = %err, "Failed to parse JSON"); 58 actix_web::error::ErrorBadRequest(format!("Failed to parse JSON: {}", err)) 59 })?; 60 61 tracing::info!(params = ?params, "Parsed scrobble request"); 62 63 if params.event_name != "scrobble" { 64 tracing::info!(event_name = %params.event_name.cyan(), "Skipping non-scrobble event"); 65 return Ok(HttpResponse::Ok().body("Skipping non-scrobble event")); 66 } 67 68 // Check if connector is Spotify 69 if params.data.song.connector.id == "spotify" { 70 // Skip if the user has a Spotify token 71 let spotify_token = repo::spotify_token::get_spotify_token(&pool, &user.did) 72 .await 73 .map_err(|err| { 74 actix_web::error::ErrorInternalServerError(format!( 75 "Failed to get Spotify tokens: {}", 76 err 77 )) 78 })?; 79 80 if spotify_token.is_some() { 81 tracing::info!("User has a Spotify token, skipping scrobble"); 82 return Ok(HttpResponse::Ok().body("User has a Spotify token, skipping scrobble")); 83 } 84 } 85 86 let cache = cache.get_ref().clone(); 87 88 if params.data.song.connector.id == "emby" { 89 let artist = params.data.song.parsed.artist.clone(); 90 let track = params.data.song.parsed.track.clone(); 91 let cached = cache.get(&format!( 92 "listenbrainz:emby:{}:{}:{}", 93 artist, track, user.did 94 )); 95 96 if cached.is_err() { 97 tracing::error!(artist = %artist, track = %track, error = %cached.unwrap_err(), "Failed to check cache for Emby scrobble"); 98 return Ok(HttpResponse::Ok().body("Failed to check cache for Emby scrobble")); 99 } 100 101 if cached.unwrap().is_some() { 102 tracing::warn!(artist = %artist, track = %track, "Skipping duplicate scrobble for Emby"); 103 return Ok(HttpResponse::Ok().body("Skipping duplicate scrobble for Emby")); 104 } 105 } 106 107 let mb_client = mb_client.get_ref().as_ref(); 108 scrobble(&pool, &cache, mb_client, params, &user.did) 109 .await 110 .map_err(|err| { 111 actix_web::error::ErrorInternalServerError(format!("Failed to scrobble: {}", err)) 112 })?; 113 114 Ok(HttpResponse::Ok().body("Scrobble received")) 115}