this repo has no description
1use anyhow::Result;
2use async_trait::async_trait;
3use reqwest::Client;
4use rocketman::{
5 connection::JetstreamConnection, handler, ingestion::LexiconIngestor,
6 options::JetstreamOptions, types::event::Event,
7};
8use serde_json::{json, Value};
9use std::{
10 collections::HashMap,
11 sync::{Arc, Mutex},
12};
13
14mod resolve;
15
16#[tokio::main]
17async fn main() {
18 // Load environment variables from .env file
19 dotenv::dotenv().ok();
20
21 // init the builder
22 let opts = JetstreamOptions::builder()
23 // your EXACT nsids
24 .wanted_collections(vec!["fm.teal.alpha.feed.play".to_string()])
25 .ws_url(rocketman::endpoints::JetstreamEndpoints::Custom(
26 "wss://jetstream1.us-east.fire.hose.cam/subscribe".to_string(),
27 ))
28 .build();
29 // create the jetstream connector
30 let jetstream = JetstreamConnection::new(opts);
31
32 // create your ingestors
33 let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
34 ingestors.insert(
35 // your EXACT nsid
36 "fm.teal.alpha.feed.play".to_string(),
37 Box::new(MyCoolIngestor),
38 );
39
40 // tracks the last message we've processed
41 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
42
43 // get channels
44 let msg_rx = jetstream.get_msg_rx();
45 let reconnect_tx = jetstream.get_reconnect_tx();
46
47 // spawn a task to process messages from the queue.
48 // this is a simple implementation, you can use a more complex one based on needs.
49 let c_cursor = cursor.clone();
50 tokio::spawn(async move {
51 while let Ok(message) = msg_rx.recv_async().await {
52 if let Err(e) =
53 handler::handle_message(message, &ingestors, reconnect_tx.clone(), c_cursor.clone())
54 .await
55 {
56 eprintln!("Error processing message: {}", e);
57 };
58 }
59 });
60
61 // connect to jetstream
62 // retries internally, but may fail if there is an extreme error.
63 if let Err(e) = jetstream.connect(cursor.clone()).await {
64 eprintln!("Failed to connect to Jetstream: {}", e);
65 std::process::exit(1);
66 }
67}
68
69pub struct MyCoolIngestor;
70
71/// A cool ingestor implementation. Will just print the message. Does not do verification.
72#[async_trait]
73impl LexiconIngestor for MyCoolIngestor {
74 async fn ingest(&self, message: Event<Value>) -> Result<()> {
75 // Only process Create operations, ignore Delete operations
76 if let Some(commit) = &message.commit {
77 if !matches!(commit.operation, rocketman::types::event::Operation::Create) {
78 return Ok(());
79 }
80 } else {
81 return Ok(());
82 }
83
84 let client = Client::new();
85 let url = std::env::var("DISCORD_WEBHOOK_URL")
86 .expect("DISCORD_WEBHOOK_URL environment variable must be set");
87
88 // Get resolver app view URL from environment
89 let resolver_app_view = std::env::var("RESOLVER_APP_VIEW")
90 .unwrap_or_else(|_| "https://bsky.social".to_string());
91
92 // Safely extract track name and artist from the record
93 let track_info = message
94 .commit
95 .as_ref()
96 .and_then(|commit| commit.record.as_ref())
97 .and_then(|record| {
98 let track_name = record.get("trackName")?.as_str()?;
99 let artists = record.get("artists")?.as_array()?;
100 let artist_name = artists.first()?.get("artistName")?.as_str()?;
101 Some(format!("{} by {}", track_name, artist_name))
102 })
103 .unwrap_or_else(|| "unknown track".to_string());
104
105 let submission_client_agent = message
106 .commit
107 .as_ref()
108 .and_then(|commit| commit.record.as_ref())
109 .and_then(|record| record.get("submissionClientAgent")?.as_str());
110
111 // Resolve the handle from the DID
112 let handle = match resolve::resolve_identity(&message.did, &resolver_app_view).await {
113 Ok(resolved) => resolved.identity,
114 Err(e) => {
115 eprintln!("Failed to resolve handle for DID {}: {}", message.did, e);
116 // Fallback to showing the DID if resolution fails
117 message.did.clone()
118 }
119 };
120
121 let payload = json!({
122 "content": format!("{} is listening to {} via `{}`", handle, track_info, submission_client_agent.unwrap_or("unknown client")),
123 "allowed_mentions": { "parse": [] },
124 });
125 let response = client.post(url).json(&payload).send().await?;
126
127 println!("{:?}", response.status());
128 println!("{:?}", message);
129 // Process message for default lexicon.
130 Ok(())
131 }
132}