use anyhow::Result; use atproto_attestation::verify_record; use atproto_client::com::atproto::repo::get_blob; use atproto_identity::key::IdentityDocumentKeyResolver; use atproto_identity::model::Document; use atproto_identity::resolve::IdentityResolver; use atproto_identity::traits::DidDocumentStorage; use futures::future::join_all; use image::GenericImageView; use image::ImageFormat; use serde_json::Value; use std::sync::Arc; use crate::atproto::lexicon::acceptance::NSID as AcceptanceNSID; /// Build an AT URI with pre-allocated capacity to avoid format! overhead. #[inline] fn build_aturi(did: &str, nsid: &str, rkey: &str) -> String { // "at://" (5) + "/" (1) + "/" (1) = 7 fixed chars let capacity = 7 + did.len() + nsid.len() + rkey.len(); let mut uri = String::with_capacity(capacity); uri.push_str("at://"); uri.push_str(did); uri.push('/'); uri.push_str(nsid); uri.push('/'); uri.push_str(rkey); uri } /// Build an image storage path with pre-allocated capacity. #[inline] fn build_image_path(cid: &str) -> String { let mut path = String::with_capacity(cid.len() + 4); path.push_str(cid); path.push_str(".png"); path } use crate::atproto::lexicon::acceptance::TypedAcceptance; use crate::atproto::lexicon::profile::{NSID as ProfileNSID, Profile}; use crate::processor_errors::ProcessorError; use crate::storage::StoragePool; use crate::storage::acceptance::{ acceptance_record_delete, acceptance_record_upsert, rsvp_update_validated_at, }; use crate::storage::atproto_record::{atproto_record_delete, atproto_record_upsert}; use crate::storage::content::ContentStorage; use crate::storage::denylist::denylist_exists; use crate::storage::event::EventInsertParams; use crate::storage::event::RsvpInsertParams; use crate::storage::event::event_delete; use crate::storage::event::event_exists; use crate::storage::event::event_insert_with_metadata; use crate::storage::event::rsvp_delete; use crate::storage::event::rsvp_insert_with_metadata; use crate::storage::profile::profile_delete; use crate::storage::profile::profile_insert; use atproto_record::lexicon::community::lexicon::calendar::event::{ Event, Media, NSID as LexiconCommunityEventNSID, }; use atproto_record::lexicon::community::lexicon::calendar::rsvp::{ NSID as LexiconCommunityRSVPNSID, Rsvp, RsvpStatus, }; const BEACONBITS_BOOKMARK_NSID: &str = "app.beaconbits.bookmark.item"; const BEACONBITS_BEACON_NSID: &str = "app.beaconbits.beacon"; const DROPANCHOR_CHECKIN_NSID: &str = "app.dropanchor.checkin"; pub struct ContentFetcher { pool: StoragePool, content_storage: Arc, identity_resolver: Arc, document_storage: Arc, http_client: reqwest::Client, record_resolver: Arc, key_resolver: IdentityDocumentKeyResolver, } impl ContentFetcher { pub fn new( pool: StoragePool, content_storage: Arc, identity_resolver: Arc, document_storage: Arc, http_client: reqwest::Client, record_resolver: Arc, key_resolver: IdentityDocumentKeyResolver, ) -> Self { Self { pool, content_storage, identity_resolver, document_storage, http_client, record_resolver, key_resolver, } } async fn ensure_identity_stored(&self, did: &str) -> Result { // Check if we already have this identity if let Some(document) = self.document_storage.get_document_by_did(did).await? { return Ok(document); } let document = self.identity_resolver.resolve(did).await?; self.document_storage .store_document(document.clone()) .await?; Ok(document) } /// Handle a commit event (create or update). /// /// The `live` flag indicates whether this is a live event (true) or backfill (false). /// Takes ownership of `record` to avoid cloning during deserialization. pub async fn handle_commit( &self, did: &str, collection: &str, rkey: &str, cid: &str, record: Value, live: bool, ) -> Result<()> { match collection { "community.lexicon.calendar.event" => { self.handle_event_commit(did, rkey, cid, record, live).await } "community.lexicon.calendar.rsvp" => { self.handle_rsvp_commit(did, rkey, cid, record, live).await } "events.smokesignal.profile" => { self.handle_profile_commit(did, rkey, cid, record).await } "events.smokesignal.calendar.acceptance" => { self.handle_acceptance_commit(did, rkey, cid, record).await } BEACONBITS_BOOKMARK_NSID | BEACONBITS_BEACON_NSID | DROPANCHOR_CHECKIN_NSID => { self.handle_atproto_record_commit(did, collection, rkey, cid, record) .await } _ => Ok(()), } } /// Handle a delete event. /// /// The `live` flag indicates whether this is a live event (true) or backfill (false). pub async fn handle_delete( &self, did: &str, collection: &str, rkey: &str, live: bool, ) -> Result<()> { match collection { "community.lexicon.calendar.event" => self.handle_event_delete(did, rkey, live).await, "community.lexicon.calendar.rsvp" => self.handle_rsvp_delete(did, rkey, live).await, "events.smokesignal.profile" => self.handle_profile_delete(did, rkey).await, "events.smokesignal.calendar.acceptance" => { self.handle_acceptance_delete(did, rkey).await } BEACONBITS_BOOKMARK_NSID | BEACONBITS_BEACON_NSID | DROPANCHOR_CHECKIN_NSID => { self.handle_atproto_record_delete(did, collection, rkey) .await } _ => Ok(()), } } async fn handle_event_commit( &self, did: &str, rkey: &str, cid: &str, record: Value, _live: bool, ) -> Result<()> { tracing::info!("Processing event: {} for {}", rkey, did); let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey); let event_record: Event = serde_json::from_value(record)?; let document = self.ensure_identity_stored(did).await?; let pds_endpoints = document.pds_endpoints(); let pds_endpoint = pds_endpoints.first().ok_or(ProcessorError::NoPdsInDid)?; let name = event_record.name.clone(); event_insert_with_metadata( &self.pool, EventInsertParams { aturi: &aturi, cid, did, lexicon: LexiconCommunityEventNSID, record: &event_record, name: &name, require_confirmed_email: false, disable_direct_rsvp: false, rsvp_redirect_url: None, }, ) .await?; let all_media = event_record.media; // Download all media items in parallel if !all_media.is_empty() { let download_futures: Vec<_> = all_media .iter() .map(|media| self.download_media(pds_endpoint, did, media)) .collect(); let results = join_all(download_futures).await; for result in results { if let Err(err) = result { tracing::error!(error = ?err, "failed processing image"); } } } Ok(()) } async fn handle_rsvp_commit( &self, did: &str, rkey: &str, cid: &str, record: Value, _live: bool, ) -> Result<()> { tracing::info!("Processing rsvp: {} for {}", rkey, did); let aturi = build_aturi(did, LexiconCommunityRSVPNSID, rkey); let rsvp_record: Rsvp = serde_json::from_value(record)?; let event_aturi = rsvp_record.subject.uri.as_str(); let event_cid = rsvp_record.subject.cid.as_str(); let status = match rsvp_record.status { RsvpStatus::Going => "going", RsvpStatus::Interested => "interested", RsvpStatus::NotGoing => "notgoing", }; match event_exists(&self.pool, event_aturi).await { Ok(true) => {} _ => return Ok(()), }; let _ = self.ensure_identity_stored(did).await?; rsvp_insert_with_metadata( &self.pool, RsvpInsertParams { aturi: &aturi, cid, did, lexicon: LexiconCommunityRSVPNSID, record: &rsvp_record, event_aturi, event_cid, status, clear_validated_at: false, }, ) .await?; // Check if RSVP has signatures and verify them if !rsvp_record.signatures.is_empty() { tracing::info!( "RSVP {} has {} signature(s), verifying...", aturi, rsvp_record.signatures.len() ); let key_resolver_clone = self.key_resolver.clone(); let validated = verify_record( (&rsvp_record).into(), did, key_resolver_clone, self.record_resolver.as_ref(), ) .await .is_ok(); if validated { if let Err(e) = rsvp_update_validated_at(&self.pool, &aturi, Some(chrono::Utc::now())).await { tracing::error!("Failed to update RSVP validated_at: {:?}", e); } else { tracing::info!("RSVP {} validated with signatures", aturi); } } else { tracing::warn!("RSVP {} signature verification failed", aturi); } } Ok(()) } async fn handle_event_delete(&self, did: &str, rkey: &str, _live: bool) -> Result<()> { let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey); event_delete(&self.pool, &aturi).await?; Ok(()) } async fn handle_rsvp_delete(&self, did: &str, rkey: &str, _live: bool) -> Result<()> { let aturi = build_aturi(did, LexiconCommunityRSVPNSID, rkey); rsvp_delete(&self.pool, &aturi).await?; Ok(()) } async fn handle_profile_commit( &self, did: &str, rkey: &str, cid: &str, record: Value, ) -> Result<()> { tracing::info!("Processing profile: {} for {}", rkey, did); let aturi = build_aturi(did, ProfileNSID, rkey); // Check denylist before proceeding if denylist_exists(&self.pool, &[did, &aturi]).await? { tracing::info!("User {} is in denylist, skipping profile update", did); return Ok(()); } let profile_record: Profile = serde_json::from_value(record)?; // Get the identity to resolve the handle for display_name fallback and PDS endpoint let document = self.ensure_identity_stored(did).await?; let handle = document .also_known_as .first() .and_then(|aka| aka.strip_prefix("at://")) .unwrap_or(did); // Use displayName from profile, or fallback to handle let display_name = profile_record .display_name .as_ref() .filter(|s| !s.trim().is_empty()) .map(|s| s.as_str()) .unwrap_or(handle); profile_insert(&self.pool, &aturi, cid, did, display_name, &profile_record).await?; // Download avatar and banner blobs if present (in parallel) let pds_endpoints = document.pds_endpoints(); if let Some(pds_endpoint) = pds_endpoints.first() { // Create futures for avatar and banner downloads let avatar_future = async { if let Some(ref avatar) = profile_record.avatar { self.download_avatar(pds_endpoint, did, avatar).await } else { Ok(()) } }; let banner_future = async { if let Some(ref banner) = profile_record.banner { self.download_banner(pds_endpoint, did, banner).await } else { Ok(()) } }; // Download both concurrently let (avatar_result, banner_result) = tokio::join!(avatar_future, banner_future); if let Err(e) = avatar_result { tracing::warn!( error = ?e, did = %did, "Failed to download avatar for profile" ); } if let Err(e) = banner_result { tracing::warn!( error = ?e, did = %did, "Failed to download banner for profile" ); } } else { tracing::debug!(did = %did, "No PDS endpoint found for profile blob download"); } Ok(()) } async fn handle_profile_delete(&self, did: &str, rkey: &str) -> Result<()> { let aturi = build_aturi(did, ProfileNSID, rkey); profile_delete(&self.pool, &aturi).await?; Ok(()) } async fn handle_acceptance_commit( &self, did: &str, rkey: &str, cid: &str, record: Value, ) -> Result<()> { tracing::info!("Processing acceptance: {} for {}", rkey, did); let aturi = build_aturi(did, AcceptanceNSID, rkey); // Deserialize and validate the acceptance record let acceptance_record: TypedAcceptance = serde_json::from_value(record)?; tracing::info!(?acceptance_record, "acceptance_record"); // Validate the acceptance record if let Err(e) = acceptance_record.validate() { tracing::warn!("Invalid acceptance record: {}", e); return Ok(()); } // Store the acceptance record (use deserialized record to avoid clone) acceptance_record_upsert(&self.pool, &aturi, cid, did, &acceptance_record).await?; tracing::info!("Acceptance stored: {}", aturi); Ok(()) } async fn handle_acceptance_delete(&self, did: &str, rkey: &str) -> Result<()> { let aturi = build_aturi(did, AcceptanceNSID, rkey); acceptance_record_delete(&self.pool, &aturi).await?; tracing::info!("Acceptance deleted: {}", aturi); Ok(()) } async fn handle_atproto_record_commit( &self, did: &str, collection: &str, rkey: &str, cid: &str, record: Value, ) -> Result<()> { let aturi = build_aturi(did, collection, rkey); atproto_record_upsert(&self.pool, &aturi, did, cid, collection, &record).await?; tracing::info!("Stored atproto record: {} ({})", aturi, collection); Ok(()) } async fn handle_atproto_record_delete( &self, did: &str, collection: &str, rkey: &str, ) -> Result<()> { let aturi = build_aturi(did, collection, rkey); atproto_record_delete(&self.pool, &aturi).await?; tracing::info!("Deleted atproto record: {}", aturi); Ok(()) } /// Download and process avatar blob (1:1 aspect ratio, max 3MB) async fn download_avatar( &self, pds: &str, did: &str, avatar: &atproto_record::lexicon::TypedBlob, ) -> Result<()> { let cid = &avatar.inner.ref_.link; let image_path = build_image_path(cid); // Check if already exists if self.content_storage.content_exists(&image_path).await? { tracing::debug!(cid = %cid, "Avatar already exists in storage"); return Ok(()); } // Validate mime type if avatar.inner.mime_type != "image/png" && avatar.inner.mime_type != "image/jpeg" { tracing::debug!( mime_type = %avatar.inner.mime_type, "Skipping avatar with unsupported mime type" ); return Ok(()); } // Validate size (max 3MB) if avatar.inner.size > 3_000_000 { tracing::debug!( size = avatar.inner.size, "Skipping avatar that exceeds max size" ); return Ok(()); } // Download the blob let image_bytes = match get_blob(&self.http_client, pds, did, cid).await { Ok(bytes) => bytes, Err(e) => { tracing::warn!(error = ?e, cid = %cid, "Failed to download avatar blob"); return Ok(()); // Don't fail the whole operation } }; // Validate and process the image let img = match image::load_from_memory(&image_bytes) { Ok(img) => img, Err(e) => { tracing::warn!(error = ?e, cid = %cid, "Failed to load avatar image"); return Ok(()); } }; let (width, height) = img.dimensions(); // Validate 1:1 aspect ratio (allow small deviation) let aspect_ratio = width as f32 / height as f32; if (aspect_ratio - 1.0).abs() > 0.05 { tracing::debug!( width, height, aspect_ratio, "Skipping avatar with non-square aspect ratio" ); return Ok(()); } // Resize to standard size (400x400) let resized = if width != 400 || height != 400 { img.resize_exact(400, 400, image::imageops::FilterType::Lanczos3) } else { img }; // Convert to PNG with pre-allocated buffer // 400x400 PNG typically compresses to ~100-300KB, pre-allocate 256KB let mut png_buffer = std::io::Cursor::new(Vec::with_capacity(256 * 1024)); resized.write_to(&mut png_buffer, ImageFormat::Png)?; let png_bytes = png_buffer.into_inner(); // Store in content storage self.content_storage .write_content(&image_path, &png_bytes) .await?; tracing::info!(cid = %cid, "Successfully downloaded and processed avatar"); Ok(()) } /// Download and process banner blob (16:9 aspect ratio, max 3MB) async fn download_banner( &self, pds: &str, did: &str, banner: &atproto_record::lexicon::TypedBlob, ) -> Result<()> { let cid = &banner.inner.ref_.link; let image_path = build_image_path(cid); // Check if already exists if self.content_storage.content_exists(&image_path).await? { tracing::debug!(cid = %cid, "Banner already exists in storage"); return Ok(()); } // Validate mime type if banner.inner.mime_type != "image/png" && banner.inner.mime_type != "image/jpeg" { tracing::debug!( mime_type = %banner.inner.mime_type, "Skipping banner with unsupported mime type" ); return Ok(()); } // Validate size (max 3MB) if banner.inner.size > 3_000_000 { tracing::debug!( size = banner.inner.size, "Skipping banner that exceeds max size" ); return Ok(()); } // Download the blob let image_bytes = match get_blob(&self.http_client, pds, did, cid).await { Ok(bytes) => bytes, Err(e) => { tracing::warn!(error = ?e, cid = %cid, "Failed to download banner blob"); return Ok(()); // Don't fail the whole operation } }; // Validate and process the image let img = match image::load_from_memory(&image_bytes) { Ok(img) => img, Err(e) => { tracing::warn!(error = ?e, cid = %cid, "Failed to load banner image"); return Ok(()); } }; let (width, height) = img.dimensions(); // Validate 16:9 aspect ratio (allow 10% deviation) let aspect_ratio = width as f32 / height as f32; let expected_ratio = 16.0 / 9.0; if (aspect_ratio - expected_ratio).abs() / expected_ratio > 0.10 { tracing::debug!( width, height, aspect_ratio, "Skipping banner with non-16:9 aspect ratio" ); return Ok(()); } // Resize to standard size (1600x900) let resized = if width != 1600 || height != 900 { img.resize_exact(1600, 900, image::imageops::FilterType::Lanczos3) } else { img }; // Convert to PNG with pre-allocated buffer // 1600x900 PNG typically compresses to ~500KB-1.5MB, pre-allocate 1MB let mut png_buffer = std::io::Cursor::new(Vec::with_capacity(1024 * 1024)); resized.write_to(&mut png_buffer, ImageFormat::Png)?; let png_bytes = png_buffer.into_inner(); // Store in content storage self.content_storage .write_content(&image_path, &png_bytes) .await?; tracing::info!(cid = %cid, "Successfully downloaded and processed banner"); Ok(()) } async fn download_media(&self, pds: &str, did: &str, event_image: &Media) -> Result<()> { let content = &event_image.content; let role = &event_image.role; let aspect_ratio = &event_image.aspect_ratio; if role != "header" { return Ok(()); } match content.mime_type.as_str() { "image/png" | "image/jpeg" | "image/webp" => {} _ => return Ok(()), } let (reported_height, reported_width) = aspect_ratio .as_ref() .map(|value| (value.height, value.width)) .unwrap_or_default(); if !(755..=12000).contains(&reported_height) || !(reported_height..=12000).contains(&reported_width) { tracing::info!( ?reported_height, ?reported_width, "aspect ratio check 1 failed" ); return Ok(()); } let is_16_9 = (((reported_width as f64) / (reported_height as f64)) - (16.0 / 9.0)).abs() < 0.02; if !is_16_9 { tracing::info!("aspect ratio check 2 failed"); return Ok(()); } // Access the CID from the TypedBlob structure let blob_ref = &content.inner.ref_.link; let image_path = build_image_path(blob_ref); tracing::info!(?image_path, "image_path"); if self .content_storage .as_ref() .content_exists(&image_path) .await? { tracing::info!(?image_path, "content exists"); return Ok(()); } let image_bytes = get_blob(&self.http_client, pds, did, blob_ref).await?; const MAX_SIZE: usize = 3 * 1024 * 1024; if image_bytes.len() > MAX_SIZE { tracing::info!("max size failed"); return Ok(()); } let img = image::load_from_memory(&image_bytes)?; let format = image::guess_format(&image_bytes)?; match format { ImageFormat::Jpeg | ImageFormat::Png | ImageFormat::WebP => { // Supported formats } _ => { tracing::info!("supported formats failed"); return Ok(()); } } // Check it again for fun let (actual_width, actual_height) = img.dimensions(); if actual_height == 0 || actual_height > 12000 || actual_width == 0 || actual_width > 12000 || actual_width <= actual_height { tracing::info!("aspect ratio check 3 failed"); return Ok(()); } let is_really_16_9 = (((actual_width as f64) / (actual_height as f64)) - (16.0 / 9.0)).abs() < 0.02; if !is_really_16_9 { tracing::info!("aspect ratio check 4 failed"); return Ok(()); } // Resize to standard dimensions (1600x900) to match process_event_header() let final_image = if actual_width != 1600 || actual_height != 900 { img.resize_exact(1600, 900, image::imageops::FilterType::Lanczos3) } else { img }; // Convert to PNG with pre-allocated buffer // 1600x900 PNG typically compresses to ~500KB-1.5MB, pre-allocate 1MB let mut png_buffer = std::io::Cursor::new(Vec::with_capacity(1024 * 1024)); final_image.write_to(&mut png_buffer, ImageFormat::Png)?; let png_bytes = png_buffer.into_inner(); self.content_storage .as_ref() .write_content(&image_path, &png_bytes) .await?; tracing::info!("image written"); Ok(()) } }