//! Badge processing logic with signature validation and image handling. //! //! Processes badge award events by validating cryptographic signatures, //! downloading and processing badge images, and storing validated awards. use std::str::FromStr; use std::sync::Arc; use crate::errors::{Result, ShowcaseError}; use atproto_client::com::atproto::repo::{get_blob, get_record}; use atproto_identity::{ key::{KeyData, identify_key, validate}, model::Document, resolve::IdentityResolver, storage::DidDocumentStorage, }; use atproto_record::aturi::ATURI; use chrono::Utc; use image::{GenericImageView, ImageFormat}; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use tracing::{error, info, warn}; use crate::{ config::Config, consumer::{AwardEvent, BadgeEventReceiver}, storage::{Award, Badge, FileStorage, Storage}, }; /// Badge award record structure from AT Protocol #[derive(Debug, Deserialize, Serialize)] struct AwardRecord { pub did: String, pub badge: StrongRef, pub issued: String, pub signatures: Vec, } /// Strong reference to another record #[derive(Debug, Deserialize, Serialize)] struct StrongRef { #[serde(rename = "$type")] pub type_: String, pub uri: String, pub cid: String, } /// Signature from badge issuer #[derive(Debug, Deserialize, Serialize)] struct Signature { pub issuer: String, #[serde(rename = "issuedAt")] pub issued_at: String, pub signature: String, } /// Badge definition record #[derive(Debug, Deserialize, Serialize)] struct BadgeRecord { pub name: String, pub description: String, pub image: Option, } /// Blob reference for badge images #[derive(Debug, Deserialize, Serialize)] struct BlobRef { #[serde(rename = "$type")] pub type_: String, #[serde(rename = "ref")] pub ref_: Option, #[serde(rename = "mimeType")] pub mime_type: String, pub size: u64, } /// Link reference in blob #[derive(Debug, Deserialize, Serialize)] struct LinkRef { #[serde(rename = "$link")] pub link: String, } impl BlobRef { fn get_ref(&self) -> Option { self.ref_.as_ref().map(|r| r.link.clone()) } } /// Background processor for badge events pub struct BadgeProcessor { storage: Arc, config: Arc, identity_resolver: IdentityResolver, document_storage: Arc, http_client: reqwest::Client, file_storage: Arc, } impl BadgeProcessor { /// Create a new badge processor with the required dependencies. pub fn new( storage: Arc, config: Arc, identity_resolver: IdentityResolver, document_storage: Arc, http_client: reqwest::Client, file_storage: Arc, ) -> Self { Self { storage, config, identity_resolver, document_storage, http_client, file_storage, } } /// Start processing badge events from the queue pub async fn start_processing(&self, mut event_receiver: BadgeEventReceiver) -> Result<()> { info!("Badge processor started"); while let Some(event) = event_receiver.recv().await { match &event { AwardEvent::Commit { did, cid, record, rkey, .. } => { if let Err(e) = self.handle_commit(did, rkey, cid, record).await { error!( "error-showcase-process-6 Failed to process create event: {}", e ); } } AwardEvent::Delete { did, rkey, .. } => { if let Err(e) = self.handle_delete(did, rkey).await { error!( "error-showcase-process-8 Failed to process delete event: {}", e ); } } } } info!("Badge processor finished"); Ok(()) } async fn handle_commit(&self, did: &str, rkey: &str, cid: &str, record: &Value) -> Result<()> { info!("Processing award: {} for {}", rkey, did); let aturi = format!("at://{did}/community.lexicon.badge.award/{rkey}"); // Parse the award record let award_record: AwardRecord = serde_json::from_value(record.clone())?; tracing::debug!(?award_record, "processing award"); let badge_from_issuer = self.config.badge_issuers.iter().any(|value| { award_record .badge .uri .starts_with(&format!("at://{value}/")) }); if !badge_from_issuer { return Ok(()); } // Ensure identity is stored let document = self.ensure_identity_stored(did).await?; // Get or create badge let badge = self.get_or_create_badge(&award_record.badge).await?; let badge_aturi = ATURI::from_str(&award_record.badge.uri)?; let badge_isser_document = self.ensure_identity_stored(&badge_aturi.authority).await?; let issuer_pds_endpoint = { badge_isser_document .pds_endpoints() .first() .ok_or_else(|| ShowcaseError::ProcessIdentityResolutionFailed { did: did.to_string(), details: "No PDS endpoint found in DID document".to_string(), }) .cloned()? }; self.download_badge_image(issuer_pds_endpoint, &badge_isser_document.id, &badge) .await?; // Validate signatures let validated_issuers = self .validate_signatures(record, &document.id, "community.lexicon.badge.award") .await?; // Create award record let award = Award { aturi: aturi.to_string(), cid: cid.to_string(), did: document.id.clone(), badge: award_record.badge.uri.clone(), badge_cid: award_record.badge.cid.clone(), badge_name: badge.name, validated_issuers: serde_json::to_value(&validated_issuers)?, created_at: Utc::now(), updated_at: Utc::now(), record: record.clone(), }; // Store award let is_new = self.storage.upsert_award(&award).await?; // Update badge count if this is a new award if is_new { self.storage .increment_badge_count(&award.badge, &award.badge_cid) .await?; } // Trim awards for this DID to max 100 self.storage.trim_awards_for_did(did, 100).await?; info!("Successfully processed award: {}", aturi); Ok(()) } async fn handle_delete(&self, did: &str, rkey: &str) -> Result<()> { let aturi = format!("at://{did}/community.lexicon.badge.award/{rkey}"); if let Some(award) = self.storage.delete_award(&aturi).await? { // Decrement badge count self.storage .decrement_badge_count(&award.badge, &award.badge_cid) .await?; info!("Successfully deleted award: {}", aturi); } Ok(()) } async fn ensure_identity_stored(&self, did: &str) -> Result { // Check if we already have this identity if let Some(document) = self.document_storage.get_document_by_did(did).await? { return Ok(document); } let document = self.identity_resolver.resolve(did).await?; self.document_storage .store_document(document.clone()) .await?; Ok(document) } async fn get_or_create_badge(&self, badge_ref: &StrongRef) -> Result { // Check if we already have this badge if let Some(existing) = self .storage .get_badge(&badge_ref.uri, &badge_ref.cid) .await? { let badge_record = serde_json::from_value::(existing.record.clone())?; return Ok(badge_record); } // Parse AT-URI to get DID and record key let parts: Vec<&str> = badge_ref .uri .strip_prefix("at://") .unwrap_or(&badge_ref.uri) .split('/') .collect(); if parts.len() != 3 { return Err(ShowcaseError::ProcessInvalidAturi { uri: badge_ref.uri.clone(), }); } let repo = parts[0]; let rkey = parts[2]; // Get the DID document to find PDS endpoint let document = self.ensure_identity_stored(repo).await?; let pds_endpoints = document.pds_endpoints(); let pds_endpoint = pds_endpoints .first() .ok_or_else(|| ShowcaseError::ProcessBadgeFetchFailed { uri: format!("No PDS endpoint found for DID: {}", repo), })?; let badge_record = self .fetch_badge_record(pds_endpoint, repo, rkey, &badge_ref.cid) .await?; let badge = Badge { aturi: badge_ref.uri.clone(), cid: badge_ref.cid.clone(), name: badge_record.name.clone(), image: badge_record.image.as_ref().and_then(|img| img.get_ref()), created_at: Utc::now(), updated_at: Utc::now(), count: 0, record: serde_json::to_value(&badge_record)?, }; self.storage.upsert_badge(&badge).await?; Ok(badge_record) } async fn fetch_badge_record( &self, pds: &str, did: &str, rkey: &str, cid: &str, ) -> Result { let get_record_response = get_record( &self.http_client, None, pds, did, "community.lexicon.badge.definition", rkey, Some(cid), ) .await?; match get_record_response { atproto_client::com::atproto::repo::GetRecordResponse::Record { value, .. } => { serde_json::from_value(value.clone()).map_err(|e| { ShowcaseError::ProcessBadgeRecordFetchFailed { uri: format!("at://{}/community.lexicon.badge.definition/{}", did, rkey), details: format!("Failed to deserialize record: {}", e), } }) } atproto_client::com::atproto::repo::GetRecordResponse::Error(simple_error) => { Err(ShowcaseError::ProcessBadgeRecordFetchFailed { uri: format!("at://{}/community.lexicon.badge.definition/{}", did, rkey), details: format!("Get record returned an error: {}", simple_error.error_message()), }) } } } async fn download_badge_image(&self, pds: &str, did: &str, badge: &BadgeRecord) -> Result<()> { if let Some(ref image_blob) = badge.image { let image_ref = match image_blob.get_ref() { Some(ref_val) => ref_val, None => return Ok(()), // No image reference }; let image_path = format!("{}.png", image_ref); // Check if image already exists if self.file_storage.file_exists(&image_path).await? { return Ok(()); } // Download and process image match self .download_and_process_image(pds, did, &image_ref, &image_path) .await { Ok(()) => { info!("Downloaded badge image: {}", image_ref); } Err(e) => { warn!( "error-showcase-process-11 Failed to download badge image {}: {}", image_ref, e ); } } } Ok(()) } async fn download_and_process_image( &self, pds: &str, did: &str, blob_ref: &str, output_path: &str, ) -> Result<()> { // Fetch the blob from PDS let image_bytes = get_blob(&self.http_client, pds, did, blob_ref).await?; // 1. Check size limit (3MB = 3 * 1024 * 1024 bytes) const MAX_SIZE: usize = 3 * 1024 * 1024; if image_bytes.len() > MAX_SIZE { return Err(ShowcaseError::ProcessImageTooLarge { size: image_bytes.len(), }); } // 2. Try to load and detect image format let img = image::load_from_memory(&image_bytes).map_err(|e| { ShowcaseError::ProcessImageDecodeFailed { details: e.to_string(), } })?; // Check if format is supported (JPG, PNG, WebP) let format = image::guess_format(&image_bytes).map_err(|e| { ShowcaseError::ProcessImageDecodeFailed { details: format!("Could not detect image format: {}", e), } })?; match format { ImageFormat::Jpeg | ImageFormat::Png | ImageFormat::WebP => { // Supported formats } _ => { return Err(ShowcaseError::ProcessUnsupportedImageFormat { format: format!("{:?}", format), }); } } // 3. Check original dimensions (minimum 512x512) let (original_width, original_height) = img.dimensions(); if original_width < 512 || original_height < 512 { return Err(ShowcaseError::ProcessImageTooSmall { width: original_width, height: original_height, }); } // 4. Resize to height 512 while preserving aspect ratio let aspect_ratio = original_width as f32 / original_height as f32; let new_height = 512; let new_width = (new_height as f32 * aspect_ratio) as u32; // 5. Check that width after resize is >= 512 if new_width < 512 { return Err(ShowcaseError::ProcessImageWidthTooSmall { width: new_width }); } // Perform the resize let mut resized_img = img.resize_exact(new_width, new_height, image::imageops::FilterType::Lanczos3); // 6. Center crop the width to exactly 512 pixels if needed let final_img = if new_width > 512 { let crop_x = (new_width - 512) / 2; resized_img.crop(crop_x, 0, 512, 512) } else { resized_img }; // Save as PNG to byte buffer let mut png_buffer = std::io::Cursor::new(Vec::new()); final_img.write_to(&mut png_buffer, ImageFormat::Png)?; let png_bytes = png_buffer.into_inner(); // Write the processed image using file storage self.file_storage .write_file(output_path, &png_bytes) .await?; info!( "Processed badge image {}: {}x{} -> 512x512", blob_ref, original_width, original_height ); Ok(()) } async fn validate_signatures( &self, record: &serde_json::Value, repository: &str, collection: &str, ) -> Result> { let mut validated_issuers = Vec::new(); // Extract signatures from the record let signatures = record .get("signatures") .and_then(|v| v.as_array()) .ok_or(ShowcaseError::ProcessNoSignaturesField)?; for sig_obj in signatures { // Extract the issuer from the signature object let signature_issuer = sig_obj .get("issuer") .and_then(|v| v.as_str()) .ok_or(ShowcaseError::ProcessMissingIssuerField)?; // Retrieve the DID document for the issuer let did_document = match self .document_storage .get_document_by_did(signature_issuer) .await { Ok(Some(doc)) => doc, Ok(None) => { warn!( "error-showcase-process-16 Failed to retrieve DID document for issuer {}: not found", signature_issuer ); continue; } Err(e) => { warn!( "error-showcase-process-16 Failed to retrieve DID document for issuer {}: {}", signature_issuer, e ); continue; } }; // Iterate over all keys available in the DID document for key_string in did_document.did_keys() { // Decode the key using identify_key let key_data = match identify_key(key_string) { Ok(key_data) => key_data, Err(e) => { warn!( "error-showcase-process-17 Failed to decode key {} for issuer {}: {}", key_string, signature_issuer, e ); continue; } }; // Attempt to verify the signature with this key match self .verify_signature( signature_issuer, &key_data, record, repository, collection, sig_obj, ) .await { Ok(()) => { validated_issuers.push(signature_issuer.to_string()); info!( "Validated signature from trusted issuer {} using key: {}", signature_issuer, key_string ); break; // Stop trying other keys once we find one that works } Err(_) => { // Continue trying other keys - don't warn on each failure as this is expected continue; } } } } Ok(validated_issuers) } async fn verify_signature( &self, issuer: &str, key_data: &KeyData, record: &serde_json::Value, repository: &str, collection: &str, sig_obj: &serde_json::Value, ) -> Result<()> { // Reconstruct the $sig object as per the reference implementation let mut sig_variable = sig_obj.clone(); if let Some(sig_map) = sig_variable.as_object_mut() { sig_map.remove("signature"); sig_map.insert("repository".to_string(), json!(repository)); sig_map.insert("collection".to_string(), json!(collection)); } // Create the signed record for verification let mut signed_record = record.clone(); if let Some(record_map) = signed_record.as_object_mut() { record_map.remove("signatures"); record_map.insert("$sig".to_string(), sig_variable); } // Serialize the record using IPLD DAG-CBOR let serialized_record = serde_ipld_dagcbor::to_vec(&signed_record).map_err(|e| { ShowcaseError::ProcessRecordSerializationFailed { details: e.to_string(), } })?; // Get the signature value and decode it let signature_value = sig_obj .get("signature") .and_then(|v| v.as_str()) .ok_or(ShowcaseError::ProcessMissingSignatureField)?; let (_, signature_bytes) = multibase::decode(signature_value).map_err(|e| { ShowcaseError::ProcessSignatureDecodingFailed { details: e.to_string(), } })?; // Validate the signature validate(key_data, &signature_bytes, &serialized_record).map_err(|e| { ShowcaseError::ProcessCryptographicValidationFailed { issuer: issuer.to_string(), details: e.to_string(), } })?; Ok(()) } } #[cfg(test)] mod tests { use super::*; #[cfg(feature = "sqlite")] use crate::storage::{LocalFileStorage, SqliteStorage, SqliteStorageDidDocumentStorage}; use atproto_identity::model::Document; use serde_json::json; use std::collections::HashMap; #[cfg(feature = "sqlite")] #[tokio::test] async fn test_validate_signatures_no_signatures_field() { let config = Arc::new(Config::default()); let storage = Arc::new(SqliteStorage::new( sqlx::SqlitePool::connect(":memory:").await.unwrap(), )); let identity_resolver = create_mock_identity_resolver(); let document_storage = Arc::new(SqliteStorageDidDocumentStorage::new(storage.clone())); let http_client = reqwest::Client::new(); let processor = BadgeProcessor::new( storage as Arc, config, identity_resolver, document_storage as Arc, http_client, create_test_file_storage(), ); // Record without signatures field let record = json!({ "did": "did:plc:test", "badge": { "$type": "strongRef", "uri": "at://did:plc:issuer/community.lexicon.badge.definition/test", "cid": "bafyreiabc123" }, "issued": "2023-01-01T00:00:00Z" }); let result = processor .validate_signatures(&record, "did:plc:test", "community.lexicon.badge.award") .await; // Should return error because no signatures field assert!(result.is_err()); if let Err(ShowcaseError::ProcessNoSignaturesField) = result { // Expected error } else { panic!("Expected ProcessNoSignaturesField error"); } } #[cfg(feature = "sqlite")] #[tokio::test] async fn test_validate_signatures_untrusted_issuer() { let mut config = Config::default(); config.badge_issuers = vec!["did:plc:trusted".to_string()]; let config = Arc::new(config); let storage = Arc::new(SqliteStorage::new( sqlx::SqlitePool::connect(":memory:").await.unwrap(), )); let identity_resolver = create_mock_identity_resolver(); let document_storage = Arc::new(SqliteStorageDidDocumentStorage::new(storage.clone())); let http_client = reqwest::Client::new(); let processor = BadgeProcessor::new( storage as Arc, config, identity_resolver, document_storage as Arc, http_client, create_test_file_storage(), ); // Record with signature from untrusted issuer let record = json!({ "did": "did:plc:test", "badge": { "$type": "strongRef", "uri": "at://did:plc:issuer/community.lexicon.badge.definition/test", "cid": "bafyreiabc123" }, "issued": "2023-01-01T00:00:00Z", "signatures": [{ "issuer": "did:plc:untrusted", "issuedAt": "2023-01-01T00:00:00Z", "signature": "mEiDqZ4..." }] }); let result = processor .validate_signatures(&record, "did:plc:test", "community.lexicon.badge.award") .await; // Should succeed but return empty list (untrusted issuer ignored) assert!(result.is_ok()); assert_eq!(result.unwrap().len(), 0); } #[cfg(feature = "sqlite")] #[tokio::test] async fn test_validate_signatures_no_did_keys() { let mut config = Config::default(); config.badge_issuers = vec!["did:plc:trusted".to_string()]; let config = Arc::new(config); let storage = Arc::new(SqliteStorage::new( sqlx::SqlitePool::connect(":memory:").await.unwrap(), )); storage.migrate().await.unwrap(); // Initialize database tables let identity_resolver = create_mock_identity_resolver(); let document_storage = Arc::new(SqliteStorageDidDocumentStorage::new(storage.clone())); let http_client = reqwest::Client::new(); // Create a mock DID document with no keys (using a Document that has an empty did_keys() result) let document = Document { id: "did:plc:trusted".to_string(), also_known_as: vec![], service: vec![], verification_method: vec![], // Empty verification methods means no keys extra: HashMap::new(), }; // Store the document in our storage document_storage.store_document(document).await.unwrap(); let processor = BadgeProcessor::new( storage as Arc, config, identity_resolver, document_storage as Arc, http_client, create_test_file_storage(), ); // Record with signature from trusted issuer but no valid keys in DID document let record = json!({ "did": "did:plc:test", "badge": { "$type": "strongRef", "uri": "at://did:plc:issuer/community.lexicon.badge.definition/test", "cid": "bafyreiabc123" }, "issued": "2023-01-01T00:00:00Z", "signatures": [{ "issuer": "did:plc:trusted", "issuedAt": "2023-01-01T00:00:00Z", "signature": "mEiDqZ4..." }] }); let result = processor .validate_signatures(&record, "did:plc:test", "community.lexicon.badge.award") .await; // Should succeed but return empty list (no valid keys to verify with) assert!(result.is_ok()); assert_eq!(result.unwrap().len(), 0); } #[cfg(feature = "sqlite")] #[tokio::test] async fn test_image_validation_too_large() { let config = Arc::new(Config::default()); let storage = Arc::new(SqliteStorage::new( sqlx::SqlitePool::connect(":memory:").await.unwrap(), )); let identity_resolver = create_mock_identity_resolver(); let document_storage = Arc::new(SqliteStorageDidDocumentStorage::new(storage.clone())); let http_client = reqwest::Client::new(); let _processor = BadgeProcessor::new( storage, config, identity_resolver, document_storage, http_client, create_test_file_storage(), ); // Create test image bytes larger than 3MB let large_image_bytes = vec![0u8; 4 * 1024 * 1024]; // 4MB // We can't easily test this without mocking get_blob, but we can test the size check logic directly assert!(large_image_bytes.len() > 3 * 1024 * 1024); } #[tokio::test] async fn test_image_validation_unsupported_format() { // This would require creating actual image bytes of unsupported format // For now, we can verify the error types exist and compile correctly let error = ShowcaseError::ProcessUnsupportedImageFormat { format: "BMP".to_string(), }; assert!(error.to_string().contains("Unsupported image format")); } #[tokio::test] async fn test_image_validation_too_small() { let error = ShowcaseError::ProcessImageTooSmall { width: 256, height: 256, }; assert!(error.to_string().contains("256x256")); assert!(error.to_string().contains("minimum is 512x512")); } #[tokio::test] async fn test_image_validation_width_too_small_after_resize() { let error = ShowcaseError::ProcessImageWidthTooSmall { width: 300 }; assert!(error.to_string().contains("300")); assert!(error.to_string().contains("minimum is 512")); } #[tokio::test] async fn test_center_crop_logic() { // Test the center crop calculation logic // If we have an image that's 1024x512 after resize, it should be center cropped to 512x512 let original_width = 1024u32; let target_width = 512u32; // This is the same calculation used in download_and_process_image let crop_x = (original_width - target_width) / 2; // Should crop from x=256 to get the center 512 pixels assert_eq!(crop_x, 256); // Test with different widths let wide_width = 768u32; let crop_x_wide = (wide_width - target_width) / 2; assert_eq!(crop_x_wide, 128); // Should crop 128 pixels from each side } fn create_mock_identity_resolver() -> IdentityResolver { // Create a mock resolver - in a real test, you'd want to properly mock this // For now, we'll create one with default DNS resolver and HTTP client use atproto_identity::resolve::{InnerIdentityResolver, create_resolver}; let dns_resolver = create_resolver(&[]); let http_client = reqwest::Client::new(); IdentityResolver(Arc::new(InnerIdentityResolver { dns_resolver, http_client, plc_hostname: "plc.directory".to_string(), })) } #[cfg(feature = "sqlite")] fn create_test_file_storage() -> Arc { // Create a temporary directory for test file storage // Note: In a real test, you'd want to ensure this gets cleaned up properly // For simplicity, we'll use a simple path that gets cleaned up by the OS let temp_dir = "/tmp/showcase_test_storage"; Arc::new(LocalFileStorage::new(temp_dir.to_string())) } }