forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use std::{env, sync::Arc};
2
3use anyhow::{Context, Error};
4use futures_util::StreamExt;
5use owo_colors::OwoColorize;
6use sqlx::postgres::PgPoolOptions;
7use tokio::sync::Mutex;
8use tokio_tungstenite::{connect_async, tungstenite::Message};
9
10use crate::{repo::save_scrobble, types::Root, webhook_worker::AppState};
11
12pub const SCROBBLE_NSID: &str = "app.rocksky.scrobble";
13pub const ARTIST_NSID: &str = "app.rocksky.artist";
14pub const ALBUM_NSID: &str = "app.rocksky.album";
15pub const SONG_NSID: &str = "app.rocksky.song";
16pub const PLAYLIST_NSID: &str = "app.rocksky.playlist";
17pub const LIKE_NSID: &str = "app.rocksky.like";
18pub const SHOUT_NSID: &str = "app.rocksky.shout";
19
20pub struct ScrobbleSubscriber {
21 pub service_url: String,
22}
23
24impl ScrobbleSubscriber {
25 pub fn new(service: &str) -> Self {
26 Self {
27 service_url: service.to_string(),
28 }
29 }
30
31 pub async fn run(&self, state: Arc<Mutex<AppState>>) -> Result<(), Error> {
32 // Get the connection string outside of the task
33 let db_url = env::var("XATA_POSTGRES_URL")
34 .context("Failed to get XATA_POSTGRES_URL environment variable")?;
35
36 let pool = PgPoolOptions::new()
37 .max_connections(5)
38 .connect(&db_url)
39 .await?;
40 let pool = Arc::new(Mutex::new(pool));
41
42 let (mut ws_stream, _) = connect_async(&self.service_url).await?;
43 tracing::info!(url = %self.service_url.bright_green(), "Connected to jetstream at");
44
45 while let Some(msg) = ws_stream.next().await {
46 match msg {
47 Ok(msg) => {
48 if let Err(e) = handle_message(state.clone(), pool.clone(), msg).await {
49 tracing::error!(error = %e, "Error handling message");
50 }
51 }
52 Err(e) => {
53 tracing::error!(error = %e, "WebSocket error");
54 break;
55 }
56 }
57 }
58
59 Ok(())
60 }
61}
62
63async fn handle_message(
64 state: Arc<Mutex<AppState>>,
65 pool: Arc<Mutex<sqlx::PgPool>>,
66 msg: Message,
67) -> Result<(), Error> {
68 tokio::spawn(async move {
69 if let Message::Text(text) = msg {
70 let message: Root = serde_json::from_str(&text)?;
71
72 if message.kind != "commit" {
73 return Ok::<(), Error>(());
74 }
75
76 tracing::info!(message = %text, "Received message");
77 if let Some(commit) = message.commit {
78 match save_scrobble(state, pool, &message.did, commit).await {
79 Ok(_) => {
80 tracing::info!(user_id = %message.did.bright_green(), "Scrobble saved successfully");
81 }
82 Err(e) => {
83 tracing::error!(error = %e, "Error saving scrobble");
84 }
85 }
86 }
87 }
88 Ok(())
89 });
90
91 Ok(())
92}