A rust implementation of skywatch-phash
at main 382 lines 13 kB view raw
1use jacquard::client::{Agent, MemoryCredentialSession}; 2use jacquard_api::com_atproto::moderation::ReasonType; 3use jacquard_common::types::string::Did; 4use miette::{IntoDiagnostic, Result}; 5use reqwest::Client; 6use std::sync::Arc; 7use std::time::Duration; 8use tokio::time::sleep; 9use tracing::{debug, error, info}; 10 11use crate::agent::AgentSession; 12use crate::cache::PhashCache; 13use crate::config::Config; 14use crate::metrics::Metrics; 15use crate::moderation::{account, claims, post, rate_limiter::RateLimiter}; 16use crate::processor::matcher; 17use crate::queue::redis_queue::JobQueue; 18use crate::types::{BlobCheck, ImageJob, MatchResult}; 19 20/// Macro to handle moderation actions with claim checking 21macro_rules! moderation_action { 22 // Report pattern: claim_X_report -> action 23 (report: $check_field:expr, $action_name:expr, $claim_fn:expr, $action:expr, $metrics_done:expr, $metrics_skip:expr, $subject:expr) => { 24 if $check_field { 25 if $claim_fn { 26 $action.await?; 27 $metrics_done; 28 info!(concat!($action_name, " completed for: {}"), $subject); 29 } else { 30 $metrics_skip; 31 info!( 32 concat!($action_name, " already done, skipping: {}"), 33 $subject 34 ); 35 } 36 } 37 }; 38 39 // Label pattern: !has_label -> action -> set_label 40 (label: $check_field:expr, $action_name:expr, $has_label_fn:expr, $action:expr, $set_label_fn:expr, $metrics_done:expr, $metrics_skip:expr, $subject:expr) => { 41 if $check_field { 42 if !$has_label_fn { 43 $action.await?; 44 $metrics_done; 45 $set_label_fn.await?; 46 info!(concat!($action_name, " completed for: {}"), $subject); 47 } else { 48 $metrics_skip; 49 info!( 50 concat!($action_name, " already done, skipping: {}"), 51 $subject 52 ); 53 } 54 } 55 }; 56} 57 58/// Worker pool for processing image jobs 59pub struct WorkerPool { 60 config: Config, 61 client: Client, 62 agent: AgentSession, 63 blob_checks: Vec<BlobCheck>, 64 metrics: Metrics, 65 rate_limiter: RateLimiter, 66} 67 68impl WorkerPool { 69 /// Create a new worker pool 70 pub fn new( 71 config: Config, 72 client: Client, 73 agent: AgentSession, 74 blob_checks: Vec<BlobCheck>, 75 metrics: Metrics, 76 ) -> Self { 77 let rate_limiter = RateLimiter::new(config.moderation.rate_limit); 78 79 Self { 80 config, 81 client, 82 agent, 83 blob_checks, 84 metrics, 85 rate_limiter, 86 } 87 } 88 89 /// Start the worker pool - processes jobs sequentially 90 /// Concurrency is achieved by running multiple instances of this concurrently 91 pub async fn start( 92 &self, 93 mut queue: JobQueue, 94 mut cache: PhashCache, 95 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, 96 ) -> Result<()> { 97 loop { 98 tokio::select! { 99 _ = shutdown_rx.recv() => { 100 info!("Worker shutting down"); 101 break; 102 } 103 104 job_result = queue.pop(1) => { 105 match job_result { 106 Ok(Some(job)) => { 107 debug!("Worker popped job from queue: {}", job.post_uri); 108 let redis_client = match redis::Client::open(self.config.redis.url.as_str()) { 109 Ok(c) => c, 110 Err(e) => { 111 error!("Failed to create Redis client: {}", e); 112 continue; 113 } 114 }; 115 116 let mut redis_conn = match redis_client.get_multiplexed_async_connection().await { 117 Ok(conn) => conn, 118 Err(e) => { 119 error!("Failed to connect to Redis: {}", e); 120 continue; 121 } 122 }; 123 124 let job_clone = job.clone(); 125 if let Err(e) = Self::process_job( 126 &self.config, 127 &self.client, 128 self.agent.agent(), 129 &self.blob_checks, 130 &self.metrics, 131 &self.rate_limiter, 132 &mut cache, 133 &mut redis_conn, 134 job, 135 self.agent.did(), 136 ) 137 .await 138 { 139 error!("Worker task failed: {}", e); 140 self.metrics.inc_jobs_failed(); 141 if let Err(retry_err) = queue.retry(job_clone).await { 142 error!("Failed to retry job: {}", retry_err); 143 } 144 } 145 } 146 Ok(None) => { 147 // Timeout, continue loop 148 } 149 Err(e) => { 150 error!("Error popping job from queue: {}", e); 151 sleep(Duration::from_millis(self.config.processing.retry_delay)).await; 152 } 153 } 154 } 155 } 156 } 157 158 Ok(()) 159 } 160 161 /// Process a single job 162 async fn process_job( 163 config: &Config, 164 client: &Client, 165 agent: &Arc<Agent<MemoryCredentialSession>>, 166 blob_checks: &[BlobCheck], 167 metrics: &Metrics, 168 rate_limiter: &RateLimiter, 169 cache: &mut PhashCache, 170 redis_conn: &mut redis::aio::MultiplexedConnection, 171 job: ImageJob, 172 created_by: &str, 173 ) -> Result<()> { 174 debug!("Processing job: {}", job.post_uri); 175 176 let matches = 177 Self::process_job_blobs(config, client, blob_checks, metrics, cache, &job).await?; 178 179 if matches.is_empty() { 180 debug!("No matches found for job: {}", job.post_uri); 181 metrics.inc_jobs_processed(); 182 return Ok(()); 183 } 184 185 // Take moderation actions for each match 186 for match_result in matches { 187 if let Err(e) = Self::take_moderation_action( 188 config, 189 agent, 190 metrics, 191 rate_limiter, 192 redis_conn, 193 &job, 194 &match_result, 195 created_by, 196 ) 197 .await 198 { 199 error!("Failed to take moderation action: {}", e); 200 // Don't retry here - worker will handle it 201 return Err(e); 202 } 203 } 204 205 debug!("Successfully processed job: {}", job.post_uri); 206 metrics.inc_jobs_processed(); 207 208 Ok(()) 209 } 210 211 /// Process all blobs in a job and return matches 212 async fn process_job_blobs( 213 config: &Config, 214 client: &Client, 215 blob_checks: &[BlobCheck], 216 metrics: &Metrics, 217 cache: &mut PhashCache, 218 job: &ImageJob, 219 ) -> Result<Vec<MatchResult>> { 220 let mut matches = Vec::new(); 221 222 for blob in &job.blobs { 223 metrics.inc_blobs_processed(); 224 225 // Check cache first 226 let cache_result = cache.get(&blob.cid).await?; 227 let phash = if let Some(cached_phash) = cache_result { 228 metrics.inc_cache_hits(); 229 cached_phash 230 } else { 231 metrics.inc_cache_misses(); 232 metrics.inc_blobs_downloaded(); 233 234 // Download and compute 235 let image_bytes = 236 matcher::download_blob(client, config, &job.post_did, &blob.cid).await?; 237 let computed_phash = crate::processor::phash::compute_phash(&image_bytes)?; 238 239 // Store in cache 240 cache.set(&blob.cid, &computed_phash).await?; 241 computed_phash 242 }; 243 244 // Check for matches 245 if let Some(match_result) = matcher::match_phash( 246 &phash, 247 blob_checks, 248 &job.post_did, 249 config.phash.default_hamming_threshold, 250 ) { 251 metrics.inc_matches_found(); 252 matches.push(match_result); 253 } 254 } 255 256 Ok(matches) 257 } 258 259 /// Take moderation action based on match result 260 async fn take_moderation_action( 261 config: &Config, 262 agent: &Arc<Agent<MemoryCredentialSession>>, 263 metrics: &Metrics, 264 rate_limiter: &RateLimiter, 265 redis_conn: &mut redis::aio::MultiplexedConnection, 266 job: &ImageJob, 267 match_result: &MatchResult, 268 created_by: &str, 269 ) -> Result<()> { 270 let check = &match_result.matched_check; 271 let created_by_did = Did::new(created_by).into_diagnostic()?; 272 273 info!( 274 "Taking moderation action for label '{}' on post: {}", 275 check.label, job.post_uri 276 ); 277 278 moderation_action!( 279 report: check.report_post, 280 "Report post", 281 claims::claim_post_report(redis_conn, &job.post_uri, &check.label).await?, 282 post::report_post( 283 agent.as_ref(), 284 config, 285 rate_limiter, 286 &job.post_uri, 287 &job.post_cid, 288 &job.post_did, 289 ReasonType::ComAtprotoModerationDefsReasonSpam, 290 &check.comment, 291 &match_result.phash, 292 &created_by_did, 293 ), 294 metrics.inc_posts_reported(), 295 metrics.inc_posts_already_reported(), 296 job.post_uri 297 ); 298 299 moderation_action!( 300 label: check.to_label, 301 "Label post", 302 claims::has_label(redis_conn, &job.post_uri, &check.label).await?, 303 post::label_post( 304 agent.as_ref(), 305 config, 306 rate_limiter, 307 &job.post_uri, 308 &job.post_cid, 309 &check.label, 310 &check.comment, 311 &match_result.phash, 312 &created_by_did, 313 ), 314 claims::set_label(redis_conn, &job.post_uri, &check.label, None), 315 metrics.inc_posts_labeled(), 316 metrics.inc_posts_already_labeled(), 317 job.post_uri 318 ); 319 320 moderation_action!( 321 report: check.report_acct, 322 "Report account", 323 claims::claim_account_report(redis_conn, &job.post_did, &check.label).await?, 324 account::report_account( 325 agent.as_ref(), 326 config, 327 rate_limiter, 328 &job.post_did, 329 ReasonType::ComAtprotoModerationDefsReasonSpam, 330 &check.comment, 331 &job.post_uri, 332 &match_result.phash, 333 &created_by_did, 334 ), 335 metrics.inc_accounts_reported(), 336 metrics.inc_accounts_already_reported(), 337 job.post_did 338 ); 339 340 moderation_action!( 341 label: check.label_acct, 342 "Label account", 343 claims::has_label(redis_conn, &job.post_did, &check.label).await?, 344 account::label_account( 345 agent.as_ref(), 346 config, 347 rate_limiter, 348 &job.post_did, 349 &check.label, 350 &check.comment, 351 &job.post_uri, 352 &match_result.phash, 353 &created_by_did, 354 ), 355 claims::set_label(redis_conn, &job.post_did, &check.label, None), 356 metrics.inc_accounts_labeled(), 357 metrics.inc_accounts_already_labeled(), 358 job.post_did 359 ); 360 361 Ok(()) 362 } 363} 364 365// Manual Clone implementation 366impl Clone for WorkerPool { 367 fn clone(&self) -> Self { 368 Self { 369 config: self.config.clone(), 370 client: self.client.clone(), 371 agent: self.agent.clone(), 372 blob_checks: self.blob_checks.clone(), 373 metrics: self.metrics.clone(), 374 rate_limiter: self.rate_limiter.clone(), 375 } 376 } 377} 378 379#[cfg(test)] 380mod tests { 381 // Note: Worker tests require integration testing with Redis 382}