A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at feat/like-scrobble 154 lines 5.2 kB view raw
1use std::{ 2 env, 3 str::FromStr, 4 sync::{Arc, Mutex}, 5 thread, 6}; 7 8use anyhow::Error; 9use duckdb::Connection; 10use sqlx::postgres::PgPoolOptions; 11 12use crate::core::{create_tables, update_artist_genres}; 13 14pub mod cmd; 15pub mod core; 16pub mod handlers; 17pub mod subscriber; 18pub mod types; 19pub mod xata; 20 21pub async fn serve() -> Result<(), Error> { 22 let conn = Connection::open("./rocksky-analytics.ddb")?; 23 24 create_tables(&conn).await?; 25 26 let pool = PgPoolOptions::new() 27 .max_connections(5) 28 .connect(&env::var("XATA_POSTGRES_URL")?) 29 .await?; 30 31 let conn = Arc::new(Mutex::new(conn)); 32 update_artist_genres(conn.clone(), &pool).await?; 33 34 export_parquets(conn.clone()); 35 cmd::serve::serve(conn).await?; 36 37 Ok(()) 38} 39 40pub async fn sync() -> Result<(), Error> { 41 let pool = PgPoolOptions::new() 42 .max_connections(5) 43 .connect(&env::var("XATA_POSTGRES_URL")?) 44 .await?; 45 46 let conn = Connection::open("./rocksky-analytics.ddb")?; 47 create_tables(&conn).await?; 48 49 let conn = Arc::new(Mutex::new(conn)); 50 51 cmd::sync::sync(conn, &pool).await?; 52 53 Ok(()) 54} 55 56fn export_parquets(conn: Arc<Mutex<Connection>>) { 57 thread::spawn(move || { 58 // fire every 5 minutes 59 let cron_expr = "0 */5 * * * * *"; 60 let schedule = cron::Schedule::from_str(cron_expr); 61 if let Err(err) = schedule { 62 tracing::error!("Failed to parse cron expression: {}", cron_expr); 63 tracing::error!(error = %err); 64 return Ok(()); 65 } 66 let schedule = schedule.unwrap(); 67 loop { 68 let now = chrono::Utc::now(); 69 let mut upcoming = schedule.upcoming(chrono::Utc).take(1); 70 71 if let Some(next) = upcoming.next() { 72 let duration = next.signed_duration_since(now).to_std().unwrap(); 73 thread::sleep(duration); 74 tracing::info!("Exporting parquets ..."); 75 76 let conn = conn.lock().unwrap(); 77 conn.execute_batch( 78 "BEGIN; 79 COPY (SELECT * FROM scrobbles) TO 'scrobbles.parquet' (FORMAT PARQUET); 80 COPY (SELECT * FROM artists) TO 'artists.parquet' (FORMAT PARQUET); 81 COPY (SELECT * FROM albums) TO 'albums.parquet' (FORMAT PARQUET); 82 COPY (SELECT * FROM tracks) TO 'tracks.parquet' (FORMAT PARQUET); 83 COPY (SELECT * FROM users) TO 'users.parquet' (FORMAT PARQUET); 84 COPY (SELECT * FROM album_tracks) TO 'album_tracks.parquet' (FORMAT PARQUET); 85 COPY (SELECT * FROM artist_albums) TO 'artist_albums.parquet' (FORMAT PARQUET); 86 COPY (SELECT * FROM artist_tracks) TO 'artist_tracks.parquet' (FORMAT PARQUET); 87 COPY (SELECT * FROM loved_tracks) TO 'loved_tracks.parquet' (FORMAT PARQUET); 88 COPY (SELECT * FROM user_albums) TO 'user_albums.parquet' (FORMAT PARQUET); 89 COPY (SELECT * FROM user_artists) TO 'user_artists.parquet' (FORMAT PARQUET); 90 COPY (SELECT * FROM user_tracks) TO 'user_tracks.parquet' (FORMAT PARQUET); 91 COMMIT;", 92 )?; 93 94 drop(conn); 95 96 if env::var("CF_ACCOUNT_ID").is_err() { 97 tracing::warn!("CF_ACCOUNT_ID is not set, skipping upload to R2"); 98 continue; 99 } 100 101 upload_to_r2("scrobbles.parquet"); 102 upload_to_r2("artists.parquet"); 103 upload_to_r2("albums.parquet"); 104 upload_to_r2("tracks.parquet"); 105 upload_to_r2("users.parquet"); 106 upload_to_r2("album_tracks.parquet"); 107 upload_to_r2("artist_albums.parquet"); 108 upload_to_r2("artist_tracks.parquet"); 109 upload_to_r2("loved_tracks.parquet"); 110 upload_to_r2("user_albums.parquet"); 111 upload_to_r2("user_artists.parquet"); 112 upload_to_r2("user_tracks.parquet"); 113 114 tracing::info!("Exported parquets successfully."); 115 } 116 } 117 118 #[allow(unreachable_code)] 119 Ok::<(), Error>(()) 120 }); 121} 122 123fn upload_to_r2(file: &str) { 124 let status = std::process::Command::new("aws") 125 .arg("s3") 126 .arg("cp") 127 .arg(file) 128 .arg(format!( 129 "s3://{}", 130 env::var("R2_BUCKET_NAME").unwrap_or("rocksky-backup".to_string()) 131 )) 132 .arg("--endpoint-url") 133 .arg(&format!( 134 "https://{}.r2.cloudflarestorage.com", 135 env::var("CF_ACCOUNT_ID").unwrap() 136 )) 137 .arg("--profile") 138 .arg("r2") 139 .stdout(std::process::Stdio::inherit()) 140 .stderr(std::process::Stdio::inherit()) 141 .status(); 142 match status { 143 Ok(status) => { 144 if status.success() { 145 tracing::info!("Uploaded {} to R2 successfully.", file); 146 } else { 147 tracing::error!("Failed to upload {} to R2.", file); 148 } 149 } 150 Err(err) => { 151 tracing::error!("Failed to execute aws command: {}", err); 152 } 153 }; 154}