this repo has no description
at master 132 lines 4.8 kB view raw
1use anyhow::Result; 2use async_trait::async_trait; 3use reqwest::Client; 4use rocketman::{ 5 connection::JetstreamConnection, handler, ingestion::LexiconIngestor, 6 options::JetstreamOptions, types::event::Event, 7}; 8use serde_json::{json, Value}; 9use std::{ 10 collections::HashMap, 11 sync::{Arc, Mutex}, 12}; 13 14mod resolve; 15 16#[tokio::main] 17async fn main() { 18 // Load environment variables from .env file 19 dotenv::dotenv().ok(); 20 21 // init the builder 22 let opts = JetstreamOptions::builder() 23 // your EXACT nsids 24 .wanted_collections(vec!["fm.teal.alpha.feed.play".to_string()]) 25 .ws_url(rocketman::endpoints::JetstreamEndpoints::Custom( 26 "wss://jetstream1.us-east.fire.hose.cam/subscribe".to_string(), 27 )) 28 .build(); 29 // create the jetstream connector 30 let jetstream = JetstreamConnection::new(opts); 31 32 // create your ingestors 33 let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new(); 34 ingestors.insert( 35 // your EXACT nsid 36 "fm.teal.alpha.feed.play".to_string(), 37 Box::new(MyCoolIngestor), 38 ); 39 40 // tracks the last message we've processed 41 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None)); 42 43 // get channels 44 let msg_rx = jetstream.get_msg_rx(); 45 let reconnect_tx = jetstream.get_reconnect_tx(); 46 47 // spawn a task to process messages from the queue. 48 // this is a simple implementation, you can use a more complex one based on needs. 49 let c_cursor = cursor.clone(); 50 tokio::spawn(async move { 51 while let Ok(message) = msg_rx.recv_async().await { 52 if let Err(e) = 53 handler::handle_message(message, &ingestors, reconnect_tx.clone(), c_cursor.clone()) 54 .await 55 { 56 eprintln!("Error processing message: {}", e); 57 }; 58 } 59 }); 60 61 // connect to jetstream 62 // retries internally, but may fail if there is an extreme error. 63 if let Err(e) = jetstream.connect(cursor.clone()).await { 64 eprintln!("Failed to connect to Jetstream: {}", e); 65 std::process::exit(1); 66 } 67} 68 69pub struct MyCoolIngestor; 70 71/// A cool ingestor implementation. Will just print the message. Does not do verification. 72#[async_trait] 73impl LexiconIngestor for MyCoolIngestor { 74 async fn ingest(&self, message: Event<Value>) -> Result<()> { 75 // Only process Create operations, ignore Delete operations 76 if let Some(commit) = &message.commit { 77 if !matches!(commit.operation, rocketman::types::event::Operation::Create) { 78 return Ok(()); 79 } 80 } else { 81 return Ok(()); 82 } 83 84 let client = Client::new(); 85 let url = std::env::var("DISCORD_WEBHOOK_URL") 86 .expect("DISCORD_WEBHOOK_URL environment variable must be set"); 87 88 // Get resolver app view URL from environment 89 let resolver_app_view = std::env::var("RESOLVER_APP_VIEW") 90 .unwrap_or_else(|_| "https://bsky.social".to_string()); 91 92 // Safely extract track name and artist from the record 93 let track_info = message 94 .commit 95 .as_ref() 96 .and_then(|commit| commit.record.as_ref()) 97 .and_then(|record| { 98 let track_name = record.get("trackName")?.as_str()?; 99 let artists = record.get("artists")?.as_array()?; 100 let artist_name = artists.first()?.get("artistName")?.as_str()?; 101 Some(format!("{} by {}", track_name, artist_name)) 102 }) 103 .unwrap_or_else(|| "unknown track".to_string()); 104 105 let submission_client_agent = message 106 .commit 107 .as_ref() 108 .and_then(|commit| commit.record.as_ref()) 109 .and_then(|record| record.get("submissionClientAgent")?.as_str()); 110 111 // Resolve the handle from the DID 112 let handle = match resolve::resolve_identity(&message.did, &resolver_app_view).await { 113 Ok(resolved) => resolved.identity, 114 Err(e) => { 115 eprintln!("Failed to resolve handle for DID {}: {}", message.did, e); 116 // Fallback to showing the DID if resolution fails 117 message.did.clone() 118 } 119 }; 120 121 let payload = json!({ 122 "content": format!("{} is listening to {} via `{}`", handle, track_info, submission_client_agent.unwrap_or("unknown client")), 123 "allowed_mentions": { "parse": [] }, 124 }); 125 let response = client.post(url).json(&payload).send().await?; 126 127 println!("{:?}", response.status()); 128 println!("{:?}", message); 129 // Process message for default lexicon. 130 Ok(()) 131 } 132}