A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at feat/discord-webhook 95 lines 2.9 kB view raw
1use std::{env, sync::Arc}; 2 3use anyhow::{Context, Error}; 4use futures_util::StreamExt; 5use owo_colors::OwoColorize; 6use sqlx::postgres::PgPoolOptions; 7use tokio::sync::Mutex; 8use tokio_tungstenite::{connect_async, tungstenite::Message}; 9 10use crate::{repo::save_scrobble, types::Root, webhook_worker::AppState}; 11 12pub const SCROBBLE_NSID: &str = "app.rocksky.scrobble"; 13pub const ARTIST_NSID: &str = "app.rocksky.artist"; 14pub const ALBUM_NSID: &str = "app.rocksky.album"; 15pub const SONG_NSID: &str = "app.rocksky.song"; 16pub const PLAYLIST_NSID: &str = "app.rocksky.playlist"; 17pub const LIKE_NSID: &str = "app.rocksky.like"; 18pub const SHOUT_NSID: &str = "app.rocksky.shout"; 19 20pub struct ScrobbleSubscriber { 21 pub service_url: String, 22} 23 24impl ScrobbleSubscriber { 25 pub fn new(service: &str) -> Self { 26 Self { 27 service_url: service.to_string(), 28 } 29 } 30 31 pub async fn run(&self, state: Arc<Mutex<AppState>>) -> Result<(), Error> { 32 // Get the connection string outside of the task 33 let db_url = env::var("XATA_POSTGRES_URL") 34 .context("Failed to get XATA_POSTGRES_URL environment variable")?; 35 36 let pool = PgPoolOptions::new() 37 .max_connections(5) 38 .connect(&db_url) 39 .await?; 40 let pool = Arc::new(Mutex::new(pool)); 41 42 let (mut ws_stream, _) = connect_async(&self.service_url).await?; 43 println!( 44 "Connected to jetstream at {}", 45 self.service_url.bright_green() 46 ); 47 48 while let Some(msg) = ws_stream.next().await { 49 match msg { 50 Ok(msg) => { 51 if let Err(e) = handle_message(state.clone(), pool.clone(), msg).await { 52 eprintln!("Error handling message: {}", e); 53 } 54 } 55 Err(e) => { 56 eprintln!("WebSocket error: {}", e); 57 break; 58 } 59 } 60 } 61 62 Ok(()) 63 } 64} 65 66async fn handle_message( 67 state: Arc<Mutex<AppState>>, 68 pool: Arc<Mutex<sqlx::PgPool>>, 69 msg: Message, 70) -> Result<(), Error> { 71 tokio::spawn(async move { 72 if let Message::Text(text) = msg { 73 let message: Root = serde_json::from_str(&text)?; 74 75 if message.kind != "commit" { 76 return Ok::<(), Error>(()); 77 } 78 79 println!("Received message: {:#?}", message); 80 if let Some(commit) = message.commit { 81 match save_scrobble(state, pool, &message.did, commit).await { 82 Ok(_) => { 83 println!("Scrobble saved successfully"); 84 } 85 Err(e) => { 86 eprintln!("Error saving scrobble: {}", e); 87 } 88 } 89 } 90 } 91 Ok(()) 92 }); 93 94 Ok(()) 95}