A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at main 101 lines 2.9 kB view raw
1use anyhow::Error; 2use async_nats::{connect, Client}; 3use duckdb::{params, Connection}; 4use owo_colors::OwoColorize; 5use std::{ 6 env, 7 sync::{Arc, Mutex}, 8 thread, 9}; 10use tokio_stream::StreamExt; 11use types::UserPayload; 12 13pub mod types; 14 15pub async fn subscribe(conn: Arc<Mutex<Connection>>) -> Result<(), Error> { 16 let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()); 17 let conn = conn.clone(); 18 let nc = connect(&addr).await?; 19 println!("Connected to NATS server at {}", addr.bright_green()); 20 21 let nc = Arc::new(Mutex::new(nc)); 22 on_new_user(nc.clone(), conn.clone()); 23 24 Ok(()) 25} 26 27pub fn on_new_user(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) { 28 thread::spawn(move || { 29 let rt = tokio::runtime::Runtime::new().unwrap(); 30 let conn = conn.clone(); 31 let nc = nc.clone(); 32 rt.block_on(async { 33 let nc = nc.lock().unwrap(); 34 let mut sub = nc.subscribe("rocksky.user".to_string()).await?; 35 drop(nc); 36 37 while let Some(msg) = sub.next().await { 38 let data = String::from_utf8(msg.payload.to_vec()).unwrap(); 39 match serde_json::from_str::<UserPayload>(&data) { 40 Ok(payload) => match save_user(conn.clone(), payload.clone()).await { 41 Ok(_) => println!( 42 "User saved successfully for {}{}", 43 "@".cyan(), 44 payload.handle.cyan() 45 ), 46 Err(e) => eprintln!("Error saving user: {}", e), 47 }, 48 Err(e) => { 49 eprintln!("Error parsing payload: {}", e); 50 println!("{}", data); 51 } 52 } 53 } 54 55 Ok::<(), Error>(()) 56 })?; 57 58 Ok::<(), Error>(()) 59 }); 60} 61 62pub async fn save_user(conn: Arc<Mutex<Connection>>, payload: UserPayload) -> Result<(), Error> { 63 let conn = conn.lock().unwrap(); 64 65 match conn.execute( 66 "INSERT INTO users ( 67 id, 68 avatar, 69 did, 70 display_name, 71 handle 72 ) VALUES ( 73 ?, 74 ?, 75 ?, 76 ?, 77 ? 78 ) 79 ON CONFLICT (id) DO UPDATE SET 80 avatar = EXCLUDED.avatar, 81 did = EXCLUDED.did, 82 display_name = EXCLUDED.display_name, 83 handle = EXCLUDED.handle", 84 params![ 85 payload.xata_id, 86 payload.avatar, 87 payload.did, 88 payload.display_name, 89 payload.handle, 90 ], 91 ) { 92 Ok(_) => (), 93 Err(e) => { 94 if !e.to_string().contains("violates primary key constraint") { 95 println!("[users] error: {}", e); 96 return Err(e.into()); 97 } 98 } 99 } 100 Ok(()) 101}