this repo has no description
1use anyhow::Result;
2use async_trait::async_trait;
3use reqwest::Client;
4use rocketman::{
5 connection::JetstreamConnection, handler, ingestion::LexiconIngestor,
6 options::JetstreamOptions, types::event::Event,
7};
8use serde_json::{Value, json};
9use std::{
10 collections::HashMap,
11 sync::{Arc, Mutex},
12};
13
14#[tokio::main]
15async fn main() {
16 // Load environment variables from .env file
17 dotenv::dotenv().ok();
18
19 // init the builder
20 let opts = JetstreamOptions::builder()
21 // your EXACT nsids
22 .wanted_collections(vec!["fm.teal.alpha.feed.play".to_string()])
23 .build();
24 // create the jetstream connector
25 let jetstream = JetstreamConnection::new(opts);
26
27 // create your ingestors
28 let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
29 ingestors.insert(
30 // your EXACT nsid
31 "fm.teal.alpha.feed.play".to_string(),
32 Box::new(MyCoolIngestor),
33 );
34
35 // tracks the last message we've processed
36 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
37
38 // get channels
39 let msg_rx = jetstream.get_msg_rx();
40 let reconnect_tx = jetstream.get_reconnect_tx();
41
42 // spawn a task to process messages from the queue.
43 // this is a simple implementation, you can use a more complex one based on needs.
44 let c_cursor = cursor.clone();
45 tokio::spawn(async move {
46 while let Ok(message) = msg_rx.recv_async().await {
47 if let Err(e) =
48 handler::handle_message(message, &ingestors, reconnect_tx.clone(), c_cursor.clone())
49 .await
50 {
51 eprintln!("Error processing message: {}", e);
52 };
53 }
54 });
55
56 // connect to jetstream
57 // retries internally, but may fail if there is an extreme error.
58 if let Err(e) = jetstream.connect(cursor.clone()).await {
59 eprintln!("Failed to connect to Jetstream: {}", e);
60 std::process::exit(1);
61 }
62}
63
64pub struct MyCoolIngestor;
65
66/// A cool ingestor implementation. Will just print the message. Does not do verification.
67#[async_trait]
68impl LexiconIngestor for MyCoolIngestor {
69 async fn ingest(&self, message: Event<Value>) -> Result<()> {
70 // Only process Create operations, ignore Delete operations
71 if let Some(commit) = &message.commit {
72 if !matches!(commit.operation, rocketman::types::event::Operation::Create) {
73 return Ok(());
74 }
75 } else {
76 return Ok(());
77 }
78
79 let client = Client::new();
80 let url = std::env::var("DISCORD_WEBHOOK_URL")
81 .expect("DISCORD_WEBHOOK_URL environment variable must be set");
82 // Safely extract track name and artist from the record
83 let track_info = message
84 .commit
85 .as_ref()
86 .and_then(|commit| commit.record.as_ref())
87 .and_then(|record| {
88 let track_name = record.get("trackName")?.as_str()?;
89 let artists = record.get("artists")?.as_array()?;
90 let artist_name = artists.first()?.get("artistName")?.as_str()?;
91 Some(format!("{} by {}", track_name, artist_name))
92 })
93 .unwrap_or_else(|| "unknown track".to_string());
94
95 let payload = json!({
96 "content": format!("{} is listening to {}", message.did, track_info)
97 });
98 let response = client.post(url).json(&payload).send().await?;
99
100 println!("{:?}", response.status());
101 println!("{:?}", message);
102 // Process message for default lexicon.
103 Ok(())
104 }
105}