···11-use async_trait::async_trait;
22-use rocketman::{
33- connection::JetstreamConnection,
44- handler,
55- ingestion::LexiconIngestor,
66- options::JetstreamOptions,
77- types::event::{Commit, Event},
88-};
99-use serde_json::Value;
1010-use std::{collections::HashMap, sync::Arc, sync::Mutex};
1111-1212-#[tokio::main]
1313-async fn main() {
1414- // init the builder
1515- let opts = JetstreamOptions::builder()
1616- // your EXACT nsids
1717- .wanted_collections(vec!["app.bsky.feed.post".to_string()])
1818- .build();
1919- // create the jetstream connector
2020- let jetstream = JetstreamConnection::new(opts);
2121-2222- // create your ingestors
2323- let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
2424- ingestors.insert(
2525- // your EXACT nsid
2626- "app.bsky.feed.post".to_string(),
2727- Box::new(MyCoolIngestor),
2828- );
2929-3030- // tracks the last message we've processed
3131- let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
3232-3333- // get channels
3434- let msg_rx = jetstream.get_msg_rx();
3535- let reconnect_tx = jetstream.get_reconnect_tx();
3636-3737- // spawn a task to process messages from the queue.
3838- // this is a simple implementation, you can use a more complex one based on needs.
3939- let c_cursor = cursor.clone();
4040- tokio::spawn(async move {
4141- while let Ok(message) = msg_rx.recv_async().await {
4242- if let Err(e) =
4343- handler::handle_message(message, &ingestors, reconnect_tx.clone(), c_cursor.clone())
4444- .await
4545- {
4646- eprintln!("Error processing message: {}", e);
4747- };
4848- }
4949- });
5050-5151- // connect to jetstream
5252- // retries internally, but may fail if there is an extreme error.
5353- if let Err(e) = jetstream.connect(cursor.clone()).await {
5454- eprintln!("Failed to connect to Jetstream: {}", e);
5555- std::process::exit(1);
5656- }
5757-}
5858-5959-pub struct MyCoolIngestor;
6060-6161-/// A cool ingestor implementation. Will just print the message. Does not do verification.
6262-#[async_trait]
6363-impl LexiconIngestor for MyCoolIngestor {
6464- async fn ingest(&self, message: Event<Value>) -> anyhow::Result<()> {
6565- if let Some(Commit {
6666- record: Some(record),
6767- ..
6868- }) = message.commit
6969- {
7070- if let Some(Value::String(text)) = record.get("text") {
7171- println!("{text:?}");
7272- }
7373- }
7474- Ok(())
7575- }
7676-}
···11-## Rocketman
22-33-A modular(ish) jetstream consumer. Backed by Tungstenite.
44-55-66-### Installation
77-```toml
88-[dependencies]
99-rocketman = "latest" # pyt the latest version here
1010-tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
1111-```
1212-### Usage
1313-```rs
1414-#[tokio::main]
1515-async fn main() {
1616- // init the builder
1717- let opts = JetstreamOptions::builder()
1818- // your EXACT nsids
1919- .wanted_collections(vec!["com.example.cool.nsid".to_string()])
2020- .build();
2121- // create the jetstream connector
2222- let jetstream = JetstreamConnection::new(opts);
2323-2424- // create your ingestors
2525- let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
2626- ingestors.insert(
2727- // your EXACT nsid
2828- "com.example.cool.nsid".to_string(),
2929- Box::new(MyCoolIngestor),
3030- );
3131-3232-3333- // tracks the last message we've processed
3434- let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
3535-3636- // get channels
3737- let msg_rx = jetstream.get_msg_rx();
3838- let reconnect_tx = jetstream.get_reconnect_tx();
3939-4040- // spawn a task to process messages from the queue.
4141- // this is a simple implementation, you can use a more complex one based on needs.
4242- let c_cursor = cursor.clone();
4343- tokio::spawn(async move {
4444- while let Ok(message) = msg_rx.recv_async().await {
4545- if let Err(e) =
4646- handler::handle_message(message, &ingestors, reconnect_tx.clone(), c_cursor.clone())
4747- .await
4848- {
4949- error!("Error processing message: {}", e);
5050- };
5151- }
5252- });
5353-5454- // connect to jetstream
5555- // retries internally, but may fail if there is an extreme error.
5656- if let Err(e) = jetstream.connect(cursor.clone()).await {
5757- error!("Failed to connect to Jetstream: {}", e);
5858- std::process::exit(1);
5959- }
6060-}
6161-6262-pub struct MyCoolIngestor;
6363-6464-/// A cool ingestor implementation. Will just print the message. Does not do verification.
6565-impl LexiconIngestor for MyCoolIngestor {
6666- async fn ingest(&self, message: Event<Value>) -> Result<()> {
6767- info!("{:?}", message);
6868- // Process message for default lexicon.
6969- Ok(())
7070- }
7171-}
7272-```
7373-### gratz
7474-Based heavily on [phil's jetstream consumer on atcosm constellation.](https://github.com/atcosm/links/blob/main/constellation/src/consumer/jetstream.rs)
-335
services/rocketman/src/connection.rs
···11-use flume::{Receiver, Sender};
22-use futures_util::StreamExt;
33-use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
44-use std::cmp::{max, min};
55-use std::sync::{Arc, Mutex};
66-use std::time::Instant;
77-use tokio::time::{sleep, Duration};
88-use tokio_tungstenite::{connect_async, tungstenite::Message};
99-use tracing::{error, info};
1010-use url::Url;
1111-1212-use crate::options::JetstreamOptions;
1313-use crate::time::system_time::SystemTimeProvider;
1414-use crate::time::TimeProvider;
1515-1616-pub struct JetstreamConnection {
1717- pub opts: JetstreamOptions,
1818- reconnect_tx: flume::Sender<()>,
1919- reconnect_rx: flume::Receiver<()>,
2020- msg_tx: flume::Sender<Message>,
2121- msg_rx: flume::Receiver<Message>,
2222-}
2323-2424-impl JetstreamConnection {
2525- pub fn new(opts: JetstreamOptions) -> Self {
2626- let (reconnect_tx, reconnect_rx) = flume::bounded(opts.bound);
2727- let (msg_tx, msg_rx) = flume::bounded(opts.bound);
2828- Self {
2929- opts,
3030- reconnect_tx,
3131- reconnect_rx,
3232- msg_tx,
3333- msg_rx,
3434- }
3535- }
3636-3737- pub fn get_reconnect_tx(&self) -> Sender<()> {
3838- self.reconnect_tx.clone()
3939- }
4040-4141- pub fn get_msg_rx(&self) -> Receiver<Message> {
4242- self.msg_rx.clone()
4343- }
4444-4545- fn build_ws_url(&self, cursor: Arc<Mutex<Option<u64>>>) -> String {
4646- let mut url = Url::parse(&self.opts.ws_url.to_string()).unwrap();
4747-4848- // Append query params
4949- if let Some(ref cols) = self.opts.wanted_collections {
5050- for col in cols {
5151- url.query_pairs_mut().append_pair("wantedCollections", col);
5252- }
5353- }
5454- if let Some(ref dids) = self.opts.wanted_dids {
5555- for did in dids {
5656- url.query_pairs_mut().append_pair("wantedDids", did);
5757- }
5858- }
5959- if let Some(cursor) = cursor.lock().unwrap().as_ref() {
6060- url.query_pairs_mut()
6161- .append_pair("cursor", &cursor.to_string());
6262- }
6363- #[cfg(feature = "zstd")]
6464- if self.opts.compress {
6565- url.query_pairs_mut().append_pair("compress", "true");
6666- }
6767-6868- url.to_string()
6969- }
7070-7171- pub async fn connect(
7272- &self,
7373- cursor: Arc<Mutex<Option<u64>>>,
7474- ) -> Result<(), Box<dyn std::error::Error>> {
7575- describe_counter!(
7676- "jetstream.connection.attempt",
7777- Unit::Count,
7878- "attempts to connect to jetstream service"
7979- );
8080- describe_counter!(
8181- "jetstream.connection.error",
8282- Unit::Count,
8383- "errors connecting to jetstream service"
8484- );
8585- describe_histogram!(
8686- "jetstream.connection.duration",
8787- Unit::Seconds,
8888- "Time connected to jetstream service"
8989- );
9090- describe_counter!(
9191- "jetstream.connection.reconnect",
9292- Unit::Count,
9393- "reconnects to jetstream service"
9494- );
9595- let mut retry_interval = 1;
9696-9797- let time_provider = SystemTimeProvider::new();
9898-9999- let mut start_time = time_provider.now();
100100-101101- loop {
102102- counter!("jetstream.connection.attempt").increment(1);
103103- info!("Connecting to {}", self.opts.ws_url);
104104- let start = Instant::now();
105105-106106- let ws_url = self.build_ws_url(cursor.clone());
107107-108108- match connect_async(ws_url).await {
109109- Ok((ws_stream, response)) => {
110110- let elapsed = start.elapsed();
111111- info!("Connected. HTTP status: {}", response.status());
112112-113113- let (_, mut read) = ws_stream.split();
114114-115115- loop {
116116- // Inner loop to handle messages, reconnect signals, and receive timeout
117117- let receive_timeout =
118118- sleep(Duration::from_secs(self.opts.timeout_time_sec as u64));
119119- tokio::pin!(receive_timeout);
120120-121121- loop {
122122- tokio::select! {
123123- message_result = read.next() => {
124124- match message_result {
125125- Some(message) => {
126126- // Reset timeout on message received
127127- receive_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(self.opts.timeout_time_sec as u64));
128128-129129- histogram!("jetstream.connection.duration").record(elapsed.as_secs_f64());
130130- match message {
131131- Ok(message) => {
132132- if let Err(err) = self.msg_tx.send_async(message).await {
133133- counter!("jetstream.error").increment(1);
134134- error!("Failed to queue message: {}", err);
135135- }
136136- }
137137- Err(e) => {
138138- counter!("jetstream.error").increment(1);
139139- error!("Error: {}", e);
140140- }
141141- }
142142- }
143143- None => {
144144- info!("Stream closed by server.");
145145- counter!("jetstream.connection.reconnect").increment(1);
146146- break; // Stream ended, break inner loop to reconnect
147147- }
148148- }
149149- }
150150- _ = self.reconnect_rx.recv_async() => {
151151- info!("Reconnect signal received.");
152152- counter!("jetstream.connection.reconnect").increment(1);
153153- break;
154154- }
155155- _ = &mut receive_timeout => {
156156- // last final poll, just in case
157157- match read.next().await {
158158- Some(Ok(message)) => {
159159- if let Err(err) = self.msg_tx.send_async(message).await {
160160- counter!("jetstream.error").increment(1);
161161- error!("Failed to queue message: {}", err);
162162- }
163163- // Reset timeout to continue
164164- receive_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(self.opts.timeout_time_sec as u64));
165165- }
166166- Some(Err(e)) => {
167167- counter!("jetstream.error").increment(1);
168168- error!("Error receiving message during final poll: {}", e);
169169- counter!("jetstream.connection.reconnect").increment(1);
170170- break;
171171- }
172172- None => {
173173- info!("No commits received in {} seconds, reconnecting.", self.opts.timeout_time_sec);
174174- counter!("jetstream.connection.reconnect").increment(1);
175175- break;
176176- }
177177- }
178178- }
179179- }
180180- }
181181- }
182182- }
183183- Err(e) => {
184184- let elapsed_time = time_provider.elapsed(start_time);
185185- // reset if time connected > the time we set
186186- if elapsed_time.as_secs() > self.opts.max_retry_interval_seconds {
187187- retry_interval = 0;
188188- start_time = time_provider.now();
189189- }
190190- counter!("jetstream.connection.error").increment(1);
191191- error!("Connection error: {}", e);
192192- }
193193- }
194194-195195- let sleep_time = max(1, min(self.opts.max_retry_interval_seconds, retry_interval));
196196- info!("Reconnecting in {} seconds...", sleep_time);
197197- sleep(Duration::from_secs(sleep_time)).await;
198198-199199- if retry_interval > self.opts.max_retry_interval_seconds {
200200- retry_interval = self.opts.max_retry_interval_seconds;
201201- } else {
202202- retry_interval *= 2;
203203- }
204204- }
205205- }
206206-207207- pub fn force_reconnect(&self) -> Result<(), flume::SendError<()>> {
208208- info!("Force reconnect requested.");
209209- self.reconnect_tx.send(()) // Send a reconnect signal
210210- }
211211-}
212212-213213-#[cfg(test)]
214214-mod tests {
215215- use super::*;
216216- use std::sync::{Arc, Mutex};
217217- use tokio::task;
218218- use tokio::time::{timeout, Duration};
219219- use tokio_tungstenite::tungstenite::Message;
220220-221221- #[test]
222222- fn test_build_ws_url() {
223223- let opts = JetstreamOptions {
224224- wanted_collections: Some(vec!["col1".to_string(), "col2".to_string()]),
225225- wanted_dids: Some(vec!["did1".to_string()]),
226226- ..Default::default()
227227- };
228228- let connection = JetstreamConnection::new(opts);
229229-230230- let test = Arc::new(Mutex::new(Some(8373)));
231231-232232- let url = connection.build_ws_url(test);
233233-234234- assert!(url.starts_with("wss://"));
235235- assert!(url.contains("cursor=8373"));
236236- assert!(url.contains("wantedCollections=col1"));
237237- assert!(url.contains("wantedCollections=col2"));
238238- assert!(url.contains("wantedDids=did1"));
239239- }
240240-241241- #[tokio::test]
242242- async fn test_force_reconnect() {
243243- let opts = JetstreamOptions::default();
244244- let connection = JetstreamConnection::new(opts);
245245-246246- // Spawn a task to listen for the reconnect signal
247247- let reconnect_rx = connection.reconnect_rx.clone();
248248- let recv_task = task::spawn(async move {
249249- reconnect_rx
250250- .recv_async()
251251- .await
252252- .expect("Failed to receive reconnect signal");
253253- });
254254-255255- connection
256256- .force_reconnect()
257257- .expect("Failed to send reconnect signal");
258258-259259- // Ensure reconnect signal was received
260260- assert!(recv_task.await.is_ok());
261261- }
262262-263263- #[tokio::test]
264264- async fn test_message_queue() {
265265- let opts = JetstreamOptions::default();
266266- let connection = JetstreamConnection::new(opts);
267267-268268- let msg_rx = connection.get_msg_rx();
269269- let msg = Message::Text("test message".into());
270270-271271- // Send a message to the queue
272272- connection
273273- .msg_tx
274274- .send_async(msg.clone())
275275- .await
276276- .expect("Failed to send message");
277277-278278- // Receive and verify the message
279279- let received = msg_rx
280280- .recv_async()
281281- .await
282282- .expect("Failed to receive message");
283283- assert_eq!(received, msg);
284284- }
285285-286286- #[tokio::test]
287287- async fn test_connection_retries_on_failure() {
288288- let opts = JetstreamOptions::default();
289289- let connection = Arc::new(JetstreamConnection::new(opts));
290290-291291- let cursor = Arc::new(Mutex::new(None));
292292-293293- // Timeout to prevent infinite loop
294294- let result = timeout(Duration::from_secs(3), connection.connect(cursor)).await;
295295-296296- assert!(result.is_err(), "Expected timeout due to retry logic");
297297- }
298298-299299- #[tokio::test]
300300- async fn test_reconnect_after_receive_timeout() {
301301- use tokio::net::TcpListener;
302302- use tokio_tungstenite::accept_async;
303303-304304- let opts = JetstreamOptions {
305305- ws_url: crate::endpoints::JetstreamEndpoints::Custom("ws://127.0.0.1:9001".to_string()),
306306- bound: 5,
307307- max_retry_interval_seconds: 1,
308308- ..Default::default()
309309- };
310310- let connection = JetstreamConnection::new(opts);
311311- let cursor = Arc::new(Mutex::new(None));
312312-313313- // set up dummy "websocket"
314314- let listener = TcpListener::bind("127.0.0.1:9001")
315315- .await
316316- .expect("Failed to bind");
317317- let server_handle = tokio::spawn(async move {
318318- if let Ok((stream, _)) = listener.accept().await {
319319- let ws_stream = accept_async(stream).await.expect("Failed to accept");
320320- // send nothing
321321- tokio::time::sleep(Duration::from_secs(6)).await;
322322- drop(ws_stream);
323323- }
324324- });
325325-326326- // spawn, then run for >30 seconds to trigger reconnect
327327- let connect_handle = tokio::spawn(async move {
328328- tokio::time::timeout(Duration::from_secs(5), connection.connect(cursor))
329329- .await
330330- .ok();
331331- });
332332-333333- let _ = tokio::join!(server_handle, connect_handle);
334334- }
335335-}
···11-// lib.rs
22-pub mod connection;
33-pub mod endpoints;
44-pub mod handler;
55-pub mod ingestion;
66-pub mod options;
77-pub mod time;
88-pub mod types;