forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use std::{
2 env,
3 sync::{Arc, Mutex},
4};
5
6use anyhow::Error;
7use async_nats::connect;
8use duckdb::Connection;
9use owo_colors::OwoColorize;
10use sqlx::postgres::PgPoolOptions;
11
12use crate::{
13 core::{create_tables, find_spotify_users, load_users, save_playlists},
14 spotify::get_user_playlists,
15 subscriber::subscribe,
16};
17
18pub mod core;
19pub mod crypto;
20pub mod spotify;
21pub mod subscriber;
22pub mod types;
23pub mod xata;
24
25pub async fn start() -> Result<(), Error> {
26 let conn = Connection::open("./rocksky-playlists.ddb")?;
27 let conn = Arc::new(Mutex::new(conn));
28 create_tables(conn.clone())?;
29
30 subscribe(conn.clone()).await?;
31
32 let pool = PgPoolOptions::new()
33 .max_connections(5)
34 .connect(&env::var("XATA_POSTGRES_URL")?)
35 .await?;
36 let users = find_spotify_users(&pool, 0, 100).await?;
37
38 load_users(conn.clone(), &pool).await?;
39
40 sqlx::query(r#"
41 CREATE UNIQUE INDEX IF NOT EXISTS user_playlists_unique_index ON user_playlists (user_id, playlist_id)
42 "#)
43 .execute(&pool)
44 .await?;
45 let conn = conn.clone();
46
47 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
48 let nc = connect(&addr).await?;
49 let nc = Arc::new(Mutex::new(nc));
50 println!("Connected to NATS server at {}", addr.bright_green());
51
52 for user in users {
53 let token = user.1.clone();
54 let did = user.2.clone();
55 let user_id = user.3.clone();
56 let client_id = user.4.clone();
57 let client_secret = user.5.clone();
58 let playlists = get_user_playlists(token, client_id, client_secret).await?;
59 save_playlists(&pool, conn.clone(), nc.clone(), playlists, &user_id, &did).await?;
60 }
61
62 println!("Done!");
63
64 loop {
65 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
66 }
67}