A decentralized music tracking and discovery platform built on AT Protocol 🎵

Merge pull request #10 from tsirysndr/feat/parquet-export

feat: add cron job for exporting parquet files every minute

authored by tsiry-sandratraina.com and committed by

GitHub ea06ea3e c00c7877

+130 -4
+2 -1
.gitignore
··· 8 8 .vscode/ 9 9 *.DS_Store 10 10 rocksky-backup.sql 11 - *.db 11 + *.db 12 + *.parquet
+24 -3
Cargo.lock
··· 1374 1374 ] 1375 1375 1376 1376 [[package]] 1377 + name = "cron" 1378 + version = "0.15.0" 1379 + source = "registry+https://github.com/rust-lang/crates.io-index" 1380 + checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740" 1381 + dependencies = [ 1382 + "chrono", 1383 + "once_cell", 1384 + "winnow 0.6.26", 1385 + ] 1386 + 1387 + [[package]] 1377 1388 name = "crossbeam-channel" 1378 1389 version = "0.5.15" 1379 1390 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 4846 4857 "async-nats", 4847 4858 "chrono", 4848 4859 "clap", 4860 + "cron", 4849 4861 "dotenv", 4850 4862 "duckdb", 4851 4863 "owo-colors", ··· 6776 6788 "toml_datetime 0.7.0", 6777 6789 "toml_parser", 6778 6790 "toml_writer", 6779 - "winnow", 6791 + "winnow 0.7.10", 6780 6792 ] 6781 6793 6782 6794 [[package]] ··· 6808 6820 "serde_spanned 0.6.9", 6809 6821 "toml_datetime 0.6.11", 6810 6822 "toml_write", 6811 - "winnow", 6823 + "winnow 0.7.10", 6812 6824 ] 6813 6825 6814 6826 [[package]] ··· 6817 6829 source = "registry+https://github.com/rust-lang/crates.io-index" 6818 6830 checksum = "97200572db069e74c512a14117b296ba0a80a30123fbbb5aa1f4a348f639ca30" 6819 6831 dependencies = [ 6820 - "winnow", 6832 + "winnow 0.7.10", 6821 6833 ] 6822 6834 6823 6835 [[package]] ··· 7742 7754 version = "0.53.0" 7743 7755 source = "registry+https://github.com/rust-lang/crates.io-index" 7744 7756 checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" 7757 + 7758 + [[package]] 7759 + name = "winnow" 7760 + version = "0.6.26" 7761 + source = "registry+https://github.com/rust-lang/crates.io-index" 7762 + checksum = "1e90edd2ac1aa278a5c4599b1d89cf03074b610800f866d4026dc199d7929a28" 7763 + dependencies = [ 7764 + "memchr", 7765 + ] 7745 7766 7746 7767 [[package]] 7747 7768 name = "winnow"
+1
crates/analytics/Cargo.toml
··· 29 29 actix-web = "4.9.0" 30 30 tokio-stream = { version = "0.1.17", features = ["full"] } 31 31 tracing = "0.1.41" 32 + cron = "0.15.0"
+103
crates/analytics/src/lib.rs
··· 1 1 use std::{ 2 2 env, 3 + str::FromStr, 3 4 sync::{Arc, Mutex}, 5 + thread, 4 6 }; 5 7 6 8 use anyhow::Error; ··· 22 24 create_tables(&conn).await?; 23 25 24 26 let conn = Arc::new(Mutex::new(conn)); 27 + export_parquets(conn.clone()); 25 28 cmd::serve::serve(conn).await?; 26 29 27 30 Ok(()) ··· 42 45 43 46 Ok(()) 44 47 } 48 + 49 + fn export_parquets(conn: Arc<Mutex<Connection>>) { 50 + thread::spawn(move || { 51 + // fire every 1 minute 52 + let cron_expr = "0 * * * * * *"; 53 + let schedule = cron::Schedule::from_str(cron_expr); 54 + if let Err(err) = schedule { 55 + tracing::error!("Failed to parse cron expression: {}", cron_expr); 56 + tracing::error!(error = %err); 57 + return Ok(()); 58 + } 59 + let schedule = schedule.unwrap(); 60 + loop { 61 + let now = chrono::Utc::now(); 62 + let mut upcoming = schedule.upcoming(chrono::Utc).take(1); 63 + 64 + if let Some(next) = upcoming.next() { 65 + let duration = next.signed_duration_since(now).to_std().unwrap(); 66 + thread::sleep(duration); 67 + tracing::info!("Exporting parquets ..."); 68 + 69 + let conn = conn.lock().unwrap(); 70 + conn.execute_batch( 71 + "BEGIN; 72 + COPY (SELECT * FROM scrobbles) TO 'scrobbles.parquet' (FORMAT PARQUET); 73 + COPY (SELECT * FROM artists) TO 'artists.parquet' (FORMAT PARQUET); 74 + COPY (SELECT * FROM albums) TO 'albums.parquet' (FORMAT PARQUET); 75 + COPY (SELECT * FROM tracks) TO 'tracks.parquet' (FORMAT PARQUET); 76 + COPY (SELECT * FROM users) TO 'users.parquet' (FORMAT PARQUET); 77 + COPY (SELECT * FROM album_tracks) TO 'album_tracks.parquet' (FORMAT PARQUET); 78 + COPY (SELECT * FROM artist_albums) TO 'artist_albums.parquet' (FORMAT PARQUET); 79 + COPY (SELECT * FROM artist_tracks) TO 'artist_tracks.parquet' (FORMAT PARQUET); 80 + COPY (SELECT * FROM loved_tracks) TO 'loved_tracks.parquet' (FORMAT PARQUET); 81 + COPY (SELECT * FROM user_albums) TO 'user_albums.parquet' (FORMAT PARQUET); 82 + COPY (SELECT * FROM user_artists) TO 'user_artists.parquet' (FORMAT PARQUET); 83 + COPY (SELECT * FROM user_tracks) TO 'user_tracks.parquet' (FORMAT PARQUET); 84 + COMMIT;", 85 + )?; 86 + 87 + drop(conn); 88 + 89 + if env::var("CF_ACCOUNT_ID").is_err() { 90 + tracing::warn!("CF_ACCOUNT_ID is not set, skipping upload to R2"); 91 + continue; 92 + } 93 + 94 + upload_to_r2("scrobbles.parquet"); 95 + upload_to_r2("artists.parquet"); 96 + upload_to_r2("albums.parquet"); 97 + upload_to_r2("tracks.parquet"); 98 + upload_to_r2("users.parquet"); 99 + upload_to_r2("album_tracks.parquet"); 100 + upload_to_r2("artist_albums.parquet"); 101 + upload_to_r2("artist_tracks.parquet"); 102 + upload_to_r2("loved_tracks.parquet"); 103 + upload_to_r2("user_albums.parquet"); 104 + upload_to_r2("user_artists.parquet"); 105 + upload_to_r2("user_tracks.parquet"); 106 + 107 + tracing::info!("Exported parquets successfully."); 108 + } 109 + } 110 + 111 + #[allow(unreachable_code)] 112 + Ok::<(), Error>(()) 113 + }); 114 + } 115 + 116 + fn upload_to_r2(file: &str) { 117 + let status = std::process::Command::new("aws") 118 + .arg("s3") 119 + .arg("cp") 120 + .arg(file) 121 + .arg(format!( 122 + "s3://{}", 123 + env::var("R2_BUCKET_NAME").unwrap_or("rocksky-backup".to_string()) 124 + )) 125 + .arg("--endpoint-url") 126 + .arg(&format!( 127 + "https://{}.r2.cloudflarestorage.com", 128 + env::var("CF_ACCOUNT_ID").unwrap() 129 + )) 130 + .arg("--profile") 131 + .arg("r2") 132 + .stdout(std::process::Stdio::inherit()) 133 + .stderr(std::process::Stdio::inherit()) 134 + .status(); 135 + match status { 136 + Ok(status) => { 137 + if status.success() { 138 + tracing::info!("Uploaded {} to R2 successfully.", file); 139 + } else { 140 + tracing::error!("Failed to upload {} to R2.", file); 141 + } 142 + } 143 + Err(err) => { 144 + tracing::error!("Failed to execute aws command: {}", err); 145 + } 146 + }; 147 + }