use miette::{IntoDiagnostic, Result}; use reqwest::Client; use std::path::Path; use tracing::{debug, info, warn}; use crate::config::Config; use crate::processor::phash; use crate::types::{BlobCheck, BlobReference, ImageJob, MatchResult}; /// Load blob checks from a JSON file pub async fn load_blob_checks(path: &Path) -> Result> { let contents = tokio::fs::read_to_string(path).await.into_diagnostic()?; let checks: Vec = serde_json::from_str(&contents).into_diagnostic()?; info!("Loaded {} blob checks from {:?}", checks.len(), path); Ok(checks) } /// Download a blob from the Bluesky CDN, falling back to PDS if necessary pub async fn download_blob( client: &Client, config: &Config, did: &str, cid: &str, ) -> Result> { // Try CDN first - attempt common image formats for format in ["jpeg", "png", "webp"] { let cdn_url = format!( "https://cdn.bsky.app/img/feed_fullsize/plain/{}/{}@{}", did, cid, format ); debug!("Trying CDN download: {}", cdn_url); match client.get(&cdn_url).send().await { Ok(response) if response.status().is_success() => { debug!("Successfully downloaded from CDN: did={}, cid={}", did, cid); let bytes = response.bytes().await.into_diagnostic()?; return Ok(bytes.to_vec()); } Ok(response) => { debug!("CDN returned status {}, trying next format", response.status()); } Err(e) => { debug!("CDN request failed: {}, trying next format", e); } } } // Fall back to PDS if CDN fails warn!("CDN failed for did={}, cid={}, falling back to PDS", did, cid); let pds_url = format!( "{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}", config.pds.endpoint, did, cid ); debug!("Downloading from PDS: {}", pds_url); let response = client .get(&pds_url) .send() .await .into_diagnostic()? .error_for_status() .into_diagnostic()?; let bytes = response.bytes().await.into_diagnostic()?; Ok(bytes.to_vec()) } /// Match a computed phash against blob checks pub fn match_phash( phash: &str, blob_checks: &[BlobCheck], did: &str, default_threshold: u32, ) -> Option { for check in blob_checks { if let Some(ignore_list) = &check.ignore_did { if ignore_list.iter().any(|ignored_did| ignored_did.as_str() == did) { debug!("Skipping check '{}' for ignored DID: {}", check.label, did); continue; } } let threshold = check.hamming_threshold.unwrap_or(default_threshold); for check_phash in &check.phashes { match phash::hamming_distance(phash, check_phash.as_str()) { Ok(distance) => { if distance <= threshold { info!( "Match found! label={}, distance={}, threshold={}", check.label, distance, threshold ); return Some(MatchResult { phash: phash.to_string().into(), matched_check: check.clone(), matched_phash: check_phash.clone(), hamming_distance: distance, }); } } Err(e) => { warn!("Failed to compute hamming distance: {}", e); continue; } } } } None } /// Process a single blob reference pub async fn process_blob( client: &Client, config: &Config, blob_checks: &[BlobCheck], did: &str, blob: &BlobReference, ) -> Result> { let image_bytes = download_blob(client, config, did, &blob.cid).await?; let phash = phash::compute_phash(&image_bytes)?; debug!("Computed phash for blob {}: {}", blob.cid, phash); let match_result = match_phash(&phash, blob_checks, did, config.phash.default_hamming_threshold); Ok(match_result) } /// Process an image job - check all blobs and return matches pub async fn process_image_job( client: &Client, config: &Config, blob_checks: &[BlobCheck], job: &ImageJob, ) -> Result> { info!( "Processing job: post={}, blobs={}", job.post_uri, job.blobs.len() ); let mut matches = Vec::new(); for blob in &job.blobs { match process_blob(client, config, blob_checks, &job.post_did, blob).await { Ok(Some(result)) => { matches.push(result); } Ok(None) => { debug!("No match for blob: {}", blob.cid); } Err(e) => { warn!("Error processing blob {}: {}", blob.cid, e); // Continue processing other blobs } } } if !matches.is_empty() { info!( "Found {} match(es) for post: {}", matches.len(), job.post_uri ); } Ok(matches) } #[cfg(test)] mod tests { use super::*; #[test] fn test_match_phash_exact() { let checks = vec![BlobCheck { phashes: vec!["deadbeefdeadbeef".to_string().into()], label: "test-label".to_string().into(), comment: "Test".to_string().into(), report_acct: false, label_acct: false, report_post: true, to_label: true, takedown_post: false, takedown_acct: false, hamming_threshold: Some(3), description: None, ignore_did: None, }]; let result = match_phash("deadbeefdeadbeef", &checks, "did:plc:test", 3); assert!(result.is_some()); assert_eq!(result.unwrap().hamming_distance, 0); } #[test] fn test_match_phash_within_threshold() { let checks = vec![BlobCheck { phashes: vec!["deadbeefdeadbeef".to_string().into()], label: "test-label".to_string().into(), comment: "Test".to_string().into(), report_acct: false, label_acct: false, report_post: true, to_label: true, takedown_post: false, takedown_acct: false, hamming_threshold: Some(3), description: None, ignore_did: None, }]; // deadbeefdeadbeef vs deadbeefdeadbeee = 1 bit difference in last nibble let result = match_phash("deadbeefdeadbeee", &checks, "did:plc:test", 3); assert!(result.is_some()); assert_eq!(result.unwrap().hamming_distance, 1); } #[test] fn test_match_phash_exceeds_threshold() { let checks = vec![BlobCheck { phashes: vec!["deadbeefdeadbeef".to_string().into()], label: "test-label".to_string().into(), comment: "Test".to_string().into(), report_acct: false, label_acct: false, report_post: true, to_label: true, takedown_post: false, takedown_acct: false, hamming_threshold: Some(1), description: None, ignore_did: None, }]; // More than 1 bit difference let result = match_phash("deadbeefdeadbee0", &checks, "did:plc:test", 1); assert!(result.is_none()); } #[test] fn test_match_phash_ignored_did() { use jacquard_common::types::string::Did; use jacquard_common::IntoStatic; let checks = vec![BlobCheck { phashes: vec!["deadbeefdeadbeef".to_string().into()], label: "test-label".to_string().into(), comment: "Test".to_string().into(), report_acct: false, label_acct: false, report_post: true, to_label: true, takedown_post: false, takedown_acct: false, hamming_threshold: Some(3), description: None, ignore_did: Some(vec![Did::new("did:plc:ignored").unwrap().into_static()]), }]; let result = match_phash("deadbeefdeadbeef", &checks, "did:plc:ignored", 3); assert!(result.is_none()); } #[tokio::test] async fn test_load_real_rules() { let path = std::path::Path::new("rules/blobs.json"); if !path.exists() { // Skip test if rules file doesn't exist return; } let result = load_blob_checks(path).await; assert!(result.is_ok()); let checks = result.unwrap(); assert!(!checks.is_empty()); // Verify first check has expected fields let first = &checks[0]; assert!(!first.phashes.is_empty()); assert!(!first.label.is_empty()); // Check that ignoreDID alias works if let Some(ignore_list) = &first.ignore_did { assert!(!ignore_list.is_empty()); } } }