A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at feat/pgpull 65 lines 1.7 kB view raw
1use std::{ 2 env, 3 sync::{Arc, Mutex}, 4}; 5 6use anyhow::Error; 7use async_nats::connect; 8use duckdb::Connection; 9use owo_colors::OwoColorize; 10use sqlx::postgres::PgPoolOptions; 11 12use crate::{ 13 core::{create_tables, find_spotify_users, load_users, save_playlists}, 14 spotify::get_user_playlists, 15 subscriber::subscribe, 16}; 17 18pub mod core; 19pub mod crypto; 20pub mod spotify; 21pub mod subscriber; 22pub mod types; 23pub mod xata; 24 25pub async fn start() -> Result<(), Error> { 26 let conn = Connection::open("./rocksky-playlists.ddb")?; 27 let conn = Arc::new(Mutex::new(conn)); 28 create_tables(conn.clone())?; 29 30 subscribe(conn.clone()).await?; 31 32 let pool = PgPoolOptions::new() 33 .max_connections(5) 34 .connect(&env::var("XATA_POSTGRES_URL")?) 35 .await?; 36 let users = find_spotify_users(&pool, 0, 100).await?; 37 38 load_users(conn.clone(), &pool).await?; 39 40 sqlx::query(r#" 41 CREATE UNIQUE INDEX IF NOT EXISTS user_playlists_unique_index ON user_playlists (user_id, playlist_id) 42 "#) 43 .execute(&pool) 44 .await?; 45 let conn = conn.clone(); 46 47 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()); 48 let nc = connect(&addr).await?; 49 let nc = Arc::new(Mutex::new(nc)); 50 println!("Connected to NATS server at {}", addr.bright_green()); 51 52 for user in users { 53 let token = user.1.clone(); 54 let did = user.2.clone(); 55 let user_id = user.3.clone(); 56 let playlists = get_user_playlists(token).await?; 57 save_playlists(&pool, conn.clone(), nc.clone(), playlists, &user_id, &did).await?; 58 } 59 60 println!("Done!"); 61 62 loop { 63 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; 64 } 65}