this repo has no description

feature: denylist support and admin page

Signed-off-by: Nick Gerakines <12125+ngerakines@users.noreply.github.com>

+215 -8
+1
migrations/20241115180828_denylist.down.sql
··· 1 + -- Add down migration script here
+9
migrations/20241115180828_denylist.up.sql
··· 1 + -- Add up migration script here 2 + 3 + CREATE TABLE denylist ( 4 + subject TEXT NOT NULL, 5 + reason TEXT NOT NULL, 6 + updated_at DATETIME NOT NULL DEFAULT (datetime('now')), 7 + PRIMARY KEY (subject) 8 + ); 9 +
+24 -1
src/consumer.rs
··· 16 16 use crate::storage; 17 17 use crate::storage::consumer_control_get; 18 18 use crate::storage::consumer_control_insert; 19 + use crate::storage::denylist_exists; 19 20 use crate::storage::feed_content_update; 20 21 use crate::storage::feed_content_upsert; 21 22 use crate::storage::StoragePool; ··· 178 179 } 179 180 let event_value = event_value.unwrap(); 180 181 181 - for feed_matcher in self.feed_matchers.0.iter() { 182 + // Assumption: Performing a query for each event will cost more in the 183 + // long-term than evaluating each event against all matchers and if there's a 184 + // match, then checking both the event DID and the AT-URI DID. 185 + 'matchers_loop: for feed_matcher in self.feed_matchers.0.iter() { 182 186 if let Some(Match(op, aturi)) = feed_matcher.matches(&event_value) { 183 187 tracing::debug!(feed_id = ?feed_matcher.feed, "matched event"); 188 + 189 + let aturi_did = did_from_aturi(&aturi); 190 + let dids = vec![event.did.as_str(), aturi_did.as_str()]; 191 + if denylist_exists(&self.pool, &dids).await? { 192 + break 'matchers_loop; 193 + } 194 + 184 195 let feed_content = storage::model::FeedContent{ 185 196 feed_id: feed_matcher.feed.clone(), 186 197 uri: aturi, ··· 206 217 207 218 Ok(()) 208 219 } 220 + } 221 + 222 + fn did_from_aturi(aturi: &str) -> String { 223 + let aturi_len = aturi.len(); 224 + if aturi_len < 6 { 225 + return "".to_string(); 226 + } 227 + let collection_start = aturi[5..] 228 + .find("/") 229 + .map(|value| value + 5) 230 + .unwrap_or(aturi_len); 231 + aturi[5..collection_start].to_string() 209 232 } 210 233 211 234 pub(crate) mod model {
+83
src/http/handle_admin.rs
··· 1 + use anyhow::Result; 2 + use axum::{extract::State, response::IntoResponse, Form}; 3 + use axum_extra::response::Html; 4 + use serde::Deserialize; 5 + 6 + use crate::{ 7 + errors::SupercellError, 8 + storage::{denylist_remove, denylist_upsert, feed_content_purge_aturi}, 9 + }; 10 + 11 + use super::context::WebContext; 12 + 13 + #[derive(Deserialize, Default)] 14 + pub struct AdminForm { 15 + pub action: Option<String>, 16 + pub did: Option<String>, 17 + pub reason: Option<String>, 18 + pub aturi: Option<String>, 19 + pub feed: Option<String>, 20 + } 21 + 22 + pub async fn handle_admin( 23 + State(web_context): State<WebContext>, 24 + Form(form): Form<AdminForm>, 25 + ) -> Result<impl IntoResponse, SupercellError> { 26 + if let Some(action) = form.action { 27 + match action.as_str() { 28 + "purge" => { 29 + if let Some(aturi) = form.aturi { 30 + let feed = form.feed.filter(|s| !s.is_empty()); 31 + tracing::debug!("purging at-uri: {:?} with feed: {:?}", aturi, feed); 32 + feed_content_purge_aturi(&web_context.pool, &aturi, &feed).await?; 33 + } 34 + } 35 + "deny" => { 36 + if let Some(did) = form.did { 37 + let reason = form.reason.unwrap_or("n/a".to_string()); 38 + denylist_upsert(&web_context.pool, &did, &reason).await?; 39 + } 40 + } 41 + "allow" => { 42 + if let Some(did) = form.did { 43 + denylist_remove(&web_context.pool, &did).await?; 44 + } 45 + } 46 + _ => {} 47 + } 48 + } 49 + 50 + Ok(Html( 51 + r#" 52 + <!doctype html> 53 + <html> 54 + <head><title>Supercell Admin</title></head> 55 + <body> 56 + <p>Purge AT-URI</p> 57 + <form action="/admin" method="post"> 58 + <input type="hidden" name="action" value="purge"> 59 + <label for="purge_aturi">AT-URI: <input type="text" id="purge_aturi" name="aturi" required="required"></label> 60 + <label for="purge_feed">Feed (optional): <input type="text" id="purge_feed" name="feed"></label> 61 + <input type="submit" name="submit" value="Submit"> 62 + </form> 63 + <hr/> 64 + <p>Denylist Add</p> 65 + <form action="/admin" method="post"> 66 + <input type="hidden" name="action" value="deny"> 67 + <label for="deny_did">DID: <input type="text" id="deny_did" name="did" required="required"></label> 68 + <label for="deny_reason">Reason (optional): <input type="text" id="deny_reason" name="reason"></label> 69 + <input type="submit" name="submit" value="Submit"> 70 + </form> 71 + <hr/> 72 + <p>Denylist Remove</p> 73 + <form action="/admin" method="post"> 74 + <input type="hidden" name="action" value="allow"> 75 + <label for="allow_did">DID: <input type="text" id="allow_did" name="did" required="required"></label> 76 + <input type="submit" name="submit" value="Submit"> 77 + </form> 78 + <hr/> 79 + </body> 80 + </html> 81 + "#, 82 + )) 83 + }
+1
src/http/mod.rs
··· 1 1 pub mod context; 2 + pub mod handle_admin; 2 3 pub mod handle_describe_feed_generator; 3 4 pub mod handle_get_feed_skeleton; 4 5 pub mod handle_index;
+9 -2
src/http/server.rs
··· 1 1 use super::{ 2 - context::WebContext, handle_describe_feed_generator::handle_describe_feed_generator, 2 + context::WebContext, handle_admin::handle_admin, 3 + handle_describe_feed_generator::handle_describe_feed_generator, 3 4 handle_get_feed_skeleton::handle_get_feed_skeleton, handle_index::handle_index, 4 5 handle_well_known::handle_well_known, 5 6 }; 6 - use axum::{http::HeaderValue, routing::get, Router}; 7 + use axum::{ 8 + http::HeaderValue, 9 + routing::{get, post}, 10 + Router, 11 + }; 7 12 use http::{ 8 13 header::{ACCEPT, ACCEPT_LANGUAGE}, 9 14 Method, ··· 25 30 "/xrpc/app.bsky.feed.describeFeedGenerator", 26 31 get(handle_describe_feed_generator), 27 32 ) 33 + .route("/admin", get(handle_admin)) 34 + .route("/admin", post(handle_admin)) 28 35 .layer(( 29 36 TraceLayer::new_for_http(), 30 37 TimeoutLayer::new(Duration::from_secs(10)),
+88 -5
src/storage.rs
··· 1 1 use anyhow::{Context, Result}; 2 2 use chrono::{prelude::*, Duration}; 3 - use sqlx::{Pool, Sqlite}; 3 + use sqlx::{Execute, Pool, QueryBuilder, Sqlite}; 4 4 5 5 use model::FeedContent; 6 6 7 7 pub type StoragePool = Pool<Sqlite>; 8 8 9 9 pub mod model { 10 - use chrono::{DateTime, SubsecRound}; 11 - use serde::Serialize; 12 - use sqlx::prelude::FromRow; 10 + use chrono::{DateTime, SubsecRound, Utc}; 11 + use sqlx::prelude::*; 13 12 14 - #[derive(Clone, FromRow, Serialize)] 13 + #[derive(Clone, FromRow)] 15 14 pub struct FeedContent { 16 15 pub feed_id: String, 17 16 pub uri: String, ··· 30 29 let diff_seconds = now - target; 31 30 std::cmp::max((diff_seconds / (60 * 60)) + 1, 1) 32 31 } 32 + } 33 + 34 + #[derive(Clone, FromRow)] 35 + pub struct Denylist { 36 + pub subject: String, 37 + pub reason: String, 38 + pub created_at: DateTime<Utc>, 33 39 } 34 40 } 35 41 ··· 188 194 .context("failed to delete feed content beyond mark")?; 189 195 190 196 tx.commit().await.context("failed to commit transaction") 197 + } 198 + 199 + pub async fn denylist_upsert(pool: &StoragePool, subject: &str, reason: &str) -> Result<()> { 200 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 201 + 202 + let now = Utc::now(); 203 + sqlx::query("INSERT OR REPLACE INTO denylist (subject, reason, updated_at) VALUES (?, ?, ?)") 204 + .bind(subject) 205 + .bind(reason) 206 + .bind(now) 207 + .execute(tx.as_mut()) 208 + .await 209 + .context("failed to upsert denylist record")?; 210 + 211 + tx.commit().await.context("failed to commit transaction") 212 + } 213 + 214 + pub async fn denylist_remove(pool: &StoragePool, subject: &str) -> Result<()> { 215 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 216 + 217 + sqlx::query("DELETE FROM denylist WHERE subject = ?") 218 + .bind(subject) 219 + .execute(tx.as_mut()) 220 + .await 221 + .context("failed to delete denylist record")?; 222 + 223 + tx.commit().await.context("failed to commit transaction") 224 + } 225 + 226 + pub async fn feed_content_purge_aturi( 227 + pool: &StoragePool, 228 + aturi: &str, 229 + feed: &Option<String>, 230 + ) -> Result<()> { 231 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 232 + 233 + if let Some(feed) = feed { 234 + sqlx::query("DELETE FROM feed_content WHERE feed_id = ? AND uri = ?") 235 + .bind(feed) 236 + .bind(aturi) 237 + .execute(tx.as_mut()) 238 + .await 239 + .context("failed to delete denylist record")?; 240 + } else { 241 + sqlx::query("DELETE FROM feed_content WHERE uri = ?") 242 + .bind(aturi) 243 + .execute(tx.as_mut()) 244 + .await 245 + .context("failed to delete denylist record")?; 246 + } 247 + 248 + tx.commit().await.context("failed to commit transaction") 249 + } 250 + 251 + pub async fn denylist_exists(pool: &StoragePool, subjects: &[&str]) -> Result<bool> { 252 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 253 + 254 + let mut query_builder: QueryBuilder<Sqlite> = 255 + QueryBuilder::new("SELECT COUNT(*) FROM denylist WHERE subject IN ("); 256 + let mut separated = query_builder.separated(", "); 257 + for subject in subjects { 258 + separated.push_bind(subject); 259 + } 260 + separated.push_unseparated(") "); 261 + 262 + let mut query = sqlx::query_scalar::<_, i64>(query_builder.build().sql()); 263 + for subject in subjects { 264 + query = query.bind(subject); 265 + } 266 + let count = query 267 + .fetch_one(tx.as_mut()) 268 + .await 269 + .context("failed to delete denylist record")?; 270 + 271 + tx.commit().await.context("failed to commit transaction")?; 272 + 273 + Ok(count > 0) 191 274 } 192 275 193 276 #[cfg(test)]