A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at feat/pgpull 92 lines 2.9 kB view raw
1use std::{env, sync::Arc}; 2 3use anyhow::{Context, Error}; 4use futures_util::StreamExt; 5use owo_colors::OwoColorize; 6use sqlx::postgres::PgPoolOptions; 7use tokio::sync::Mutex; 8use tokio_tungstenite::{connect_async, tungstenite::Message}; 9 10use crate::{repo::save_scrobble, types::Root, webhook_worker::AppState}; 11 12pub const SCROBBLE_NSID: &str = "app.rocksky.scrobble"; 13pub const ARTIST_NSID: &str = "app.rocksky.artist"; 14pub const ALBUM_NSID: &str = "app.rocksky.album"; 15pub const SONG_NSID: &str = "app.rocksky.song"; 16pub const PLAYLIST_NSID: &str = "app.rocksky.playlist"; 17pub const LIKE_NSID: &str = "app.rocksky.like"; 18pub const SHOUT_NSID: &str = "app.rocksky.shout"; 19 20pub struct ScrobbleSubscriber { 21 pub service_url: String, 22} 23 24impl ScrobbleSubscriber { 25 pub fn new(service: &str) -> Self { 26 Self { 27 service_url: service.to_string(), 28 } 29 } 30 31 pub async fn run(&self, state: Arc<Mutex<AppState>>) -> Result<(), Error> { 32 // Get the connection string outside of the task 33 let db_url = env::var("XATA_POSTGRES_URL") 34 .context("Failed to get XATA_POSTGRES_URL environment variable")?; 35 36 let pool = PgPoolOptions::new() 37 .max_connections(5) 38 .connect(&db_url) 39 .await?; 40 let pool = Arc::new(Mutex::new(pool)); 41 42 let (mut ws_stream, _) = connect_async(&self.service_url).await?; 43 tracing::info!(url = %self.service_url.bright_green(), "Connected to jetstream at"); 44 45 while let Some(msg) = ws_stream.next().await { 46 match msg { 47 Ok(msg) => { 48 if let Err(e) = handle_message(state.clone(), pool.clone(), msg).await { 49 tracing::error!(error = %e, "Error handling message"); 50 } 51 } 52 Err(e) => { 53 tracing::error!(error = %e, "WebSocket error"); 54 break; 55 } 56 } 57 } 58 59 Ok(()) 60 } 61} 62 63async fn handle_message( 64 state: Arc<Mutex<AppState>>, 65 pool: Arc<Mutex<sqlx::PgPool>>, 66 msg: Message, 67) -> Result<(), Error> { 68 tokio::spawn(async move { 69 if let Message::Text(text) = msg { 70 let message: Root = serde_json::from_str(&text)?; 71 72 if message.kind != "commit" { 73 return Ok::<(), Error>(()); 74 } 75 76 tracing::info!(message = %text, "Received message"); 77 if let Some(commit) = message.commit { 78 match save_scrobble(state, pool, &message.did, commit).await { 79 Ok(_) => { 80 tracing::info!(user_id = %message.did.bright_green(), "Scrobble saved successfully"); 81 } 82 Err(e) => { 83 tracing::error!(error = %e, "Error saving scrobble"); 84 } 85 } 86 } 87 } 88 Ok(()) 89 }); 90 91 Ok(()) 92}