The smokesignal.events web application
at main 837 lines 26 kB view raw
1use anyhow::{Context, Result, anyhow}; 2use atproto_attestation::verify_record; 3use atproto_client::com::atproto::repo::{ListRecordsParams, get_blob, get_record, list_records}; 4use atproto_identity::key::IdentityDocumentKeyResolver; 5use atproto_identity::model::Document; 6use atproto_identity::resolve::IdentityResolver; 7use atproto_identity::traits::DidDocumentStorage; 8use atproto_record::aturi::ATURI; 9use image::GenericImageView; 10use image::ImageFormat; 11use ordermap::OrderMap; 12use sqlx::PgPool; 13use std::str::FromStr; 14use std::sync::Arc; 15 16use crate::atproto::lexicon::acceptance::NSID as ACCEPTANCE_NSID; 17use crate::atproto::lexicon::acceptance::TypedAcceptance; 18use crate::atproto::lexicon::profile::{NSID as PROFILE_NSID, Profile}; 19use crate::storage::acceptance::{acceptance_record_upsert, rsvp_update_validated_at}; 20use crate::storage::content::ContentStorage; 21use crate::storage::denylist::denylist_exists; 22use crate::storage::event::{ 23 EventInsertParams, RsvpInsertParams, event_exists, event_insert_with_metadata, 24 rsvp_insert_with_metadata, 25}; 26use crate::storage::profile::profile_insert; 27use atproto_record::lexicon::community::lexicon::calendar::{ 28 event::{Event as CommunityEvent, Media, NSID as COMMUNITY_EVENT_NSID}, 29 rsvp::{NSID as COMMUNITY_RSVP_NSID, Rsvp as CommunityRsvp, RsvpStatus}, 30}; 31 32pub async fn import_event_records( 33 http_client: &reqwest::Client, 34 pool: &PgPool, 35 did: &str, 36 repository_endpoint: &str, 37) -> Result<ordermap::OrderMap<String, Option<String>>> { 38 const LIMIT: u32 = 100; 39 const MAX_PAGES: usize = 10; 40 41 let mut result_map: OrderMap<String, Option<String>> = OrderMap::new(); 42 let mut cursor: Option<String> = None; 43 44 for page in 0..MAX_PAGES { 45 tracing::info!("Fetching page {} of events", page + 1); 46 47 let list_params = ListRecordsParams { 48 limit: Some(LIMIT), 49 cursor: cursor.clone(), 50 reverse: None, 51 }; 52 53 let results = list_records::<CommunityEvent>( 54 http_client, 55 &atproto_client::client::Auth::None, 56 repository_endpoint, 57 did.to_string(), 58 COMMUNITY_EVENT_NSID.to_string(), 59 list_params, 60 ) 61 .await?; 62 63 let records_count = results.records.len(); 64 tracing::info!("Retrieved {} records on page {}", records_count, page + 1); 65 66 for event_record in results.records { 67 let name = event_record.value.name.clone(); 68 69 let error_status = match event_insert_with_metadata( 70 pool, 71 EventInsertParams { 72 aturi: &event_record.uri, 73 cid: &event_record.cid, 74 did, 75 lexicon: COMMUNITY_EVENT_NSID, 76 record: &event_record.value, 77 name: &name, 78 require_confirmed_email: false, 79 disable_direct_rsvp: false, 80 rsvp_redirect_url: None, 81 }, 82 ) 83 .await 84 { 85 Ok(_) => { 86 tracing::debug!("Successfully imported event: {}", event_record.uri); 87 None 88 } 89 Err(err) => { 90 tracing::error!( 91 ?err, 92 "Failed to import event: {} ({})", 93 event_record.uri, 94 name 95 ); 96 Some(err.to_string()) 97 } 98 }; 99 100 result_map.insert(event_record.uri, error_status); 101 } 102 103 if records_count < LIMIT as usize || results.cursor.is_none() { 104 tracing::info!("No more pages to fetch"); 105 break; 106 } 107 108 cursor = results.cursor; 109 } 110 111 Ok(result_map) 112} 113 114pub async fn import_rsvp_records( 115 http_client: &reqwest::Client, 116 pool: &PgPool, 117 did: &str, 118 repository_endpoint: &str, 119) -> Result<ordermap::OrderMap<String, Option<String>>> { 120 const LIMIT: u32 = 100; 121 const MAX_PAGES: usize = 10; 122 123 let mut result_map: OrderMap<String, Option<String>> = OrderMap::new(); 124 let mut cursor: Option<String> = None; 125 126 for page in 0..MAX_PAGES { 127 tracing::info!("Fetching page {} of RSVPs", page + 1); 128 129 let list_params = ListRecordsParams { 130 limit: Some(LIMIT), 131 cursor: cursor.clone(), 132 reverse: None, 133 }; 134 135 let results = list_records::<CommunityRsvp>( 136 http_client, 137 &atproto_client::client::Auth::None, 138 repository_endpoint, 139 did.to_string(), 140 COMMUNITY_RSVP_NSID.to_string(), 141 list_params, 142 ) 143 .await?; 144 145 let records_count = results.records.len(); 146 tracing::info!( 147 "Retrieved {} RSVP records on page {}", 148 records_count, 149 page + 1 150 ); 151 152 for rsvp_record in results.records { 153 let event_aturi = rsvp_record.value.subject.uri.clone(); 154 let event_cid = rsvp_record.value.subject.cid.clone(); 155 let status = match &rsvp_record.value.status { 156 RsvpStatus::Going => "going", 157 RsvpStatus::Interested => "interested", 158 RsvpStatus::NotGoing => "notgoing", 159 }; 160 161 let error_status = match rsvp_insert_with_metadata( 162 pool, 163 RsvpInsertParams { 164 aturi: &rsvp_record.uri, 165 cid: &rsvp_record.cid, 166 did, 167 lexicon: COMMUNITY_RSVP_NSID, 168 record: &rsvp_record.value, 169 event_aturi: &event_aturi, 170 event_cid: &event_cid, 171 status, 172 clear_validated_at: false, 173 }, 174 ) 175 .await 176 { 177 Ok(_) => { 178 tracing::debug!("Successfully imported RSVP: {}", rsvp_record.uri); 179 None 180 } 181 Err(err) => { 182 tracing::error!( 183 ?err, 184 "Failed to import RSVP: {} (event: {})", 185 rsvp_record.uri, 186 event_aturi 187 ); 188 Some(err.to_string()) 189 } 190 }; 191 192 result_map.insert(rsvp_record.uri, error_status); 193 } 194 195 if records_count < LIMIT as usize || results.cursor.is_none() { 196 tracing::info!("No more pages to fetch"); 197 break; 198 } 199 200 cursor = results.cursor; 201 } 202 203 Ok(result_map) 204} 205 206/// Import a record from an AT-URI, processing it the same way as the jetstream consumer. 207/// This includes resolving the identity, fetching the record from the PDS, and processing 208/// any blobs/content (images, avatars, banners, etc.). 209pub async fn import_from_aturi( 210 http_client: &reqwest::Client, 211 pool: &PgPool, 212 identity_resolver: &Arc<dyn IdentityResolver>, 213 document_storage: &Arc<dyn DidDocumentStorage>, 214 content_storage: &Arc<dyn ContentStorage>, 215 record_resolver: &Arc<crate::record_resolver::StorageBackedRecordResolver>, 216 aturi: &str, 217) -> Result<()> { 218 // Parse the AT-URI 219 let parsed = ATURI::from_str(aturi).with_context(|| format!("Invalid AT-URI: {}", aturi))?; 220 221 let did = &parsed.authority; 222 let collection = &parsed.collection; 223 let rkey = &parsed.record_key; 224 225 tracing::info!("Importing from AT-URI: {}", aturi); 226 tracing::info!(" DID: {}", did); 227 tracing::info!(" Collection: {}", collection); 228 tracing::info!(" RKey: {}", rkey); 229 230 // Resolve the identity to get the PDS endpoint 231 let document = ensure_identity_stored(identity_resolver, document_storage, did).await?; 232 let pds_endpoints = document.pds_endpoints(); 233 let pds_endpoint = pds_endpoints 234 .first() 235 .ok_or_else(|| anyhow!("No PDS endpoint found for DID: {}", did))?; 236 237 tracing::info!(" PDS Endpoint: {}", pds_endpoint); 238 239 // Fetch the record from the PDS 240 let response = get_record( 241 http_client, 242 &atproto_client::client::Auth::None, 243 pds_endpoint, 244 did, 245 collection, 246 rkey, 247 None, 248 ) 249 .await 250 .with_context(|| format!("Failed to fetch record from PDS: {}", pds_endpoint))?; 251 252 let (record, cid) = match response { 253 atproto_client::com::atproto::repo::GetRecordResponse::Record { value, cid, .. } => { 254 (value, cid) 255 } 256 atproto_client::com::atproto::repo::GetRecordResponse::Error(error) => { 257 return Err(anyhow!("Failed to fetch record: {}", error.error_message())); 258 } 259 }; 260 261 tracing::info!(" Record CID: {}", cid); 262 263 // Process the record based on its collection 264 match collection.as_str() { 265 COMMUNITY_EVENT_NSID => { 266 handle_event_import( 267 http_client, 268 pool, 269 content_storage, 270 did, 271 rkey, 272 &cid, 273 &record, 274 pds_endpoint, 275 ) 276 .await?; 277 } 278 COMMUNITY_RSVP_NSID => { 279 handle_rsvp_import(pool, record_resolver, did, rkey, &cid, &record).await?; 280 } 281 PROFILE_NSID => { 282 handle_profile_import( 283 http_client, 284 pool, 285 content_storage, 286 &document, 287 did, 288 rkey, 289 &cid, 290 &record, 291 pds_endpoint, 292 ) 293 .await?; 294 } 295 ACCEPTANCE_NSID => { 296 handle_acceptance_import(pool, did, rkey, &cid, &record).await?; 297 } 298 _ => { 299 tracing::warn!("Unsupported collection: {}", collection); 300 } 301 } 302 303 Ok(()) 304} 305 306async fn ensure_identity_stored( 307 identity_resolver: &Arc<dyn IdentityResolver>, 308 document_storage: &Arc<dyn DidDocumentStorage>, 309 did: &str, 310) -> Result<Document> { 311 // Check if we already have this identity 312 if let Some(document) = document_storage.get_document_by_did(did).await? { 313 return Ok(document); 314 } 315 316 let document = identity_resolver.resolve(did).await?; 317 document_storage.store_document(document.clone()).await?; 318 Ok(document) 319} 320 321#[allow(clippy::too_many_arguments)] 322async fn handle_event_import( 323 http_client: &reqwest::Client, 324 pool: &PgPool, 325 content_storage: &Arc<dyn ContentStorage>, 326 did: &str, 327 rkey: &str, 328 cid: &str, 329 record: &serde_json::Value, 330 pds_endpoint: &str, 331) -> Result<()> { 332 tracing::info!("Processing event: {} for {}", rkey, did); 333 334 let aturi = format!("at://{}/{}/{}", did, COMMUNITY_EVENT_NSID, rkey); 335 let event_record: CommunityEvent = serde_json::from_value(record.clone())?; 336 let name = event_record.name.clone(); 337 338 event_insert_with_metadata( 339 pool, 340 EventInsertParams { 341 aturi: &aturi, 342 cid, 343 did, 344 lexicon: COMMUNITY_EVENT_NSID, 345 record: &event_record, 346 name: &name, 347 require_confirmed_email: false, 348 disable_direct_rsvp: false, 349 rsvp_redirect_url: None, 350 }, 351 ) 352 .await?; 353 354 let all_media = event_record.media; 355 356 for media in &all_media { 357 if let Err(err) = 358 download_media(http_client, content_storage, pds_endpoint, did, media).await 359 { 360 tracing::error!(error = ?err, "failed processing image"); 361 } 362 } 363 364 tracing::info!("Successfully imported event: {}", aturi); 365 Ok(()) 366} 367 368async fn handle_rsvp_import( 369 pool: &PgPool, 370 record_resolver: &Arc<crate::record_resolver::StorageBackedRecordResolver>, 371 did: &str, 372 rkey: &str, 373 cid: &str, 374 record: &serde_json::Value, 375) -> Result<()> { 376 tracing::info!("Processing rsvp: {} for {}", rkey, did); 377 378 let aturi = format!("at://{}/{}/{}", did, COMMUNITY_RSVP_NSID, rkey); 379 let rsvp_record: CommunityRsvp = serde_json::from_value(record.clone())?; 380 381 let event_aturi = rsvp_record.subject.uri.clone(); 382 let event_cid = rsvp_record.subject.cid.clone(); 383 let status = match &rsvp_record.status { 384 RsvpStatus::Going => "going", 385 RsvpStatus::Interested => "interested", 386 RsvpStatus::NotGoing => "notgoing", 387 }; 388 389 // Check if the event exists 390 match event_exists(pool, &event_aturi).await { 391 Ok(true) => {} 392 _ => { 393 tracing::warn!("Event {} does not exist, skipping RSVP import", event_aturi); 394 return Ok(()); 395 } 396 }; 397 398 rsvp_insert_with_metadata( 399 pool, 400 RsvpInsertParams { 401 aturi: &aturi, 402 cid, 403 did, 404 lexicon: COMMUNITY_RSVP_NSID, 405 record: &rsvp_record, 406 event_aturi: &event_aturi, 407 event_cid: &event_cid, 408 status, 409 clear_validated_at: false, 410 }, 411 ) 412 .await?; 413 414 // Check if RSVP has signatures and verify them 415 if !rsvp_record.signatures.is_empty() { 416 tracing::info!( 417 "RSVP {} has {} signature(s), verifying...", 418 aturi, 419 rsvp_record.signatures.len() 420 ); 421 422 // Create a key resolver for signature verification 423 let identity_resolver = record_resolver.identity_resolver.clone(); 424 let key_resolver = IdentityDocumentKeyResolver::new(identity_resolver); 425 426 let validated = verify_record( 427 (&rsvp_record).into(), 428 did, 429 key_resolver, 430 record_resolver.as_ref(), 431 ) 432 .await 433 .is_ok(); 434 435 if validated { 436 if let Err(e) = rsvp_update_validated_at(pool, &aturi, Some(chrono::Utc::now())).await { 437 tracing::error!("Failed to update RSVP validated_at: {:?}", e); 438 } else { 439 tracing::info!("RSVP {} validated with signatures", aturi); 440 } 441 } else { 442 tracing::warn!("RSVP {} signature verification failed", aturi); 443 } 444 } 445 446 tracing::info!("Successfully imported RSVP: {}", aturi); 447 Ok(()) 448} 449 450#[allow(clippy::too_many_arguments)] 451async fn handle_profile_import( 452 http_client: &reqwest::Client, 453 pool: &PgPool, 454 content_storage: &Arc<dyn ContentStorage>, 455 document: &Document, 456 did: &str, 457 rkey: &str, 458 cid: &str, 459 record: &serde_json::Value, 460 pds_endpoint: &str, 461) -> Result<()> { 462 tracing::info!("Processing profile: {} for {}", rkey, did); 463 464 let aturi = format!("at://{}/{}/{}", did, PROFILE_NSID, rkey); 465 466 // Check denylist before proceeding 467 if denylist_exists(pool, &[did, &aturi]).await? { 468 tracing::info!("User {} is in denylist, skipping profile import", did); 469 return Ok(()); 470 } 471 472 let profile_record: Profile = serde_json::from_value(record.clone())?; 473 474 // Get handle for display_name fallback 475 let handle = document 476 .also_known_as 477 .first() 478 .and_then(|aka| aka.strip_prefix("at://")) 479 .unwrap_or(did); 480 481 // Use displayName from profile, or fallback to handle 482 let display_name = profile_record 483 .display_name 484 .as_ref() 485 .filter(|s| !s.trim().is_empty()) 486 .map(|s| s.as_str()) 487 .unwrap_or(handle); 488 489 profile_insert(pool, &aturi, cid, did, display_name, &profile_record).await?; 490 491 // Download avatar and banner blobs if present 492 if let Some(ref avatar) = profile_record.avatar 493 && let Err(e) = 494 download_avatar(http_client, content_storage, pds_endpoint, did, avatar).await 495 { 496 tracing::warn!( 497 error = ?e, 498 did = %did, 499 "Failed to download avatar for profile" 500 ); 501 } 502 503 if let Some(ref banner) = profile_record.banner 504 && let Err(e) = 505 download_banner(http_client, content_storage, pds_endpoint, did, banner).await 506 { 507 tracing::warn!( 508 error = ?e, 509 did = %did, 510 "Failed to download banner for profile" 511 ); 512 } 513 514 tracing::info!("Successfully imported profile: {}", aturi); 515 Ok(()) 516} 517 518async fn handle_acceptance_import( 519 pool: &PgPool, 520 did: &str, 521 rkey: &str, 522 cid: &str, 523 record: &serde_json::Value, 524) -> Result<()> { 525 tracing::info!("Processing acceptance: {} for {}", rkey, did); 526 527 let aturi = format!("at://{}/{}/{}", did, ACCEPTANCE_NSID, rkey); 528 529 // Deserialize and validate the acceptance record 530 let acceptance_record: TypedAcceptance = serde_json::from_value(record.clone())?; 531 532 // Validate the acceptance record 533 if let Err(e) = acceptance_record.validate() { 534 tracing::warn!("Invalid acceptance record: {}", e); 535 return Ok(()); 536 } 537 538 // Store the acceptance record 539 acceptance_record_upsert(pool, &aturi, cid, did, record).await?; 540 541 tracing::info!("Successfully imported acceptance: {}", aturi); 542 Ok(()) 543} 544 545/// Download and process avatar blob (1:1 aspect ratio, max 3MB) 546async fn download_avatar( 547 http_client: &reqwest::Client, 548 content_storage: &Arc<dyn ContentStorage>, 549 pds: &str, 550 did: &str, 551 avatar: &atproto_record::lexicon::TypedBlob, 552) -> Result<()> { 553 let cid = &avatar.inner.ref_.link; 554 let image_path = format!("{}.png", cid); 555 556 // Check if already exists 557 if content_storage.content_exists(&image_path).await? { 558 tracing::debug!(cid = %cid, "Avatar already exists in storage"); 559 return Ok(()); 560 } 561 562 // Validate mime type 563 if avatar.inner.mime_type != "image/png" && avatar.inner.mime_type != "image/jpeg" { 564 tracing::debug!( 565 mime_type = %avatar.inner.mime_type, 566 "Skipping avatar with unsupported mime type" 567 ); 568 return Ok(()); 569 } 570 571 // Validate size (max 3MB) 572 if avatar.inner.size > 3_000_000 { 573 tracing::debug!( 574 size = avatar.inner.size, 575 "Skipping avatar that exceeds max size" 576 ); 577 return Ok(()); 578 } 579 580 // Download the blob 581 let image_bytes = match get_blob(http_client, pds, did, cid).await { 582 Ok(bytes) => bytes, 583 Err(e) => { 584 tracing::warn!(error = ?e, cid = %cid, "Failed to download avatar blob"); 585 return Ok(()); // Don't fail the whole operation 586 } 587 }; 588 589 // Validate and process the image 590 let img = match image::load_from_memory(&image_bytes) { 591 Ok(img) => img, 592 Err(e) => { 593 tracing::warn!(error = ?e, cid = %cid, "Failed to load avatar image"); 594 return Ok(()); 595 } 596 }; 597 598 let (width, height) = img.dimensions(); 599 600 // Validate 1:1 aspect ratio (allow small deviation) 601 let aspect_ratio = width as f32 / height as f32; 602 if (aspect_ratio - 1.0).abs() > 0.05 { 603 tracing::debug!( 604 width, 605 height, 606 aspect_ratio, 607 "Skipping avatar with non-square aspect ratio" 608 ); 609 return Ok(()); 610 } 611 612 // Resize to standard size (400x400) 613 let resized = if width != 400 || height != 400 { 614 img.resize_exact(400, 400, image::imageops::FilterType::Lanczos3) 615 } else { 616 img 617 }; 618 619 // Convert to PNG 620 let mut png_buffer = std::io::Cursor::new(Vec::new()); 621 resized.write_to(&mut png_buffer, ImageFormat::Png)?; 622 let png_bytes = png_buffer.into_inner(); 623 624 // Store in content storage 625 content_storage 626 .write_content(&image_path, &png_bytes) 627 .await?; 628 629 tracing::info!(cid = %cid, "Successfully downloaded and processed avatar"); 630 Ok(()) 631} 632 633/// Download and process banner blob (16:9 aspect ratio, max 3MB) 634async fn download_banner( 635 http_client: &reqwest::Client, 636 content_storage: &Arc<dyn ContentStorage>, 637 pds: &str, 638 did: &str, 639 banner: &atproto_record::lexicon::TypedBlob, 640) -> Result<()> { 641 let cid = &banner.inner.ref_.link; 642 let image_path = format!("{}.png", cid); 643 644 // Check if already exists 645 if content_storage.content_exists(&image_path).await? { 646 tracing::debug!(cid = %cid, "Banner already exists in storage"); 647 return Ok(()); 648 } 649 650 // Validate mime type 651 if banner.inner.mime_type != "image/png" && banner.inner.mime_type != "image/jpeg" { 652 tracing::debug!( 653 mime_type = %banner.inner.mime_type, 654 "Skipping banner with unsupported mime type" 655 ); 656 return Ok(()); 657 } 658 659 // Validate size (max 3MB) 660 if banner.inner.size > 3_000_000 { 661 tracing::debug!( 662 size = banner.inner.size, 663 "Skipping banner that exceeds max size" 664 ); 665 return Ok(()); 666 } 667 668 // Download the blob 669 let image_bytes = match get_blob(http_client, pds, did, cid).await { 670 Ok(bytes) => bytes, 671 Err(e) => { 672 tracing::warn!(error = ?e, cid = %cid, "Failed to download banner blob"); 673 return Ok(()); // Don't fail the whole operation 674 } 675 }; 676 677 // Validate and process the image 678 let img = match image::load_from_memory(&image_bytes) { 679 Ok(img) => img, 680 Err(e) => { 681 tracing::warn!(error = ?e, cid = %cid, "Failed to load banner image"); 682 return Ok(()); 683 } 684 }; 685 686 let (width, height) = img.dimensions(); 687 688 // Validate 16:9 aspect ratio (allow 10% deviation) 689 let aspect_ratio = width as f32 / height as f32; 690 let expected_ratio = 16.0 / 9.0; 691 if (aspect_ratio - expected_ratio).abs() / expected_ratio > 0.10 { 692 tracing::debug!( 693 width, 694 height, 695 aspect_ratio, 696 "Skipping banner with non-16:9 aspect ratio" 697 ); 698 return Ok(()); 699 } 700 701 // Resize to standard size (1600x900) 702 let resized = if width != 1600 || height != 900 { 703 img.resize_exact(1600, 900, image::imageops::FilterType::Lanczos3) 704 } else { 705 img 706 }; 707 708 // Convert to PNG 709 let mut png_buffer = std::io::Cursor::new(Vec::new()); 710 resized.write_to(&mut png_buffer, ImageFormat::Png)?; 711 let png_bytes = png_buffer.into_inner(); 712 713 // Store in content storage 714 content_storage 715 .write_content(&image_path, &png_bytes) 716 .await?; 717 718 tracing::info!(cid = %cid, "Successfully downloaded and processed banner"); 719 Ok(()) 720} 721 722async fn download_media( 723 http_client: &reqwest::Client, 724 content_storage: &Arc<dyn ContentStorage>, 725 pds: &str, 726 did: &str, 727 event_image: &Media, 728) -> Result<()> { 729 let content = &event_image.content; 730 let role = &event_image.role; 731 let aspect_ratio = &event_image.aspect_ratio; 732 733 if role != "header" { 734 return Ok(()); 735 } 736 737 match content.mime_type.as_str() { 738 "image/png" | "image/jpeg" | "image/webp" => {} 739 _ => return Ok(()), 740 } 741 742 let (reported_height, reported_width) = aspect_ratio 743 .as_ref() 744 .map(|value| (value.height, value.width)) 745 .unwrap_or_default(); 746 if !(755..=12000).contains(&reported_height) 747 || !(reported_height..=12000).contains(&reported_width) 748 { 749 tracing::info!( 750 ?reported_height, 751 ?reported_width, 752 "aspect ratio check 1 failed" 753 ); 754 return Ok(()); 755 } 756 let is_16_9 = 757 (((reported_width as f64) / (reported_height as f64)) - (16.0 / 9.0)).abs() < 0.02; 758 if !is_16_9 { 759 tracing::info!("aspect ratio check 2 failed"); 760 return Ok(()); 761 } 762 763 // Access the CID from the TypedBlob structure 764 let blob_ref = &content.inner.ref_.link; 765 766 let image_path = format!("{}.png", blob_ref); 767 tracing::info!(?image_path, "image_path"); 768 769 if content_storage.as_ref().content_exists(&image_path).await? { 770 tracing::info!(?image_path, "content exists"); 771 return Ok(()); 772 } 773 774 let image_bytes = get_blob(http_client, pds, did, blob_ref).await?; 775 776 const MAX_SIZE: usize = 3 * 1024 * 1024; 777 if image_bytes.len() > MAX_SIZE { 778 tracing::info!("max size failed"); 779 return Ok(()); 780 } 781 782 let img = image::load_from_memory(&image_bytes)?; 783 784 let format = image::guess_format(&image_bytes)?; 785 786 match format { 787 ImageFormat::Jpeg | ImageFormat::Png | ImageFormat::WebP => { 788 // Supported formats 789 } 790 _ => { 791 tracing::info!("supported formats failed"); 792 return Ok(()); 793 } 794 } 795 796 // Check it again for fun 797 let (actual_width, actual_height) = img.dimensions(); 798 if actual_height == 0 799 || actual_height > 12000 800 || actual_width == 0 801 || actual_width > 12000 802 || actual_width <= actual_height 803 { 804 tracing::info!("aspect ratio check 3 failed"); 805 return Ok(()); 806 } 807 let is_really_16_9 = 808 (((actual_width as f64) / (actual_height as f64)) - (16.0 / 9.0)).abs() < 0.02; 809 if !is_really_16_9 { 810 tracing::info!("aspect ratio check 4 failed"); 811 return Ok(()); 812 } 813 814 let aspect_ratio = actual_width as f32 / actual_height as f32; 815 let new_height = 756; 816 let new_width = (new_height as f32 * aspect_ratio) as u32; 817 818 let final_image = { 819 if new_width != actual_width { 820 img.resize_exact(new_width, new_height, image::imageops::FilterType::Lanczos3) 821 } else { 822 img 823 } 824 }; 825 826 let mut png_buffer = std::io::Cursor::new(Vec::new()); 827 final_image.write_to(&mut png_buffer, ImageFormat::Png)?; 828 let png_bytes = png_buffer.into_inner(); 829 830 content_storage 831 .as_ref() 832 .write_content(&image_path, &png_bytes) 833 .await?; 834 835 tracing::info!("image written"); 836 Ok(()) 837}