forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use std::{env, sync::Arc};
2
3use anyhow::{Context, Error};
4use futures_util::StreamExt;
5use owo_colors::OwoColorize;
6use sqlx::postgres::PgPoolOptions;
7use tokio::sync::Mutex;
8use tokio_tungstenite::{connect_async, tungstenite::Message};
9
10use crate::{repo::save_scrobble, types::Root, webhook_worker::AppState};
11
12pub const SCROBBLE_NSID: &str = "app.rocksky.scrobble";
13pub const ARTIST_NSID: &str = "app.rocksky.artist";
14pub const ALBUM_NSID: &str = "app.rocksky.album";
15pub const SONG_NSID: &str = "app.rocksky.song";
16pub const PLAYLIST_NSID: &str = "app.rocksky.playlist";
17pub const LIKE_NSID: &str = "app.rocksky.like";
18pub const SHOUT_NSID: &str = "app.rocksky.shout";
19
20pub struct ScrobbleSubscriber {
21 pub service_url: String,
22}
23
24impl ScrobbleSubscriber {
25 pub fn new(service: &str) -> Self {
26 Self {
27 service_url: service.to_string(),
28 }
29 }
30
31 pub async fn run(&self, state: Arc<Mutex<AppState>>) -> Result<(), Error> {
32 // Get the connection string outside of the task
33 let db_url = env::var("XATA_POSTGRES_URL")
34 .context("Failed to get XATA_POSTGRES_URL environment variable")?;
35
36 let pool = PgPoolOptions::new()
37 .max_connections(5)
38 .connect(&db_url)
39 .await?;
40 let pool = Arc::new(Mutex::new(pool));
41
42 let (mut ws_stream, _) = connect_async(&self.service_url).await?;
43 println!(
44 "Connected to jetstream at {}",
45 self.service_url.bright_green()
46 );
47
48 while let Some(msg) = ws_stream.next().await {
49 match msg {
50 Ok(msg) => {
51 if let Err(e) = handle_message(state.clone(), pool.clone(), msg).await {
52 eprintln!("Error handling message: {}", e);
53 }
54 }
55 Err(e) => {
56 eprintln!("WebSocket error: {}", e);
57 break;
58 }
59 }
60 }
61
62 Ok(())
63 }
64}
65
66async fn handle_message(
67 state: Arc<Mutex<AppState>>,
68 pool: Arc<Mutex<sqlx::PgPool>>,
69 msg: Message,
70) -> Result<(), Error> {
71 tokio::spawn(async move {
72 if let Message::Text(text) = msg {
73 let message: Root = serde_json::from_str(&text)?;
74
75 if message.kind != "commit" {
76 return Ok::<(), Error>(());
77 }
78
79 println!("Received message: {:#?}", message);
80 if let Some(commit) = message.commit {
81 match save_scrobble(state, pool, &message.did, commit).await {
82 Ok(_) => {
83 println!("Scrobble saved successfully");
84 }
85 Err(e) => {
86 eprintln!("Error saving scrobble: {}", e);
87 }
88 }
89 }
90 }
91 Ok(())
92 });
93
94 Ok(())
95}