forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use anyhow::Error;
2use async_nats::{connect, Client};
3use duckdb::{params, Connection};
4use owo_colors::OwoColorize;
5use std::{
6 env,
7 sync::{Arc, Mutex},
8 thread,
9};
10use tokio_stream::StreamExt;
11use types::UserPayload;
12
13pub mod types;
14
15pub async fn subscribe(conn: Arc<Mutex<Connection>>) -> Result<(), Error> {
16 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
17 let conn = conn.clone();
18 let nc = connect(&addr).await?;
19 println!("Connected to NATS server at {}", addr.bright_green());
20
21 let nc = Arc::new(Mutex::new(nc));
22 on_new_user(nc.clone(), conn.clone());
23
24 Ok(())
25}
26
27pub fn on_new_user(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) {
28 thread::spawn(move || {
29 let rt = tokio::runtime::Runtime::new().unwrap();
30 let conn = conn.clone();
31 let nc = nc.clone();
32 rt.block_on(async {
33 let nc = nc.lock().unwrap();
34 let mut sub = nc.subscribe("rocksky.user".to_string()).await?;
35 drop(nc);
36
37 while let Some(msg) = sub.next().await {
38 let data = String::from_utf8(msg.payload.to_vec()).unwrap();
39 match serde_json::from_str::<UserPayload>(&data) {
40 Ok(payload) => match save_user(conn.clone(), payload.clone()).await {
41 Ok(_) => println!(
42 "User saved successfully for {}{}",
43 "@".cyan(),
44 payload.handle.cyan()
45 ),
46 Err(e) => eprintln!("Error saving user: {}", e),
47 },
48 Err(e) => {
49 eprintln!("Error parsing payload: {}", e);
50 println!("{}", data);
51 }
52 }
53 }
54
55 Ok::<(), Error>(())
56 })?;
57
58 Ok::<(), Error>(())
59 });
60}
61
62pub async fn save_user(conn: Arc<Mutex<Connection>>, payload: UserPayload) -> Result<(), Error> {
63 let conn = conn.lock().unwrap();
64
65 match conn.execute(
66 "INSERT INTO users (
67 id,
68 avatar,
69 did,
70 display_name,
71 handle
72 ) VALUES (
73 ?,
74 ?,
75 ?,
76 ?,
77 ?
78 )
79 ON CONFLICT (id) DO UPDATE SET
80 avatar = EXCLUDED.avatar,
81 did = EXCLUDED.did,
82 display_name = EXCLUDED.display_name,
83 handle = EXCLUDED.handle",
84 params![
85 payload.xata_id,
86 payload.avatar,
87 payload.did,
88 payload.display_name,
89 payload.handle,
90 ],
91 ) {
92 Ok(_) => (),
93 Err(e) => {
94 if !e.to_string().contains("violates primary key constraint") {
95 println!("[users] error: {}", e);
96 return Err(e.into());
97 }
98 }
99 }
100 Ok(())
101}