The smokesignal.events web application
at main 762 lines 26 kB view raw
1use anyhow::Result; 2use atproto_attestation::verify_record; 3use atproto_client::com::atproto::repo::get_blob; 4use atproto_identity::key::IdentityDocumentKeyResolver; 5use atproto_identity::model::Document; 6use atproto_identity::resolve::IdentityResolver; 7use atproto_identity::traits::DidDocumentStorage; 8use futures::future::join_all; 9use image::GenericImageView; 10use image::ImageFormat; 11use serde_json::Value; 12use std::sync::Arc; 13 14use crate::atproto::lexicon::acceptance::NSID as AcceptanceNSID; 15 16/// Build an AT URI with pre-allocated capacity to avoid format! overhead. 17#[inline] 18fn build_aturi(did: &str, nsid: &str, rkey: &str) -> String { 19 // "at://" (5) + "/" (1) + "/" (1) = 7 fixed chars 20 let capacity = 7 + did.len() + nsid.len() + rkey.len(); 21 let mut uri = String::with_capacity(capacity); 22 uri.push_str("at://"); 23 uri.push_str(did); 24 uri.push('/'); 25 uri.push_str(nsid); 26 uri.push('/'); 27 uri.push_str(rkey); 28 uri 29} 30 31/// Build an image storage path with pre-allocated capacity. 32#[inline] 33fn build_image_path(cid: &str) -> String { 34 let mut path = String::with_capacity(cid.len() + 4); 35 path.push_str(cid); 36 path.push_str(".png"); 37 path 38} 39use crate::atproto::lexicon::acceptance::TypedAcceptance; 40use crate::atproto::lexicon::profile::{NSID as ProfileNSID, Profile}; 41use crate::processor_errors::ProcessorError; 42use crate::storage::StoragePool; 43use crate::storage::acceptance::{ 44 acceptance_record_delete, acceptance_record_upsert, rsvp_update_validated_at, 45}; 46use crate::storage::atproto_record::{atproto_record_delete, atproto_record_upsert}; 47use crate::storage::content::ContentStorage; 48use crate::storage::denylist::denylist_exists; 49use crate::storage::event::EventInsertParams; 50use crate::storage::event::RsvpInsertParams; 51use crate::storage::event::event_delete; 52use crate::storage::event::event_exists; 53use crate::storage::event::event_insert_with_metadata; 54use crate::storage::event::rsvp_delete; 55use crate::storage::event::rsvp_insert_with_metadata; 56use crate::storage::profile::profile_delete; 57use crate::storage::profile::profile_insert; 58use atproto_record::lexicon::community::lexicon::calendar::event::{ 59 Event, Media, NSID as LexiconCommunityEventNSID, 60}; 61use atproto_record::lexicon::community::lexicon::calendar::rsvp::{ 62 NSID as LexiconCommunityRSVPNSID, Rsvp, RsvpStatus, 63}; 64 65const BEACONBITS_BOOKMARK_NSID: &str = "app.beaconbits.bookmark.item"; 66const BEACONBITS_BEACON_NSID: &str = "app.beaconbits.beacon"; 67const DROPANCHOR_CHECKIN_NSID: &str = "app.dropanchor.checkin"; 68 69pub struct ContentFetcher { 70 pool: StoragePool, 71 content_storage: Arc<dyn ContentStorage>, 72 identity_resolver: Arc<dyn IdentityResolver>, 73 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>, 74 http_client: reqwest::Client, 75 record_resolver: Arc<crate::record_resolver::StorageBackedRecordResolver>, 76 key_resolver: IdentityDocumentKeyResolver, 77} 78 79impl ContentFetcher { 80 pub fn new( 81 pool: StoragePool, 82 content_storage: Arc<dyn ContentStorage>, 83 identity_resolver: Arc<dyn IdentityResolver>, 84 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>, 85 http_client: reqwest::Client, 86 record_resolver: Arc<crate::record_resolver::StorageBackedRecordResolver>, 87 key_resolver: IdentityDocumentKeyResolver, 88 ) -> Self { 89 Self { 90 pool, 91 content_storage, 92 identity_resolver, 93 document_storage, 94 http_client, 95 record_resolver, 96 key_resolver, 97 } 98 } 99 100 async fn ensure_identity_stored(&self, did: &str) -> Result<Document> { 101 // Check if we already have this identity 102 if let Some(document) = self.document_storage.get_document_by_did(did).await? { 103 return Ok(document); 104 } 105 106 let document = self.identity_resolver.resolve(did).await?; 107 self.document_storage 108 .store_document(document.clone()) 109 .await?; 110 Ok(document) 111 } 112 113 /// Handle a commit event (create or update). 114 /// 115 /// The `live` flag indicates whether this is a live event (true) or backfill (false). 116 /// Takes ownership of `record` to avoid cloning during deserialization. 117 pub async fn handle_commit( 118 &self, 119 did: &str, 120 collection: &str, 121 rkey: &str, 122 cid: &str, 123 record: Value, 124 live: bool, 125 ) -> Result<()> { 126 match collection { 127 "community.lexicon.calendar.event" => { 128 self.handle_event_commit(did, rkey, cid, record, live).await 129 } 130 "community.lexicon.calendar.rsvp" => { 131 self.handle_rsvp_commit(did, rkey, cid, record, live).await 132 } 133 "events.smokesignal.profile" => { 134 self.handle_profile_commit(did, rkey, cid, record).await 135 } 136 "events.smokesignal.calendar.acceptance" => { 137 self.handle_acceptance_commit(did, rkey, cid, record).await 138 } 139 BEACONBITS_BOOKMARK_NSID | BEACONBITS_BEACON_NSID | DROPANCHOR_CHECKIN_NSID => { 140 self.handle_atproto_record_commit(did, collection, rkey, cid, record) 141 .await 142 } 143 _ => Ok(()), 144 } 145 } 146 147 /// Handle a delete event. 148 /// 149 /// The `live` flag indicates whether this is a live event (true) or backfill (false). 150 pub async fn handle_delete( 151 &self, 152 did: &str, 153 collection: &str, 154 rkey: &str, 155 live: bool, 156 ) -> Result<()> { 157 match collection { 158 "community.lexicon.calendar.event" => self.handle_event_delete(did, rkey, live).await, 159 "community.lexicon.calendar.rsvp" => self.handle_rsvp_delete(did, rkey, live).await, 160 "events.smokesignal.profile" => self.handle_profile_delete(did, rkey).await, 161 "events.smokesignal.calendar.acceptance" => { 162 self.handle_acceptance_delete(did, rkey).await 163 } 164 BEACONBITS_BOOKMARK_NSID | BEACONBITS_BEACON_NSID | DROPANCHOR_CHECKIN_NSID => { 165 self.handle_atproto_record_delete(did, collection, rkey) 166 .await 167 } 168 _ => Ok(()), 169 } 170 } 171 172 async fn handle_event_commit( 173 &self, 174 did: &str, 175 rkey: &str, 176 cid: &str, 177 record: Value, 178 _live: bool, 179 ) -> Result<()> { 180 tracing::info!("Processing event: {} for {}", rkey, did); 181 182 let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey); 183 184 let event_record: Event = serde_json::from_value(record)?; 185 186 let document = self.ensure_identity_stored(did).await?; 187 let pds_endpoints = document.pds_endpoints(); 188 let pds_endpoint = pds_endpoints.first().ok_or(ProcessorError::NoPdsInDid)?; 189 190 let name = event_record.name.clone(); 191 192 event_insert_with_metadata( 193 &self.pool, 194 EventInsertParams { 195 aturi: &aturi, 196 cid, 197 did, 198 lexicon: LexiconCommunityEventNSID, 199 record: &event_record, 200 name: &name, 201 require_confirmed_email: false, 202 disable_direct_rsvp: false, 203 rsvp_redirect_url: None, 204 }, 205 ) 206 .await?; 207 208 let all_media = event_record.media; 209 210 // Download all media items in parallel 211 if !all_media.is_empty() { 212 let download_futures: Vec<_> = all_media 213 .iter() 214 .map(|media| self.download_media(pds_endpoint, did, media)) 215 .collect(); 216 217 let results = join_all(download_futures).await; 218 219 for result in results { 220 if let Err(err) = result { 221 tracing::error!(error = ?err, "failed processing image"); 222 } 223 } 224 } 225 226 Ok(()) 227 } 228 229 async fn handle_rsvp_commit( 230 &self, 231 did: &str, 232 rkey: &str, 233 cid: &str, 234 record: Value, 235 _live: bool, 236 ) -> Result<()> { 237 tracing::info!("Processing rsvp: {} for {}", rkey, did); 238 239 let aturi = build_aturi(did, LexiconCommunityRSVPNSID, rkey); 240 241 let rsvp_record: Rsvp = serde_json::from_value(record)?; 242 243 let event_aturi = rsvp_record.subject.uri.as_str(); 244 let event_cid = rsvp_record.subject.cid.as_str(); 245 let status = match rsvp_record.status { 246 RsvpStatus::Going => "going", 247 RsvpStatus::Interested => "interested", 248 RsvpStatus::NotGoing => "notgoing", 249 }; 250 251 match event_exists(&self.pool, event_aturi).await { 252 Ok(true) => {} 253 _ => return Ok(()), 254 }; 255 256 let _ = self.ensure_identity_stored(did).await?; 257 258 rsvp_insert_with_metadata( 259 &self.pool, 260 RsvpInsertParams { 261 aturi: &aturi, 262 cid, 263 did, 264 lexicon: LexiconCommunityRSVPNSID, 265 record: &rsvp_record, 266 event_aturi, 267 event_cid, 268 status, 269 clear_validated_at: false, 270 }, 271 ) 272 .await?; 273 274 // Check if RSVP has signatures and verify them 275 if !rsvp_record.signatures.is_empty() { 276 tracing::info!( 277 "RSVP {} has {} signature(s), verifying...", 278 aturi, 279 rsvp_record.signatures.len() 280 ); 281 282 let key_resolver_clone = self.key_resolver.clone(); 283 let validated = verify_record( 284 (&rsvp_record).into(), 285 did, 286 key_resolver_clone, 287 self.record_resolver.as_ref(), 288 ) 289 .await 290 .is_ok(); 291 292 if validated { 293 if let Err(e) = 294 rsvp_update_validated_at(&self.pool, &aturi, Some(chrono::Utc::now())).await 295 { 296 tracing::error!("Failed to update RSVP validated_at: {:?}", e); 297 } else { 298 tracing::info!("RSVP {} validated with signatures", aturi); 299 } 300 } else { 301 tracing::warn!("RSVP {} signature verification failed", aturi); 302 } 303 } 304 305 Ok(()) 306 } 307 308 async fn handle_event_delete(&self, did: &str, rkey: &str, _live: bool) -> Result<()> { 309 let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey); 310 311 event_delete(&self.pool, &aturi).await?; 312 313 Ok(()) 314 } 315 316 async fn handle_rsvp_delete(&self, did: &str, rkey: &str, _live: bool) -> Result<()> { 317 let aturi = build_aturi(did, LexiconCommunityRSVPNSID, rkey); 318 319 rsvp_delete(&self.pool, &aturi).await?; 320 321 Ok(()) 322 } 323 324 async fn handle_profile_commit( 325 &self, 326 did: &str, 327 rkey: &str, 328 cid: &str, 329 record: Value, 330 ) -> Result<()> { 331 tracing::info!("Processing profile: {} for {}", rkey, did); 332 333 let aturi = build_aturi(did, ProfileNSID, rkey); 334 335 // Check denylist before proceeding 336 if denylist_exists(&self.pool, &[did, &aturi]).await? { 337 tracing::info!("User {} is in denylist, skipping profile update", did); 338 return Ok(()); 339 } 340 341 let profile_record: Profile = serde_json::from_value(record)?; 342 343 // Get the identity to resolve the handle for display_name fallback and PDS endpoint 344 let document = self.ensure_identity_stored(did).await?; 345 let handle = document 346 .also_known_as 347 .first() 348 .and_then(|aka| aka.strip_prefix("at://")) 349 .unwrap_or(did); 350 351 // Use displayName from profile, or fallback to handle 352 let display_name = profile_record 353 .display_name 354 .as_ref() 355 .filter(|s| !s.trim().is_empty()) 356 .map(|s| s.as_str()) 357 .unwrap_or(handle); 358 359 profile_insert(&self.pool, &aturi, cid, did, display_name, &profile_record).await?; 360 361 // Download avatar and banner blobs if present (in parallel) 362 let pds_endpoints = document.pds_endpoints(); 363 if let Some(pds_endpoint) = pds_endpoints.first() { 364 // Create futures for avatar and banner downloads 365 let avatar_future = async { 366 if let Some(ref avatar) = profile_record.avatar { 367 self.download_avatar(pds_endpoint, did, avatar).await 368 } else { 369 Ok(()) 370 } 371 }; 372 373 let banner_future = async { 374 if let Some(ref banner) = profile_record.banner { 375 self.download_banner(pds_endpoint, did, banner).await 376 } else { 377 Ok(()) 378 } 379 }; 380 381 // Download both concurrently 382 let (avatar_result, banner_result) = tokio::join!(avatar_future, banner_future); 383 384 if let Err(e) = avatar_result { 385 tracing::warn!( 386 error = ?e, 387 did = %did, 388 "Failed to download avatar for profile" 389 ); 390 } 391 392 if let Err(e) = banner_result { 393 tracing::warn!( 394 error = ?e, 395 did = %did, 396 "Failed to download banner for profile" 397 ); 398 } 399 } else { 400 tracing::debug!(did = %did, "No PDS endpoint found for profile blob download"); 401 } 402 403 Ok(()) 404 } 405 406 async fn handle_profile_delete(&self, did: &str, rkey: &str) -> Result<()> { 407 let aturi = build_aturi(did, ProfileNSID, rkey); 408 profile_delete(&self.pool, &aturi).await?; 409 Ok(()) 410 } 411 412 async fn handle_acceptance_commit( 413 &self, 414 did: &str, 415 rkey: &str, 416 cid: &str, 417 record: Value, 418 ) -> Result<()> { 419 tracing::info!("Processing acceptance: {} for {}", rkey, did); 420 421 let aturi = build_aturi(did, AcceptanceNSID, rkey); 422 423 // Deserialize and validate the acceptance record 424 let acceptance_record: TypedAcceptance = serde_json::from_value(record)?; 425 tracing::info!(?acceptance_record, "acceptance_record"); 426 427 // Validate the acceptance record 428 if let Err(e) = acceptance_record.validate() { 429 tracing::warn!("Invalid acceptance record: {}", e); 430 return Ok(()); 431 } 432 433 // Store the acceptance record (use deserialized record to avoid clone) 434 acceptance_record_upsert(&self.pool, &aturi, cid, did, &acceptance_record).await?; 435 436 tracing::info!("Acceptance stored: {}", aturi); 437 Ok(()) 438 } 439 440 async fn handle_acceptance_delete(&self, did: &str, rkey: &str) -> Result<()> { 441 let aturi = build_aturi(did, AcceptanceNSID, rkey); 442 acceptance_record_delete(&self.pool, &aturi).await?; 443 tracing::info!("Acceptance deleted: {}", aturi); 444 Ok(()) 445 } 446 447 async fn handle_atproto_record_commit( 448 &self, 449 did: &str, 450 collection: &str, 451 rkey: &str, 452 cid: &str, 453 record: Value, 454 ) -> Result<()> { 455 let aturi = build_aturi(did, collection, rkey); 456 atproto_record_upsert(&self.pool, &aturi, did, cid, collection, &record).await?; 457 tracing::info!("Stored atproto record: {} ({})", aturi, collection); 458 Ok(()) 459 } 460 461 async fn handle_atproto_record_delete( 462 &self, 463 did: &str, 464 collection: &str, 465 rkey: &str, 466 ) -> Result<()> { 467 let aturi = build_aturi(did, collection, rkey); 468 atproto_record_delete(&self.pool, &aturi).await?; 469 tracing::info!("Deleted atproto record: {}", aturi); 470 Ok(()) 471 } 472 473 /// Download and process avatar blob (1:1 aspect ratio, max 3MB) 474 async fn download_avatar( 475 &self, 476 pds: &str, 477 did: &str, 478 avatar: &atproto_record::lexicon::TypedBlob, 479 ) -> Result<()> { 480 let cid = &avatar.inner.ref_.link; 481 let image_path = build_image_path(cid); 482 483 // Check if already exists 484 if self.content_storage.content_exists(&image_path).await? { 485 tracing::debug!(cid = %cid, "Avatar already exists in storage"); 486 return Ok(()); 487 } 488 489 // Validate mime type 490 if avatar.inner.mime_type != "image/png" && avatar.inner.mime_type != "image/jpeg" { 491 tracing::debug!( 492 mime_type = %avatar.inner.mime_type, 493 "Skipping avatar with unsupported mime type" 494 ); 495 return Ok(()); 496 } 497 498 // Validate size (max 3MB) 499 if avatar.inner.size > 3_000_000 { 500 tracing::debug!( 501 size = avatar.inner.size, 502 "Skipping avatar that exceeds max size" 503 ); 504 return Ok(()); 505 } 506 507 // Download the blob 508 let image_bytes = match get_blob(&self.http_client, pds, did, cid).await { 509 Ok(bytes) => bytes, 510 Err(e) => { 511 tracing::warn!(error = ?e, cid = %cid, "Failed to download avatar blob"); 512 return Ok(()); // Don't fail the whole operation 513 } 514 }; 515 516 // Validate and process the image 517 let img = match image::load_from_memory(&image_bytes) { 518 Ok(img) => img, 519 Err(e) => { 520 tracing::warn!(error = ?e, cid = %cid, "Failed to load avatar image"); 521 return Ok(()); 522 } 523 }; 524 525 let (width, height) = img.dimensions(); 526 527 // Validate 1:1 aspect ratio (allow small deviation) 528 let aspect_ratio = width as f32 / height as f32; 529 if (aspect_ratio - 1.0).abs() > 0.05 { 530 tracing::debug!( 531 width, 532 height, 533 aspect_ratio, 534 "Skipping avatar with non-square aspect ratio" 535 ); 536 return Ok(()); 537 } 538 539 // Resize to standard size (400x400) 540 let resized = if width != 400 || height != 400 { 541 img.resize_exact(400, 400, image::imageops::FilterType::Lanczos3) 542 } else { 543 img 544 }; 545 546 // Convert to PNG with pre-allocated buffer 547 // 400x400 PNG typically compresses to ~100-300KB, pre-allocate 256KB 548 let mut png_buffer = std::io::Cursor::new(Vec::with_capacity(256 * 1024)); 549 resized.write_to(&mut png_buffer, ImageFormat::Png)?; 550 let png_bytes = png_buffer.into_inner(); 551 552 // Store in content storage 553 self.content_storage 554 .write_content(&image_path, &png_bytes) 555 .await?; 556 557 tracing::info!(cid = %cid, "Successfully downloaded and processed avatar"); 558 Ok(()) 559 } 560 561 /// Download and process banner blob (16:9 aspect ratio, max 3MB) 562 async fn download_banner( 563 &self, 564 pds: &str, 565 did: &str, 566 banner: &atproto_record::lexicon::TypedBlob, 567 ) -> Result<()> { 568 let cid = &banner.inner.ref_.link; 569 let image_path = build_image_path(cid); 570 571 // Check if already exists 572 if self.content_storage.content_exists(&image_path).await? { 573 tracing::debug!(cid = %cid, "Banner already exists in storage"); 574 return Ok(()); 575 } 576 577 // Validate mime type 578 if banner.inner.mime_type != "image/png" && banner.inner.mime_type != "image/jpeg" { 579 tracing::debug!( 580 mime_type = %banner.inner.mime_type, 581 "Skipping banner with unsupported mime type" 582 ); 583 return Ok(()); 584 } 585 586 // Validate size (max 3MB) 587 if banner.inner.size > 3_000_000 { 588 tracing::debug!( 589 size = banner.inner.size, 590 "Skipping banner that exceeds max size" 591 ); 592 return Ok(()); 593 } 594 595 // Download the blob 596 let image_bytes = match get_blob(&self.http_client, pds, did, cid).await { 597 Ok(bytes) => bytes, 598 Err(e) => { 599 tracing::warn!(error = ?e, cid = %cid, "Failed to download banner blob"); 600 return Ok(()); // Don't fail the whole operation 601 } 602 }; 603 604 // Validate and process the image 605 let img = match image::load_from_memory(&image_bytes) { 606 Ok(img) => img, 607 Err(e) => { 608 tracing::warn!(error = ?e, cid = %cid, "Failed to load banner image"); 609 return Ok(()); 610 } 611 }; 612 613 let (width, height) = img.dimensions(); 614 615 // Validate 16:9 aspect ratio (allow 10% deviation) 616 let aspect_ratio = width as f32 / height as f32; 617 let expected_ratio = 16.0 / 9.0; 618 if (aspect_ratio - expected_ratio).abs() / expected_ratio > 0.10 { 619 tracing::debug!( 620 width, 621 height, 622 aspect_ratio, 623 "Skipping banner with non-16:9 aspect ratio" 624 ); 625 return Ok(()); 626 } 627 628 // Resize to standard size (1600x900) 629 let resized = if width != 1600 || height != 900 { 630 img.resize_exact(1600, 900, image::imageops::FilterType::Lanczos3) 631 } else { 632 img 633 }; 634 635 // Convert to PNG with pre-allocated buffer 636 // 1600x900 PNG typically compresses to ~500KB-1.5MB, pre-allocate 1MB 637 let mut png_buffer = std::io::Cursor::new(Vec::with_capacity(1024 * 1024)); 638 resized.write_to(&mut png_buffer, ImageFormat::Png)?; 639 let png_bytes = png_buffer.into_inner(); 640 641 // Store in content storage 642 self.content_storage 643 .write_content(&image_path, &png_bytes) 644 .await?; 645 646 tracing::info!(cid = %cid, "Successfully downloaded and processed banner"); 647 Ok(()) 648 } 649 650 async fn download_media(&self, pds: &str, did: &str, event_image: &Media) -> Result<()> { 651 let content = &event_image.content; 652 let role = &event_image.role; 653 let aspect_ratio = &event_image.aspect_ratio; 654 655 if role != "header" { 656 return Ok(()); 657 } 658 659 match content.mime_type.as_str() { 660 "image/png" | "image/jpeg" | "image/webp" => {} 661 _ => return Ok(()), 662 } 663 664 let (reported_height, reported_width) = aspect_ratio 665 .as_ref() 666 .map(|value| (value.height, value.width)) 667 .unwrap_or_default(); 668 if !(755..=12000).contains(&reported_height) 669 || !(reported_height..=12000).contains(&reported_width) 670 { 671 tracing::info!( 672 ?reported_height, 673 ?reported_width, 674 "aspect ratio check 1 failed" 675 ); 676 return Ok(()); 677 } 678 let is_16_9 = 679 (((reported_width as f64) / (reported_height as f64)) - (16.0 / 9.0)).abs() < 0.02; 680 if !is_16_9 { 681 tracing::info!("aspect ratio check 2 failed"); 682 return Ok(()); 683 } 684 685 // Access the CID from the TypedBlob structure 686 let blob_ref = &content.inner.ref_.link; 687 688 let image_path = build_image_path(blob_ref); 689 tracing::info!(?image_path, "image_path"); 690 691 if self 692 .content_storage 693 .as_ref() 694 .content_exists(&image_path) 695 .await? 696 { 697 tracing::info!(?image_path, "content exists"); 698 return Ok(()); 699 } 700 701 let image_bytes = get_blob(&self.http_client, pds, did, blob_ref).await?; 702 703 const MAX_SIZE: usize = 3 * 1024 * 1024; 704 if image_bytes.len() > MAX_SIZE { 705 tracing::info!("max size failed"); 706 return Ok(()); 707 } 708 709 let img = image::load_from_memory(&image_bytes)?; 710 711 let format = image::guess_format(&image_bytes)?; 712 713 match format { 714 ImageFormat::Jpeg | ImageFormat::Png | ImageFormat::WebP => { 715 // Supported formats 716 } 717 _ => { 718 tracing::info!("supported formats failed"); 719 return Ok(()); 720 } 721 } 722 723 // Check it again for fun 724 let (actual_width, actual_height) = img.dimensions(); 725 if actual_height == 0 726 || actual_height > 12000 727 || actual_width == 0 728 || actual_width > 12000 729 || actual_width <= actual_height 730 { 731 tracing::info!("aspect ratio check 3 failed"); 732 return Ok(()); 733 } 734 let is_really_16_9 = 735 (((actual_width as f64) / (actual_height as f64)) - (16.0 / 9.0)).abs() < 0.02; 736 if !is_really_16_9 { 737 tracing::info!("aspect ratio check 4 failed"); 738 return Ok(()); 739 } 740 741 // Resize to standard dimensions (1600x900) to match process_event_header() 742 let final_image = if actual_width != 1600 || actual_height != 900 { 743 img.resize_exact(1600, 900, image::imageops::FilterType::Lanczos3) 744 } else { 745 img 746 }; 747 748 // Convert to PNG with pre-allocated buffer 749 // 1600x900 PNG typically compresses to ~500KB-1.5MB, pre-allocate 1MB 750 let mut png_buffer = std::io::Cursor::new(Vec::with_capacity(1024 * 1024)); 751 final_image.write_to(&mut png_buffer, ImageFormat::Png)?; 752 let png_bytes = png_buffer.into_inner(); 753 754 self.content_storage 755 .as_ref() 756 .write_content(&image_path, &png_bytes) 757 .await?; 758 759 tracing::info!("image written"); 760 Ok(()) 761 } 762}