···11+-- Create the stars table
22+CREATE TABLE IF NOT EXISTS stars (
33+ createdAt TEXT NOT NULL, -- ISO8601 timestamp string (from record.created_at)
44+ did TEXT NOT NULL,
55+ rkey TEXT NOT NULL,
66+ subject TEXT NOT NULL
77+);
88+99+-- Indexes for faster lookup
1010+CREATE INDEX IF NOT EXISTS idx_stars_rkey ON stars(rkey);
1111+CREATE INDEX IF NOT EXISTS idx_stars_subject ON stars(subject);
1212+1313+-- Optional composite unique constraint to prevent duplicate entries per DID/rkey
1414+CREATE UNIQUE INDEX IF NOT EXISTS uq_stars_did_rkey ON stars(did, rkey);
+35-2
bot/src/main.rs
···3535 let msg_rx = jetstream.get_msg_rx();
3636 let reconnect_tx = jetstream.get_reconnect_tx();
37373838+ // Read configuration from environment
3939+ let timeframe_hours: i64 = std::env::var("TIMEFRAME")
4040+ .ok()
4141+ .and_then(|v| v.parse::<i64>().ok())
4242+ .filter(|v| *v > 0)
4343+ .unwrap_or(1);
4444+ let star_threshold: i64 = std::env::var("STAR_THRESHOLD")
4545+ .ok()
4646+ .and_then(|v| v.parse::<i64>().ok())
4747+ .filter(|v| *v > 0)
4848+ .unwrap_or(10);
4949+3850 // Ingestor for the star collection
3951 let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
4052 ingestors.insert(
4153 atproto_api::sh::tangled::feed::Star::NSID.to_string(),
4242- Box::new(StarIngestor { pool: pool.clone() }),
5454+ Box::new(StarIngestor { pool: pool.clone(), timeframe_hours, star_threshold }),
4355 );
4456 let ingestors = Arc::new(ingestors);
4557···61736274struct StarIngestor {
6375 pool: SqlitePool,
7676+ timeframe_hours: i64,
7777+ star_threshold: i64,
6478}
65796680#[async_trait::async_trait]
···7286 if let Some(record) = &commit.record {
7387 let rec = serde_json::from_value::<atproto_api::sh::tangled::feed::star::RecordData>(record.clone())?;
7488 // Insert or ignore duplicate per did+rkey
7575- let _ = sqlx::query(
8989+ let result = sqlx::query(
7690 "INSERT OR IGNORE INTO stars(createdAt, did, rkey, subject) VALUES(?, ?, ?, ?)"
7791 )
7892 .bind(rec.created_at.as_str())
···8195 .bind(&rec.subject)
8296 .execute(&self.pool)
8397 .await?;
9898+9999+ // Only check threshold if we actually inserted a new row
100100+ if result.rows_affected() > 0 {
101101+ let offset = format!("-{} hours", self.timeframe_hours);
102102+ // Count stars in the last timeframe_hours using RFC3339 string comparison
103103+ let (count_in_window,): (i64,) = sqlx::query_as(
104104+ "SELECT COUNT(*) as cnt FROM stars WHERE createdAt >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?)"
105105+ )
106106+ .bind(&offset)
107107+ .fetch_one(&self.pool)
108108+ .await?;
109109+110110+ if count_in_window >= self.star_threshold {
111111+ println!(
112112+ "Star threshold met: {} stars in the last {} hour(s) (threshold: {})",
113113+ count_in_window, self.timeframe_hours, self.star_threshold
114114+ );
115115+ }
116116+ }
84117 }
85118 }
86119 Operation::Delete => {