tangled.org trending bluesky account
1extern crate dotenv;
2use crate::constellation::fetch_constellation_count;
3use atrium_api::app::bsky::feed::post::RecordEmbedRefs;
4use atrium_api::app::bsky::richtext::facet::{ByteSliceData, LinkData, MainFeaturesItem};
5use atrium_api::types::string::Language;
6use atrium_api::types::{Collection, Union};
7use dotenv::dotenv;
8use logic::BotApi;
9use rocketman::endpoints::JetstreamEndpoints;
10use rocketman::{
11 connection::JetstreamConnection, handler, ingestion::LexiconIngestor,
12 options::JetstreamOptions, types::event::Operation,
13};
14use serde::Serialize;
15use slingshot::Slingshot;
16use sqlx::sqlite::{SqliteConnectOptions, SqlitePool};
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::{Arc, Mutex};
20use std::time::Duration;
21
22mod constellation;
23
24#[derive(Debug)]
25struct ParsedRecord {
26 did: String,
27 collection: String,
28 rkey: String,
29}
30
31struct StarIngestor {
32 pool: SqlitePool,
33 bot: Arc<BotApi>,
34 sling_shot: Arc<Slingshot>,
35 timeframe_hours: i64,
36 star_threshold: i64,
37 post_window_hours: i64,
38}
39
40#[derive(Serialize)]
41struct RepoPreview {
42 repo: String,
43 stars: u64,
44 description: String,
45}
46
47fn parse_uri(uri: &str) -> anyhow::Result<ParsedRecord> {
48 let parts: Vec<&str> = uri.trim_start_matches("at://").split('/').collect();
49 if parts.len() != 3 {
50 return Err(anyhow::anyhow!("Invalid URI format"));
51 }
52 Ok(ParsedRecord {
53 did: parts[0].to_string(),
54 collection: parts[1].to_string(),
55 rkey: parts[2].to_string(),
56 })
57}
58
59#[tokio::main]
60async fn main() -> anyhow::Result<()> {
61 env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
62 dotenv().ok();
63
64 // Initialize DB pool and run migrations
65 let database_url =
66 std::env::var("DATABASE_URL").unwrap_or_else(|_| "./stitch_counter.sqlite".to_string());
67 let options = SqliteConnectOptions::new()
68 .filename(database_url)
69 .create_if_missing(true)
70 .busy_timeout(Duration::from_secs(5));
71 let pool = SqlitePool::connect_with(options).await?;
72 // Run migrations from ./migrations
73 sqlx::migrate!("./migrations").run(&pool).await?;
74
75 let jet_stream = match std::env::var("JETSTREAM_URL") {
76 Ok(url) => JetstreamEndpoints::Custom(url.into()),
77 Err(_) => JetstreamEndpoints::default(),
78 };
79 // Configure Jetstream to listen to sh.tangled.feed.star
80 let opts = JetstreamOptions::builder()
81 .ws_url(jet_stream)
82 .wanted_collections(vec![atproto_api::sh::tangled::feed::Star::NSID.to_string()])
83 .build();
84
85 let jetstream = JetstreamConnection::new(opts);
86
87 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
88
89 let msg_rx = jetstream.get_msg_rx();
90 let reconnect_tx = jetstream.get_reconnect_tx();
91
92 // Read configuration from environment
93 let timeframe_hours: i64 = std::env::var("TIMEFRAME")
94 .ok()
95 .and_then(|v| v.parse::<i64>().ok())
96 .filter(|v| *v > 0)
97 .unwrap_or(1);
98 let star_threshold: i64 = std::env::var("STAR_THRESHOLD")
99 .ok()
100 .and_then(|v| v.parse::<i64>().ok())
101 .filter(|v| *v > 0)
102 .unwrap_or(10);
103 let post_window_hours: i64 = std::env::var("POST_WINDOW_HOURS")
104 .ok()
105 .and_then(|v| v.parse::<i64>().ok())
106 .filter(|v| *v > 0)
107 .unwrap_or(24);
108
109 let bot_username = std::env::var("BOT_USERNAME").expect("BOT_USERNAME must be set");
110 let bot_password = std::env::var("BOT_PASSWORD").expect("BOT_PASSWORD must be set");
111 let bot_pds_url = std::env::var("BOT_PDS_URL").expect("BOT_PDS_URL must be set");
112
113 log::info!(
114 "Starting bot with username: {} Timeframe threshold: {} Star Threshold: {} Post Threshold: {}",
115 bot_username,
116 timeframe_hours,
117 star_threshold,
118 post_window_hours
119 );
120
121 let bot_api = BotApi::new_logged_in(bot_username, bot_password, bot_pds_url).await?;
122 let sling_shot = Arc::new(Slingshot::new("https://slingshot.microcosm.blue")?);
123
124 // Ingestor for the star collection
125 let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
126 ingestors.insert(
127 atproto_api::sh::tangled::feed::Star::NSID.to_string(),
128 Box::new(StarIngestor {
129 pool: pool.clone(),
130 bot: Arc::new(bot_api),
131 sling_shot: sling_shot.clone(),
132 timeframe_hours,
133 star_threshold,
134 post_window_hours,
135 }),
136 );
137 let ingestors = Arc::new(ingestors);
138
139 // Spawn message handling loop
140 let ingestors_clone = ingestors.clone();
141 let reconnect_tx_clone = reconnect_tx.clone();
142 let cursor_clone = cursor.clone();
143 tokio::spawn(async move {
144 while let Ok(message) = msg_rx.recv_async().await {
145 if let Err(e) = handler::handle_message(
146 message,
147 &ingestors_clone,
148 reconnect_tx_clone.clone(),
149 cursor_clone.clone(),
150 )
151 .await
152 {
153 eprintln!("Error processing message: {}", e);
154 }
155 }
156 });
157
158 // Connect to jetstream
159 jetstream
160 .connect(cursor.clone())
161 .await
162 .map_err(|e| anyhow::anyhow!(e.to_string()))
163}
164
165#[async_trait::async_trait]
166impl LexiconIngestor for StarIngestor {
167 /// Asynchronously processes an incoming event message related to "stars" and performs database operations and other actions based on the event type.
168 ///
169 /// # Parameters:
170 /// - `message`: An [`Event`](rocketman::types::event::Event) containing event data, which includes operations (`Create`, `Update`, or `Delete`) and associated metadata.
171 ///
172 /// # Returns:
173 /// - An [`anyhow::Result<()>`](anyhow::Result), which is `Ok(())` if the operation succeeds, or an error if any step fails.
174 ///
175 /// # Functionality:
176 /// 1. **Create or Update Operations**:
177 /// - If the
178 async fn ingest(
179 &self,
180 message: rocketman::types::event::Event<serde_json::Value>,
181 ) -> anyhow::Result<()> {
182 if let Some(commit) = &message.commit {
183 match commit.operation {
184 Operation::Create | Operation::Update => {
185 if let Some(record) = &commit.record {
186 let rec = serde_json::from_value::<
187 atproto_api::sh::tangled::feed::star::RecordData,
188 >(record.clone())?;
189
190 let repo_subject = rec.subject;
191
192 // Insert or ignore duplicate per did+rkey
193 let result = sqlx::query(
194 "INSERT OR IGNORE INTO stars(createdAt, did, rkey, subject) VALUES(?, ?, ?, ?)"
195 )
196 .bind(rec.created_at.as_str())
197 .bind(&message.did)
198 .bind(commit.rkey.as_str())
199 .bind(&repo_subject)
200 .execute(&self.pool)
201 .await?;
202
203 // Only check threshold if we actually inserted a new row
204 if result.rows_affected() > 0 {
205 let offset = format!("-{} hours", self.timeframe_hours);
206 // Count stars in the last timeframe_hours using RFC3339 string comparison
207 let (count_in_window,): (i64,) = sqlx::query_as(
208 "SELECT COUNT(*) as cnt FROM stars WHERE createdAt >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?) AND subject = ?"
209 )
210 .bind(&offset)
211 .bind(&repo_subject)
212 .fetch_one(&self.pool)
213 .await?;
214
215 if count_in_window >= self.star_threshold {
216 log::info!(
217 "Star threshold met: {} stars in the last {} hour(s) (threshold: {}), checking to see if it should be posted",
218 count_in_window,
219 self.timeframe_hours,
220 self.star_threshold
221 );
222
223 // Check if a post was made within the last post_window_hours
224 let post_offset = format!("-{} hours", self.post_window_hours);
225 let (posts_in_window,): (i64,) = sqlx::query_as(
226 "SELECT COUNT(*) as cnt FROM posts_made WHERE createdAt >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?) AND subject = ?"
227 )
228 .bind(&post_offset)
229 .bind(&repo_subject)
230 .fetch_one(&self.pool)
231 .await?;
232
233 if posts_in_window == 0 {
234 // Insert a new record to mark that we've posted
235 let _ = sqlx::query(
236 "INSERT INTO posts_made(createdAt, subject) VALUES(strftime('%Y-%m-%dT%H:%M:%fZ','now'), ?)"
237 )
238 .bind(&repo_subject.clone())
239 .execute(&self.pool)
240 .await?;
241
242 let parsed = parse_uri(&repo_subject)?;
243 let cloned_repo_owner = parsed.did.clone();
244 let stars = fetch_constellation_count(&repo_subject)
245 .await
246 .unwrap_or_else(|err| {
247 log::error!("Error calling constellation: {:?}", err);
248 0
249 });
250 let repo_record = &self
251 .sling_shot
252 .get_record::<atproto_api::sh::tangled::repo::RecordData>(
253 parsed.did.as_str(),
254 parsed.collection.as_str(),
255 parsed.rkey.as_str(),
256 )
257 .await?;
258 let repo_name = repo_record.value.name.clone();
259 let handle =
260 &self.bot.get_handle(cloned_repo_owner.clone()).await?;
261 let tangled_sh_url = format!(
262 "https://tangled.sh/{cloned_repo_owner}/{repo_name}"
263 );
264 let description = match repo_record.value.description.clone() {
265 None => "".to_string(),
266 Some(desc) => format!(" {desc}"),
267 };
268
269 let handle_and_repo = format!("{handle}/{repo_name}");
270
271 let _ctx = RepoPreview {
272 repo: handle_and_repo.clone(),
273 stars: stars as u64,
274 description: description.clone(),
275 };
276
277 // Fetch the pre-rendered image from the external service instead of rendering via Chromium
278 let tangled_og_image = format!("{tangled_sh_url}/opengraph");
279 log::info!(
280 "Attempting to get the picture at: {tangled_og_image}"
281 );
282
283 let post_text =
284 format!("{handle_and_repo}{description}\n⭐️ {stars}");
285
286 let response = reqwest::get(tangled_og_image).await?;
287
288 let embed = match response.status().is_success() {
289 true => {
290 let bytes = response.bytes().await?.to_vec();
291 let blob_upload = &self
292 .bot
293 .agent
294 .api
295 .com
296 .atproto
297 .repo
298 .upload_blob(bytes)
299 .await?;
300
301 let image = atrium_api::app::bsky::embed::images::ImageData {
302 alt: format!(
303 "An image showing the same text inside of the post. {post_text}"
304 ),
305 aspect_ratio: Some(
306 atrium_api::app::bsky::embed::defs::AspectRatioData {
307 width: NonZeroU64::try_from( 1_200_u64)?,
308 height: NonZeroU64::try_from(630_u64)?,
309 }
310 .into(),
311 ),
312 //Good lord how many clones is that
313 image: blob_upload.clone().blob.clone(),
314 };
315 Some(atrium_api::types::Union::Refs(
316 RecordEmbedRefs::AppBskyEmbedImagesMain(Box::new(
317 atrium_api::app::bsky::embed::images::MainData {
318 images: vec![image.into()],
319 }
320 .into(),
321 )),
322 ))
323 }
324 false => None,
325 };
326
327 let post = atrium_api::app::bsky::feed::post::RecordData {
328 created_at: atrium_api::types::string::Datetime::now(),
329 embed,
330 entities: None,
331 facets: Some(vec![
332 atrium_api::app::bsky::richtext::facet::MainData {
333 features: vec![Union::Refs(
334 MainFeaturesItem::Link(Box::new(
335 LinkData {
336 uri: tangled_sh_url,
337 }
338 .into(),
339 )),
340 )],
341 index: ByteSliceData {
342 byte_end: handle_and_repo.chars().count(),
343 byte_start: 0,
344 }
345 .into(),
346 }
347 .into(),
348 ]),
349 labels: None,
350 //You don't see a thing. No unwraps
351 langs: Some(vec![Language::new("en".to_string()).unwrap()]),
352 reply: None,
353 tags: None,
354 text: post_text,
355 };
356 log::info!(
357 "Threshold met and allowed to be posted, total stars: {stars}"
358 );
359 match self.bot.agent.create_record(post).await {
360 Ok(_) => {
361 log::info!("NEW POST MADE")
362 }
363 Err(err) => {
364 log::error!("{err}")
365 }
366 }
367 }
368 }
369 }
370 }
371 }
372 Operation::Delete => {
373 // Delete by did and rkey
374 let _ = sqlx::query("DELETE FROM stars WHERE did = ? AND rkey = ?")
375 .bind(&message.did)
376 .bind(commit.rkey.as_str())
377 .execute(&self.pool)
378 .await?;
379 }
380 }
381 }
382 Ok(())
383 }
384}