extern crate dotenv; use crate::constellation::fetch_constellation_count; use atrium_api::app::bsky::feed::post::RecordEmbedRefs; use atrium_api::app::bsky::richtext::facet::{ByteSliceData, LinkData, MainFeaturesItem}; use atrium_api::types::string::Language; use atrium_api::types::{Collection, Union}; use dotenv::dotenv; use logic::BotApi; use rocketman::endpoints::JetstreamEndpoints; use rocketman::{ connection::JetstreamConnection, handler, ingestion::LexiconIngestor, options::JetstreamOptions, types::event::Operation, }; use serde::Serialize; use slingshot::Slingshot; use sqlx::sqlite::{SqliteConnectOptions, SqlitePool}; use std::collections::HashMap; use std::num::NonZeroU64; use std::sync::{Arc, Mutex}; use std::time::Duration; mod constellation; #[derive(Debug)] struct ParsedRecord { did: String, collection: String, rkey: String, } struct StarIngestor { pool: SqlitePool, bot: Arc, sling_shot: Arc, timeframe_hours: i64, star_threshold: i64, post_window_hours: i64, } #[derive(Serialize)] struct RepoPreview { repo: String, stars: u64, description: String, } fn parse_uri(uri: &str) -> anyhow::Result { let parts: Vec<&str> = uri.trim_start_matches("at://").split('/').collect(); if parts.len() != 3 { return Err(anyhow::anyhow!("Invalid URI format")); } Ok(ParsedRecord { did: parts[0].to_string(), collection: parts[1].to_string(), rkey: parts[2].to_string(), }) } #[tokio::main] async fn main() -> anyhow::Result<()> { env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); dotenv().ok(); // Initialize DB pool and run migrations let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| "./stitch_counter.sqlite".to_string()); let options = SqliteConnectOptions::new() .filename(database_url) .create_if_missing(true) .busy_timeout(Duration::from_secs(5)); let pool = SqlitePool::connect_with(options).await?; // Run migrations from ./migrations sqlx::migrate!("./migrations").run(&pool).await?; let jet_stream = match std::env::var("JETSTREAM_URL") { Ok(url) => JetstreamEndpoints::Custom(url.into()), Err(_) => JetstreamEndpoints::default(), }; // Configure Jetstream to listen to sh.tangled.feed.star let opts = JetstreamOptions::builder() .ws_url(jet_stream) .wanted_collections(vec![atproto_api::sh::tangled::feed::Star::NSID.to_string()]) .build(); let jetstream = JetstreamConnection::new(opts); let cursor: Arc>> = Arc::new(Mutex::new(None)); let msg_rx = jetstream.get_msg_rx(); let reconnect_tx = jetstream.get_reconnect_tx(); // Read configuration from environment let timeframe_hours: i64 = std::env::var("TIMEFRAME") .ok() .and_then(|v| v.parse::().ok()) .filter(|v| *v > 0) .unwrap_or(1); let star_threshold: i64 = std::env::var("STAR_THRESHOLD") .ok() .and_then(|v| v.parse::().ok()) .filter(|v| *v > 0) .unwrap_or(10); let post_window_hours: i64 = std::env::var("POST_WINDOW_HOURS") .ok() .and_then(|v| v.parse::().ok()) .filter(|v| *v > 0) .unwrap_or(24); let bot_username = std::env::var("BOT_USERNAME").expect("BOT_USERNAME must be set"); let bot_password = std::env::var("BOT_PASSWORD").expect("BOT_PASSWORD must be set"); let bot_pds_url = std::env::var("BOT_PDS_URL").expect("BOT_PDS_URL must be set"); log::info!( "Starting bot with username: {} Timeframe threshold: {} Star Threshold: {} Post Threshold: {}", bot_username, timeframe_hours, star_threshold, post_window_hours ); let bot_api = BotApi::new_logged_in(bot_username, bot_password, bot_pds_url).await?; let sling_shot = Arc::new(Slingshot::new("https://slingshot.microcosm.blue")?); // Ingestor for the star collection let mut ingestors: HashMap> = HashMap::new(); ingestors.insert( atproto_api::sh::tangled::feed::Star::NSID.to_string(), Box::new(StarIngestor { pool: pool.clone(), bot: Arc::new(bot_api), sling_shot: sling_shot.clone(), timeframe_hours, star_threshold, post_window_hours, }), ); let ingestors = Arc::new(ingestors); // Spawn message handling loop let ingestors_clone = ingestors.clone(); let reconnect_tx_clone = reconnect_tx.clone(); let cursor_clone = cursor.clone(); tokio::spawn(async move { while let Ok(message) = msg_rx.recv_async().await { if let Err(e) = handler::handle_message( message, &ingestors_clone, reconnect_tx_clone.clone(), cursor_clone.clone(), ) .await { eprintln!("Error processing message: {}", e); } } }); // Connect to jetstream jetstream .connect(cursor.clone()) .await .map_err(|e| anyhow::anyhow!(e.to_string())) } #[async_trait::async_trait] impl LexiconIngestor for StarIngestor { /// Asynchronously processes an incoming event message related to "stars" and performs database operations and other actions based on the event type. /// /// # Parameters: /// - `message`: An [`Event`](rocketman::types::event::Event) containing event data, which includes operations (`Create`, `Update`, or `Delete`) and associated metadata. /// /// # Returns: /// - An [`anyhow::Result<()>`](anyhow::Result), which is `Ok(())` if the operation succeeds, or an error if any step fails. /// /// # Functionality: /// 1. **Create or Update Operations**: /// - If the async fn ingest( &self, message: rocketman::types::event::Event, ) -> anyhow::Result<()> { if let Some(commit) = &message.commit { match commit.operation { Operation::Create | Operation::Update => { if let Some(record) = &commit.record { let rec = serde_json::from_value::< atproto_api::sh::tangled::feed::star::RecordData, >(record.clone())?; let repo_subject = rec.subject; // Insert or ignore duplicate per did+rkey let result = sqlx::query( "INSERT OR IGNORE INTO stars(createdAt, did, rkey, subject) VALUES(?, ?, ?, ?)" ) .bind(rec.created_at.as_str()) .bind(&message.did) .bind(commit.rkey.as_str()) .bind(&repo_subject) .execute(&self.pool) .await?; // Only check threshold if we actually inserted a new row if result.rows_affected() > 0 { let offset = format!("-{} hours", self.timeframe_hours); // Count stars in the last timeframe_hours using RFC3339 string comparison let (count_in_window,): (i64,) = sqlx::query_as( "SELECT COUNT(*) as cnt FROM stars WHERE createdAt >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?) AND subject = ?" ) .bind(&offset) .bind(&repo_subject) .fetch_one(&self.pool) .await?; if count_in_window >= self.star_threshold { log::info!( "Star threshold met: {} stars in the last {} hour(s) (threshold: {}), checking to see if it should be posted", count_in_window, self.timeframe_hours, self.star_threshold ); // Check if a post was made within the last post_window_hours let post_offset = format!("-{} hours", self.post_window_hours); let (posts_in_window,): (i64,) = sqlx::query_as( "SELECT COUNT(*) as cnt FROM posts_made WHERE createdAt >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?) AND subject = ?" ) .bind(&post_offset) .bind(&repo_subject) .fetch_one(&self.pool) .await?; if posts_in_window == 0 { // Insert a new record to mark that we've posted let _ = sqlx::query( "INSERT INTO posts_made(createdAt, subject) VALUES(strftime('%Y-%m-%dT%H:%M:%fZ','now'), ?)" ) .bind(&repo_subject.clone()) .execute(&self.pool) .await?; let parsed = parse_uri(&repo_subject)?; let cloned_repo_owner = parsed.did.clone(); let stars = fetch_constellation_count(&repo_subject) .await .unwrap_or_else(|err| { log::error!("Error calling constellation: {:?}", err); 0 }); let repo_record = &self .sling_shot .get_record::( parsed.did.as_str(), parsed.collection.as_str(), parsed.rkey.as_str(), ) .await?; let repo_name = repo_record.value.name.clone(); let handle = &self.bot.get_handle(cloned_repo_owner.clone()).await?; let tangled_sh_url = format!( "https://tangled.sh/{cloned_repo_owner}/{repo_name}" ); let description = match repo_record.value.description.clone() { None => "".to_string(), Some(desc) => format!(" {desc}"), }; let handle_and_repo = format!("{handle}/{repo_name}"); let _ctx = RepoPreview { repo: handle_and_repo.clone(), stars: stars as u64, description: description.clone(), }; // Fetch the pre-rendered image from the external service instead of rendering via Chromium let tangled_og_image = format!("{tangled_sh_url}/opengraph"); log::info!( "Attempting to get the picture at: {tangled_og_image}" ); let post_text = format!("{handle_and_repo}{description}\n⭐️ {stars}"); let response = reqwest::get(tangled_og_image).await?; let embed = match response.status().is_success() { true => { let bytes = response.bytes().await?.to_vec(); let blob_upload = &self .bot .agent .api .com .atproto .repo .upload_blob(bytes) .await?; let image = atrium_api::app::bsky::embed::images::ImageData { alt: format!( "An image showing the same text inside of the post. {post_text}" ), aspect_ratio: Some( atrium_api::app::bsky::embed::defs::AspectRatioData { width: NonZeroU64::try_from( 1_200_u64)?, height: NonZeroU64::try_from(630_u64)?, } .into(), ), //Good lord how many clones is that image: blob_upload.clone().blob.clone(), }; Some(atrium_api::types::Union::Refs( RecordEmbedRefs::AppBskyEmbedImagesMain(Box::new( atrium_api::app::bsky::embed::images::MainData { images: vec![image.into()], } .into(), )), )) } false => None, }; let post = atrium_api::app::bsky::feed::post::RecordData { created_at: atrium_api::types::string::Datetime::now(), embed, entities: None, facets: Some(vec![ atrium_api::app::bsky::richtext::facet::MainData { features: vec![Union::Refs( MainFeaturesItem::Link(Box::new( LinkData { uri: tangled_sh_url, } .into(), )), )], index: ByteSliceData { byte_end: handle_and_repo.chars().count(), byte_start: 0, } .into(), } .into(), ]), labels: None, //You don't see a thing. No unwraps langs: Some(vec![Language::new("en".to_string()).unwrap()]), reply: None, tags: None, text: post_text, }; log::info!( "Threshold met and allowed to be posted, total stars: {stars}" ); match self.bot.agent.create_record(post).await { Ok(_) => { log::info!("NEW POST MADE") } Err(err) => { log::error!("{err}") } } } } } } } Operation::Delete => { // Delete by did and rkey let _ = sqlx::query("DELETE FROM stars WHERE did = ? AND rkey = ?") .bind(&message.did) .bind(commit.rkey.as_str()) .execute(&self.pool) .await?; } } } Ok(()) } }