this repo has no description
at docker 105 lines 3.6 kB view raw
1use anyhow::Result; 2use async_trait::async_trait; 3use reqwest::Client; 4use rocketman::{ 5 connection::JetstreamConnection, handler, ingestion::LexiconIngestor, 6 options::JetstreamOptions, types::event::Event, 7}; 8use serde_json::{Value, json}; 9use std::{ 10 collections::HashMap, 11 sync::{Arc, Mutex}, 12}; 13 14#[tokio::main] 15async fn main() { 16 // Load environment variables from .env file 17 dotenv::dotenv().ok(); 18 19 // init the builder 20 let opts = JetstreamOptions::builder() 21 // your EXACT nsids 22 .wanted_collections(vec!["fm.teal.alpha.feed.play".to_string()]) 23 .build(); 24 // create the jetstream connector 25 let jetstream = JetstreamConnection::new(opts); 26 27 // create your ingestors 28 let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new(); 29 ingestors.insert( 30 // your EXACT nsid 31 "fm.teal.alpha.feed.play".to_string(), 32 Box::new(MyCoolIngestor), 33 ); 34 35 // tracks the last message we've processed 36 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None)); 37 38 // get channels 39 let msg_rx = jetstream.get_msg_rx(); 40 let reconnect_tx = jetstream.get_reconnect_tx(); 41 42 // spawn a task to process messages from the queue. 43 // this is a simple implementation, you can use a more complex one based on needs. 44 let c_cursor = cursor.clone(); 45 tokio::spawn(async move { 46 while let Ok(message) = msg_rx.recv_async().await { 47 if let Err(e) = 48 handler::handle_message(message, &ingestors, reconnect_tx.clone(), c_cursor.clone()) 49 .await 50 { 51 eprintln!("Error processing message: {}", e); 52 }; 53 } 54 }); 55 56 // connect to jetstream 57 // retries internally, but may fail if there is an extreme error. 58 if let Err(e) = jetstream.connect(cursor.clone()).await { 59 eprintln!("Failed to connect to Jetstream: {}", e); 60 std::process::exit(1); 61 } 62} 63 64pub struct MyCoolIngestor; 65 66/// A cool ingestor implementation. Will just print the message. Does not do verification. 67#[async_trait] 68impl LexiconIngestor for MyCoolIngestor { 69 async fn ingest(&self, message: Event<Value>) -> Result<()> { 70 // Only process Create operations, ignore Delete operations 71 if let Some(commit) = &message.commit { 72 if !matches!(commit.operation, rocketman::types::event::Operation::Create) { 73 return Ok(()); 74 } 75 } else { 76 return Ok(()); 77 } 78 79 let client = Client::new(); 80 let url = std::env::var("DISCORD_WEBHOOK_URL") 81 .expect("DISCORD_WEBHOOK_URL environment variable must be set"); 82 // Safely extract track name and artist from the record 83 let track_info = message 84 .commit 85 .as_ref() 86 .and_then(|commit| commit.record.as_ref()) 87 .and_then(|record| { 88 let track_name = record.get("trackName")?.as_str()?; 89 let artists = record.get("artists")?.as_array()?; 90 let artist_name = artists.first()?.get("artistName")?.as_str()?; 91 Some(format!("{} by {}", track_name, artist_name)) 92 }) 93 .unwrap_or_else(|| "unknown track".to_string()); 94 95 let payload = json!({ 96 "content": format!("{} is listening to {}", message.did, track_info) 97 }); 98 let response = client.post(url).json(&payload).send().await?; 99 100 println!("{:?}", response.status()); 101 println!("{:?}", message); 102 // Process message for default lexicon. 103 Ok(()) 104 } 105}