tangled.org trending bluesky account
at main 384 lines 18 kB view raw
1extern crate dotenv; 2use crate::constellation::fetch_constellation_count; 3use atrium_api::app::bsky::feed::post::RecordEmbedRefs; 4use atrium_api::app::bsky::richtext::facet::{ByteSliceData, LinkData, MainFeaturesItem}; 5use atrium_api::types::string::Language; 6use atrium_api::types::{Collection, Union}; 7use dotenv::dotenv; 8use logic::BotApi; 9use rocketman::endpoints::JetstreamEndpoints; 10use rocketman::{ 11 connection::JetstreamConnection, handler, ingestion::LexiconIngestor, 12 options::JetstreamOptions, types::event::Operation, 13}; 14use serde::Serialize; 15use slingshot::Slingshot; 16use sqlx::sqlite::{SqliteConnectOptions, SqlitePool}; 17use std::collections::HashMap; 18use std::num::NonZeroU64; 19use std::sync::{Arc, Mutex}; 20use std::time::Duration; 21 22mod constellation; 23 24#[derive(Debug)] 25struct ParsedRecord { 26 did: String, 27 collection: String, 28 rkey: String, 29} 30 31struct StarIngestor { 32 pool: SqlitePool, 33 bot: Arc<BotApi>, 34 sling_shot: Arc<Slingshot>, 35 timeframe_hours: i64, 36 star_threshold: i64, 37 post_window_hours: i64, 38} 39 40#[derive(Serialize)] 41struct RepoPreview { 42 repo: String, 43 stars: u64, 44 description: String, 45} 46 47fn parse_uri(uri: &str) -> anyhow::Result<ParsedRecord> { 48 let parts: Vec<&str> = uri.trim_start_matches("at://").split('/').collect(); 49 if parts.len() != 3 { 50 return Err(anyhow::anyhow!("Invalid URI format")); 51 } 52 Ok(ParsedRecord { 53 did: parts[0].to_string(), 54 collection: parts[1].to_string(), 55 rkey: parts[2].to_string(), 56 }) 57} 58 59#[tokio::main] 60async fn main() -> anyhow::Result<()> { 61 env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); 62 dotenv().ok(); 63 64 // Initialize DB pool and run migrations 65 let database_url = 66 std::env::var("DATABASE_URL").unwrap_or_else(|_| "./stitch_counter.sqlite".to_string()); 67 let options = SqliteConnectOptions::new() 68 .filename(database_url) 69 .create_if_missing(true) 70 .busy_timeout(Duration::from_secs(5)); 71 let pool = SqlitePool::connect_with(options).await?; 72 // Run migrations from ./migrations 73 sqlx::migrate!("./migrations").run(&pool).await?; 74 75 let jet_stream = match std::env::var("JETSTREAM_URL") { 76 Ok(url) => JetstreamEndpoints::Custom(url.into()), 77 Err(_) => JetstreamEndpoints::default(), 78 }; 79 // Configure Jetstream to listen to sh.tangled.feed.star 80 let opts = JetstreamOptions::builder() 81 .ws_url(jet_stream) 82 .wanted_collections(vec![atproto_api::sh::tangled::feed::Star::NSID.to_string()]) 83 .build(); 84 85 let jetstream = JetstreamConnection::new(opts); 86 87 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None)); 88 89 let msg_rx = jetstream.get_msg_rx(); 90 let reconnect_tx = jetstream.get_reconnect_tx(); 91 92 // Read configuration from environment 93 let timeframe_hours: i64 = std::env::var("TIMEFRAME") 94 .ok() 95 .and_then(|v| v.parse::<i64>().ok()) 96 .filter(|v| *v > 0) 97 .unwrap_or(1); 98 let star_threshold: i64 = std::env::var("STAR_THRESHOLD") 99 .ok() 100 .and_then(|v| v.parse::<i64>().ok()) 101 .filter(|v| *v > 0) 102 .unwrap_or(10); 103 let post_window_hours: i64 = std::env::var("POST_WINDOW_HOURS") 104 .ok() 105 .and_then(|v| v.parse::<i64>().ok()) 106 .filter(|v| *v > 0) 107 .unwrap_or(24); 108 109 let bot_username = std::env::var("BOT_USERNAME").expect("BOT_USERNAME must be set"); 110 let bot_password = std::env::var("BOT_PASSWORD").expect("BOT_PASSWORD must be set"); 111 let bot_pds_url = std::env::var("BOT_PDS_URL").expect("BOT_PDS_URL must be set"); 112 113 log::info!( 114 "Starting bot with username: {} Timeframe threshold: {} Star Threshold: {} Post Threshold: {}", 115 bot_username, 116 timeframe_hours, 117 star_threshold, 118 post_window_hours 119 ); 120 121 let bot_api = BotApi::new_logged_in(bot_username, bot_password, bot_pds_url).await?; 122 let sling_shot = Arc::new(Slingshot::new("https://slingshot.microcosm.blue")?); 123 124 // Ingestor for the star collection 125 let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new(); 126 ingestors.insert( 127 atproto_api::sh::tangled::feed::Star::NSID.to_string(), 128 Box::new(StarIngestor { 129 pool: pool.clone(), 130 bot: Arc::new(bot_api), 131 sling_shot: sling_shot.clone(), 132 timeframe_hours, 133 star_threshold, 134 post_window_hours, 135 }), 136 ); 137 let ingestors = Arc::new(ingestors); 138 139 // Spawn message handling loop 140 let ingestors_clone = ingestors.clone(); 141 let reconnect_tx_clone = reconnect_tx.clone(); 142 let cursor_clone = cursor.clone(); 143 tokio::spawn(async move { 144 while let Ok(message) = msg_rx.recv_async().await { 145 if let Err(e) = handler::handle_message( 146 message, 147 &ingestors_clone, 148 reconnect_tx_clone.clone(), 149 cursor_clone.clone(), 150 ) 151 .await 152 { 153 eprintln!("Error processing message: {}", e); 154 } 155 } 156 }); 157 158 // Connect to jetstream 159 jetstream 160 .connect(cursor.clone()) 161 .await 162 .map_err(|e| anyhow::anyhow!(e.to_string())) 163} 164 165#[async_trait::async_trait] 166impl LexiconIngestor for StarIngestor { 167 /// Asynchronously processes an incoming event message related to "stars" and performs database operations and other actions based on the event type. 168 /// 169 /// # Parameters: 170 /// - `message`: An [`Event`](rocketman::types::event::Event) containing event data, which includes operations (`Create`, `Update`, or `Delete`) and associated metadata. 171 /// 172 /// # Returns: 173 /// - An [`anyhow::Result<()>`](anyhow::Result), which is `Ok(())` if the operation succeeds, or an error if any step fails. 174 /// 175 /// # Functionality: 176 /// 1. **Create or Update Operations**: 177 /// - If the 178 async fn ingest( 179 &self, 180 message: rocketman::types::event::Event<serde_json::Value>, 181 ) -> anyhow::Result<()> { 182 if let Some(commit) = &message.commit { 183 match commit.operation { 184 Operation::Create | Operation::Update => { 185 if let Some(record) = &commit.record { 186 let rec = serde_json::from_value::< 187 atproto_api::sh::tangled::feed::star::RecordData, 188 >(record.clone())?; 189 190 let repo_subject = rec.subject; 191 192 // Insert or ignore duplicate per did+rkey 193 let result = sqlx::query( 194 "INSERT OR IGNORE INTO stars(createdAt, did, rkey, subject) VALUES(?, ?, ?, ?)" 195 ) 196 .bind(rec.created_at.as_str()) 197 .bind(&message.did) 198 .bind(commit.rkey.as_str()) 199 .bind(&repo_subject) 200 .execute(&self.pool) 201 .await?; 202 203 // Only check threshold if we actually inserted a new row 204 if result.rows_affected() > 0 { 205 let offset = format!("-{} hours", self.timeframe_hours); 206 // Count stars in the last timeframe_hours using RFC3339 string comparison 207 let (count_in_window,): (i64,) = sqlx::query_as( 208 "SELECT COUNT(*) as cnt FROM stars WHERE createdAt >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?) AND subject = ?" 209 ) 210 .bind(&offset) 211 .bind(&repo_subject) 212 .fetch_one(&self.pool) 213 .await?; 214 215 if count_in_window >= self.star_threshold { 216 log::info!( 217 "Star threshold met: {} stars in the last {} hour(s) (threshold: {}), checking to see if it should be posted", 218 count_in_window, 219 self.timeframe_hours, 220 self.star_threshold 221 ); 222 223 // Check if a post was made within the last post_window_hours 224 let post_offset = format!("-{} hours", self.post_window_hours); 225 let (posts_in_window,): (i64,) = sqlx::query_as( 226 "SELECT COUNT(*) as cnt FROM posts_made WHERE createdAt >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?) AND subject = ?" 227 ) 228 .bind(&post_offset) 229 .bind(&repo_subject) 230 .fetch_one(&self.pool) 231 .await?; 232 233 if posts_in_window == 0 { 234 // Insert a new record to mark that we've posted 235 let _ = sqlx::query( 236 "INSERT INTO posts_made(createdAt, subject) VALUES(strftime('%Y-%m-%dT%H:%M:%fZ','now'), ?)" 237 ) 238 .bind(&repo_subject.clone()) 239 .execute(&self.pool) 240 .await?; 241 242 let parsed = parse_uri(&repo_subject)?; 243 let cloned_repo_owner = parsed.did.clone(); 244 let stars = fetch_constellation_count(&repo_subject) 245 .await 246 .unwrap_or_else(|err| { 247 log::error!("Error calling constellation: {:?}", err); 248 0 249 }); 250 let repo_record = &self 251 .sling_shot 252 .get_record::<atproto_api::sh::tangled::repo::RecordData>( 253 parsed.did.as_str(), 254 parsed.collection.as_str(), 255 parsed.rkey.as_str(), 256 ) 257 .await?; 258 let repo_name = repo_record.value.name.clone(); 259 let handle = 260 &self.bot.get_handle(cloned_repo_owner.clone()).await?; 261 let tangled_sh_url = format!( 262 "https://tangled.sh/{cloned_repo_owner}/{repo_name}" 263 ); 264 let description = match repo_record.value.description.clone() { 265 None => "".to_string(), 266 Some(desc) => format!(" {desc}"), 267 }; 268 269 let handle_and_repo = format!("{handle}/{repo_name}"); 270 271 let _ctx = RepoPreview { 272 repo: handle_and_repo.clone(), 273 stars: stars as u64, 274 description: description.clone(), 275 }; 276 277 // Fetch the pre-rendered image from the external service instead of rendering via Chromium 278 let tangled_og_image = format!("{tangled_sh_url}/opengraph"); 279 log::info!( 280 "Attempting to get the picture at: {tangled_og_image}" 281 ); 282 283 let post_text = 284 format!("{handle_and_repo}{description}\n⭐️ {stars}"); 285 286 let response = reqwest::get(tangled_og_image).await?; 287 288 let embed = match response.status().is_success() { 289 true => { 290 let bytes = response.bytes().await?.to_vec(); 291 let blob_upload = &self 292 .bot 293 .agent 294 .api 295 .com 296 .atproto 297 .repo 298 .upload_blob(bytes) 299 .await?; 300 301 let image = atrium_api::app::bsky::embed::images::ImageData { 302 alt: format!( 303 "An image showing the same text inside of the post. {post_text}" 304 ), 305 aspect_ratio: Some( 306 atrium_api::app::bsky::embed::defs::AspectRatioData { 307 width: NonZeroU64::try_from( 1_200_u64)?, 308 height: NonZeroU64::try_from(630_u64)?, 309 } 310 .into(), 311 ), 312 //Good lord how many clones is that 313 image: blob_upload.clone().blob.clone(), 314 }; 315 Some(atrium_api::types::Union::Refs( 316 RecordEmbedRefs::AppBskyEmbedImagesMain(Box::new( 317 atrium_api::app::bsky::embed::images::MainData { 318 images: vec![image.into()], 319 } 320 .into(), 321 )), 322 )) 323 } 324 false => None, 325 }; 326 327 let post = atrium_api::app::bsky::feed::post::RecordData { 328 created_at: atrium_api::types::string::Datetime::now(), 329 embed, 330 entities: None, 331 facets: Some(vec![ 332 atrium_api::app::bsky::richtext::facet::MainData { 333 features: vec![Union::Refs( 334 MainFeaturesItem::Link(Box::new( 335 LinkData { 336 uri: tangled_sh_url, 337 } 338 .into(), 339 )), 340 )], 341 index: ByteSliceData { 342 byte_end: handle_and_repo.chars().count(), 343 byte_start: 0, 344 } 345 .into(), 346 } 347 .into(), 348 ]), 349 labels: None, 350 //You don't see a thing. No unwraps 351 langs: Some(vec![Language::new("en".to_string()).unwrap()]), 352 reply: None, 353 tags: None, 354 text: post_text, 355 }; 356 log::info!( 357 "Threshold met and allowed to be posted, total stars: {stars}" 358 ); 359 match self.bot.agent.create_record(post).await { 360 Ok(_) => { 361 log::info!("NEW POST MADE") 362 } 363 Err(err) => { 364 log::error!("{err}") 365 } 366 } 367 } 368 } 369 } 370 } 371 } 372 Operation::Delete => { 373 // Delete by did and rkey 374 let _ = sqlx::query("DELETE FROM stars WHERE did = ? AND rkey = ?") 375 .bind(&message.did) 376 .bind(commit.rkey.as_str()) 377 .execute(&self.pool) 378 .await?; 379 } 380 } 381 } 382 Ok(()) 383 } 384}