A rust implementation of skywatch-phash
1use miette::{IntoDiagnostic, Result};
2use reqwest::Client;
3use std::path::Path;
4use tracing::{debug, info, warn};
5
6use crate::config::Config;
7use crate::processor::phash;
8use crate::types::{BlobCheck, BlobReference, ImageJob, MatchResult};
9
10/// Load blob checks from a JSON file
11pub async fn load_blob_checks(path: &Path) -> Result<Vec<BlobCheck>> {
12 let contents = tokio::fs::read_to_string(path).await.into_diagnostic()?;
13 let checks: Vec<BlobCheck> = serde_json::from_str(&contents).into_diagnostic()?;
14 info!("Loaded {} blob checks from {:?}", checks.len(), path);
15 Ok(checks)
16}
17
18/// Download a blob from the Bluesky CDN, falling back to PDS if necessary
19pub async fn download_blob(
20 client: &Client,
21 config: &Config,
22 did: &str,
23 cid: &str,
24) -> Result<Vec<u8>> {
25 // Try CDN first - attempt common image formats
26 for format in ["jpeg", "png", "webp"] {
27 let cdn_url = format!(
28 "https://cdn.bsky.app/img/feed_fullsize/plain/{}/{}@{}",
29 did, cid, format
30 );
31
32 debug!("Trying CDN download: {}", cdn_url);
33
34 match client.get(&cdn_url).send().await {
35 Ok(response) if response.status().is_success() => {
36 debug!("Successfully downloaded from CDN: did={}, cid={}", did, cid);
37 let bytes = response.bytes().await.into_diagnostic()?;
38 return Ok(bytes.to_vec());
39 }
40 Ok(response) => {
41 debug!("CDN returned status {}, trying next format", response.status());
42 }
43 Err(e) => {
44 debug!("CDN request failed: {}, trying next format", e);
45 }
46 }
47 }
48
49 // Fall back to PDS if CDN fails
50 warn!("CDN failed for did={}, cid={}, falling back to PDS", did, cid);
51
52 let pds_url = format!(
53 "{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}",
54 config.pds.endpoint, did, cid
55 );
56
57 debug!("Downloading from PDS: {}", pds_url);
58
59 let response = client
60 .get(&pds_url)
61 .send()
62 .await
63 .into_diagnostic()?
64 .error_for_status()
65 .into_diagnostic()?;
66
67 let bytes = response.bytes().await.into_diagnostic()?;
68 Ok(bytes.to_vec())
69}
70
71/// Match a computed phash against blob checks
72pub fn match_phash(
73 phash: &str,
74 blob_checks: &[BlobCheck],
75 did: &str,
76 default_threshold: u32,
77) -> Option<MatchResult> {
78 for check in blob_checks {
79 if let Some(ignore_list) = &check.ignore_did {
80 if ignore_list.iter().any(|ignored_did| ignored_did.as_str() == did) {
81 debug!("Skipping check '{}' for ignored DID: {}", check.label, did);
82 continue;
83 }
84 }
85
86 let threshold = check.hamming_threshold.unwrap_or(default_threshold);
87
88 for check_phash in &check.phashes {
89 match phash::hamming_distance(phash, check_phash.as_str()) {
90 Ok(distance) => {
91 if distance <= threshold {
92 info!(
93 "Match found! label={}, distance={}, threshold={}",
94 check.label, distance, threshold
95 );
96 return Some(MatchResult {
97 phash: phash.to_string().into(),
98 matched_check: check.clone(),
99 matched_phash: check_phash.clone(),
100 hamming_distance: distance,
101 });
102 }
103 }
104 Err(e) => {
105 warn!("Failed to compute hamming distance: {}", e);
106 continue;
107 }
108 }
109 }
110 }
111
112 None
113}
114
115/// Process a single blob reference
116pub async fn process_blob(
117 client: &Client,
118 config: &Config,
119 blob_checks: &[BlobCheck],
120 did: &str,
121 blob: &BlobReference,
122) -> Result<Option<MatchResult>> {
123 let image_bytes = download_blob(client, config, did, &blob.cid).await?;
124 let phash = phash::compute_phash(&image_bytes)?;
125 debug!("Computed phash for blob {}: {}", blob.cid, phash);
126
127 let match_result = match_phash(&phash, blob_checks, did, config.phash.default_hamming_threshold);
128
129 Ok(match_result)
130}
131
132/// Process an image job - check all blobs and return matches
133pub async fn process_image_job(
134 client: &Client,
135 config: &Config,
136 blob_checks: &[BlobCheck],
137 job: &ImageJob,
138) -> Result<Vec<MatchResult>> {
139 info!(
140 "Processing job: post={}, blobs={}",
141 job.post_uri,
142 job.blobs.len()
143 );
144
145 let mut matches = Vec::new();
146
147 for blob in &job.blobs {
148 match process_blob(client, config, blob_checks, &job.post_did, blob).await {
149 Ok(Some(result)) => {
150 matches.push(result);
151 }
152 Ok(None) => {
153 debug!("No match for blob: {}", blob.cid);
154 }
155 Err(e) => {
156 warn!("Error processing blob {}: {}", blob.cid, e);
157 // Continue processing other blobs
158 }
159 }
160 }
161
162 if !matches.is_empty() {
163 info!(
164 "Found {} match(es) for post: {}",
165 matches.len(),
166 job.post_uri
167 );
168 }
169
170 Ok(matches)
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176
177 #[test]
178 fn test_match_phash_exact() {
179
180
181 let checks = vec![BlobCheck {
182 phashes: vec!["deadbeefdeadbeef".to_string().into()],
183 label: "test-label".to_string().into(),
184 comment: "Test".to_string().into(),
185 report_acct: false,
186 label_acct: false,
187 report_post: true,
188 to_label: true,
189 takedown_post: false,
190 takedown_acct: false,
191 hamming_threshold: Some(3),
192 description: None,
193 ignore_did: None,
194 }];
195
196 let result = match_phash("deadbeefdeadbeef", &checks, "did:plc:test", 3);
197 assert!(result.is_some());
198 assert_eq!(result.unwrap().hamming_distance, 0);
199 }
200
201 #[test]
202 fn test_match_phash_within_threshold() {
203
204
205 let checks = vec![BlobCheck {
206 phashes: vec!["deadbeefdeadbeef".to_string().into()],
207 label: "test-label".to_string().into(),
208 comment: "Test".to_string().into(),
209 report_acct: false,
210 label_acct: false,
211 report_post: true,
212 to_label: true,
213 takedown_post: false,
214 takedown_acct: false,
215 hamming_threshold: Some(3),
216 description: None,
217 ignore_did: None,
218 }];
219
220 // deadbeefdeadbeef vs deadbeefdeadbeee = 1 bit difference in last nibble
221 let result = match_phash("deadbeefdeadbeee", &checks, "did:plc:test", 3);
222 assert!(result.is_some());
223 assert_eq!(result.unwrap().hamming_distance, 1);
224 }
225
226 #[test]
227 fn test_match_phash_exceeds_threshold() {
228
229
230 let checks = vec![BlobCheck {
231 phashes: vec!["deadbeefdeadbeef".to_string().into()],
232 label: "test-label".to_string().into(),
233 comment: "Test".to_string().into(),
234 report_acct: false,
235 label_acct: false,
236 report_post: true,
237 to_label: true,
238 takedown_post: false,
239 takedown_acct: false,
240 hamming_threshold: Some(1),
241 description: None,
242 ignore_did: None,
243 }];
244
245 // More than 1 bit difference
246 let result = match_phash("deadbeefdeadbee0", &checks, "did:plc:test", 1);
247 assert!(result.is_none());
248 }
249
250 #[test]
251 fn test_match_phash_ignored_did() {
252 use jacquard_common::types::string::Did;
253 use jacquard_common::IntoStatic;
254
255 let checks = vec![BlobCheck {
256 phashes: vec!["deadbeefdeadbeef".to_string().into()],
257 label: "test-label".to_string().into(),
258 comment: "Test".to_string().into(),
259 report_acct: false,
260 label_acct: false,
261 report_post: true,
262 to_label: true,
263 takedown_post: false,
264 takedown_acct: false,
265 hamming_threshold: Some(3),
266 description: None,
267 ignore_did: Some(vec![Did::new("did:plc:ignored").unwrap().into_static()]),
268 }];
269
270 let result = match_phash("deadbeefdeadbeef", &checks, "did:plc:ignored", 3);
271 assert!(result.is_none());
272 }
273
274 #[tokio::test]
275 async fn test_load_real_rules() {
276 let path = std::path::Path::new("rules/blobs.json");
277 if !path.exists() {
278 // Skip test if rules file doesn't exist
279 return;
280 }
281
282 let result = load_blob_checks(path).await;
283 assert!(result.is_ok());
284
285 let checks = result.unwrap();
286 assert!(!checks.is_empty());
287
288 // Verify first check has expected fields
289 let first = &checks[0];
290 assert!(!first.phashes.is_empty());
291 assert!(!first.label.is_empty());
292
293 // Check that ignoreDID alias works
294 if let Some(ignore_list) = &first.ignore_did {
295 assert!(!ignore_list.is_empty());
296 }
297 }
298}