A rust implementation of skywatch-phash
at main 298 lines 9.2 kB view raw
1use miette::{IntoDiagnostic, Result}; 2use reqwest::Client; 3use std::path::Path; 4use tracing::{debug, info, warn}; 5 6use crate::config::Config; 7use crate::processor::phash; 8use crate::types::{BlobCheck, BlobReference, ImageJob, MatchResult}; 9 10/// Load blob checks from a JSON file 11pub async fn load_blob_checks(path: &Path) -> Result<Vec<BlobCheck>> { 12 let contents = tokio::fs::read_to_string(path).await.into_diagnostic()?; 13 let checks: Vec<BlobCheck> = serde_json::from_str(&contents).into_diagnostic()?; 14 info!("Loaded {} blob checks from {:?}", checks.len(), path); 15 Ok(checks) 16} 17 18/// Download a blob from the Bluesky CDN, falling back to PDS if necessary 19pub async fn download_blob( 20 client: &Client, 21 config: &Config, 22 did: &str, 23 cid: &str, 24) -> Result<Vec<u8>> { 25 // Try CDN first - attempt common image formats 26 for format in ["jpeg", "png", "webp"] { 27 let cdn_url = format!( 28 "https://cdn.bsky.app/img/feed_fullsize/plain/{}/{}@{}", 29 did, cid, format 30 ); 31 32 debug!("Trying CDN download: {}", cdn_url); 33 34 match client.get(&cdn_url).send().await { 35 Ok(response) if response.status().is_success() => { 36 debug!("Successfully downloaded from CDN: did={}, cid={}", did, cid); 37 let bytes = response.bytes().await.into_diagnostic()?; 38 return Ok(bytes.to_vec()); 39 } 40 Ok(response) => { 41 debug!("CDN returned status {}, trying next format", response.status()); 42 } 43 Err(e) => { 44 debug!("CDN request failed: {}, trying next format", e); 45 } 46 } 47 } 48 49 // Fall back to PDS if CDN fails 50 warn!("CDN failed for did={}, cid={}, falling back to PDS", did, cid); 51 52 let pds_url = format!( 53 "{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}", 54 config.pds.endpoint, did, cid 55 ); 56 57 debug!("Downloading from PDS: {}", pds_url); 58 59 let response = client 60 .get(&pds_url) 61 .send() 62 .await 63 .into_diagnostic()? 64 .error_for_status() 65 .into_diagnostic()?; 66 67 let bytes = response.bytes().await.into_diagnostic()?; 68 Ok(bytes.to_vec()) 69} 70 71/// Match a computed phash against blob checks 72pub fn match_phash( 73 phash: &str, 74 blob_checks: &[BlobCheck], 75 did: &str, 76 default_threshold: u32, 77) -> Option<MatchResult> { 78 for check in blob_checks { 79 if let Some(ignore_list) = &check.ignore_did { 80 if ignore_list.iter().any(|ignored_did| ignored_did.as_str() == did) { 81 debug!("Skipping check '{}' for ignored DID: {}", check.label, did); 82 continue; 83 } 84 } 85 86 let threshold = check.hamming_threshold.unwrap_or(default_threshold); 87 88 for check_phash in &check.phashes { 89 match phash::hamming_distance(phash, check_phash.as_str()) { 90 Ok(distance) => { 91 if distance <= threshold { 92 info!( 93 "Match found! label={}, distance={}, threshold={}", 94 check.label, distance, threshold 95 ); 96 return Some(MatchResult { 97 phash: phash.to_string().into(), 98 matched_check: check.clone(), 99 matched_phash: check_phash.clone(), 100 hamming_distance: distance, 101 }); 102 } 103 } 104 Err(e) => { 105 warn!("Failed to compute hamming distance: {}", e); 106 continue; 107 } 108 } 109 } 110 } 111 112 None 113} 114 115/// Process a single blob reference 116pub async fn process_blob( 117 client: &Client, 118 config: &Config, 119 blob_checks: &[BlobCheck], 120 did: &str, 121 blob: &BlobReference, 122) -> Result<Option<MatchResult>> { 123 let image_bytes = download_blob(client, config, did, &blob.cid).await?; 124 let phash = phash::compute_phash(&image_bytes)?; 125 debug!("Computed phash for blob {}: {}", blob.cid, phash); 126 127 let match_result = match_phash(&phash, blob_checks, did, config.phash.default_hamming_threshold); 128 129 Ok(match_result) 130} 131 132/// Process an image job - check all blobs and return matches 133pub async fn process_image_job( 134 client: &Client, 135 config: &Config, 136 blob_checks: &[BlobCheck], 137 job: &ImageJob, 138) -> Result<Vec<MatchResult>> { 139 info!( 140 "Processing job: post={}, blobs={}", 141 job.post_uri, 142 job.blobs.len() 143 ); 144 145 let mut matches = Vec::new(); 146 147 for blob in &job.blobs { 148 match process_blob(client, config, blob_checks, &job.post_did, blob).await { 149 Ok(Some(result)) => { 150 matches.push(result); 151 } 152 Ok(None) => { 153 debug!("No match for blob: {}", blob.cid); 154 } 155 Err(e) => { 156 warn!("Error processing blob {}: {}", blob.cid, e); 157 // Continue processing other blobs 158 } 159 } 160 } 161 162 if !matches.is_empty() { 163 info!( 164 "Found {} match(es) for post: {}", 165 matches.len(), 166 job.post_uri 167 ); 168 } 169 170 Ok(matches) 171} 172 173#[cfg(test)] 174mod tests { 175 use super::*; 176 177 #[test] 178 fn test_match_phash_exact() { 179 180 181 let checks = vec![BlobCheck { 182 phashes: vec!["deadbeefdeadbeef".to_string().into()], 183 label: "test-label".to_string().into(), 184 comment: "Test".to_string().into(), 185 report_acct: false, 186 label_acct: false, 187 report_post: true, 188 to_label: true, 189 takedown_post: false, 190 takedown_acct: false, 191 hamming_threshold: Some(3), 192 description: None, 193 ignore_did: None, 194 }]; 195 196 let result = match_phash("deadbeefdeadbeef", &checks, "did:plc:test", 3); 197 assert!(result.is_some()); 198 assert_eq!(result.unwrap().hamming_distance, 0); 199 } 200 201 #[test] 202 fn test_match_phash_within_threshold() { 203 204 205 let checks = vec![BlobCheck { 206 phashes: vec!["deadbeefdeadbeef".to_string().into()], 207 label: "test-label".to_string().into(), 208 comment: "Test".to_string().into(), 209 report_acct: false, 210 label_acct: false, 211 report_post: true, 212 to_label: true, 213 takedown_post: false, 214 takedown_acct: false, 215 hamming_threshold: Some(3), 216 description: None, 217 ignore_did: None, 218 }]; 219 220 // deadbeefdeadbeef vs deadbeefdeadbeee = 1 bit difference in last nibble 221 let result = match_phash("deadbeefdeadbeee", &checks, "did:plc:test", 3); 222 assert!(result.is_some()); 223 assert_eq!(result.unwrap().hamming_distance, 1); 224 } 225 226 #[test] 227 fn test_match_phash_exceeds_threshold() { 228 229 230 let checks = vec![BlobCheck { 231 phashes: vec!["deadbeefdeadbeef".to_string().into()], 232 label: "test-label".to_string().into(), 233 comment: "Test".to_string().into(), 234 report_acct: false, 235 label_acct: false, 236 report_post: true, 237 to_label: true, 238 takedown_post: false, 239 takedown_acct: false, 240 hamming_threshold: Some(1), 241 description: None, 242 ignore_did: None, 243 }]; 244 245 // More than 1 bit difference 246 let result = match_phash("deadbeefdeadbee0", &checks, "did:plc:test", 1); 247 assert!(result.is_none()); 248 } 249 250 #[test] 251 fn test_match_phash_ignored_did() { 252 use jacquard_common::types::string::Did; 253 use jacquard_common::IntoStatic; 254 255 let checks = vec![BlobCheck { 256 phashes: vec!["deadbeefdeadbeef".to_string().into()], 257 label: "test-label".to_string().into(), 258 comment: "Test".to_string().into(), 259 report_acct: false, 260 label_acct: false, 261 report_post: true, 262 to_label: true, 263 takedown_post: false, 264 takedown_acct: false, 265 hamming_threshold: Some(3), 266 description: None, 267 ignore_did: Some(vec![Did::new("did:plc:ignored").unwrap().into_static()]), 268 }]; 269 270 let result = match_phash("deadbeefdeadbeef", &checks, "did:plc:ignored", 3); 271 assert!(result.is_none()); 272 } 273 274 #[tokio::test] 275 async fn test_load_real_rules() { 276 let path = std::path::Path::new("rules/blobs.json"); 277 if !path.exists() { 278 // Skip test if rules file doesn't exist 279 return; 280 } 281 282 let result = load_blob_checks(path).await; 283 assert!(result.is_ok()); 284 285 let checks = result.unwrap(); 286 assert!(!checks.is_empty()); 287 288 // Verify first check has expected fields 289 let first = &checks[0]; 290 assert!(!first.phashes.is_empty()); 291 assert!(!first.label.is_empty()); 292 293 // Check that ignoreDID alias works 294 if let Some(ignore_list) = &first.ignore_did { 295 assert!(!ignore_list.is_empty()); 296 } 297 } 298}