The smokesignal.events web application
1use anyhow::Result;
2use atproto_attestation::verify_record;
3use atproto_client::com::atproto::repo::get_blob;
4use atproto_identity::key::IdentityDocumentKeyResolver;
5use atproto_identity::model::Document;
6use atproto_identity::resolve::IdentityResolver;
7use atproto_identity::traits::DidDocumentStorage;
8use futures::future::join_all;
9use image::GenericImageView;
10use image::ImageFormat;
11use serde_json::Value;
12use std::sync::Arc;
13
14use crate::atproto::lexicon::acceptance::NSID as AcceptanceNSID;
15
16/// Build an AT URI with pre-allocated capacity to avoid format! overhead.
17#[inline]
18fn build_aturi(did: &str, nsid: &str, rkey: &str) -> String {
19 // "at://" (5) + "/" (1) + "/" (1) = 7 fixed chars
20 let capacity = 7 + did.len() + nsid.len() + rkey.len();
21 let mut uri = String::with_capacity(capacity);
22 uri.push_str("at://");
23 uri.push_str(did);
24 uri.push('/');
25 uri.push_str(nsid);
26 uri.push('/');
27 uri.push_str(rkey);
28 uri
29}
30
31/// Build an image storage path with pre-allocated capacity.
32#[inline]
33fn build_image_path(cid: &str) -> String {
34 let mut path = String::with_capacity(cid.len() + 4);
35 path.push_str(cid);
36 path.push_str(".png");
37 path
38}
39use crate::atproto::lexicon::acceptance::TypedAcceptance;
40use crate::atproto::lexicon::profile::{NSID as ProfileNSID, Profile};
41use crate::processor_errors::ProcessorError;
42use crate::storage::StoragePool;
43use crate::storage::acceptance::{
44 acceptance_record_delete, acceptance_record_upsert, rsvp_update_validated_at,
45};
46use crate::storage::atproto_record::{atproto_record_delete, atproto_record_upsert};
47use crate::storage::content::ContentStorage;
48use crate::storage::denylist::denylist_exists;
49use crate::storage::event::EventInsertParams;
50use crate::storage::event::RsvpInsertParams;
51use crate::storage::event::event_delete;
52use crate::storage::event::event_exists;
53use crate::storage::event::event_insert_with_metadata;
54use crate::storage::event::rsvp_delete;
55use crate::storage::event::rsvp_insert_with_metadata;
56use crate::storage::profile::profile_delete;
57use crate::storage::profile::profile_insert;
58use atproto_record::lexicon::community::lexicon::calendar::event::{
59 Event, Media, NSID as LexiconCommunityEventNSID,
60};
61use atproto_record::lexicon::community::lexicon::calendar::rsvp::{
62 NSID as LexiconCommunityRSVPNSID, Rsvp, RsvpStatus,
63};
64
65const BEACONBITS_BOOKMARK_NSID: &str = "app.beaconbits.bookmark.item";
66const BEACONBITS_BEACON_NSID: &str = "app.beaconbits.beacon";
67const DROPANCHOR_CHECKIN_NSID: &str = "app.dropanchor.checkin";
68
69pub struct ContentFetcher {
70 pool: StoragePool,
71 content_storage: Arc<dyn ContentStorage>,
72 identity_resolver: Arc<dyn IdentityResolver>,
73 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>,
74 http_client: reqwest::Client,
75 record_resolver: Arc<crate::record_resolver::StorageBackedRecordResolver>,
76 key_resolver: IdentityDocumentKeyResolver,
77}
78
79impl ContentFetcher {
80 pub fn new(
81 pool: StoragePool,
82 content_storage: Arc<dyn ContentStorage>,
83 identity_resolver: Arc<dyn IdentityResolver>,
84 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>,
85 http_client: reqwest::Client,
86 record_resolver: Arc<crate::record_resolver::StorageBackedRecordResolver>,
87 key_resolver: IdentityDocumentKeyResolver,
88 ) -> Self {
89 Self {
90 pool,
91 content_storage,
92 identity_resolver,
93 document_storage,
94 http_client,
95 record_resolver,
96 key_resolver,
97 }
98 }
99
100 async fn ensure_identity_stored(&self, did: &str) -> Result<Document> {
101 // Check if we already have this identity
102 if let Some(document) = self.document_storage.get_document_by_did(did).await? {
103 return Ok(document);
104 }
105
106 let document = self.identity_resolver.resolve(did).await?;
107 self.document_storage
108 .store_document(document.clone())
109 .await?;
110 Ok(document)
111 }
112
113 /// Handle a commit event (create or update).
114 ///
115 /// The `live` flag indicates whether this is a live event (true) or backfill (false).
116 /// Takes ownership of `record` to avoid cloning during deserialization.
117 pub async fn handle_commit(
118 &self,
119 did: &str,
120 collection: &str,
121 rkey: &str,
122 cid: &str,
123 record: Value,
124 live: bool,
125 ) -> Result<()> {
126 match collection {
127 "community.lexicon.calendar.event" => {
128 self.handle_event_commit(did, rkey, cid, record, live).await
129 }
130 "community.lexicon.calendar.rsvp" => {
131 self.handle_rsvp_commit(did, rkey, cid, record, live).await
132 }
133 "events.smokesignal.profile" => {
134 self.handle_profile_commit(did, rkey, cid, record).await
135 }
136 "events.smokesignal.calendar.acceptance" => {
137 self.handle_acceptance_commit(did, rkey, cid, record).await
138 }
139 BEACONBITS_BOOKMARK_NSID | BEACONBITS_BEACON_NSID | DROPANCHOR_CHECKIN_NSID => {
140 self.handle_atproto_record_commit(did, collection, rkey, cid, record)
141 .await
142 }
143 _ => Ok(()),
144 }
145 }
146
147 /// Handle a delete event.
148 ///
149 /// The `live` flag indicates whether this is a live event (true) or backfill (false).
150 pub async fn handle_delete(
151 &self,
152 did: &str,
153 collection: &str,
154 rkey: &str,
155 live: bool,
156 ) -> Result<()> {
157 match collection {
158 "community.lexicon.calendar.event" => self.handle_event_delete(did, rkey, live).await,
159 "community.lexicon.calendar.rsvp" => self.handle_rsvp_delete(did, rkey, live).await,
160 "events.smokesignal.profile" => self.handle_profile_delete(did, rkey).await,
161 "events.smokesignal.calendar.acceptance" => {
162 self.handle_acceptance_delete(did, rkey).await
163 }
164 BEACONBITS_BOOKMARK_NSID | BEACONBITS_BEACON_NSID | DROPANCHOR_CHECKIN_NSID => {
165 self.handle_atproto_record_delete(did, collection, rkey)
166 .await
167 }
168 _ => Ok(()),
169 }
170 }
171
172 async fn handle_event_commit(
173 &self,
174 did: &str,
175 rkey: &str,
176 cid: &str,
177 record: Value,
178 _live: bool,
179 ) -> Result<()> {
180 tracing::info!("Processing event: {} for {}", rkey, did);
181
182 let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey);
183
184 let event_record: Event = serde_json::from_value(record)?;
185
186 let document = self.ensure_identity_stored(did).await?;
187 let pds_endpoints = document.pds_endpoints();
188 let pds_endpoint = pds_endpoints.first().ok_or(ProcessorError::NoPdsInDid)?;
189
190 let name = event_record.name.clone();
191
192 event_insert_with_metadata(
193 &self.pool,
194 EventInsertParams {
195 aturi: &aturi,
196 cid,
197 did,
198 lexicon: LexiconCommunityEventNSID,
199 record: &event_record,
200 name: &name,
201 require_confirmed_email: false,
202 disable_direct_rsvp: false,
203 rsvp_redirect_url: None,
204 },
205 )
206 .await?;
207
208 let all_media = event_record.media;
209
210 // Download all media items in parallel
211 if !all_media.is_empty() {
212 let download_futures: Vec<_> = all_media
213 .iter()
214 .map(|media| self.download_media(pds_endpoint, did, media))
215 .collect();
216
217 let results = join_all(download_futures).await;
218
219 for result in results {
220 if let Err(err) = result {
221 tracing::error!(error = ?err, "failed processing image");
222 }
223 }
224 }
225
226 Ok(())
227 }
228
229 async fn handle_rsvp_commit(
230 &self,
231 did: &str,
232 rkey: &str,
233 cid: &str,
234 record: Value,
235 _live: bool,
236 ) -> Result<()> {
237 tracing::info!("Processing rsvp: {} for {}", rkey, did);
238
239 let aturi = build_aturi(did, LexiconCommunityRSVPNSID, rkey);
240
241 let rsvp_record: Rsvp = serde_json::from_value(record)?;
242
243 let event_aturi = rsvp_record.subject.uri.as_str();
244 let event_cid = rsvp_record.subject.cid.as_str();
245 let status = match rsvp_record.status {
246 RsvpStatus::Going => "going",
247 RsvpStatus::Interested => "interested",
248 RsvpStatus::NotGoing => "notgoing",
249 };
250
251 match event_exists(&self.pool, event_aturi).await {
252 Ok(true) => {}
253 _ => return Ok(()),
254 };
255
256 let _ = self.ensure_identity_stored(did).await?;
257
258 rsvp_insert_with_metadata(
259 &self.pool,
260 RsvpInsertParams {
261 aturi: &aturi,
262 cid,
263 did,
264 lexicon: LexiconCommunityRSVPNSID,
265 record: &rsvp_record,
266 event_aturi,
267 event_cid,
268 status,
269 clear_validated_at: false,
270 },
271 )
272 .await?;
273
274 // Check if RSVP has signatures and verify them
275 if !rsvp_record.signatures.is_empty() {
276 tracing::info!(
277 "RSVP {} has {} signature(s), verifying...",
278 aturi,
279 rsvp_record.signatures.len()
280 );
281
282 let key_resolver_clone = self.key_resolver.clone();
283 let validated = verify_record(
284 (&rsvp_record).into(),
285 did,
286 key_resolver_clone,
287 self.record_resolver.as_ref(),
288 )
289 .await
290 .is_ok();
291
292 if validated {
293 if let Err(e) =
294 rsvp_update_validated_at(&self.pool, &aturi, Some(chrono::Utc::now())).await
295 {
296 tracing::error!("Failed to update RSVP validated_at: {:?}", e);
297 } else {
298 tracing::info!("RSVP {} validated with signatures", aturi);
299 }
300 } else {
301 tracing::warn!("RSVP {} signature verification failed", aturi);
302 }
303 }
304
305 Ok(())
306 }
307
308 async fn handle_event_delete(&self, did: &str, rkey: &str, _live: bool) -> Result<()> {
309 let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey);
310
311 event_delete(&self.pool, &aturi).await?;
312
313 Ok(())
314 }
315
316 async fn handle_rsvp_delete(&self, did: &str, rkey: &str, _live: bool) -> Result<()> {
317 let aturi = build_aturi(did, LexiconCommunityRSVPNSID, rkey);
318
319 rsvp_delete(&self.pool, &aturi).await?;
320
321 Ok(())
322 }
323
324 async fn handle_profile_commit(
325 &self,
326 did: &str,
327 rkey: &str,
328 cid: &str,
329 record: Value,
330 ) -> Result<()> {
331 tracing::info!("Processing profile: {} for {}", rkey, did);
332
333 let aturi = build_aturi(did, ProfileNSID, rkey);
334
335 // Check denylist before proceeding
336 if denylist_exists(&self.pool, &[did, &aturi]).await? {
337 tracing::info!("User {} is in denylist, skipping profile update", did);
338 return Ok(());
339 }
340
341 let profile_record: Profile = serde_json::from_value(record)?;
342
343 // Get the identity to resolve the handle for display_name fallback and PDS endpoint
344 let document = self.ensure_identity_stored(did).await?;
345 let handle = document
346 .also_known_as
347 .first()
348 .and_then(|aka| aka.strip_prefix("at://"))
349 .unwrap_or(did);
350
351 // Use displayName from profile, or fallback to handle
352 let display_name = profile_record
353 .display_name
354 .as_ref()
355 .filter(|s| !s.trim().is_empty())
356 .map(|s| s.as_str())
357 .unwrap_or(handle);
358
359 profile_insert(&self.pool, &aturi, cid, did, display_name, &profile_record).await?;
360
361 // Download avatar and banner blobs if present (in parallel)
362 let pds_endpoints = document.pds_endpoints();
363 if let Some(pds_endpoint) = pds_endpoints.first() {
364 // Create futures for avatar and banner downloads
365 let avatar_future = async {
366 if let Some(ref avatar) = profile_record.avatar {
367 self.download_avatar(pds_endpoint, did, avatar).await
368 } else {
369 Ok(())
370 }
371 };
372
373 let banner_future = async {
374 if let Some(ref banner) = profile_record.banner {
375 self.download_banner(pds_endpoint, did, banner).await
376 } else {
377 Ok(())
378 }
379 };
380
381 // Download both concurrently
382 let (avatar_result, banner_result) = tokio::join!(avatar_future, banner_future);
383
384 if let Err(e) = avatar_result {
385 tracing::warn!(
386 error = ?e,
387 did = %did,
388 "Failed to download avatar for profile"
389 );
390 }
391
392 if let Err(e) = banner_result {
393 tracing::warn!(
394 error = ?e,
395 did = %did,
396 "Failed to download banner for profile"
397 );
398 }
399 } else {
400 tracing::debug!(did = %did, "No PDS endpoint found for profile blob download");
401 }
402
403 Ok(())
404 }
405
406 async fn handle_profile_delete(&self, did: &str, rkey: &str) -> Result<()> {
407 let aturi = build_aturi(did, ProfileNSID, rkey);
408 profile_delete(&self.pool, &aturi).await?;
409 Ok(())
410 }
411
412 async fn handle_acceptance_commit(
413 &self,
414 did: &str,
415 rkey: &str,
416 cid: &str,
417 record: Value,
418 ) -> Result<()> {
419 tracing::info!("Processing acceptance: {} for {}", rkey, did);
420
421 let aturi = build_aturi(did, AcceptanceNSID, rkey);
422
423 // Deserialize and validate the acceptance record
424 let acceptance_record: TypedAcceptance = serde_json::from_value(record)?;
425 tracing::info!(?acceptance_record, "acceptance_record");
426
427 // Validate the acceptance record
428 if let Err(e) = acceptance_record.validate() {
429 tracing::warn!("Invalid acceptance record: {}", e);
430 return Ok(());
431 }
432
433 // Store the acceptance record (use deserialized record to avoid clone)
434 acceptance_record_upsert(&self.pool, &aturi, cid, did, &acceptance_record).await?;
435
436 tracing::info!("Acceptance stored: {}", aturi);
437 Ok(())
438 }
439
440 async fn handle_acceptance_delete(&self, did: &str, rkey: &str) -> Result<()> {
441 let aturi = build_aturi(did, AcceptanceNSID, rkey);
442 acceptance_record_delete(&self.pool, &aturi).await?;
443 tracing::info!("Acceptance deleted: {}", aturi);
444 Ok(())
445 }
446
447 async fn handle_atproto_record_commit(
448 &self,
449 did: &str,
450 collection: &str,
451 rkey: &str,
452 cid: &str,
453 record: Value,
454 ) -> Result<()> {
455 let aturi = build_aturi(did, collection, rkey);
456 atproto_record_upsert(&self.pool, &aturi, did, cid, collection, &record).await?;
457 tracing::info!("Stored atproto record: {} ({})", aturi, collection);
458 Ok(())
459 }
460
461 async fn handle_atproto_record_delete(
462 &self,
463 did: &str,
464 collection: &str,
465 rkey: &str,
466 ) -> Result<()> {
467 let aturi = build_aturi(did, collection, rkey);
468 atproto_record_delete(&self.pool, &aturi).await?;
469 tracing::info!("Deleted atproto record: {}", aturi);
470 Ok(())
471 }
472
473 /// Download and process avatar blob (1:1 aspect ratio, max 3MB)
474 async fn download_avatar(
475 &self,
476 pds: &str,
477 did: &str,
478 avatar: &atproto_record::lexicon::TypedBlob,
479 ) -> Result<()> {
480 let cid = &avatar.inner.ref_.link;
481 let image_path = build_image_path(cid);
482
483 // Check if already exists
484 if self.content_storage.content_exists(&image_path).await? {
485 tracing::debug!(cid = %cid, "Avatar already exists in storage");
486 return Ok(());
487 }
488
489 // Validate mime type
490 if avatar.inner.mime_type != "image/png" && avatar.inner.mime_type != "image/jpeg" {
491 tracing::debug!(
492 mime_type = %avatar.inner.mime_type,
493 "Skipping avatar with unsupported mime type"
494 );
495 return Ok(());
496 }
497
498 // Validate size (max 3MB)
499 if avatar.inner.size > 3_000_000 {
500 tracing::debug!(
501 size = avatar.inner.size,
502 "Skipping avatar that exceeds max size"
503 );
504 return Ok(());
505 }
506
507 // Download the blob
508 let image_bytes = match get_blob(&self.http_client, pds, did, cid).await {
509 Ok(bytes) => bytes,
510 Err(e) => {
511 tracing::warn!(error = ?e, cid = %cid, "Failed to download avatar blob");
512 return Ok(()); // Don't fail the whole operation
513 }
514 };
515
516 // Validate and process the image
517 let img = match image::load_from_memory(&image_bytes) {
518 Ok(img) => img,
519 Err(e) => {
520 tracing::warn!(error = ?e, cid = %cid, "Failed to load avatar image");
521 return Ok(());
522 }
523 };
524
525 let (width, height) = img.dimensions();
526
527 // Validate 1:1 aspect ratio (allow small deviation)
528 let aspect_ratio = width as f32 / height as f32;
529 if (aspect_ratio - 1.0).abs() > 0.05 {
530 tracing::debug!(
531 width,
532 height,
533 aspect_ratio,
534 "Skipping avatar with non-square aspect ratio"
535 );
536 return Ok(());
537 }
538
539 // Resize to standard size (400x400)
540 let resized = if width != 400 || height != 400 {
541 img.resize_exact(400, 400, image::imageops::FilterType::Lanczos3)
542 } else {
543 img
544 };
545
546 // Convert to PNG with pre-allocated buffer
547 // 400x400 PNG typically compresses to ~100-300KB, pre-allocate 256KB
548 let mut png_buffer = std::io::Cursor::new(Vec::with_capacity(256 * 1024));
549 resized.write_to(&mut png_buffer, ImageFormat::Png)?;
550 let png_bytes = png_buffer.into_inner();
551
552 // Store in content storage
553 self.content_storage
554 .write_content(&image_path, &png_bytes)
555 .await?;
556
557 tracing::info!(cid = %cid, "Successfully downloaded and processed avatar");
558 Ok(())
559 }
560
561 /// Download and process banner blob (16:9 aspect ratio, max 3MB)
562 async fn download_banner(
563 &self,
564 pds: &str,
565 did: &str,
566 banner: &atproto_record::lexicon::TypedBlob,
567 ) -> Result<()> {
568 let cid = &banner.inner.ref_.link;
569 let image_path = build_image_path(cid);
570
571 // Check if already exists
572 if self.content_storage.content_exists(&image_path).await? {
573 tracing::debug!(cid = %cid, "Banner already exists in storage");
574 return Ok(());
575 }
576
577 // Validate mime type
578 if banner.inner.mime_type != "image/png" && banner.inner.mime_type != "image/jpeg" {
579 tracing::debug!(
580 mime_type = %banner.inner.mime_type,
581 "Skipping banner with unsupported mime type"
582 );
583 return Ok(());
584 }
585
586 // Validate size (max 3MB)
587 if banner.inner.size > 3_000_000 {
588 tracing::debug!(
589 size = banner.inner.size,
590 "Skipping banner that exceeds max size"
591 );
592 return Ok(());
593 }
594
595 // Download the blob
596 let image_bytes = match get_blob(&self.http_client, pds, did, cid).await {
597 Ok(bytes) => bytes,
598 Err(e) => {
599 tracing::warn!(error = ?e, cid = %cid, "Failed to download banner blob");
600 return Ok(()); // Don't fail the whole operation
601 }
602 };
603
604 // Validate and process the image
605 let img = match image::load_from_memory(&image_bytes) {
606 Ok(img) => img,
607 Err(e) => {
608 tracing::warn!(error = ?e, cid = %cid, "Failed to load banner image");
609 return Ok(());
610 }
611 };
612
613 let (width, height) = img.dimensions();
614
615 // Validate 16:9 aspect ratio (allow 10% deviation)
616 let aspect_ratio = width as f32 / height as f32;
617 let expected_ratio = 16.0 / 9.0;
618 if (aspect_ratio - expected_ratio).abs() / expected_ratio > 0.10 {
619 tracing::debug!(
620 width,
621 height,
622 aspect_ratio,
623 "Skipping banner with non-16:9 aspect ratio"
624 );
625 return Ok(());
626 }
627
628 // Resize to standard size (1600x900)
629 let resized = if width != 1600 || height != 900 {
630 img.resize_exact(1600, 900, image::imageops::FilterType::Lanczos3)
631 } else {
632 img
633 };
634
635 // Convert to PNG with pre-allocated buffer
636 // 1600x900 PNG typically compresses to ~500KB-1.5MB, pre-allocate 1MB
637 let mut png_buffer = std::io::Cursor::new(Vec::with_capacity(1024 * 1024));
638 resized.write_to(&mut png_buffer, ImageFormat::Png)?;
639 let png_bytes = png_buffer.into_inner();
640
641 // Store in content storage
642 self.content_storage
643 .write_content(&image_path, &png_bytes)
644 .await?;
645
646 tracing::info!(cid = %cid, "Successfully downloaded and processed banner");
647 Ok(())
648 }
649
650 async fn download_media(&self, pds: &str, did: &str, event_image: &Media) -> Result<()> {
651 let content = &event_image.content;
652 let role = &event_image.role;
653 let aspect_ratio = &event_image.aspect_ratio;
654
655 if role != "header" {
656 return Ok(());
657 }
658
659 match content.mime_type.as_str() {
660 "image/png" | "image/jpeg" | "image/webp" => {}
661 _ => return Ok(()),
662 }
663
664 let (reported_height, reported_width) = aspect_ratio
665 .as_ref()
666 .map(|value| (value.height, value.width))
667 .unwrap_or_default();
668 if !(755..=12000).contains(&reported_height)
669 || !(reported_height..=12000).contains(&reported_width)
670 {
671 tracing::info!(
672 ?reported_height,
673 ?reported_width,
674 "aspect ratio check 1 failed"
675 );
676 return Ok(());
677 }
678 let is_16_9 =
679 (((reported_width as f64) / (reported_height as f64)) - (16.0 / 9.0)).abs() < 0.02;
680 if !is_16_9 {
681 tracing::info!("aspect ratio check 2 failed");
682 return Ok(());
683 }
684
685 // Access the CID from the TypedBlob structure
686 let blob_ref = &content.inner.ref_.link;
687
688 let image_path = build_image_path(blob_ref);
689 tracing::info!(?image_path, "image_path");
690
691 if self
692 .content_storage
693 .as_ref()
694 .content_exists(&image_path)
695 .await?
696 {
697 tracing::info!(?image_path, "content exists");
698 return Ok(());
699 }
700
701 let image_bytes = get_blob(&self.http_client, pds, did, blob_ref).await?;
702
703 const MAX_SIZE: usize = 3 * 1024 * 1024;
704 if image_bytes.len() > MAX_SIZE {
705 tracing::info!("max size failed");
706 return Ok(());
707 }
708
709 let img = image::load_from_memory(&image_bytes)?;
710
711 let format = image::guess_format(&image_bytes)?;
712
713 match format {
714 ImageFormat::Jpeg | ImageFormat::Png | ImageFormat::WebP => {
715 // Supported formats
716 }
717 _ => {
718 tracing::info!("supported formats failed");
719 return Ok(());
720 }
721 }
722
723 // Check it again for fun
724 let (actual_width, actual_height) = img.dimensions();
725 if actual_height == 0
726 || actual_height > 12000
727 || actual_width == 0
728 || actual_width > 12000
729 || actual_width <= actual_height
730 {
731 tracing::info!("aspect ratio check 3 failed");
732 return Ok(());
733 }
734 let is_really_16_9 =
735 (((actual_width as f64) / (actual_height as f64)) - (16.0 / 9.0)).abs() < 0.02;
736 if !is_really_16_9 {
737 tracing::info!("aspect ratio check 4 failed");
738 return Ok(());
739 }
740
741 // Resize to standard dimensions (1600x900) to match process_event_header()
742 let final_image = if actual_width != 1600 || actual_height != 900 {
743 img.resize_exact(1600, 900, image::imageops::FilterType::Lanczos3)
744 } else {
745 img
746 };
747
748 // Convert to PNG with pre-allocated buffer
749 // 1600x900 PNG typically compresses to ~500KB-1.5MB, pre-allocate 1MB
750 let mut png_buffer = std::io::Cursor::new(Vec::with_capacity(1024 * 1024));
751 final_image.write_to(&mut png_buffer, ImageFormat::Png)?;
752 let png_bytes = png_buffer.into_inner();
753
754 self.content_storage
755 .as_ref()
756 .write_content(&image_path, &png_bytes)
757 .await?;
758
759 tracing::info!("image written");
760 Ok(())
761 }
762}