A rust implementation of skywatch-phash
1use jacquard::client::{Agent, MemoryCredentialSession};
2use jacquard_api::com_atproto::moderation::ReasonType;
3use jacquard_common::types::string::Did;
4use miette::{IntoDiagnostic, Result};
5use reqwest::Client;
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::time::sleep;
9use tracing::{debug, error, info};
10
11use crate::agent::AgentSession;
12use crate::cache::PhashCache;
13use crate::config::Config;
14use crate::metrics::Metrics;
15use crate::moderation::{account, claims, post, rate_limiter::RateLimiter};
16use crate::processor::matcher;
17use crate::queue::redis_queue::JobQueue;
18use crate::types::{BlobCheck, ImageJob, MatchResult};
19
20/// Macro to handle moderation actions with claim checking
21macro_rules! moderation_action {
22 // Report pattern: claim_X_report -> action
23 (report: $check_field:expr, $action_name:expr, $claim_fn:expr, $action:expr, $metrics_done:expr, $metrics_skip:expr, $subject:expr) => {
24 if $check_field {
25 if $claim_fn {
26 $action.await?;
27 $metrics_done;
28 info!(concat!($action_name, " completed for: {}"), $subject);
29 } else {
30 $metrics_skip;
31 info!(
32 concat!($action_name, " already done, skipping: {}"),
33 $subject
34 );
35 }
36 }
37 };
38
39 // Label pattern: !has_label -> action -> set_label
40 (label: $check_field:expr, $action_name:expr, $has_label_fn:expr, $action:expr, $set_label_fn:expr, $metrics_done:expr, $metrics_skip:expr, $subject:expr) => {
41 if $check_field {
42 if !$has_label_fn {
43 $action.await?;
44 $metrics_done;
45 $set_label_fn.await?;
46 info!(concat!($action_name, " completed for: {}"), $subject);
47 } else {
48 $metrics_skip;
49 info!(
50 concat!($action_name, " already done, skipping: {}"),
51 $subject
52 );
53 }
54 }
55 };
56}
57
58/// Worker pool for processing image jobs
59pub struct WorkerPool {
60 config: Config,
61 client: Client,
62 agent: AgentSession,
63 blob_checks: Vec<BlobCheck>,
64 metrics: Metrics,
65 rate_limiter: RateLimiter,
66}
67
68impl WorkerPool {
69 /// Create a new worker pool
70 pub fn new(
71 config: Config,
72 client: Client,
73 agent: AgentSession,
74 blob_checks: Vec<BlobCheck>,
75 metrics: Metrics,
76 ) -> Self {
77 let rate_limiter = RateLimiter::new(config.moderation.rate_limit);
78
79 Self {
80 config,
81 client,
82 agent,
83 blob_checks,
84 metrics,
85 rate_limiter,
86 }
87 }
88
89 /// Start the worker pool - processes jobs sequentially
90 /// Concurrency is achieved by running multiple instances of this concurrently
91 pub async fn start(
92 &self,
93 mut queue: JobQueue,
94 mut cache: PhashCache,
95 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
96 ) -> Result<()> {
97 loop {
98 tokio::select! {
99 _ = shutdown_rx.recv() => {
100 info!("Worker shutting down");
101 break;
102 }
103
104 job_result = queue.pop(1) => {
105 match job_result {
106 Ok(Some(job)) => {
107 debug!("Worker popped job from queue: {}", job.post_uri);
108 let redis_client = match redis::Client::open(self.config.redis.url.as_str()) {
109 Ok(c) => c,
110 Err(e) => {
111 error!("Failed to create Redis client: {}", e);
112 continue;
113 }
114 };
115
116 let mut redis_conn = match redis_client.get_multiplexed_async_connection().await {
117 Ok(conn) => conn,
118 Err(e) => {
119 error!("Failed to connect to Redis: {}", e);
120 continue;
121 }
122 };
123
124 let job_clone = job.clone();
125 if let Err(e) = Self::process_job(
126 &self.config,
127 &self.client,
128 self.agent.agent(),
129 &self.blob_checks,
130 &self.metrics,
131 &self.rate_limiter,
132 &mut cache,
133 &mut redis_conn,
134 job,
135 self.agent.did(),
136 )
137 .await
138 {
139 error!("Worker task failed: {}", e);
140 self.metrics.inc_jobs_failed();
141 if let Err(retry_err) = queue.retry(job_clone).await {
142 error!("Failed to retry job: {}", retry_err);
143 }
144 }
145 }
146 Ok(None) => {
147 // Timeout, continue loop
148 }
149 Err(e) => {
150 error!("Error popping job from queue: {}", e);
151 sleep(Duration::from_millis(self.config.processing.retry_delay)).await;
152 }
153 }
154 }
155 }
156 }
157
158 Ok(())
159 }
160
161 /// Process a single job
162 async fn process_job(
163 config: &Config,
164 client: &Client,
165 agent: &Arc<Agent<MemoryCredentialSession>>,
166 blob_checks: &[BlobCheck],
167 metrics: &Metrics,
168 rate_limiter: &RateLimiter,
169 cache: &mut PhashCache,
170 redis_conn: &mut redis::aio::MultiplexedConnection,
171 job: ImageJob,
172 created_by: &str,
173 ) -> Result<()> {
174 debug!("Processing job: {}", job.post_uri);
175
176 let matches =
177 Self::process_job_blobs(config, client, blob_checks, metrics, cache, &job).await?;
178
179 if matches.is_empty() {
180 debug!("No matches found for job: {}", job.post_uri);
181 metrics.inc_jobs_processed();
182 return Ok(());
183 }
184
185 // Take moderation actions for each match
186 for match_result in matches {
187 if let Err(e) = Self::take_moderation_action(
188 config,
189 agent,
190 metrics,
191 rate_limiter,
192 redis_conn,
193 &job,
194 &match_result,
195 created_by,
196 )
197 .await
198 {
199 error!("Failed to take moderation action: {}", e);
200 // Don't retry here - worker will handle it
201 return Err(e);
202 }
203 }
204
205 debug!("Successfully processed job: {}", job.post_uri);
206 metrics.inc_jobs_processed();
207
208 Ok(())
209 }
210
211 /// Process all blobs in a job and return matches
212 async fn process_job_blobs(
213 config: &Config,
214 client: &Client,
215 blob_checks: &[BlobCheck],
216 metrics: &Metrics,
217 cache: &mut PhashCache,
218 job: &ImageJob,
219 ) -> Result<Vec<MatchResult>> {
220 let mut matches = Vec::new();
221
222 for blob in &job.blobs {
223 metrics.inc_blobs_processed();
224
225 // Check cache first
226 let cache_result = cache.get(&blob.cid).await?;
227 let phash = if let Some(cached_phash) = cache_result {
228 metrics.inc_cache_hits();
229 cached_phash
230 } else {
231 metrics.inc_cache_misses();
232 metrics.inc_blobs_downloaded();
233
234 // Download and compute
235 let image_bytes =
236 matcher::download_blob(client, config, &job.post_did, &blob.cid).await?;
237 let computed_phash = crate::processor::phash::compute_phash(&image_bytes)?;
238
239 // Store in cache
240 cache.set(&blob.cid, &computed_phash).await?;
241 computed_phash
242 };
243
244 // Check for matches
245 if let Some(match_result) = matcher::match_phash(
246 &phash,
247 blob_checks,
248 &job.post_did,
249 config.phash.default_hamming_threshold,
250 ) {
251 metrics.inc_matches_found();
252 matches.push(match_result);
253 }
254 }
255
256 Ok(matches)
257 }
258
259 /// Take moderation action based on match result
260 async fn take_moderation_action(
261 config: &Config,
262 agent: &Arc<Agent<MemoryCredentialSession>>,
263 metrics: &Metrics,
264 rate_limiter: &RateLimiter,
265 redis_conn: &mut redis::aio::MultiplexedConnection,
266 job: &ImageJob,
267 match_result: &MatchResult,
268 created_by: &str,
269 ) -> Result<()> {
270 let check = &match_result.matched_check;
271 let created_by_did = Did::new(created_by).into_diagnostic()?;
272
273 info!(
274 "Taking moderation action for label '{}' on post: {}",
275 check.label, job.post_uri
276 );
277
278 moderation_action!(
279 report: check.report_post,
280 "Report post",
281 claims::claim_post_report(redis_conn, &job.post_uri, &check.label).await?,
282 post::report_post(
283 agent.as_ref(),
284 config,
285 rate_limiter,
286 &job.post_uri,
287 &job.post_cid,
288 &job.post_did,
289 ReasonType::ComAtprotoModerationDefsReasonSpam,
290 &check.comment,
291 &match_result.phash,
292 &created_by_did,
293 ),
294 metrics.inc_posts_reported(),
295 metrics.inc_posts_already_reported(),
296 job.post_uri
297 );
298
299 moderation_action!(
300 label: check.to_label,
301 "Label post",
302 claims::has_label(redis_conn, &job.post_uri, &check.label).await?,
303 post::label_post(
304 agent.as_ref(),
305 config,
306 rate_limiter,
307 &job.post_uri,
308 &job.post_cid,
309 &check.label,
310 &check.comment,
311 &match_result.phash,
312 &created_by_did,
313 ),
314 claims::set_label(redis_conn, &job.post_uri, &check.label, None),
315 metrics.inc_posts_labeled(),
316 metrics.inc_posts_already_labeled(),
317 job.post_uri
318 );
319
320 moderation_action!(
321 report: check.report_acct,
322 "Report account",
323 claims::claim_account_report(redis_conn, &job.post_did, &check.label).await?,
324 account::report_account(
325 agent.as_ref(),
326 config,
327 rate_limiter,
328 &job.post_did,
329 ReasonType::ComAtprotoModerationDefsReasonSpam,
330 &check.comment,
331 &job.post_uri,
332 &match_result.phash,
333 &created_by_did,
334 ),
335 metrics.inc_accounts_reported(),
336 metrics.inc_accounts_already_reported(),
337 job.post_did
338 );
339
340 moderation_action!(
341 label: check.label_acct,
342 "Label account",
343 claims::has_label(redis_conn, &job.post_did, &check.label).await?,
344 account::label_account(
345 agent.as_ref(),
346 config,
347 rate_limiter,
348 &job.post_did,
349 &check.label,
350 &check.comment,
351 &job.post_uri,
352 &match_result.phash,
353 &created_by_did,
354 ),
355 claims::set_label(redis_conn, &job.post_did, &check.label, None),
356 metrics.inc_accounts_labeled(),
357 metrics.inc_accounts_already_labeled(),
358 job.post_did
359 );
360
361 Ok(())
362 }
363}
364
365// Manual Clone implementation
366impl Clone for WorkerPool {
367 fn clone(&self) -> Self {
368 Self {
369 config: self.config.clone(),
370 client: self.client.clone(),
371 agent: self.agent.clone(),
372 blob_checks: self.blob_checks.clone(),
373 metrics: self.metrics.clone(),
374 rate_limiter: self.rate_limiter.clone(),
375 }
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 // Note: Worker tests require integration testing with Redis
382}