A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz

feat(analytics): add genres column to artists and update artist genres functionality

+173 -15
+86 -1
crates/analytics/src/core.rs
··· 1 - use std::sync::{Arc, Mutex}; 2 3 use anyhow::Error; 4 use duckdb::{params, Connection}; ··· 180 ", 181 )?; 182 183 Ok(()) 184 } 185
··· 1 + use std::{ 2 + env, 3 + sync::{Arc, Mutex}, 4 + }; 5 6 use anyhow::Error; 7 use duckdb::{params, Connection}; ··· 183 ", 184 )?; 185 186 + match conn.execute("ALTER TABLE artists ADD COLUMN genres VARCHAR[]", []) { 187 + Ok(_) => tracing::info!("Added genres column to artists table"), 188 + Err(e) => tracing::warn!("Could not add genres column to artists table: {}", e), 189 + } 190 + 191 + Ok(()) 192 + } 193 + 194 + pub async fn update_artist_genres( 195 + conn: Arc<Mutex<Connection>>, 196 + pool: &Pool<Postgres>, 197 + ) -> Result<(), Error> { 198 + if env::var("UPDATE_ARTIST_GENRES").is_err() { 199 + tracing::info!("Skipping update_artist_genres as UPDATE_ARTIST_GENRES is not set"); 200 + return Ok(()); 201 + } 202 + 203 + let artists: Vec<xata::artist::ArtistWithoutDate> = sqlx::query_as( 204 + r#" 205 + SELECT * FROM artists 206 + "#, 207 + ) 208 + .fetch_all(pool) 209 + .await?; 210 + 211 + let conn = conn.lock().unwrap(); 212 + 213 + conn.execute_batch( 214 + " 215 + CREATE TABLE user_artists_new AS SELECT * FROM user_artists; 216 + CREATE TABLE artist_tracks_new AS SELECT * FROM artist_tracks; 217 + CREATE TABLE artist_albums_new AS SELECT * FROM artist_albums; 218 + CREATE TABLE scrobbles_new AS SELECT * FROM scrobbles; 219 + CREATE TABLE loved_tracks_new AS SELECT * FROM loved_tracks; 220 + DROP TABLE user_artists; 221 + DROP TABLE artist_tracks; 222 + DROP TABLE artist_albums; 223 + DROP TABLE scrobbles; 224 + DROP TABLE loved_tracks; 225 + ALTER TABLE user_artists_new RENAME TO user_artists; 226 + ALTER TABLE artist_tracks_new RENAME TO artist_tracks; 227 + ALTER TABLE artist_albums_new RENAME TO artist_albums; 228 + ALTER TABLE scrobbles_new RENAME TO scrobbles; 229 + ALTER TABLE loved_tracks_new RENAME TO loved_tracks; 230 + ", 231 + )?; 232 + 233 + for (i, artist) in artists.clone().into_iter().enumerate() { 234 + let genres_array = artist 235 + .genres 236 + .as_ref() 237 + .map(|tags| { 238 + tags.iter() 239 + .map(|tag| format!("'{}'", tag.replace("'", "''"))) 240 + .collect::<Vec<_>>() 241 + .join(", ") 242 + }) 243 + .unwrap_or_default() 244 + .trim() 245 + .to_string(); 246 + 247 + if genres_array.is_empty() { 248 + continue; 249 + } 250 + 251 + tracing::info!(artist = i, name = %artist.name.bright_green(), genres = %genres_array, "Updating artist genres"); 252 + 253 + match conn.execute( 254 + &format!( 255 + "UPDATE artists SET genres = [{}] WHERE id = ? AND genres IS NULL", 256 + genres_array 257 + ), 258 + params![artist.xata_id], 259 + ) { 260 + Ok(_) => (), 261 + Err(e) => { 262 + tracing::error!(error = %e, genres = %genres_array, "Error updating artist >> ") 263 + } 264 + } 265 + } 266 + 267 + tracing::info!(artists = artists.len(), "Updated artist genres"); 268 Ok(()) 269 } 270
+38 -11
crates/analytics/src/handlers/artists.rs
··· 10 }; 11 use actix_web::{web, HttpRequest, HttpResponse}; 12 use anyhow::Error; 13 - use duckdb::Connection; 14 use tokio_stream::StreamExt; 15 16 use crate::read_payload; ··· 59 let artists = stmt.query_map( 60 [&did, &did, &limit.to_string(), &offset.to_string()], 61 |row| { 62 Ok(Artist { 63 id: row.get(0)?, 64 name: row.get(1)?, ··· 73 youtube_link: row.get(10)?, 74 apple_music_link: row.get(11)?, 75 uri: row.get(12)?, 76 - play_count: row.get(13)?, 77 - unique_listeners: row.get(14)?, 78 }) 79 }, 80 )?; ··· 84 } 85 None => { 86 let artists = stmt.query_map([limit, offset], |row| { 87 Ok(Artist { 88 id: row.get(0)?, 89 name: row.get(1)?, ··· 98 youtube_link: row.get(10)?, 99 apple_music_link: row.get(11)?, 100 uri: row.get(12)?, 101 - play_count: row.get(13)?, 102 - unique_listeners: row.get(14)?, 103 }) 104 })?; 105 ··· 131 ar.picture AS picture, 132 ar.sha256 AS sha256, 133 ar.uri AS uri, 134 COUNT(*) AS play_count, 135 COUNT(DISTINCT s.user_id) AS unique_listeners 136 FROM ··· 142 WHERE 143 s.artist_id IS NOT NULL AND (u.did = ? OR u.handle = ?) 144 GROUP BY 145 - s.artist_id, ar.name, ar.uri, ar.picture, ar.sha256 146 ORDER BY 147 play_count DESC 148 OFFSET ? ··· 157 ar.picture AS picture, 158 ar.sha256 AS sha256, 159 ar.uri AS uri, 160 COUNT(*) AS play_count, 161 COUNT(DISTINCT s.user_id) AS unique_listeners 162 FROM ··· 166 WHERE 167 s.artist_id IS NOT NULL 168 GROUP BY 169 - s.artist_id, ar.name, ar.uri, ar.picture, ar.sha256 170 ORDER BY 171 play_count DESC 172 OFFSET ? ··· 180 let artists = stmt.query_map( 181 [&did, &did, &limit.to_string(), &offset.to_string()], 182 |row| { 183 Ok(Artist { 184 id: row.get(0)?, 185 name: row.get(1)?, ··· 194 youtube_link: None, 195 apple_music_link: None, 196 uri: row.get(4)?, 197 - play_count: Some(row.get(5)?), 198 - unique_listeners: Some(row.get(6)?), 199 }) 200 }, 201 )?; ··· 205 } 206 None => { 207 let artists = stmt.query_map([limit, offset], |row| { 208 Ok(Artist { 209 id: row.get(0)?, 210 name: row.get(1)?, ··· 219 youtube_link: None, 220 apple_music_link: None, 221 uri: row.get(4)?, 222 - play_count: Some(row.get(5)?), 223 - unique_listeners: Some(row.get(6)?), 224 }) 225 })?; 226 ··· 491 let listeners: Result<Vec<_>, _> = listeners.collect(); 492 Ok(HttpResponse::Ok().json(listeners?)) 493 }
··· 10 }; 11 use actix_web::{web, HttpRequest, HttpResponse}; 12 use anyhow::Error; 13 + use duckdb::{types::Value, Connection}; 14 use tokio_stream::StreamExt; 15 16 use crate::read_payload; ··· 59 let artists = stmt.query_map( 60 [&did, &did, &limit.to_string(), &offset.to_string()], 61 |row| { 62 + let genres = extract_genres_from_value(row.get(13)?); 63 Ok(Artist { 64 id: row.get(0)?, 65 name: row.get(1)?, ··· 74 youtube_link: row.get(10)?, 75 apple_music_link: row.get(11)?, 76 uri: row.get(12)?, 77 + genres, 78 + play_count: row.get(14)?, 79 + unique_listeners: row.get(15)?, 80 }) 81 }, 82 )?; ··· 86 } 87 None => { 88 let artists = stmt.query_map([limit, offset], |row| { 89 + let genres = extract_genres_from_value(row.get(13)?); 90 Ok(Artist { 91 id: row.get(0)?, 92 name: row.get(1)?, ··· 101 youtube_link: row.get(10)?, 102 apple_music_link: row.get(11)?, 103 uri: row.get(12)?, 104 + genres, 105 + play_count: row.get(14)?, 106 + unique_listeners: row.get(15)?, 107 }) 108 })?; 109 ··· 135 ar.picture AS picture, 136 ar.sha256 AS sha256, 137 ar.uri AS uri, 138 + ar.genres AS genres, 139 COUNT(*) AS play_count, 140 COUNT(DISTINCT s.user_id) AS unique_listeners 141 FROM ··· 147 WHERE 148 s.artist_id IS NOT NULL AND (u.did = ? OR u.handle = ?) 149 GROUP BY 150 + s.artist_id, ar.name, ar.uri, ar.picture, ar.sha256, ar.genres 151 ORDER BY 152 play_count DESC 153 OFFSET ? ··· 162 ar.picture AS picture, 163 ar.sha256 AS sha256, 164 ar.uri AS uri, 165 + ar.genres AS genres, 166 COUNT(*) AS play_count, 167 COUNT(DISTINCT s.user_id) AS unique_listeners 168 FROM ··· 172 WHERE 173 s.artist_id IS NOT NULL 174 GROUP BY 175 + s.artist_id, ar.name, ar.uri, ar.picture, ar.sha256, ar.genres 176 ORDER BY 177 play_count DESC 178 OFFSET ? ··· 186 let artists = stmt.query_map( 187 [&did, &did, &limit.to_string(), &offset.to_string()], 188 |row| { 189 + let genres = extract_genres_from_value(row.get(5)?); 190 Ok(Artist { 191 id: row.get(0)?, 192 name: row.get(1)?, ··· 201 youtube_link: None, 202 apple_music_link: None, 203 uri: row.get(4)?, 204 + genres, 205 + play_count: Some(row.get(6)?), 206 + unique_listeners: Some(row.get(7)?), 207 }) 208 }, 209 )?; ··· 213 } 214 None => { 215 let artists = stmt.query_map([limit, offset], |row| { 216 + let genres = extract_genres_from_value(row.get(5)?); 217 Ok(Artist { 218 id: row.get(0)?, 219 name: row.get(1)?, ··· 228 youtube_link: None, 229 apple_music_link: None, 230 uri: row.get(4)?, 231 + genres, 232 + play_count: Some(row.get(6)?), 233 + unique_listeners: Some(row.get(7)?), 234 }) 235 })?; 236 ··· 501 let listeners: Result<Vec<_>, _> = listeners.collect(); 502 Ok(HttpResponse::Ok().json(listeners?)) 503 } 504 + 505 + fn extract_genres_from_value(value: Value) -> Option<Vec<String>> { 506 + match value { 507 + Value::Null => None, 508 + Value::List(items) => { 509 + let genres: Vec<String> = items 510 + .into_iter() 511 + .filter_map(|item| match item { 512 + Value::Text(s) => Some(s), 513 + _ => None, 514 + }) 515 + .collect(); 516 + Some(genres) 517 + } 518 + _ => None, 519 + } 520 + }
+8 -1
crates/analytics/src/lib.rs
··· 9 use duckdb::Connection; 10 use sqlx::postgres::PgPoolOptions; 11 12 - use crate::core::create_tables; 13 14 pub mod cmd; 15 pub mod core; ··· 23 24 create_tables(&conn).await?; 25 26 let conn = Arc::new(Mutex::new(conn)); 27 export_parquets(conn.clone()); 28 cmd::serve::serve(conn).await?; 29
··· 9 use duckdb::Connection; 10 use sqlx::postgres::PgPoolOptions; 11 12 + use crate::core::{create_tables, update_artist_genres}; 13 14 pub mod cmd; 15 pub mod core; ··· 23 24 create_tables(&conn).await?; 25 26 + let pool = PgPoolOptions::new() 27 + .max_connections(5) 28 + .connect(&env::var("XATA_POSTGRES_URL")?) 29 + .await?; 30 + 31 let conn = Arc::new(Mutex::new(conn)); 32 + update_artist_genres(conn.clone(), &pool).await?; 33 + 34 export_parquets(conn.clone()); 35 cmd::serve::serve(conn).await?; 36
+16 -2
crates/analytics/src/subscriber/mod.rs
··· 207 let conn = conn.lock().unwrap(); 208 209 match conn.execute( 210 - "INSERT INTO artists ( 211 id, 212 name, 213 biography, ··· 220 tidal_link, 221 youtube_link, 222 apple_music_link, 223 - uri 224 ) VALUES ( 225 ?, 226 ?, ··· 236 ?, 237 ? 238 )", 239 params![ 240 payload.scrobble.artist_id.xata_id, 241 payload.scrobble.artist_id.name,
··· 207 let conn = conn.lock().unwrap(); 208 209 match conn.execute( 210 + &format!( 211 + "INSERT INTO artists ( 212 id, 213 name, 214 biography, ··· 221 tidal_link, 222 youtube_link, 223 apple_music_link, 224 + uri, 225 + [{}] 226 ) VALUES ( 227 ?, 228 ?, ··· 238 ?, 239 ? 240 )", 241 + payload 242 + .scrobble 243 + .artist_id 244 + .genres 245 + .as_ref() 246 + .map(|genres| genres 247 + .iter() 248 + .map(|g| format!("'{}'", g)) 249 + .collect::<Vec<_>>() 250 + .join(", ")) 251 + .unwrap_or_default() 252 + ), 253 params![ 254 payload.scrobble.artist_id.xata_id, 255 payload.scrobble.artist_id.name,
+2
crates/analytics/src/subscriber/types.rs
··· 151 pub tidal_link: Option<String>, 152 #[serde(skip_serializing_if = "Option::is_none")] 153 pub youtube_link: Option<String>, 154 } 155 156 #[derive(Debug, Serialize, Deserialize, Clone)]
··· 151 pub tidal_link: Option<String>, 152 #[serde(skip_serializing_if = "Option::is_none")] 153 pub youtube_link: Option<String>, 154 + #[serde(skip_serializing_if = "Option::is_none")] 155 + pub genres: Option<Vec<String>>, 156 } 157 158 #[derive(Debug, Serialize, Deserialize, Clone)]
+2
crates/analytics/src/types/artist.rs
··· 31 pub play_count: Option<i32>, 32 #[serde(skip_serializing_if = "Option::is_none")] 33 pub unique_listeners: Option<i32>, 34 } 35 36 #[derive(Debug, Serialize, Deserialize, Default)]
··· 31 pub play_count: Option<i32>, 32 #[serde(skip_serializing_if = "Option::is_none")] 33 pub unique_listeners: Option<i32>, 34 + #[serde(skip_serializing_if = "Option::is_none")] 35 + pub genres: Option<Vec<String>>, 36 } 37 38 #[derive(Debug, Serialize, Deserialize, Default)]
+21
crates/analytics/src/xata/artist.rs
··· 18 pub youtube_link: Option<String>, 19 pub apple_music_link: Option<String>, 20 pub uri: Option<String>, 21 #[serde(with = "chrono::serde::ts_seconds")] 22 pub xata_createdat: DateTime<Utc>, 23 }
··· 18 pub youtube_link: Option<String>, 19 pub apple_music_link: Option<String>, 20 pub uri: Option<String>, 21 + pub genres: Option<Vec<String>>, 22 #[serde(with = "chrono::serde::ts_seconds")] 23 pub xata_createdat: DateTime<Utc>, 24 } 25 + 26 + #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 27 + pub struct ArtistWithoutDate { 28 + pub xata_id: String, 29 + pub name: String, 30 + pub biography: Option<String>, 31 + #[serde(with = "chrono::serde::ts_seconds_option")] 32 + pub born: Option<DateTime<Utc>>, 33 + pub born_in: Option<String>, 34 + #[serde(with = "chrono::serde::ts_seconds_option")] 35 + pub died: Option<DateTime<Utc>>, 36 + pub picture: Option<String>, 37 + pub sha256: String, 38 + pub spotify_link: Option<String>, 39 + pub tidal_link: Option<String>, 40 + pub youtube_link: Option<String>, 41 + pub apple_music_link: Option<String>, 42 + pub uri: Option<String>, 43 + pub genres: Option<Vec<String>>, 44 + }