···11+{
22+ "lexicon": 1,
33+ "id": "events.smokesignal.calendar.acceptance",
44+ "defs": {
55+ "main": {
66+ "type": "record",
77+ "description": "A cryptographic proof record that contains RSVP acceptance data.",
88+ "key": "tid",
99+ "record": {
1010+ "type": "object",
1111+ "required": [
1212+ "cid"
1313+ ],
1414+ "properties": {
1515+ "cid": {
1616+ "type": "string",
1717+ "format": "cid",
1818+ "description": "The CID (Content Identifier) of the rsvp that this proof validates."
1919+ }
2020+ }
2121+ }
2222+ }
2323+ }
2424+}
+28
migrations/20251105000000_rsvp_acceptance.sql
···11+-- Add validated_at column to RSVPs table
22+ALTER TABLE rsvps ADD COLUMN validated_at TIMESTAMP WITH TIME ZONE DEFAULT NULL;
33+44+-- Create acceptance_tickets table for storing RSVP acceptance tickets
55+CREATE TABLE acceptance_tickets (
66+ aturi VARCHAR(1024) PRIMARY KEY,
77+ did VARCHAR(256) NOT NULL,
88+ rsvp_did VARCHAR(256) NOT NULL,
99+ event_aturi VARCHAR(1024) NOT NULL,
1010+ record JSON NOT NULL,
1111+ created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
1212+);
1313+1414+CREATE INDEX idx_acceptance_tickets_did ON acceptance_tickets (did);
1515+CREATE INDEX idx_acceptance_tickets_rsvp_did ON acceptance_tickets (rsvp_did);
1616+CREATE INDEX idx_acceptance_tickets_event_aturi ON acceptance_tickets (event_aturi);
1717+1818+-- Create acceptance_records table for storing RSVP acceptance records
1919+CREATE TABLE acceptance_records (
2020+ aturi VARCHAR(1024) PRIMARY KEY,
2121+ cid VARCHAR(256) NOT NULL,
2222+ did VARCHAR(256) NOT NULL,
2323+ record JSON NOT NULL,
2424+ created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
2525+ updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
2626+);
2727+2828+CREATE INDEX idx_acceptance_records_did ON acceptance_records (did);
+30
src/atproto/lexicon/acceptance.rs
···11+use atproto_record::typed::{LexiconType, TypedLexicon};
22+use serde::{Deserialize, Serialize};
33+44+pub const NSID: &str = "events.smokesignal.calendar.acceptance";
55+66+/// RSVP acceptance proof record
77+#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
88+#[serde(rename_all = "camelCase")]
99+pub struct Acceptance {
1010+ /// The CID (Content Identifier) of the RSVP that this proof validates
1111+ pub cid: String,
1212+}
1313+1414+pub type TypedAcceptance = TypedLexicon<Acceptance>;
1515+1616+impl LexiconType for Acceptance {
1717+ fn lexicon_type() -> &'static str {
1818+ NSID
1919+ }
2020+}
2121+2222+impl Acceptance {
2323+ /// Validates the acceptance record
2424+ pub fn validate(&self) -> Result<(), String> {
2525+ if self.cid.trim().is_empty() {
2626+ return Err("CID cannot be empty".to_string());
2727+ }
2828+ Ok(())
2929+ }
3030+}
···4343 return Err("Display name must be 200 characters or less".to_string());
4444 }
45454646- if let Some(description) = &self.description {
4747- if description.len() > 5000 {
4848- return Err("Description must be 5000 characters or less".to_string());
4949- }
4646+ if let Some(description) = &self.description
4747+ && description.len() > 5000
4848+ {
4949+ return Err("Description must be 5000 characters or less".to_string());
5050 }
51515252 if let Some(profile_host) = &self.profile_host
···370370 while let Some(ch) = chars.next() {
371371 if ch == '<'
372372 && let Some(&next_ch) = chars.peek()
373373+ && (next_ch.is_ascii_alphabetic() || next_ch == '/' || next_ch == '!')
373374 {
374374- if next_ch.is_ascii_alphabetic() || next_ch == '/' || next_ch == '!' {
375375- return true;
376376- }
375375+ return true;
377376 }
378377 }
379378 false
+30-15
src/bin/smokesignal.rs
···11use anyhow::Result;
22-use atproto_identity::key::{identify_key, to_public};
22+use atproto_identity::key::{IdentityDocumentKeyResolver, identify_key, to_public};
33use atproto_identity::resolve::{
44 HickoryDnsResolver, InnerIdentityResolver, SharedIdentityResolver,
55};
···199199 };
200200201201 // Initialize email throttle (15 emails per 15 minutes using 5-minute windows)
202202- let email_throttle = Arc::new(smokesignal::throttle_redis::RedisRollingWindowThrottle::new(
203203- cache_pool.clone(),
204204- "throttle:email".to_string(),
205205- 300, // 5-minute windows
206206- 3, // Check 3 windows (15 minutes total)
207207- 15, // Max 15 emails
208208- ));
202202+ let email_throttle = Arc::new(
203203+ smokesignal::throttle_redis::RedisRollingWindowThrottle::new(
204204+ cache_pool.clone(),
205205+ "throttle:email".to_string(),
206206+ 300, // 5-minute windows
207207+ 3, // Check 3 windows (15 minutes total)
208208+ 15, // Max 15 emails
209209+ ),
210210+ );
209211210212 // Initialize emailer if SMTP is configured
211213 let emailer: Option<Arc<dyn smokesignal::emailer::Emailer>> =
···218220 email_throttle,
219221 ) {
220222 Ok(lettre_emailer) => {
221221- tracing::info!("Email notifications enabled with throttling (max 15 emails per 15 minutes)");
223223+ tracing::info!(
224224+ "Email notifications enabled with throttling (max 15 emails per 15 minutes)"
225225+ );
222226 Some(Arc::new(lettre_emailer))
223227 }
224228 Err(err) => {
···337341 (None, None)
338342 };
339343344344+ // Create resolvers for signature verification
345345+ let record_resolver = Arc::new(
346346+ smokesignal::record_resolver::StorageBackedRecordResolver::new(
347347+ http_client.clone(),
348348+ identity_resolver.clone(),
349349+ pool.clone(),
350350+ ),
351351+ );
352352+ let key_resolver = IdentityDocumentKeyResolver::new(identity_resolver.clone());
353353+340354 let content_fetcher = ContentFetcher::new(
341355 pool.clone(),
342356 content_storage.clone(),
343357 identity_resolver.clone(),
344358 document_storage.clone(),
345359 http_client.clone(),
360360+ record_resolver,
361361+ key_resolver,
346362 );
347363348364 let inner_token = token.clone();
···390406 }
391407392408 // Register search indexer handler if enabled
393393- if let Some(search_handler) = &search_indexer_handler {
394394- if let Err(err) = jetstream_consumer.register_handler(search_handler.clone()).await {
395395- tracing::error!("Failed to register search indexer handler: {}", err);
396396- inner_token.cancel();
397397- break;
398398- }
409409+ if let Some(search_handler) = &search_indexer_handler
410410+ && let Err(err) = jetstream_consumer.register_handler(search_handler.clone()).await {
411411+ tracing::error!("Failed to register search indexer handler: {}", err);
412412+ inner_token.cancel();
413413+ break;
399414 }
400415401416 tokio::select! {
+3-4
src/config.rs
···174174 };
175175176176 // Parse email secret key (required for email confirmation tokens)
177177- let email_secret_key: EmailSecretKey = require_env("EMAIL_SECRET_KEY")
178178- .and_then(|value| value.try_into())?;
177177+ let email_secret_key: EmailSecretKey =
178178+ require_env("EMAIL_SECRET_KEY").and_then(|value| value.try_into())?;
179179180180 // Parse facet limit configuration
181181 let facets_mentions_max = default_env("FACETS_MENTIONS_MAX", "5")
···313313 type Error = anyhow::Error;
314314 fn try_from(value: String) -> Result<Self, Self::Error> {
315315 // Decode hex string to bytes
316316- let decoded = hex::decode(&value)
317317- .map_err(|err| ConfigError::EmailSecretKeyDecodeFailed(err))?;
316316+ let decoded = hex::decode(&value).map_err(ConfigError::EmailSecretKeyDecodeFailed)?;
318317319318 // Require at least 32 bytes (256 bits) for security
320319 if decoded.len() < 32 {
+3-1
src/config_errors.rs
···162162 ///
163163 /// This error occurs when the decoded EMAIL_SECRET_KEY is less than
164164 /// 32 bytes (256 bits), which is the minimum required for security.
165165- #[error("error-smokesignal-config-22 EMAIL_SECRET_KEY must be at least 32 bytes, got {0} bytes")]
165165+ #[error(
166166+ "error-smokesignal-config-22 EMAIL_SECRET_KEY must be at least 32 bytes, got {0} bytes"
167167+ )]
166168 EmailSecretKeyTooShort(usize),
167169}
···11+use anyhow::{Result, anyhow};
22+33+use crate::{
44+ http::{
55+ context::WebContext,
66+ errors::{CommonError, WebError},
77+ utils::url_from_aturi,
88+ },
99+ storage::{
1010+ event::{Event, event_get},
1111+ identity_profile::handle_for_did,
1212+ },
1313+};
1414+1515+/// Verify that the current user is the organizer of the specified event.
1616+/// Returns the Event if authorized, or an error if not found or not authorized.
1717+pub(crate) async fn verify_event_organizer_authorization(
1818+ web_context: &WebContext,
1919+ event_aturi: &str,
2020+ organizer_did: &str,
2121+) -> Result<Event, WebError> {
2222+ // Get the event from storage
2323+ let event = event_get(&web_context.pool, event_aturi)
2424+ .await
2525+ .map_err(|e| anyhow!("Failed to get event: {}", e))?;
2626+2727+ // Verify the current user is the event organizer
2828+ if event.did != organizer_did {
2929+ return Err(CommonError::NotAuthorized.into());
3030+ }
3131+3232+ Ok(event)
3333+}
3434+3535+/// Send an email notification to the subject about their RSVP acceptance.
3636+/// This function never fails - errors are logged but the function always returns successfully.
3737+pub async fn send_acceptance_email_notification(
3838+ web_context: &WebContext,
3939+ subject_did: &str,
4040+ event_name: &str,
4141+ event_aturi: &str,
4242+) {
4343+ // Get the subject's profile for email notification
4444+ let subject_profile = match handle_for_did(&web_context.pool, subject_did).await {
4545+ Ok(profile) => profile,
4646+ Err(e) => {
4747+ tracing::warn!(
4848+ "Failed to get profile for DID {} to send acceptance notification: {:?}",
4949+ subject_did,
5050+ e
5151+ );
5252+ return;
5353+ }
5454+ };
5555+5656+ // Generate event URL
5757+ let event_url = match url_from_aturi(&web_context.config.external_base, event_aturi) {
5858+ Ok(url) => url,
5959+ Err(e) => {
6060+ tracing::error!(
6161+ "Failed to generate event URL from AT-URI {}: {:?}",
6262+ event_aturi,
6363+ e
6464+ );
6565+ return;
6666+ }
6767+ };
6868+6969+ // Send email notification if email is available
7070+ if let Some(email) = &subject_profile.email
7171+ && let Some(emailer) = &web_context.emailer
7272+ {
7373+ if let Err(e) = emailer
7474+ .notify_rsvp_accepted(email, subject_did, event_name, &event_url)
7575+ .await
7676+ {
7777+ tracing::error!("Failed to send RSVP accepted email to {}: {:?}", email, e);
7878+ } else {
7979+ tracing::info!(
8080+ "Sent RSVP acceptance notification to {} for event {}",
8181+ email,
8282+ event_name
8383+ );
8484+ }
8585+ }
8686+}
8787+8888+/// Format an error message as an HTML notification for HTMX responses.
8989+pub fn format_error_html(title: &str, message: &str, details: Option<&str>) -> String {
9090+ if let Some(details) = details {
9191+ format!(
9292+ r#"<div class="notification is-danger">
9393+ <p><strong>Error!</strong> {}</p>
9494+ <p>{}</p>
9595+ <p class="is-size-7 mt-2">Details: {}</p>
9696+ </div>"#,
9797+ title, message, details
9898+ )
9999+ } else {
100100+ format!(
101101+ r#"<div class="notification is-danger">
102102+ <p><strong>Error!</strong> {}</p>
103103+ <p>{}</p>
104104+ </div>"#,
105105+ title, message
106106+ )
107107+ }
108108+}
109109+110110+/// Format a success message as an HTML notification for HTMX responses.
111111+pub fn format_success_html(
112112+ title: &str,
113113+ message: &str,
114114+ additional_info: Option<Vec<String>>,
115115+) -> String {
116116+ let mut html = format!(
117117+ r#"<div class="notification is-success">
118118+ <p><strong>Success!</strong> {}</p>
119119+ <p>{}</p>"#,
120120+ title, message
121121+ );
122122+123123+ if let Some(info) = additional_info {
124124+ for line in info {
125125+ html.push_str(&format!("\n <p>{}</p>", line));
126126+ }
127127+ }
128128+129129+ html.push_str("\n </div>");
130130+ html
131131+}
+214
src/http/handle_accept_rsvp.rs
···11+use anyhow::Result;
22+use atproto_attestation::cid::create_attestation_cid;
33+use atproto_client::com::atproto::repo::{PutRecordRequest, PutRecordResponse, put_record};
44+use atproto_record::{
55+ lexicon::community::lexicon::calendar::rsvp::{Rsvp, TypedRsvp},
66+ tid::Tid,
77+ typed::TypedLexicon,
88+};
99+use axum::{extract::State, response::IntoResponse};
1010+use axum_extra::extract::{Cached, Form};
1111+use http::StatusCode;
1212+use serde::Deserialize;
1313+1414+use crate::{
1515+ atproto::{
1616+ auth::{create_dpop_auth_from_aip_session, create_dpop_auth_from_oauth_session},
1717+ lexicon::acceptance::{Acceptance, NSID as ACCEPTANCE_NSID, TypedAcceptance},
1818+ },
1919+ config::OAuthBackendConfig,
2020+ http::{
2121+ acceptance_utils::{
2222+ format_error_html, format_success_html, send_acceptance_email_notification,
2323+ verify_event_organizer_authorization,
2424+ },
2525+ context::WebContext,
2626+ errors::{CommonError, WebError},
2727+ middleware_auth::Auth,
2828+ middleware_i18n::Language,
2929+ },
3030+ storage::{acceptance::acceptance_ticket_upsert, event::rsvp_get},
3131+};
3232+3333+#[derive(Debug, Deserialize)]
3434+pub struct AcceptRsvpForm {
3535+ /// The AT-URI of the RSVP to accept
3636+ rsvp_aturi: String,
3737+}
3838+3939+pub(crate) async fn handle_accept_rsvp(
4040+ State(web_context): State<WebContext>,
4141+ Language(_language): Language,
4242+ Cached(auth): Cached<Auth>,
4343+ Form(form): Form<AcceptRsvpForm>,
4444+) -> Result<impl IntoResponse, WebError> {
4545+ let current_handle = auth.require("/accept_rsvp")?;
4646+4747+ // Get the RSVP from storage
4848+ let rsvp = match rsvp_get(&web_context.pool, &form.rsvp_aturi).await {
4949+ Ok(Some(rsvp)) => rsvp,
5050+ Ok(None) => {
5151+ return Ok((
5252+ StatusCode::NOT_FOUND,
5353+ format_error_html(
5454+ "RSVP not found",
5555+ &format!("No RSVP found with URI: {}", form.rsvp_aturi),
5656+ None,
5757+ ),
5858+ )
5959+ .into_response());
6060+ }
6161+ Err(e) => {
6262+ return Ok((
6363+ StatusCode::INTERNAL_SERVER_ERROR,
6464+ format_error_html(
6565+ "Failed to retrieve RSVP",
6666+ "Could not fetch the RSVP from the database.",
6767+ Some(&e.to_string()),
6868+ ),
6969+ )
7070+ .into_response());
7171+ }
7272+ };
7373+7474+ let rsvp_record = serde_json::from_value::<Rsvp>(rsvp.record.0)
7575+ .map_err(|_| anyhow::anyhow!("unable to deserialize rsvp record"))?;
7676+7777+ let typed_rsvp: TypedRsvp = TypedLexicon::new(rsvp_record);
7878+7979+ // Verify the current user is the event organizer
8080+ let event = match verify_event_organizer_authorization(
8181+ &web_context,
8282+ &rsvp.event_aturi,
8383+ ¤t_handle.did,
8484+ )
8585+ .await
8686+ {
8787+ Ok(event) => event,
8888+ Err(e) => {
8989+ return Ok((
9090+ StatusCode::FORBIDDEN,
9191+ format_error_html(
9292+ "Not authorized",
9393+ "You must be the event organizer to accept RSVPs.",
9494+ Some(&e.to_string()),
9595+ ),
9696+ )
9797+ .into_response());
9898+ }
9999+ };
100100+101101+ let content_cid = create_attestation_cid(
102102+ typed_rsvp.into(),
103103+ serde_json::json!({ "$type": ACCEPTANCE_NSID }).into(),
104104+ ¤t_handle.did,
105105+ )
106106+ .map_err(|e| anyhow::anyhow!("Failed to create remote attestation proof: {}", e))?;
107107+108108+ // Create the acceptance record
109109+ let acceptance = Acceptance {
110110+ cid: content_cid.to_string(),
111111+ };
112112+113113+ let record_key = Tid::new().to_string();
114114+115115+ // Create DPoP auth based on OAuth backend type
116116+ let dpop_auth = match (&auth, &web_context.config.oauth_backend) {
117117+ (Auth::Pds { session, .. }, OAuthBackendConfig::ATProtocol { .. }) => {
118118+ create_dpop_auth_from_oauth_session(session)?
119119+ }
120120+ (Auth::Aip { access_token, .. }, OAuthBackendConfig::AIP { hostname, .. }) => {
121121+ create_dpop_auth_from_aip_session(&web_context.http_client, hostname, access_token)
122122+ .await?
123123+ }
124124+ _ => return Err(CommonError::NotAuthorized.into()),
125125+ };
126126+127127+ let typed_acceptance = TypedAcceptance::new(acceptance.clone());
128128+129129+ // Write the acceptance record to the current identity's PDS
130130+ let put_request = PutRecordRequest {
131131+ repo: current_handle.did.clone(),
132132+ collection: ACCEPTANCE_NSID.to_string(),
133133+ validate: false,
134134+ record_key: record_key.clone(),
135135+ record: typed_acceptance.clone(),
136136+ swap_commit: None,
137137+ swap_record: None,
138138+ };
139139+140140+ let put_result = put_record(
141141+ &web_context.http_client,
142142+ &atproto_client::client::Auth::DPoP(dpop_auth),
143143+ ¤t_handle.pds,
144144+ put_request,
145145+ )
146146+ .await;
147147+148148+ let published_acceptance = match put_result {
149149+ Ok(PutRecordResponse::StrongRef { uri, cid, .. }) => {
150150+ atproto_record::lexicon::com::atproto::repo::StrongRef { uri, cid }
151151+ }
152152+ Ok(PutRecordResponse::Error(err)) => {
153153+ return Ok((
154154+ StatusCode::BAD_REQUEST,
155155+ format_error_html(
156156+ "Failed to create acceptance record",
157157+ "The AT Protocol server rejected the acceptance record creation.",
158158+ Some(&err.error_message()),
159159+ ),
160160+ )
161161+ .into_response());
162162+ }
163163+ Err(err) => {
164164+ return Ok((
165165+ StatusCode::INTERNAL_SERVER_ERROR,
166166+ format_error_html(
167167+ "Failed to publish acceptance",
168168+ "Could not publish the acceptance record to the AT Protocol network.",
169169+ Some(&err.to_string()),
170170+ ),
171171+ )
172172+ .into_response());
173173+ }
174174+ };
175175+176176+ // Store the acceptance ticket using individual fields
177177+ if let Err(e) = acceptance_ticket_upsert(
178178+ &web_context.pool,
179179+ &published_acceptance.uri,
180180+ ¤t_handle.did,
181181+ &rsvp.did,
182182+ &event.aturi,
183183+ &acceptance,
184184+ )
185185+ .await
186186+ {
187187+ return Ok((
188188+ StatusCode::INTERNAL_SERVER_ERROR,
189189+ format_error_html(
190190+ "Failed to store acceptance",
191191+ "Could not save the acceptance ticket to the database.",
192192+ Some(&e.to_string()),
193193+ ),
194194+ )
195195+ .into_response());
196196+ }
197197+198198+ // Send email notification to the RSVP creator (async, never fails)
199199+ send_acceptance_email_notification(&web_context, &rsvp.did, &event.name, &event.aturi).await;
200200+201201+ // Return success with HTMX-compatible HTML
202202+ Ok((
203203+ StatusCode::OK,
204204+ format_success_html(
205205+ "RSVP accepted successfully",
206206+ &format!("The RSVP from {} has been accepted.", rsvp.did),
207207+ Some(vec![
208208+ format!("Acceptance record published: {}", published_acceptance.uri),
209209+ "The user has been notified via email.".to_string(),
210210+ ]),
211211+ ),
212212+ )
213213+ .into_response())
214214+}
+23-24
src/http/handle_create_event.rs
···343343 }
344344345345 // Send webhooks if enabled
346346- if web_context.config.enable_webhooks {
347347- if let Some(webhook_sender) = &web_context.webhook_sender {
348348- // Get all enabled webhooks for the user
349349- if let Ok(webhooks) =
350350- webhook_list_enabled_by_did(&web_context.pool, ¤t_handle.did)
351351- .await
352352- {
353353- // Prepare context with email if shared
354354- let context = json!({});
346346+ if web_context.config.enable_webhooks
347347+ && let Some(webhook_sender) = &web_context.webhook_sender
348348+ {
349349+ // Get all enabled webhooks for the user
350350+ if let Ok(webhooks) =
351351+ webhook_list_enabled_by_did(&web_context.pool, ¤t_handle.did).await
352352+ {
353353+ // Prepare context with email if shared
354354+ let context = json!({});
355355356356- let record_json = json!({
357357- "uri": &create_record_response.uri,
358358- "cit": &create_record_response.cid,
359359- });
356356+ let record_json = json!({
357357+ "uri": &create_record_response.uri,
358358+ "cit": &create_record_response.cid,
359359+ });
360360361361- // Send webhook for each enabled webhook
362362- for webhook in webhooks {
363363- let _ = webhook_sender
364364- .send(TaskWork::EventCreated {
365365- identity: current_handle.did.clone(),
366366- service: webhook.service,
367367- record: record_json.clone(),
368368- context: context.clone(),
369369- })
370370- .await;
371371- }
361361+ // Send webhook for each enabled webhook
362362+ for webhook in webhooks {
363363+ let _ = webhook_sender
364364+ .send(TaskWork::EventCreated {
365365+ identity: current_handle.did.clone(),
366366+ service: webhook.service,
367367+ record: record_json.clone(),
368368+ context: context.clone(),
369369+ })
370370+ .await;
372371 }
373372 }
374373 }
+143-146
src/http/handle_create_rsvp.rs
···232232 }
233233234234 // Send email notification if RSVP is "going" and emailer is enabled
235235- if is_going_rsvp {
236236- if let Some(ref emailer) = web_context.emailer {
237237- // Extract event creator DID from event aturi
238238- if let Ok(event_aturi_parsed) =
239239- ATURI::from_str(build_rsvp_form.subject_aturi.as_ref().unwrap())
240240- {
241241- let event_creator_did = event_aturi_parsed.authority;
235235+ if is_going_rsvp && let Some(ref emailer) = web_context.emailer {
236236+ // Extract event creator DID from event aturi
237237+ if let Ok(event_aturi_parsed) =
238238+ ATURI::from_str(build_rsvp_form.subject_aturi.as_ref().unwrap())
239239+ {
240240+ let event_creator_did = event_aturi_parsed.authority;
241241+242242+ // Get event URL (used by both notifications)
243243+ let event_url = url_from_aturi(
244244+ &web_context.config.external_base,
245245+ build_rsvp_form.subject_aturi.as_ref().unwrap(),
246246+ )
247247+ .unwrap_or_else(|_| {
248248+ format!("https://{}/event", web_context.config.external_base)
249249+ });
242250243243- // Get event URL (used by both notifications)
244244- let event_url = url_from_aturi(
245245- &web_context.config.external_base,
246246- build_rsvp_form.subject_aturi.as_ref().unwrap(),
247247- )
248248- .unwrap_or_else(|_| {
249249- format!("https://{}/event", web_context.config.external_base)
250250- });
251251+ // Get full event details from database (used by both notifications)
252252+ let event_result = crate::storage::event::event_get(
253253+ &web_context.pool,
254254+ build_rsvp_form.subject_aturi.as_ref().unwrap(),
255255+ )
256256+ .await;
251257252252- // Get full event details from database (used by both notifications)
253253- let event_result = crate::storage::event::event_get(
258258+ // Send notification to event creator
259259+ if let Ok(event_creator_profile) =
260260+ crate::storage::identity_profile::handle_for_did(
254261 &web_context.pool,
255255- build_rsvp_form.subject_aturi.as_ref().unwrap(),
262262+ &event_creator_did,
256263 )
257257- .await;
264264+ .await
265265+ && let Some(creator_email) = &event_creator_profile.email
266266+ && !creator_email.is_empty()
267267+ {
268268+ let event_name = if let Ok(ref event) = event_result {
269269+ if event.name.is_empty() {
270270+ "Untitled Event"
271271+ } else {
272272+ &event.name
273273+ }
274274+ } else {
275275+ "Untitled Event"
276276+ };
258277259259- // Send notification to event creator
260260- if let Ok(event_creator_profile) =
261261- crate::storage::identity_profile::handle_for_did(
262262- &web_context.pool,
278278+ if let Err(err) = emailer
279279+ .notify_rsvp_going(
280280+ creator_email,
263281 &event_creator_did,
282282+ ¤t_handle.did,
283283+ ¤t_handle.handle,
284284+ event_name,
285285+ &event_url,
264286 )
265287 .await
266288 {
267267- if let Some(creator_email) = &event_creator_profile.email {
268268- if !creator_email.is_empty() {
269269- let event_name = if let Ok(ref event) = event_result {
270270- if event.name.is_empty() {
271271- "Untitled Event"
272272- } else {
273273- &event.name
274274- }
275275- } else {
276276- "Untitled Event"
277277- };
278278-279279- if let Err(err) = emailer
280280- .notify_rsvp_going(
281281- creator_email,
282282- &event_creator_did,
283283- ¤t_handle.did,
284284- ¤t_handle.handle,
285285- event_name,
286286- &event_url,
287287- )
288288- .await
289289- {
290290- tracing::warn!(
291291- ?err,
292292- "Failed to send RSVP notification email to event creator"
293293- );
294294- }
295295- }
296296- }
289289+ tracing::warn!(
290290+ ?err,
291291+ "Failed to send RSVP notification email to event creator"
292292+ );
297293 }
294294+ }
298295299299- // Send event summary to RSVPer (current user)
300300- if let Ok(event) = event_result {
301301- // Check if user has a confirmed email
302302- if let Ok(Some(rsvper_email)) =
303303- crate::storage::notification::notification_get_confirmed_email(
304304- &web_context.pool,
305305- ¤t_handle.did,
306306- )
307307- .await
308308- {
309309- // Extract event details from the record
310310- let event_details =
311311- crate::storage::event::extract_event_details(&event);
296296+ // Send event summary to RSVPer (current user)
297297+ if let Ok(event) = event_result {
298298+ // Check if user has a confirmed email
299299+ if let Ok(Some(rsvper_email)) =
300300+ crate::storage::notification::notification_get_confirmed_email(
301301+ &web_context.pool,
302302+ ¤t_handle.did,
303303+ )
304304+ .await
305305+ {
306306+ // Extract event details from the record
307307+ let event_details =
308308+ crate::storage::event::extract_event_details(&event);
312309313313- // Only send if event has start time
314314- if let Some(starts_at) = event_details.starts_at {
315315- // Extract location string from locations array
316316- let location_opt = event_details
310310+ // Only send if event has start time
311311+ if let Some(starts_at) = event_details.starts_at {
312312+ // Extract location string from locations array
313313+ let location_opt = event_details
317314 .locations
318315 .iter()
319316 .filter_map(|loc| {
···325322 })
326323 .next(); // Take the first address found
327324328328- // Generate ICS file
329329- let ics_content = crate::ics_helpers::generate_event_ics(
330330- &event.aturi,
331331- &event_details.name,
332332- if event_details.description.is_empty() {
333333- None
334334- } else {
335335- Some(event_details.description.as_ref())
336336- },
337337- location_opt.as_deref(),
338338- starts_at,
339339- event_details.ends_at,
340340- Some(&event_url),
341341- );
325325+ // Generate ICS file
326326+ let ics_content = crate::ics_helpers::generate_event_ics(
327327+ &event.aturi,
328328+ &event_details.name,
329329+ if event_details.description.is_empty() {
330330+ None
331331+ } else {
332332+ Some(event_details.description.as_ref())
333333+ },
334334+ location_opt.as_deref(),
335335+ starts_at,
336336+ event_details.ends_at,
337337+ Some(&event_url),
338338+ );
342339343343- if let Ok(ics_string) = ics_content {
344344- let ics_bytes = ics_string.into_bytes();
340340+ if let Ok(ics_string) = ics_content {
341341+ let ics_bytes = ics_string.into_bytes();
345342346346- // Format times for email display
347347- let event_start_time =
348348- starts_at.format("%B %e, %Y at %l:%M %p %Z").to_string();
349349- let event_end_time = event_details
350350- .ends_at
351351- .map(|t| t.format("%B %e, %Y at %l:%M %p %Z").to_string());
343343+ // Format times for email display
344344+ let event_start_time = starts_at
345345+ .format("%B %e, %Y at %l:%M %p %Z")
346346+ .to_string();
347347+ let event_end_time = event_details.ends_at.map(|t| {
348348+ t.format("%B %e, %Y at %l:%M %p %Z").to_string()
349349+ });
352350353353- // Send event summary email with ICS attachment
354354- if let Err(err) = emailer
355355- .send_event_summary(
356356- &rsvper_email,
357357- ¤t_handle.did,
358358- &event_details.name,
359359- if event_details.description.is_empty() {
360360- None
361361- } else {
362362- Some(event_details.description.as_ref())
363363- },
364364- location_opt.as_deref(),
365365- &event_start_time,
366366- event_end_time.as_deref(),
367367- &event_url,
368368- ics_bytes,
369369- )
370370- .await
371371- {
372372- tracing::warn!(
373373- ?err,
374374- "Failed to send event summary email to RSVPer"
375375- );
376376- }
377377- } else {
351351+ // Send event summary email with ICS attachment
352352+ if let Err(err) = emailer
353353+ .send_event_summary(
354354+ &rsvper_email,
355355+ ¤t_handle.did,
356356+ &event_details.name,
357357+ if event_details.description.is_empty() {
358358+ None
359359+ } else {
360360+ Some(event_details.description.as_ref())
361361+ },
362362+ location_opt.as_deref(),
363363+ &event_start_time,
364364+ event_end_time.as_deref(),
365365+ &event_url,
366366+ ics_bytes,
367367+ )
368368+ .await
369369+ {
378370 tracing::warn!(
379379- "Failed to generate ICS file for event summary"
371371+ ?err,
372372+ "Failed to send event summary email to RSVPer"
380373 );
381374 }
375375+ } else {
376376+ tracing::warn!(
377377+ "Failed to generate ICS file for event summary"
378378+ );
382379 }
383380 }
384381 }
···387384 }
388385389386 // Send webhooks if enabled
390390- if web_context.config.enable_webhooks {
391391- if let Some(webhook_sender) = &web_context.webhook_sender {
392392- let webhook_identity =
393393- ATURI::from_str(build_rsvp_form.subject_aturi.as_ref().unwrap())
394394- .map(|value| value.authority)
395395- .unwrap_or_default();
387387+ if web_context.config.enable_webhooks
388388+ && let Some(webhook_sender) = &web_context.webhook_sender
389389+ {
390390+ let webhook_identity =
391391+ ATURI::from_str(build_rsvp_form.subject_aturi.as_ref().unwrap())
392392+ .map(|value| value.authority)
393393+ .unwrap_or_default();
396394397397- // Get all enabled webhooks for the user
398398- if let Ok(webhooks) =
399399- webhook_list_enabled_by_did(&web_context.pool, &webhook_identity).await
400400- {
401401- // Prepare context (empty - email sharing removed)
402402- let context = serde_json::json!({});
395395+ // Get all enabled webhooks for the user
396396+ if let Ok(webhooks) =
397397+ webhook_list_enabled_by_did(&web_context.pool, &webhook_identity).await
398398+ {
399399+ // Prepare context (empty - email sharing removed)
400400+ let context = serde_json::json!({});
403401404404- // Convert the RSVP record to JSON
405405- let record_json = serde_json::json!({
406406- "uri": &create_record_result.uri,
407407- "cit": &create_record_result.cid,
408408- });
402402+ // Convert the RSVP record to JSON
403403+ let record_json = serde_json::json!({
404404+ "uri": &create_record_result.uri,
405405+ "cit": &create_record_result.cid,
406406+ });
409407410410- // Send webhook for each enabled webhook
411411- for webhook in webhooks {
412412- let _ = webhook_sender
413413- .send(TaskWork::RSVPCreated {
414414- identity: current_handle.did.clone(),
415415- service: webhook.service,
416416- record: record_json.clone(),
417417- context: context.clone(),
418418- })
419419- .await;
420420- }
408408+ // Send webhook for each enabled webhook
409409+ for webhook in webhooks {
410410+ let _ = webhook_sender
411411+ .send(TaskWork::RSVPCreated {
412412+ identity: current_handle.did.clone(),
413413+ service: webhook.service,
414414+ record: record_json.clone(),
415415+ context: context.clone(),
416416+ })
417417+ .await;
421418 }
422419 }
423420 }
+68-48
src/http/handle_edit_event.rs
···2525 http::utils::url_from_aturi,
2626 select_template,
2727 storage::{
2828- event::{event_get, event_update_with_metadata},
2828+ event::{event_get, event_update_with_metadata, get_event_rsvps_for_export},
2929 identity_profile::{handle_for_did, handle_for_handle},
3030 },
3131};
···105105106106 let event = event.unwrap();
107107108108+ // Fetch RSVPs for the event
109109+ let mut rsvps = get_event_rsvps_for_export(&ctx.web_context.pool, &lookup_aturi)
110110+ .await
111111+ .unwrap_or_else(|err| {
112112+ tracing::warn!(?err, "Failed to fetch RSVPs for event");
113113+ Vec::new()
114114+ });
115115+116116+ // Sort RSVPs alphabetically by handle (None values go at the end)
117117+ rsvps.sort_by(|a, b| match (&a.handle, &b.handle) {
118118+ (Some(h_a), Some(h_b)) => h_a.to_lowercase().cmp(&h_b.to_lowercase()),
119119+ (Some(_), None) => std::cmp::Ordering::Less,
120120+ (None, Some(_)) => std::cmp::Ordering::Greater,
121121+ (None, None) => a.did.cmp(&b.did),
122122+ });
123123+108124 // Check if this is a community calendar event (we only support editing those)
109125 if event.lexicon != LexiconCommunityEventNSID {
110126 return contextual_error!(
···353369 location_form,
354370 event_rkey,
355371 handle_slug,
372372+ event,
373373+ rsvps,
356374 timezones,
357375 is_development,
358376 locations_editable,
···635653 }
636654637655 // Send email notifications to RSVP holders if checkbox is checked and emailer is enabled
638638- if build_event_form.send_notifications.unwrap_or(false) {
639639- if let Some(ref emailer) = ctx.web_context.emailer {
640640- // Get all "going" RSVPs for this event
641641- if let Ok(rsvps) = crate::storage::event::get_event_rsvps(
642642- &ctx.web_context.pool,
643643- &lookup_aturi,
644644- Some("going"),
645645- )
646646- .await
647647- {
648648- let event_url =
649649- url_from_aturi(&ctx.web_context.config.external_base, &lookup_aturi)
650650- .unwrap_or_else(|_| {
651651- format!(
652652- "https://{}/event",
653653- ctx.web_context.config.external_base
654654- )
655655- });
656656+ if build_event_form.send_notifications.unwrap_or(false)
657657+ && let Some(ref emailer) = ctx.web_context.emailer
658658+ {
659659+ // Get all "going" RSVPs for this event
660660+ if let Ok(rsvps) = crate::storage::event::get_event_rsvps(
661661+ &ctx.web_context.pool,
662662+ &lookup_aturi,
663663+ Some("going"),
664664+ )
665665+ .await
666666+ {
667667+ let event_url =
668668+ url_from_aturi(&ctx.web_context.config.external_base, &lookup_aturi)
669669+ .unwrap_or_else(|_| {
670670+ format!(
671671+ "https://{}/event",
672672+ ctx.web_context.config.external_base
673673+ )
674674+ });
656675657657- // Get the event updater's DID from current_handle
658658- let updater_did = ctx
659659- .current_handle
660660- .as_ref()
661661- .map(|h| h.did.as_str())
662662- .unwrap_or("");
676676+ // Get the event updater's DID from current_handle
677677+ let updater_did = ctx
678678+ .current_handle
679679+ .as_ref()
680680+ .map(|h| h.did.as_str())
681681+ .unwrap_or("");
663682664664- // Send notification to each RSVP holder with "going" status
665665- for (rsvp_did, _status) in rsvps {
666666- // Get the RSVP holder's profile (for email)
667667- if let Ok(rsvp_holder_profile) =
668668- crate::storage::identity_profile::handle_for_did(
669669- &ctx.web_context.pool,
683683+ // Send notification to each RSVP holder with "going" status
684684+ for (rsvp_did, _status) in rsvps {
685685+ // Get the RSVP holder's profile (for email)
686686+ if let Ok(rsvp_holder_profile) =
687687+ crate::storage::identity_profile::handle_for_did(
688688+ &ctx.web_context.pool,
689689+ &rsvp_did,
690690+ )
691691+ .await
692692+ && let Some(rsvp_holder_email) = &rsvp_holder_profile.email
693693+ && !rsvp_holder_email.is_empty()
694694+ {
695695+ // Send notification
696696+ if let Err(err) = emailer
697697+ .notify_event_changed(
698698+ rsvp_holder_email,
670699 &rsvp_did,
700700+ updater_did,
701701+ name,
702702+ &event_url,
671703 )
672704 .await
673705 {
674674- if let Some(rsvp_holder_email) = &rsvp_holder_profile.email {
675675- if !rsvp_holder_email.is_empty() {
676676- // Send notification
677677- if let Err(err) = emailer
678678- .notify_event_changed(
679679- rsvp_holder_email,
680680- &rsvp_did,
681681- updater_did,
682682- &name,
683683- &event_url,
684684- )
685685- .await
686686- {
687687- tracing::warn!(?err, rsvp_holder_did = ?rsvp_did, "Failed to send event change notification email");
688688- }
689689- }
690690- }
706706+ tracing::warn!(?err, rsvp_holder_did = ?rsvp_did, "Failed to send event change notification email");
691707 }
692708 }
693709 }
···710726 event_url,
711727 event_rkey,
712728 handle_slug,
729729+ event,
730730+ rsvps,
713731 timezones,
714732 is_development,
715733 locations_editable,
···734752 location_form,
735753 event_rkey,
736754 handle_slug,
755755+ event,
756756+ rsvps,
737757 timezones,
738758 is_development,
739759 locations_editable,
+193
src/http/handle_finalize_acceptance.rs
···11+use anyhow::{Result, anyhow};
22+use atproto_attestation::append_remote_attestation;
33+use atproto_record::aturi::ATURI;
44+use atproto_record::lexicon::community::lexicon::calendar::rsvp::{NSID as RSVP_NSID, Rsvp};
55+use atproto_record::typed::TypedLexicon;
66+use axum::{extract::State, response::IntoResponse};
77+use axum_extra::extract::{Cached, Form};
88+use http::StatusCode;
99+use serde::Deserialize;
1010+use std::str::FromStr;
1111+1212+use crate::{
1313+ atproto::auth::{create_dpop_auth_from_aip_session, create_dpop_auth_from_oauth_session},
1414+ config::OAuthBackendConfig,
1515+ http::{
1616+ acceptance_utils::format_success_html,
1717+ context::WebContext,
1818+ errors::{CommonError, WebError},
1919+ middleware_auth::Auth,
2020+ middleware_i18n::Language,
2121+ },
2222+ storage::{
2323+ acceptance::{acceptance_ticket_delete, acceptance_ticket_get},
2424+ event::rsvp_get_by_event_and_did,
2525+ },
2626+};
2727+use atproto_client::com::atproto::repo::{PutRecordRequest, PutRecordResponse, put_record};
2828+2929+#[derive(Debug, Deserialize)]
3030+pub struct FinalizeAcceptanceForm {
3131+ /// The AT-URI of the acceptance ticket to finalize
3232+ acceptance_aturi: String,
3333+}
3434+3535+pub(crate) async fn handle_finalize_acceptance(
3636+ State(web_context): State<WebContext>,
3737+ Language(_language): Language,
3838+ Cached(auth): Cached<Auth>,
3939+ Form(form): Form<FinalizeAcceptanceForm>,
4040+) -> Result<impl IntoResponse, WebError> {
4141+ let current_handle = auth.require("/finalize_acceptance")?;
4242+4343+ // Get the acceptance ticket from storage
4444+ let ticket = acceptance_ticket_get(&web_context.pool, &form.acceptance_aturi)
4545+ .await
4646+ .map_err(|e| anyhow!("Failed to get acceptance ticket: {}", e))?
4747+ .ok_or_else(|| anyhow!("Acceptance ticket not found"))?;
4848+4949+ // Verify the current user is the RSVP creator (recipient of the acceptance)
5050+ if ticket.rsvp_did != current_handle.did {
5151+ return Err(CommonError::NotAuthorized.into());
5252+ }
5353+5454+ // Get the RSVP to verify it exists and get its aturi
5555+ let rsvp = rsvp_get_by_event_and_did(&web_context.pool, &ticket.event_aturi, &ticket.rsvp_did)
5656+ .await
5757+ .map_err(|e| anyhow!("Failed to get RSVP for validation: {}", e))?
5858+ .ok_or_else(|| anyhow!("RSVP not found"))?;
5959+6060+ // Parse the acceptance AT-URI to fetch it from the organizer's PDS
6161+ let parsed_acceptance_aturi =
6262+ ATURI::from_str(&ticket.aturi).map_err(|e| anyhow!("Invalid acceptance AT-URI: {}", e))?;
6363+6464+ // Resolve the organizer's DID to get their PDS endpoint
6565+ let organizer_document = web_context
6666+ .identity_resolver
6767+ .resolve(&ticket.did)
6868+ .await
6969+ .map_err(|e| anyhow!("Failed to resolve organizer DID: {}", e))?;
7070+7171+ let organizer_pds = organizer_document
7272+ .service
7373+ .iter()
7474+ .find(|s| s.r#type == "AtprotoPersonalDataServer")
7575+ .map(|s| s.service_endpoint.as_str())
7676+ .ok_or_else(|| anyhow!("Organizer has no PDS endpoint"))?;
7777+7878+ // Fetch the acceptance record from the organizer's PDS
7979+ let acceptance_record_resp = atproto_client::com::atproto::repo::get_record(
8080+ &web_context.http_client,
8181+ &atproto_client::client::Auth::None,
8282+ organizer_pds,
8383+ &parsed_acceptance_aturi.authority,
8484+ &parsed_acceptance_aturi.collection,
8585+ &parsed_acceptance_aturi.record_key,
8686+ None,
8787+ )
8888+ .await
8989+ .map_err(|e| {
9090+ anyhow!(
9191+ "Failed to fetch acceptance record from organizer's PDS: {}",
9292+ e
9393+ )
9494+ })?;
9595+9696+ let acceptance_record = match acceptance_record_resp {
9797+ atproto_client::com::atproto::repo::GetRecordResponse::Record { value, .. } => value,
9898+ atproto_client::com::atproto::repo::GetRecordResponse::Error(error) => {
9999+ return Err(
100100+ anyhow!("Failed to get acceptance record: {}", error.error_message()).into(),
101101+ );
102102+ }
103103+ };
104104+105105+ // Deserialize the RSVP record from storage
106106+ let rsvp_record: Rsvp = serde_json::from_value(rsvp.record.0.clone())
107107+ .map_err(|e| anyhow!("Failed to deserialize RSVP record: {}", e))?;
108108+109109+ // Create a base RSVP without signatures for attestation
110110+ let base_rsvp = Rsvp {
111111+ subject: rsvp_record.subject.clone(),
112112+ status: rsvp_record.status.clone(),
113113+ created_at: rsvp_record.created_at,
114114+ extra: rsvp_record.extra.clone(),
115115+ signatures: vec![], // Clear signatures before appending
116116+ };
117117+ let typed_base_rsvp = TypedLexicon::new(base_rsvp);
118118+119119+ // Append the remote attestation (acceptance) to the RSVP
120120+ let updated_rsvp_record = append_remote_attestation(
121121+ typed_base_rsvp.into(),
122122+ acceptance_record.into(), // Convert to AnyInput
123123+ &ticket.did, // organizer's DID (receiver of the RSVP, giver of acceptance)
124124+ &ticket.aturi, // acceptance AT-URI
125125+ )
126126+ .map_err(|e| anyhow!("Failed to append remote attestation: {}", e))?;
127127+128128+ // Create DPoP auth based on OAuth backend type
129129+ let dpop_auth = match (&auth, &web_context.config.oauth_backend) {
130130+ (Auth::Pds { session, .. }, OAuthBackendConfig::ATProtocol { .. }) => {
131131+ create_dpop_auth_from_oauth_session(session)?
132132+ }
133133+ (Auth::Aip { access_token, .. }, OAuthBackendConfig::AIP { hostname, .. }) => {
134134+ create_dpop_auth_from_aip_session(&web_context.http_client, hostname, access_token)
135135+ .await?
136136+ }
137137+ _ => return Err(CommonError::NotAuthorized.into()),
138138+ };
139139+140140+ // Parse the RSVP AT-URI to extract the record key
141141+ let parsed_rsvp_aturi =
142142+ ATURI::from_str(&rsvp.aturi).map_err(|e| anyhow!("Invalid RSVP AT-URI: {}", e))?;
143143+144144+ // Update the RSVP in the user's PDS with the new signatures
145145+ let put_record_request = PutRecordRequest {
146146+ repo: current_handle.did.clone(),
147147+ collection: RSVP_NSID.to_string(),
148148+ validate: false,
149149+ record_key: parsed_rsvp_aturi.record_key.clone(),
150150+ record: updated_rsvp_record,
151151+ swap_commit: None,
152152+ swap_record: None,
153153+ };
154154+155155+ let put_record_result = put_record(
156156+ &web_context.http_client,
157157+ &atproto_client::client::Auth::DPoP(dpop_auth),
158158+ ¤t_handle.pds,
159159+ put_record_request,
160160+ )
161161+ .await;
162162+163163+ let _updated_rsvp = match put_record_result {
164164+ Ok(PutRecordResponse::StrongRef { uri, cid, .. }) => {
165165+ atproto_record::lexicon::com::atproto::repo::StrongRef { uri, cid }
166166+ }
167167+ Ok(PutRecordResponse::Error(err)) => {
168168+ return Err(anyhow!("AT Protocol error updating RSVP: {}", err.error_message()).into());
169169+ }
170170+ Err(err) => {
171171+ return Err(anyhow!("Failed to update RSVP: {}", err).into());
172172+ }
173173+ };
174174+175175+ // Delete the acceptance ticket from storage (cleanup)
176176+ acceptance_ticket_delete(&web_context.pool, &ticket.aturi)
177177+ .await
178178+ .map_err(|e| anyhow!("Failed to delete acceptance ticket: {}", e))?;
179179+180180+ // Return success with HTMX-compatible HTML
181181+ Ok((
182182+ StatusCode::OK,
183183+ format_success_html(
184184+ "Acceptance finalized successfully",
185185+ "Your RSVP has been updated with the organizer's acceptance.",
186186+ Some(vec![
187187+ "The acceptance signature has been added to your RSVP.".to_string(),
188188+ "This RSVP is now verified.".to_string(),
189189+ ]),
190190+ ),
191191+ )
192192+ .into_response())
193193+}
+5-5
src/http/handle_mailgun_webhook.rs
···4242 .is_some_and(|value| value == "suppress-bounce");
4343 if is_permanent && is_suppress_bounce {
4444 // Clear confirmation for ALL users with this email address
4545- if let Err(err) = notification_unconfirm_email(&web_context.pool, &email).await {
4545+ if let Err(err) = notification_unconfirm_email(&web_context.pool, email).await {
4646 tracing::error!(?err, recipient = %email, "Failed to unconfirm email after rejected event");
4747 } else {
4848 tracing::debug!(recipient = %email, "Email unconfirmed after rejected event");
···5151 }
52525353 "unsubscribed" => {
5454- if let Err(err) = notification_unconfirm_email(&web_context.pool, &email).await {
5454+ if let Err(err) = notification_unconfirm_email(&web_context.pool, email).await {
5555 tracing::error!(?err, recipient = %email, "Failed to unconfirm email after unsubscribe");
5656 } else {
5757 tracing::info!(recipient = %email, "Email unconfirmed after unsubscribe");
···59596060 let reason = format!("{} unsubscribed via mailgun webhook", email);
6161 if let Err(err) =
6262- denylist_add_or_update(&web_context.pool, Cow::Borrowed(&email), Cow::Owned(reason))
6262+ denylist_add_or_update(&web_context.pool, Cow::Borrowed(email), Cow::Owned(reason))
6363 .await
6464 {
6565 tracing::error!(?err, recipient = %email, "Failed to add email to denylist after unsubscribe");
···7171 "complained" => {
7272 // User marked email as spam
7373 // Disable notifications for ALL users with this email address
7474- if let Err(err) = notification_unconfirm_email(&web_context.pool, &email).await {
7474+ if let Err(err) = notification_unconfirm_email(&web_context.pool, email).await {
7575 tracing::error!(?err, recipient = %email, "Failed to unconfirm email after complaint");
7676 } else {
7777 tracing::info!(recipient = %email, "Email unconfirmed after complaint");
···79798080 let reason = format!("{} complained via mailgun webhook", email);
8181 if let Err(err) =
8282- denylist_add_or_update(&web_context.pool, Cow::Borrowed(&email), Cow::Owned(reason))
8282+ denylist_add_or_update(&web_context.pool, Cow::Borrowed(email), Cow::Owned(reason))
8383 .await
8484 {
8585 tracing::error!(?err, recipient = %email, "Failed to add email to denylist after complaint");
+12-15
src/http/handle_oauth_aip_login.rs
···104104 state: state.clone(),
105105 nonce: nonce.clone(),
106106 code_challenge,
107107- scope: "openid email profile atproto account:email blob:image/* repo:community.lexicon.calendar.event repo:community.lexicon.calendar.rsvp repo:events.smokesignal.profile".to_string(),
107107+ scope: "openid email profile atproto account:email blob:image/* repo:community.lexicon.calendar.event repo:community.lexicon.calendar.rsvp repo:events.smokesignal.profile repo:events.smokesignal.calendar.acceptance".to_string(),
108108 };
109109110110 // Get AIP server configuration - config validation ensures these are set when oauth_backend is AIP
···188188 return contextual_error!(web_context, language, error_template, default_context, err);
189189 }
190190191191- if let Some(ref dest) = login_form.destination {
192192- if dest != "/" {
193193- // Create a direct instance to access the set_destination method
194194- let postgres_storage = crate::storage::atproto::PostgresOAuthRequestStorage::new(
195195- web_context.pool.clone(),
196196- );
197197- if let Err(err) = postgres_storage.set_destination(&state, dest).await {
198198- tracing::error!(?err, "set_destination");
199199- // Don't fail the login flow if we can't store the destination
200200- }
191191+ if let Some(ref dest) = login_form.destination
192192+ && dest != "/"
193193+ {
194194+ // Create a direct instance to access the set_destination method
195195+ let postgres_storage =
196196+ crate::storage::atproto::PostgresOAuthRequestStorage::new(web_context.pool.clone());
197197+ if let Err(err) = postgres_storage.set_destination(&state, dest).await {
198198+ tracing::error!(?err, "set_destination");
199199+ // Don't fail the login flow if we can't store the destination
201200 }
202201 }
203202···219218 stringify(oauth_args)
220219 );
221220222222- if hx_request {
223223- if let Ok(hx_redirect) = HxRedirect::try_from(destination.as_str()) {
224224- return Ok((StatusCode::OK, hx_redirect, "").into_response());
225225- }
221221+ if hx_request && let Ok(hx_redirect) = HxRedirect::try_from(destination.as_str()) {
222222+ return Ok((StatusCode::OK, hx_redirect, "").into_response());
226223 }
227224228225 return Ok(Redirect::temporary(destination.as_str()).into_response());
+12-13
src/http/handle_oauth_login.rs
···309309 }
310310311311 // Store destination if provided and not "/"
312312- if let Some(ref dest) = destination.destination {
313313- if dest != "/" {
314314- // Create a direct instance to access the set_destination method
315315- let postgres_storage = crate::storage::atproto::PostgresOAuthRequestStorage::new(
316316- web_context.pool.clone(),
317317- );
318318- if let Err(err) = postgres_storage
319319- .set_destination(&oauth_request_state.state, dest)
320320- .await
321321- {
322322- tracing::error!(?err, "set_destination");
323323- // Don't fail the login flow if we can't store the destination
324324- }
312312+ if let Some(ref dest) = destination.destination
313313+ && dest != "/"
314314+ {
315315+ // Create a direct instance to access the set_destination method
316316+ let postgres_storage =
317317+ crate::storage::atproto::PostgresOAuthRequestStorage::new(web_context.pool.clone());
318318+ if let Err(err) = postgres_storage
319319+ .set_destination(&oauth_request_state.state, dest)
320320+ .await
321321+ {
322322+ tracing::error!(?err, "set_destination");
323323+ // Don't fail the login flow if we can't store the destination
325324 }
326325 }
327326
+4-8
src/http/handle_profile.rs
···123123 let display_name = prof_rec.display_name.clone();
124124125125 // Parse the record JSON to get full profile data
126126- let prof_data = if let Ok(prof_data) = serde_json::from_value::<
127127- crate::atproto::lexicon::profile::Profile,
128128- >(prof_rec.record.0.clone())
129129- {
130130- Some(prof_data)
131131- } else {
132132- None
133133- };
126126+ let prof_data = serde_json::from_value::<crate::atproto::lexicon::profile::Profile>(
127127+ prof_rec.record.0.clone(),
128128+ )
129129+ .ok();
134130135131 let description_html = prof_data
136132 .as_ref()
+4-5
src/http/handle_set_language.rs
···6464 }
6565 let found = found.unwrap();
66666767- if let Some(handle) = auth.profile() {
6868- if let Err(err) = handle_update_field(
6767+ if let Some(handle) = auth.profile()
6868+ && let Err(err) = handle_update_field(
6969 &web_context.pool,
7070 &handle.did,
7171 HandleField::Language(Cow::Owned(found.to_string())),
7272 )
7373 .await
7474- {
7575- tracing::error!(error = ?err, "Failed to update language");
7676- }
7474+ {
7575+ tracing::error!(error = ?err, "Failed to update language");
7776 }
78777978 let mut cookie = Cookie::new(COOKIE_LANG, found.to_string());
+61-183
src/http/handle_settings.rs
···5454}
55555656#[derive(Deserialize, Clone, Debug)]
5757-pub(crate) struct DiscoverEventsForm {
5858- discover_events: Option<String>,
5959-}
6060-6161-#[derive(Deserialize, Clone, Debug)]
6262-pub(crate) struct DiscoverRsvpsForm {
6363- discover_rsvps: Option<String>,
6464-}
6565-6666-#[derive(Deserialize, Clone, Debug)]
6757pub(crate) struct WebhookForm {
6858 service: String,
6959}
···415405 .into_response())
416406}
417407418418-pub(crate) async fn handle_discover_events_update(
419419- State(web_context): State<WebContext>,
420420- Language(language): Language,
421421- Cached(auth): Cached<Auth>,
422422- Form(discover_form): Form<DiscoverEventsForm>,
423423-) -> Result<impl IntoResponse, WebError> {
424424- let current_handle = auth.require_flat()?;
425425-426426- let default_context = template_context! {
427427- current_handle => current_handle.clone(),
428428- language => language.to_string(),
429429- };
430430-431431- let error_template = select_template!(false, true, language);
432432- let render_template = format!(
433433- "{}/settings.discover_events.html",
434434- language.to_string().to_lowercase()
435435- );
436436-437437- // Parse the boolean value from the form
438438- let discover_events = discover_form
439439- .discover_events
440440- .as_ref()
441441- .map(|s| s == "true")
442442- .unwrap_or(false);
443443-444444- if let Err(err) = handle_update_field(
445445- &web_context.pool,
446446- ¤t_handle.did,
447447- HandleField::DiscoverEvents(discover_events),
448448- )
449449- .await
450450- {
451451- return contextual_error!(web_context, language, error_template, default_context, err);
452452- }
453453-454454- let current_handle = match handle_for_did(&web_context.pool, ¤t_handle.did).await {
455455- Ok(value) => value,
456456- Err(err) => {
457457- return contextual_error!(web_context, language, error_template, default_context, err);
458458- }
459459- };
460460-461461- Ok((
462462- StatusCode::OK,
463463- RenderHtml(
464464- &render_template,
465465- web_context.engine.clone(),
466466- template_context! {
467467- current_handle,
468468- discover_events_updated => true,
469469- ..default_context
470470- },
471471- ),
472472- )
473473- .into_response())
474474-}
475475-476476-pub(crate) async fn handle_discover_rsvps_update(
477477- State(web_context): State<WebContext>,
478478- Language(language): Language,
479479- Cached(auth): Cached<Auth>,
480480- Form(discover_form): Form<DiscoverRsvpsForm>,
481481-) -> Result<impl IntoResponse, WebError> {
482482- let current_handle = auth.require_flat()?;
483483-484484- let default_context = template_context! {
485485- current_handle => current_handle.clone(),
486486- language => language.to_string(),
487487- };
488488-489489- let error_template = select_template!(false, true, language);
490490- let render_template = format!(
491491- "{}/settings.discover_rsvps.html",
492492- language.to_string().to_lowercase()
493493- );
494494-495495- // Parse the boolean value from the form
496496- let discover_rsvps = discover_form
497497- .discover_rsvps
498498- .as_ref()
499499- .map(|s| s == "true")
500500- .unwrap_or(false);
501501-502502- if let Err(err) = handle_update_field(
503503- &web_context.pool,
504504- ¤t_handle.did,
505505- HandleField::DiscoverRsvps(discover_rsvps),
506506- )
507507- .await
508508- {
509509- return contextual_error!(web_context, language, error_template, default_context, err);
510510- }
511511-512512- let current_handle = match handle_for_did(&web_context.pool, ¤t_handle.did).await {
513513- Ok(value) => value,
514514- Err(err) => {
515515- return contextual_error!(web_context, language, error_template, default_context, err);
516516- }
517517- };
518518-519519- Ok((
520520- StatusCode::OK,
521521- RenderHtml(
522522- &render_template,
523523- web_context.engine.clone(),
524524- template_context! {
525525- current_handle,
526526- discover_rsvps_updated => true,
527527- ..default_context
528528- },
529529- ),
530530- )
531531- .into_response())
532532-}
533533-534408pub(crate) async fn handle_add_webhook(
535409 State(web_context): State<WebContext>,
536410 identity_resolver: State<Arc<dyn IdentityResolver>>,
···846720 });
847721848722 // Validate profile_host if provided
849849- if let Some(ref host) = profile_host {
850850- if host != "bsky.app" && host != "blacksky.community" && host != "smokesignal.events" {
851851- return contextual_error!(
852852- web_context,
853853- language,
854854- error_template,
855855- default_context,
856856- "Invalid profile host value"
857857- );
858858- }
723723+ if let Some(ref host) = profile_host
724724+ && host != "bsky.app"
725725+ && host != "blacksky.community"
726726+ && host != "smokesignal.events"
727727+ {
728728+ return contextual_error!(
729729+ web_context,
730730+ language,
731731+ error_template,
732732+ default_context,
733733+ "Invalid profile host value"
734734+ );
859735 }
860736861737 // Get existing profile from storage to get CID for swap record (CAS operation)
···1034910 tracing::error!(error = err.error_message(), "Failed to update profile");
1035911 let error_msg = format!("{:?}", err.error_message());
1036912 if error_msg.contains("InvalidSwap") {
10371037- return contextual_error!(
913913+ contextual_error!(
1038914 web_context,
1039915 language,
1040916 error_template,
1041917 default_context,
1042918 "Your recent profile changes are still syncing. Please wait a moment and try again."
10431043- );
919919+ )
1044920 } else {
10451045- return contextual_error!(
921921+ contextual_error!(
1046922 web_context,
1047923 language,
1048924 error_template,
1049925 default_context,
1050926 format!("Failed to update profile: {:?}", err.error_message())
10511051- );
927927+ )
1052928 }
1053929 }
1054930 Err(err) => {
1055931 tracing::error!(?err, "Failed to update profile");
1056932 let error_msg = err.to_string();
1057933 if error_msg.contains("InvalidSwap") {
10581058- return contextual_error!(
934934+ contextual_error!(
1059935 web_context,
1060936 language,
1061937 error_template,
1062938 default_context,
1063939 "Your recent profile changes are still syncing. Please wait a moment and try again."
10641064- );
940940+ )
1065941 } else {
10661066- return contextual_error!(
942942+ contextual_error!(
1067943 web_context,
1068944 language,
1069945 error_template,
1070946 default_context,
1071947 format!("Failed to update profile: {}", err)
10721072- );
948948+ )
1073949 }
1074950 }
1075951 }
···1102978 .filter(|e| !e.is_empty());
11039791104980 // Validate email format if provided
11051105- if let Some(email_addr) = email {
11061106- if !email_addr.contains('@') || !email_addr.contains('.') {
11071107- return contextual_error!(
11081108- web_context,
11091109- language,
11101110- error_template,
11111111- default_context,
11121112- "error-smokesignal-email-7 Invalid format: Email address must contain @ and domain"
11131113- );
11141114- }
981981+ if let Some(email_addr) = email
982982+ && (!email_addr.contains('@') || !email_addr.contains('.'))
983983+ {
984984+ return contextual_error!(
985985+ web_context,
986986+ language,
987987+ error_template,
988988+ default_context,
989989+ "error-smokesignal-email-7 Invalid format: Email address must contain @ and domain"
990990+ );
1115991 }
11169921117993 // Get current notification settings to check if email has changed
11181118- let current_notification_settings = notification_get(&web_context.pool, ¤t_handle.did).await?;
994994+ let current_notification_settings =
995995+ notification_get(&web_context.pool, ¤t_handle.did).await?;
1119996 let current_email = current_notification_settings
1120997 .as_ref()
11211121- .and_then(|settings| settings.email.as_ref().map(|e| e.as_str()));
998998+ .and_then(|settings| settings.email.as_deref());
112299911231000 // Check if email is unchanged - if so, skip update (no-op)
11241001 let email_unchanged = match (email, current_email) {
···11421019 }
1143102011441021 // Reset email confirmation in notification settings since email changed
11451145- if let Err(err) = notification_reset_confirmation(&web_context.pool, ¤t_handle.did).await
10221022+ if let Err(err) =
10231023+ notification_reset_confirmation(&web_context.pool, ¤t_handle.did).await
11461024 {
11471025 tracing::error!(?err, "Failed to reset notification confirmation");
11481026 // Don't fail the request if this update fails, just log it
11491027 }
1150102811511029 // Automatically send confirmation email if email was set (not cleared)
11521152- if let Some(email_addr) = email {
11531153- if let Some(ref emailer) = web_context.emailer {
11541154- // Generate signed confirmation token
11551155- let secret = web_context.config.http_cookie_key.as_ref().master();
11561156- match crate::email_confirmation::generate_confirmation_token(
11571157- ¤t_handle.did,
11581158- email_addr,
11591159- secret,
11601160- ) {
11611161- Ok(token) => {
11621162- // Generate confirmation URL and send email
11631163- let confirmation_url = format!(
11641164- "https://{}/settings/confirm-email/{}",
11651165- web_context.config.external_base, token
11661166- );
10301030+ if let Some(email_addr) = email
10311031+ && let Some(ref emailer) = web_context.emailer
10321032+ {
10331033+ // Generate signed confirmation token
10341034+ let secret = web_context.config.http_cookie_key.as_ref().master();
10351035+ match crate::email_confirmation::generate_confirmation_token(
10361036+ ¤t_handle.did,
10371037+ email_addr,
10381038+ secret,
10391039+ ) {
10401040+ Ok(token) => {
10411041+ // Generate confirmation URL and send email
10421042+ let confirmation_url = format!(
10431043+ "https://{}/settings/confirm-email/{}",
10441044+ web_context.config.external_base, token
10451045+ );
1167104611681168- if let Err(err) = emailer
11691169- .send_email_confirmation(email_addr, &confirmation_url)
11701170- .await
11711171- {
11721172- tracing::error!(?err, "Failed to send confirmation email automatically");
11731173- } else {
11741174- confirmation_sent = true;
11751175- }
10471047+ if let Err(err) = emailer
10481048+ .send_email_confirmation(email_addr, &confirmation_url)
10491049+ .await
10501050+ {
10511051+ tracing::error!(?err, "Failed to send confirmation email automatically");
10521052+ } else {
10531053+ confirmation_sent = true;
11761054 }
11771177- Err(err) => {
11781178- tracing::error!(?err, "Failed to generate confirmation token");
11791179- }
10551055+ }
10561056+ Err(err) => {
10571057+ tracing::error!(?err, "Failed to generate confirmation token");
11801058 }
11811059 }
11821060 }
···2222use crate::http::utils::url_from_aturi;
2323use crate::select_template;
2424use crate::storage::StoragePool;
2525+use crate::storage::acceptance::acceptance_ticket_get_by_event_and_rsvp_did;
2626+use crate::storage::event::RsvpDisplayData;
2527use crate::storage::event::count_event_rsvps;
2628use crate::storage::event::event_exists;
2729use crate::storage::event::event_get;
2828-use crate::storage::event::get_event_rsvps;
3030+use crate::storage::event::get_event_rsvps_with_validation;
2931use crate::storage::event::get_user_rsvp_with_email_shared;
3032use crate::storage::identity_profile::handle_for_did;
3133use crate::storage::identity_profile::handle_for_handle;
···192194 profile.did, event_rkey
193195 );
194196195195- if let Ok(exists) = event_exists(&ctx.web_context.pool, &legacy_aturi).await {
196196- if exists {
197197- return contextual_error!(
198198- ctx.web_context,
199199- ctx.language,
200200- error_template,
201201- default_context,
202202- ViewEventError::LegacyEventNotSupported,
203203- StatusCode::NOT_FOUND
204204- );
205205- }
197197+ if let Ok(exists) = event_exists(&ctx.web_context.pool, &legacy_aturi).await
198198+ && exists
199199+ {
200200+ return contextual_error!(
201201+ ctx.web_context,
202202+ ctx.language,
203203+ error_template,
204204+ default_context,
205205+ ViewEventError::LegacyEventNotSupported,
206206+ StatusCode::NOT_FOUND
207207+ );
206208 }
207209 }
208210···298300 None
299301 };
300302303303+ // Check if there's a pending acceptance ticket for the current user's RSVP
304304+ let pending_acceptance = if let Some(current_entity) = &ctx.current_handle {
305305+ match acceptance_ticket_get_by_event_and_rsvp_did(
306306+ &ctx.web_context.pool,
307307+ &aturi,
308308+ ¤t_entity.did,
309309+ )
310310+ .await
311311+ {
312312+ Ok(Some(ticket)) => Some(ticket.aturi),
313313+ Ok(None) => None,
314314+ Err(err) => {
315315+ tracing::warn!("Error checking for pending acceptance: {:?}", err);
316316+ None
317317+ }
318318+ }
319319+ } else {
320320+ None
321321+ };
322322+301323 // Get counts for all RSVP statuses
302324 let going_count = count_event_rsvps(&ctx.web_context.pool, &aturi, "going")
303325 .await
···311333 .await
312334 .unwrap_or_default();
313335314314- // Only get handles for the active tab
315315- let active_tab_handles = {
336336+ // Get RSVPs with validation data for the active tab
337337+ let active_tab_rsvps = {
316338 let tab_status = match tab {
317339 RSVPTab::Going => "going",
318340 RSVPTab::Interested => "interested",
319341 RSVPTab::NotGoing => "notgoing",
320342 };
321343322322- let rsvps = get_event_rsvps(&ctx.web_context.pool, &aturi, Some(tab_status))
323323- .await
324324- .unwrap_or_default();
344344+ let rsvps =
345345+ get_event_rsvps_with_validation(&ctx.web_context.pool, &aturi, Some(tab_status))
346346+ .await
347347+ .unwrap_or_default();
325348326349 // Extract DIDs for batch lookup
327350 let dids: Vec<String> = rsvps.iter().map(|(did, _)| did.clone()).collect();
···331354 .await
332355 .unwrap_or_default();
333356334334- // Extract handle strings in the same order as the original rsvps
335335- let mut handles = Vec::new();
336336- for (did, _) in &rsvps {
337337- if let Some(profile) = handle_profiles.get(did) {
338338- handles.push(profile.handle.clone());
339339- }
357357+ // Create RsvpDisplayData objects with handle and validation info
358358+ let mut rsvp_display_data = Vec::new();
359359+ for (did, validated_at) in &rsvps {
360360+ let handle = handle_profiles.get(did).map(|p| p.handle.clone());
361361+ rsvp_display_data.push(RsvpDisplayData {
362362+ did: did.clone(),
363363+ handle,
364364+ validated_at: *validated_at,
365365+ });
340366 }
341341- handles
367367+ rsvp_display_data
342368 };
343369344370 // Set counts on event
···366392 event => event_with_counts,
367393 is_self,
368394 can_edit,
369369- active_tab_handles,
395395+ active_tab_rsvps,
370396 active_tab => tab_name,
371397 user_rsvp_status,
398398+ pending_acceptance,
372399 handle_slug,
373400 event_rkey,
374401 collection => collection.clone(),
+8-9
src/http/handle_xrpc_search_events.rs
···152152 };
153153154154 let mut event_ids = Vec::new();
155155- if let Some(outer_hits) = serach_results_value.get("hits") {
156156- if let Some(inner_hits) = outer_hits.get("hits") {
157157- if let Some(hits) = inner_hits.as_array() {
158158- for hit in hits {
159159- let document_id = hit.get("_id").and_then(|value| value.as_str());
160160- if let Some(document_id_value) = document_id {
161161- event_ids.push(document_id_value.to_string());
162162- }
163163- }
155155+ if let Some(outer_hits) = serach_results_value.get("hits")
156156+ && let Some(inner_hits) = outer_hits.get("hits")
157157+ && let Some(hits) = inner_hits.as_array()
158158+ {
159159+ for hit in hits {
160160+ let document_id = hit.get("_id").and_then(|value| value.as_str());
161161+ if let Some(document_id_value) = document_id {
162162+ event_ids.push(document_id_value.to_string());
164163 }
165164 }
166165 }
+5-5
src/http/middleware_i18n.rs
···119119 let auth: Auth = Cached::<Auth>::from_request_parts(parts, context).await?.0;
120120121121 // 1. Try to get language from user's profile settings
122122- if let Some(handle) = auth.profile() {
123123- if let Ok(auth_lang) = handle.language.parse::<LanguageIdentifier>() {
124124- debug!(language = %auth_lang, "Using language from user profile");
125125- return Ok(Self(auth_lang));
126126- }
122122+ if let Some(handle) = auth.profile()
123123+ && let Ok(auth_lang) = handle.language.parse::<LanguageIdentifier>()
124124+ {
125125+ debug!(language = %auth_lang, "Using language from user profile");
126126+ return Ok(Self(auth_lang));
127127 }
128128129129 // 2. Try to get language from cookies
+3
src/http/mod.rs
···11+pub mod acceptance_utils;
12pub mod auth_utils;
23pub mod cache_countries;
34pub mod context;
45pub mod errors;
56pub mod event_form;
67pub mod event_view;
88+pub mod handle_accept_rsvp;
79pub mod handle_admin_denylist;
810pub mod handle_admin_event;
911pub mod handle_admin_events;
···2224pub mod handle_email_confirm;
2325pub mod handle_export_ics;
2426pub mod handle_export_rsvps;
2727+pub mod handle_finalize_acceptance;
2528pub mod handle_health;
2629pub mod handle_host_meta;
2730pub mod handle_import;
···6969 /// In-memory LRU cache
7070 memory_cache: Arc<RwLock<LruCache<String, CachedDocument>>>,
71717272- handle_cache: Arc<RwLock<LruCache<String, String>>>,
7373-7472 /// Cache configuration
7573 config: CacheConfig,
7674}
···9492 base_resolver,
9593 storage,
9694 memory_cache: Arc::new(RwLock::new(LruCache::new(cache_size))),
9797- handle_cache: Arc::new(RwLock::new(LruCache::new(cache_size))),
9895 config,
9996 }
10097 }
···170167 // Store in database
171168 if let Err(e) = self.storage.as_ref().store_document(document.clone()).await {
172169 warn!("Failed to store document in database cache: {}", e);
173173- } else {
174170 }
175171176172 // Also store by handle if the subject was a handle
+1
src/lib.rs
···1515pub mod key_provider;
1616pub mod processor;
1717pub mod processor_errors;
1818+pub mod record_resolver;
1819pub mod refresh_tokens_errors;
1920pub mod service;
2021pub mod storage;
+100-33
src/processor.rs
···11use anyhow::Result;
22+use atproto_attestation::verify_record;
23use atproto_client::com::atproto::repo::get_blob;
44+use atproto_identity::key::IdentityDocumentKeyResolver;
35use atproto_identity::model::Document;
46use atproto_identity::resolve::IdentityResolver;
57use atproto_identity::traits::DidDocumentStorage;
···810use serde_json::Value;
911use std::sync::Arc;
10121313+use crate::atproto::lexicon::acceptance::{Acceptance, NSID as AcceptanceNSID};
1114use crate::atproto::lexicon::profile::{NSID as ProfileNSID, Profile};
1215use crate::consumer::SmokeSignalEvent;
1316use crate::consumer::SmokeSignalEventReceiver;
1417use crate::processor_errors::ProcessorError;
1518use crate::storage::StoragePool;
1919+use crate::storage::acceptance::{
2020+ acceptance_record_delete, acceptance_record_upsert, rsvp_update_validated_at,
2121+};
1622use crate::storage::content::ContentStorage;
1723use crate::storage::denylist::denylist_exists;
1824use crate::storage::event::RsvpInsertParams;
···2127use crate::storage::event::event_insert_with_metadata;
2228use crate::storage::event::rsvp_delete;
2329use crate::storage::event::rsvp_insert_with_metadata;
2424-use crate::storage::identity_profile::identity_profile_allow_discover_events;
2525-use crate::storage::identity_profile::identity_profile_allow_discover_rsvps;
2630use crate::storage::profile::profile_delete;
2731use crate::storage::profile::profile_insert;
2832use atproto_record::lexicon::community::lexicon::calendar::event::{
···3842 identity_resolver: Arc<dyn IdentityResolver>,
3943 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>,
4044 http_client: reqwest::Client,
4545+ record_resolver: Arc<crate::record_resolver::StorageBackedRecordResolver>,
4646+ key_resolver: IdentityDocumentKeyResolver,
4147}
42484349impl ContentFetcher {
···4753 identity_resolver: Arc<dyn IdentityResolver>,
4854 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>,
4955 http_client: reqwest::Client,
5656+ record_resolver: Arc<crate::record_resolver::StorageBackedRecordResolver>,
5757+ key_resolver: IdentityDocumentKeyResolver,
5058 ) -> Self {
5159 Self {
5260 pool,
···5462 identity_resolver,
5563 document_storage,
5664 http_client,
6565+ record_resolver,
6666+ key_resolver,
5767 }
5868 }
5969···95105 }
96106 "events.smokesignal.profile" => {
97107 self.handle_profile_commit(did, rkey, cid, record).await
108108+ }
109109+ "events.smokesignal.calendar.acceptance" => {
110110+ self.handle_acceptance_commit(did, rkey, cid, record).await
98111 }
99112 _ => Ok(()),
100113 };
···116129 self.handle_rsvp_delete(did, rkey).await
117130 }
118131 "events.smokesignal.profile" => self.handle_profile_delete(did, rkey).await,
132132+ "events.smokesignal.calendar.acceptance" => {
133133+ self.handle_acceptance_delete(did, rkey).await
134134+ }
119135 _ => Ok(()),
120136 };
121137 if let Err(e) = result {
···137153 record: &Value,
138154 ) -> Result<()> {
139155 tracing::info!("Processing event: {} for {}", rkey, did);
140140-141141- // Check if the user allows event discovery
142142- let allow_discover = identity_profile_allow_discover_events(&self.pool, did).await?;
143143- // Default to false if the profile doesn't exist yet
144144- if !allow_discover.unwrap_or(false) {
145145- tracing::info!("User {} has opted out of event discovery", did);
146146- return Ok(());
147147- }
148156149157 let aturi = format!("at://{did}/{LexiconCommunityEventNSID}/{rkey}");
150158···186194 record: &Value,
187195 ) -> Result<()> {
188196 tracing::info!("Processing rsvp: {} for {}", rkey, did);
189189-190190- let allow_discover = identity_profile_allow_discover_rsvps(&self.pool, did).await?;
191191- if !allow_discover.unwrap_or(true) {
192192- tracing::info!("User {} has opted out of event discovery", did);
193193- return Ok(());
194194- }
195197196198 let aturi = format!("at://{did}/{LexiconCommunityRSVPNSID}/{rkey}");
197199···228230 )
229231 .await?;
230232233233+ // Check if RSVP has signatures and verify them
234234+ if !rsvp_record.signatures.is_empty() {
235235+ tracing::info!(
236236+ "RSVP {} has {} signature(s), verifying...",
237237+ aturi,
238238+ rsvp_record.signatures.len()
239239+ );
240240+241241+ let key_resolver_clone = self.key_resolver.clone();
242242+ let validated = verify_record(
243243+ (&rsvp_record).into(),
244244+ did,
245245+ key_resolver_clone,
246246+ self.record_resolver.as_ref(),
247247+ )
248248+ .await
249249+ .is_ok();
250250+251251+ if validated {
252252+ if let Err(e) =
253253+ rsvp_update_validated_at(&self.pool, &aturi, Some(chrono::Utc::now())).await
254254+ {
255255+ tracing::error!("Failed to update RSVP validated_at: {:?}", e);
256256+ } else {
257257+ tracing::info!("RSVP {} validated with signatures", aturi);
258258+ }
259259+ } else {
260260+ tracing::warn!("RSVP {} signature verification failed", aturi);
261261+ }
262262+ }
263263+231264 Ok(())
232265 }
233266···288321 let pds_endpoints = document.pds_endpoints();
289322 if let Some(pds_endpoint) = pds_endpoints.first() {
290323 // Download avatar if present
291291- if let Some(ref avatar) = profile_record.avatar {
292292- if let Err(e) = self.download_avatar(pds_endpoint, did, avatar).await {
293293- tracing::warn!(
294294- error = ?e,
295295- did = %did,
296296- "Failed to download avatar for profile"
297297- );
298298- }
324324+ if let Some(ref avatar) = profile_record.avatar
325325+ && let Err(e) = self.download_avatar(pds_endpoint, did, avatar).await
326326+ {
327327+ tracing::warn!(
328328+ error = ?e,
329329+ did = %did,
330330+ "Failed to download avatar for profile"
331331+ );
299332 }
300333301334 // Download banner if present
302302- if let Some(ref banner) = profile_record.banner {
303303- if let Err(e) = self.download_banner(pds_endpoint, did, banner).await {
304304- tracing::warn!(
305305- error = ?e,
306306- did = %did,
307307- "Failed to download banner for profile"
308308- );
309309- }
335335+ if let Some(ref banner) = profile_record.banner
336336+ && let Err(e) = self.download_banner(pds_endpoint, did, banner).await
337337+ {
338338+ tracing::warn!(
339339+ error = ?e,
340340+ did = %did,
341341+ "Failed to download banner for profile"
342342+ );
310343 }
311344 } else {
312345 tracing::debug!(did = %did, "No PDS endpoint found for profile blob download");
···318351 async fn handle_profile_delete(&self, did: &str, rkey: &str) -> Result<()> {
319352 let aturi = format!("at://{did}/{ProfileNSID}/{rkey}");
320353 profile_delete(&self.pool, &aturi).await?;
354354+ Ok(())
355355+ }
356356+357357+ async fn handle_acceptance_commit(
358358+ &self,
359359+ did: &str,
360360+ rkey: &str,
361361+ _cid: &str,
362362+ record: &Value,
363363+ ) -> Result<()> {
364364+ tracing::info!("Processing acceptance: {} for {}", rkey, did);
365365+366366+ let aturi = format!("at://{did}/{AcceptanceNSID}/{rkey}");
367367+368368+ // Deserialize and validate the acceptance record
369369+ let acceptance_record: Acceptance = serde_json::from_value(record.clone())?;
370370+371371+ // Validate the acceptance record
372372+ if let Err(e) = acceptance_record.validate() {
373373+ tracing::warn!("Invalid acceptance record: {}", e);
374374+ return Ok(());
375375+ }
376376+377377+ // Store the acceptance record
378378+ acceptance_record_upsert(&self.pool, &aturi, &acceptance_record.cid, did, record).await?;
379379+380380+ tracing::info!("Acceptance stored: {}", aturi);
381381+ Ok(())
382382+ }
383383+384384+ async fn handle_acceptance_delete(&self, did: &str, rkey: &str) -> Result<()> {
385385+ let aturi = format!("at://{did}/{AcceptanceNSID}/{rkey}");
386386+ acceptance_record_delete(&self.pool, &aturi).await?;
387387+ tracing::info!("Acceptance deleted: {}", aturi);
321388 Ok(())
322389 }
323390···554621 return Ok(());
555622 }
556623557557- let image_bytes = get_blob(&self.http_client, pds, did, &blob_ref).await?;
624624+ let image_bytes = get_blob(&self.http_client, pds, did, blob_ref).await?;
558625559626 const MAX_SIZE: usize = 3 * 1024 * 1024;
560627 if image_bytes.len() > MAX_SIZE {
+156
src/record_resolver.rs
···11+//! Storage-backed record resolver for ATProto records with network fallback.
22+//!
33+//! This module provides a `RecordResolver` implementation that:
44+//! 1. First checks local storage for cached acceptance records
55+//! 2. Falls back to fetching from the network via the identity's PDS
66+//! 3. Caches fetched records in storage for future use
77+88+use crate::storage::StoragePool;
99+use crate::storage::acceptance::{acceptance_record_get, acceptance_record_upsert};
1010+use anyhow::{Context, anyhow};
1111+use atproto_identity::resolve::IdentityResolver;
1212+use atproto_record::aturi::ATURI;
1313+use std::str::FromStr;
1414+use std::sync::Arc;
1515+use tracing::{debug, warn};
1616+1717+/// Record resolver that uses storage as a cache and falls back to network fetching.
1818+///
1919+/// This resolver attempts to retrieve acceptance records from local storage first.
2020+/// If not found, it resolves the DID to find the PDS endpoint, fetches the record from
2121+/// the network, and stores it for future use.
2222+pub struct StorageBackedRecordResolver {
2323+ http_client: reqwest::Client,
2424+ identity_resolver: Arc<dyn IdentityResolver>,
2525+ pool: StoragePool,
2626+}
2727+2828+impl StorageBackedRecordResolver {
2929+ /// Create a new storage-backed record resolver.
3030+ pub fn new(
3131+ http_client: reqwest::Client,
3232+ identity_resolver: Arc<dyn IdentityResolver>,
3333+ pool: StoragePool,
3434+ ) -> Self {
3535+ Self {
3636+ http_client,
3737+ identity_resolver,
3838+ pool,
3939+ }
4040+ }
4141+}
4242+4343+#[async_trait::async_trait]
4444+impl atproto_client::record_resolver::RecordResolver for StorageBackedRecordResolver {
4545+ async fn resolve<T>(&self, aturi: &str) -> anyhow::Result<T>
4646+ where
4747+ T: serde::de::DeserializeOwned + Send,
4848+ {
4949+ // Parse the AT-URI
5050+ let parsed = ATURI::from_str(aturi).map_err(|e| anyhow!("Invalid AT-URI: {}", e))?;
5151+ tracing::debug!(?parsed, "RecordResolver resolve parsed");
5252+5353+ // Try to get from storage first (only for acceptance records)
5454+ if parsed.collection == "events.smokesignal.calendar.acceptance" {
5555+ debug!(aturi = %aturi, "Checking storage for acceptance record");
5656+5757+ match acceptance_record_get(&self.pool, aturi).await {
5858+ Ok(Some(acceptance_record)) => {
5959+ debug!(aturi = %aturi, "Found acceptance record in storage cache");
6060+ // Deserialize the stored record
6161+ return serde_json::from_value(acceptance_record.record.0)
6262+ .map_err(|e| anyhow!("Failed to deserialize cached record: {}", e));
6363+ }
6464+ Ok(None) => {
6565+ debug!(aturi = %aturi, "Acceptance record not found in storage, fetching from network");
6666+ }
6767+ Err(e) => {
6868+ warn!(
6969+ aturi = %aturi,
7070+ error = %e,
7171+ "Failed to check storage for acceptance record, will fetch from network"
7272+ );
7373+ }
7474+ }
7575+ }
7676+7777+ // Not in storage or not an acceptance record - fetch from network
7878+ debug!(aturi = %aturi, authority = %parsed.authority, "Resolving DID to fetch record from PDS");
7979+8080+ // Resolve the DID to get the PDS endpoint
8181+ let document = self
8282+ .identity_resolver
8383+ .resolve(&parsed.authority)
8484+ .await
8585+ .with_context(|| format!("Failed to resolve DID: {}", parsed.authority))?;
8686+8787+ // Find the PDS endpoint
8888+ let pds_endpoint = document
8989+ .service
9090+ .iter()
9191+ .find(|s| s.r#type == "AtprotoPersonalDataServer")
9292+ .map(|s| s.service_endpoint.as_str())
9393+ .ok_or_else(|| anyhow!("No PDS endpoint found for DID: {}", parsed.authority))?;
9494+9595+ debug!(
9696+ aturi = %aturi,
9797+ pds_endpoint = %pds_endpoint,
9898+ "Fetching record from PDS"
9999+ );
100100+101101+ // Fetch the record using the XRPC client
102102+ let response = atproto_client::com::atproto::repo::get_record(
103103+ &self.http_client,
104104+ &atproto_client::client::Auth::None,
105105+ pds_endpoint,
106106+ &parsed.authority,
107107+ &parsed.collection,
108108+ &parsed.record_key,
109109+ None,
110110+ )
111111+ .await
112112+ .with_context(|| format!("Failed to fetch record from PDS: {}", pds_endpoint))?;
113113+114114+ match response {
115115+ atproto_client::com::atproto::repo::GetRecordResponse::Record {
116116+ value, cid, ..
117117+ } => {
118118+ // If this is an acceptance record, store it for future use
119119+ if parsed.collection == "events.smokesignal.calendar.acceptance" {
120120+ debug!(aturi = %aturi, "Caching acceptance record in storage");
121121+122122+ // Store asynchronously, but don't fail if storage fails
123123+ if let Err(e) =
124124+ acceptance_record_upsert(&self.pool, aturi, &cid, &parsed.authority, &value)
125125+ .await
126126+ {
127127+ warn!(
128128+ aturi = %aturi,
129129+ error = %e,
130130+ "Failed to cache acceptance record in storage"
131131+ );
132132+ }
133133+ }
134134+135135+ // Deserialize and return the record
136136+ serde_json::from_value(value)
137137+ .map_err(|e| anyhow!("Failed to deserialize record: {}", e))
138138+ }
139139+ atproto_client::com::atproto::repo::GetRecordResponse::Error(error) => {
140140+ Err(anyhow!("Failed to fetch record: {}", error.error_message()))
141141+ }
142142+ }
143143+ }
144144+}
145145+146146+// Implement RecordResolver for &StorageBackedRecordResolver to allow passing by reference
147147+#[async_trait::async_trait]
148148+impl atproto_client::record_resolver::RecordResolver for &StorageBackedRecordResolver {
149149+ async fn resolve<T>(&self, aturi: &str) -> anyhow::Result<T>
150150+ where
151151+ T: serde::de::DeserializeOwned + Send,
152152+ {
153153+ // Delegate to the implementation for StorageBackedRecordResolver
154154+ (*self).resolve(aturi).await
155155+ }
156156+}
+336
src/storage/acceptance.rs
···11+use anyhow::Result;
22+use chrono::{DateTime, Utc};
33+use serde_json::json;
44+use sqlx::FromRow;
55+66+use super::StoragePool;
77+use super::errors::StorageError;
88+99+/// Model for RSVP acceptance ticket
1010+#[derive(Clone, FromRow, Debug)]
1111+pub struct AcceptanceTicket {
1212+ pub aturi: String,
1313+ pub did: String,
1414+ pub rsvp_did: String,
1515+ pub event_aturi: String,
1616+ pub record: sqlx::types::Json<serde_json::Value>,
1717+ pub created_at: DateTime<Utc>,
1818+}
1919+2020+/// Model for RSVP acceptance record
2121+#[derive(Clone, FromRow, Debug)]
2222+pub struct AcceptanceRecord {
2323+ pub aturi: String,
2424+ pub cid: String,
2525+ pub did: String,
2626+ pub record: sqlx::types::Json<serde_json::Value>,
2727+ pub created_at: DateTime<Utc>,
2828+ pub updated_at: DateTime<Utc>,
2929+}
3030+3131+/// Insert or update an acceptance ticket
3232+pub async fn acceptance_ticket_upsert<T: serde::Serialize>(
3333+ pool: &StoragePool,
3434+ aturi: &str,
3535+ did: &str,
3636+ rsvp_did: &str,
3737+ event_aturi: &str,
3838+ record: &T,
3939+) -> Result<(), StorageError> {
4040+ let mut tx = pool
4141+ .begin()
4242+ .await
4343+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
4444+4545+ let now = Utc::now();
4646+4747+ sqlx::query(
4848+ "INSERT INTO acceptance_tickets (aturi, did, rsvp_did, event_aturi, record, created_at)
4949+ VALUES ($1, $2, $3, $4, $5, $6)
5050+ ON CONFLICT (aturi) DO UPDATE
5151+ SET did = $2, rsvp_did = $3, event_aturi = $4, record = $5",
5252+ )
5353+ .bind(aturi)
5454+ .bind(did)
5555+ .bind(rsvp_did)
5656+ .bind(event_aturi)
5757+ .bind(json!(record))
5858+ .bind(now)
5959+ .execute(tx.as_mut())
6060+ .await
6161+ .map_err(StorageError::UnableToExecuteQuery)?;
6262+6363+ tx.commit()
6464+ .await
6565+ .map_err(StorageError::CannotCommitDatabaseTransaction)
6666+}
6767+6868+/// Get acceptance ticket by AT-URI
6969+pub async fn acceptance_ticket_get(
7070+ pool: &StoragePool,
7171+ aturi: &str,
7272+) -> Result<Option<AcceptanceTicket>, StorageError> {
7373+ let mut tx = pool
7474+ .begin()
7575+ .await
7676+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
7777+7878+ let record =
7979+ sqlx::query_as::<_, AcceptanceTicket>("SELECT * FROM acceptance_tickets WHERE aturi = $1")
8080+ .bind(aturi)
8181+ .fetch_optional(tx.as_mut())
8282+ .await
8383+ .map_err(StorageError::UnableToExecuteQuery)?;
8484+8585+ tx.commit()
8686+ .await
8787+ .map_err(StorageError::CannotCommitDatabaseTransaction)?;
8888+8989+ Ok(record)
9090+}
9191+9292+/// List acceptance tickets created by a specific DID
9393+pub async fn acceptance_ticket_list_by_did(
9494+ pool: &StoragePool,
9595+ did: &str,
9696+) -> Result<Vec<AcceptanceTicket>, StorageError> {
9797+ let mut tx = pool
9898+ .begin()
9999+ .await
100100+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
101101+102102+ let records = sqlx::query_as::<_, AcceptanceTicket>(
103103+ "SELECT * FROM acceptance_tickets WHERE did = $1 ORDER BY created_at DESC",
104104+ )
105105+ .bind(did)
106106+ .fetch_all(tx.as_mut())
107107+ .await
108108+ .map_err(StorageError::UnableToExecuteQuery)?;
109109+110110+ tx.commit()
111111+ .await
112112+ .map_err(StorageError::CannotCommitDatabaseTransaction)?;
113113+114114+ Ok(records)
115115+}
116116+117117+/// List acceptance tickets for a specific RSVP DID (tickets created for this user)
118118+pub async fn acceptance_ticket_list_by_rsvp_did(
119119+ pool: &StoragePool,
120120+ rsvp_did: &str,
121121+) -> Result<Vec<AcceptanceTicket>, StorageError> {
122122+ let mut tx = pool
123123+ .begin()
124124+ .await
125125+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
126126+127127+ let records = sqlx::query_as::<_, AcceptanceTicket>(
128128+ "SELECT * FROM acceptance_tickets WHERE rsvp_did = $1 ORDER BY created_at DESC",
129129+ )
130130+ .bind(rsvp_did)
131131+ .fetch_all(tx.as_mut())
132132+ .await
133133+ .map_err(StorageError::UnableToExecuteQuery)?;
134134+135135+ tx.commit()
136136+ .await
137137+ .map_err(StorageError::CannotCommitDatabaseTransaction)?;
138138+139139+ Ok(records)
140140+}
141141+142142+/// List acceptance tickets for a specific event
143143+pub async fn acceptance_ticket_list_by_event(
144144+ pool: &StoragePool,
145145+ event_aturi: &str,
146146+) -> Result<Vec<AcceptanceTicket>, StorageError> {
147147+ let mut tx = pool
148148+ .begin()
149149+ .await
150150+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
151151+152152+ let records = sqlx::query_as::<_, AcceptanceTicket>(
153153+ "SELECT * FROM acceptance_tickets WHERE event_aturi = $1 ORDER BY created_at DESC",
154154+ )
155155+ .bind(event_aturi)
156156+ .fetch_all(tx.as_mut())
157157+ .await
158158+ .map_err(StorageError::UnableToExecuteQuery)?;
159159+160160+ tx.commit()
161161+ .await
162162+ .map_err(StorageError::CannotCommitDatabaseTransaction)?;
163163+164164+ Ok(records)
165165+}
166166+167167+/// Delete an acceptance ticket
168168+pub async fn acceptance_ticket_delete(pool: &StoragePool, aturi: &str) -> Result<(), StorageError> {
169169+ let mut tx = pool
170170+ .begin()
171171+ .await
172172+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
173173+174174+ sqlx::query("DELETE FROM acceptance_tickets WHERE aturi = $1")
175175+ .bind(aturi)
176176+ .execute(tx.as_mut())
177177+ .await
178178+ .map_err(StorageError::UnableToExecuteQuery)?;
179179+180180+ tx.commit()
181181+ .await
182182+ .map_err(StorageError::CannotCommitDatabaseTransaction)
183183+}
184184+185185+/// Insert or update an acceptance record
186186+pub async fn acceptance_record_upsert<T: serde::Serialize>(
187187+ pool: &StoragePool,
188188+ aturi: &str,
189189+ cid: &str,
190190+ did: &str,
191191+ record: &T,
192192+) -> Result<(), StorageError> {
193193+ let mut tx = pool
194194+ .begin()
195195+ .await
196196+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
197197+198198+ let now = Utc::now();
199199+200200+ sqlx::query(
201201+ "INSERT INTO acceptance_records (aturi, cid, did, record, created_at, updated_at)
202202+ VALUES ($1, $2, $3, $4, $5, $6)
203203+ ON CONFLICT (aturi) DO UPDATE
204204+ SET cid = $2, did = $3, record = $4, updated_at = $6",
205205+ )
206206+ .bind(aturi)
207207+ .bind(cid)
208208+ .bind(did)
209209+ .bind(json!(record))
210210+ .bind(now)
211211+ .bind(now)
212212+ .execute(tx.as_mut())
213213+ .await
214214+ .map_err(StorageError::UnableToExecuteQuery)?;
215215+216216+ tx.commit()
217217+ .await
218218+ .map_err(StorageError::CannotCommitDatabaseTransaction)
219219+}
220220+221221+/// Get acceptance record by AT-URI
222222+pub async fn acceptance_record_get(
223223+ pool: &StoragePool,
224224+ aturi: &str,
225225+) -> Result<Option<AcceptanceRecord>, StorageError> {
226226+ let mut tx = pool
227227+ .begin()
228228+ .await
229229+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
230230+231231+ let record =
232232+ sqlx::query_as::<_, AcceptanceRecord>("SELECT * FROM acceptance_records WHERE aturi = $1")
233233+ .bind(aturi)
234234+ .fetch_optional(tx.as_mut())
235235+ .await
236236+ .map_err(StorageError::UnableToExecuteQuery)?;
237237+238238+ tx.commit()
239239+ .await
240240+ .map_err(StorageError::CannotCommitDatabaseTransaction)?;
241241+242242+ Ok(record)
243243+}
244244+245245+/// Get acceptance record by RSVP CID
246246+pub async fn acceptance_record_get_by_cid(
247247+ pool: &StoragePool,
248248+ cid: &str,
249249+) -> Result<Option<AcceptanceRecord>, StorageError> {
250250+ let mut tx = pool
251251+ .begin()
252252+ .await
253253+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
254254+255255+ let record = sqlx::query_as::<_, AcceptanceRecord>(
256256+ "SELECT * FROM acceptance_records WHERE cid = $1 LIMIT 1",
257257+ )
258258+ .bind(cid)
259259+ .fetch_optional(tx.as_mut())
260260+ .await
261261+ .map_err(StorageError::UnableToExecuteQuery)?;
262262+263263+ tx.commit()
264264+ .await
265265+ .map_err(StorageError::CannotCommitDatabaseTransaction)?;
266266+267267+ Ok(record)
268268+}
269269+270270+/// Delete an acceptance record
271271+pub async fn acceptance_record_delete(pool: &StoragePool, aturi: &str) -> Result<(), StorageError> {
272272+ let mut tx = pool
273273+ .begin()
274274+ .await
275275+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
276276+277277+ sqlx::query("DELETE FROM acceptance_records WHERE aturi = $1")
278278+ .bind(aturi)
279279+ .execute(tx.as_mut())
280280+ .await
281281+ .map_err(StorageError::UnableToExecuteQuery)?;
282282+283283+ tx.commit()
284284+ .await
285285+ .map_err(StorageError::CannotCommitDatabaseTransaction)
286286+}
287287+288288+/// Get acceptance ticket for a specific event and RSVP DID combination
289289+pub async fn acceptance_ticket_get_by_event_and_rsvp_did(
290290+ pool: &StoragePool,
291291+ event_aturi: &str,
292292+ rsvp_did: &str,
293293+) -> Result<Option<AcceptanceTicket>, StorageError> {
294294+ let mut tx = pool
295295+ .begin()
296296+ .await
297297+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
298298+299299+ let record = sqlx::query_as::<_, AcceptanceTicket>(
300300+ "SELECT * FROM acceptance_tickets WHERE event_aturi = $1 AND rsvp_did = $2 LIMIT 1",
301301+ )
302302+ .bind(event_aturi)
303303+ .bind(rsvp_did)
304304+ .fetch_optional(tx.as_mut())
305305+ .await
306306+ .map_err(StorageError::UnableToExecuteQuery)?;
307307+308308+ tx.commit()
309309+ .await
310310+ .map_err(StorageError::CannotCommitDatabaseTransaction)?;
311311+312312+ Ok(record)
313313+}
314314+315315+/// Update RSVP validated_at timestamp
316316+pub async fn rsvp_update_validated_at(
317317+ pool: &StoragePool,
318318+ rsvp_aturi: &str,
319319+ validated_at: Option<DateTime<Utc>>,
320320+) -> Result<(), StorageError> {
321321+ let mut tx = pool
322322+ .begin()
323323+ .await
324324+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
325325+326326+ sqlx::query("UPDATE rsvps SET validated_at = $1 WHERE aturi = $2")
327327+ .bind(validated_at)
328328+ .bind(rsvp_aturi)
329329+ .execute(tx.as_mut())
330330+ .await
331331+ .map_err(StorageError::UnableToExecuteQuery)?;
332332+333333+ tx.commit()
334334+ .await
335335+ .map_err(StorageError::CannotCommitDatabaseTransaction)
336336+}
+133-30
src/storage/event.rs
···3344use anyhow::Result;
55use chrono::Utc;
66+use serde::Serialize;
67use serde_json::json;
78use sqlx::{Postgres, QueryBuilder};
89···13141415use super::StoragePool;
1516use super::errors::StorageError;
1616-use model::{ActivityItem, Event, EventWithRole, Rsvp};
1717+use model::{ActivityItem, EventWithRole, Rsvp};
17181818-pub mod model {
1919+pub(crate) mod model {
1920 use chrono::{DateTime, Utc};
2021 use serde::{Deserialize, Serialize};
2122 use sqlx::FromRow;
22232324 #[derive(Clone, FromRow, Deserialize, Serialize, Debug)]
2424- pub(crate) struct Event {
2525+ pub struct Event {
2526 pub aturi: String,
2627 pub cid: String,
2728···5859 pub event_cid: String,
5960 pub status: String,
6061 pub email_shared: bool,
6262+ pub validated_at: Option<DateTime<Utc>>,
61636264 pub updated_at: Option<DateTime<Utc>>,
6365 }
···7375 pub created_at: Option<DateTime<Utc>>,
7476 }
7577}
7878+7979+// Import Event for use within this module and re-export for external use
8080+pub use model::Event;
76817782pub async fn event_insert(
7883 pool: &StoragePool,
···214219 let mut parts = Vec::new();
215220216221 // Add parts in specified order, omitting empty values
217217- if let Some(name_val) = name {
218218- if !name_val.trim().is_empty() {
219219- parts.push(name_val.clone());
220220- }
222222+ if let Some(name_val) = name
223223+ && !name_val.trim().is_empty()
224224+ {
225225+ parts.push(name_val.clone());
221226 }
222227223223- if let Some(street_val) = street {
224224- if !street_val.trim().is_empty() {
225225- parts.push(street_val.clone());
226226- }
228228+ if let Some(street_val) = street
229229+ && !street_val.trim().is_empty()
230230+ {
231231+ parts.push(street_val.clone());
227232 }
228233229229- if let Some(locality_val) = locality {
230230- if !locality_val.trim().is_empty() {
231231- parts.push(locality_val.clone());
232232- }
234234+ if let Some(locality_val) = locality
235235+ && !locality_val.trim().is_empty()
236236+ {
237237+ parts.push(locality_val.clone());
233238 }
234239235235- if let Some(region_val) = region {
236236- if !region_val.trim().is_empty() {
237237- parts.push(region_val.clone());
238238- }
240240+ if let Some(region_val) = region
241241+ && !region_val.trim().is_empty()
242242+ {
243243+ parts.push(region_val.clone());
239244 }
240245241241- if let Some(postal_val) = postal_code {
242242- if !postal_val.trim().is_empty() {
243243- parts.push(postal_val.clone());
244244- }
246246+ if let Some(postal_val) = postal_code
247247+ && !postal_val.trim().is_empty()
248248+ {
249249+ parts.push(postal_val.clone());
245250 }
246251247252 // Country is required so no need to check if it's empty
···447452 }
448453449454 // If status is provided, validate it's not empty
450450- if let Some(status_val) = status {
451451- if status_val.trim().is_empty() {
452452- return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
453453- "Status cannot be empty".into(),
454454- )));
455455- }
455455+ if let Some(status_val) = status
456456+ && status_val.trim().is_empty()
457457+ {
458458+ return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
459459+ "Status cannot be empty".into(),
460460+ )));
456461 }
457462458463 let mut tx = pool
···487492 Ok(rsvps)
488493}
489494495495+pub async fn get_event_rsvps_with_validation(
496496+ pool: &StoragePool,
497497+ event_aturi: &str,
498498+ status: Option<&str>,
499499+) -> Result<Vec<(String, Option<chrono::DateTime<chrono::Utc>>)>, StorageError> {
500500+ // Validate event_aturi is not empty
501501+ if event_aturi.trim().is_empty() {
502502+ return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
503503+ "Event URI cannot be empty".into(),
504504+ )));
505505+ }
506506+507507+ // If status is provided, validate it's not empty
508508+ if let Some(status_val) = status
509509+ && status_val.trim().is_empty()
510510+ {
511511+ return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
512512+ "Status cannot be empty".into(),
513513+ )));
514514+ }
515515+516516+ let mut tx = pool
517517+ .begin()
518518+ .await
519519+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
520520+521521+ let query = if status.is_some() {
522522+ "SELECT did, validated_at FROM rsvps WHERE event_aturi = $1 AND status = $2"
523523+ } else {
524524+ "SELECT did, validated_at FROM rsvps WHERE event_aturi = $1"
525525+ };
526526+527527+ let rsvps = if let Some(status_value) = status {
528528+ sqlx::query_as::<_, (String, Option<chrono::DateTime<chrono::Utc>>)>(query)
529529+ .bind(event_aturi)
530530+ .bind(status_value)
531531+ .fetch_all(tx.as_mut())
532532+ .await
533533+ } else {
534534+ sqlx::query_as::<_, (String, Option<chrono::DateTime<chrono::Utc>>)>(query)
535535+ .bind(event_aturi)
536536+ .fetch_all(tx.as_mut())
537537+ .await
538538+ }
539539+ .map_err(StorageError::UnableToExecuteQuery)?;
540540+541541+ tx.commit()
542542+ .await
543543+ .map_err(StorageError::CannotCommitDatabaseTransaction)?;
544544+545545+ Ok(rsvps)
546546+}
547547+490548pub async fn get_user_rsvp(
491549 pool: &StoragePool,
492550 event_aturi: &str,
···565623 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
566624567625 Ok(result)
626626+}
627627+628628+pub async fn rsvp_get_by_event_and_did(
629629+ pool: &StoragePool,
630630+ event_aturi: &str,
631631+ did: &str,
632632+) -> Result<Option<Rsvp>, StorageError> {
633633+ // Validate event_aturi is not empty
634634+ if event_aturi.trim().is_empty() {
635635+ return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
636636+ "Event URI cannot be empty".into(),
637637+ )));
638638+ }
639639+640640+ // Validate did is not empty
641641+ if did.trim().is_empty() {
642642+ return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
643643+ "DID cannot be empty".into(),
644644+ )));
645645+ }
646646+647647+ let mut tx = pool
648648+ .begin()
649649+ .await
650650+ .map_err(StorageError::CannotBeginDatabaseTransaction)?;
651651+652652+ let rsvp = sqlx::query_as::<_, Rsvp>("SELECT * FROM rsvps WHERE event_aturi = $1 AND did = $2")
653653+ .bind(event_aturi)
654654+ .bind(did)
655655+ .fetch_optional(tx.as_mut())
656656+ .await
657657+ .map_err(StorageError::UnableToExecuteQuery)?;
658658+659659+ tx.commit()
660660+ .await
661661+ .map_err(StorageError::CannotCommitDatabaseTransaction)?;
662662+663663+ Ok(rsvp)
568664}
569665570666pub async fn rsvp_get(pool: &StoragePool, aturi: &str) -> Result<Option<Rsvp>, StorageError> {
···10701166}
1071116710721168// Structure to hold RSVP export data
10731073-#[derive(Debug)]
11691169+#[derive(Debug, Serialize)]
10741170pub struct RsvpExportData {
10751171 pub event_aturi: String,
10761172 pub rsvp_aturi: String,
···10781174 pub handle: Option<String>,
10791175 pub status: String,
10801176 pub created_at: Option<chrono::DateTime<chrono::Utc>>,
11771177+}
11781178+11791179+#[derive(Debug, Serialize, Clone)]
11801180+pub struct RsvpDisplayData {
11811181+ pub did: String,
11821182+ pub handle: Option<String>,
11831183+ pub validated_at: Option<chrono::DateTime<chrono::Utc>>,
10811184}
1082118510831186/// Get all RSVPs for an event with detailed information for CSV export
···11+pub mod acceptance;
12pub mod atproto;
23pub mod cache;
34pub mod content;
+6-6
src/storage/oauth.rs
···9292 }
93939494 // If did is provided, validate it's not empty
9595- if let Some(did_value) = did {
9696- if did_value.trim().is_empty() {
9797- return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
9898- "DID cannot be empty".into(),
9999- )));
100100- }
9595+ if let Some(did_value) = did
9696+ && did_value.trim().is_empty()
9797+ {
9898+ return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
9999+ "DID cannot be empty".into(),
100100+ )));
101101 }
102102103103 let mut tx = pool
+1-1
src/throttle_redis.rs
···11use async_trait::async_trait;
22-use deadpool_redis::{redis::AsyncCommands, Pool as RedisPool};
22+use deadpool_redis::{Pool as RedisPool, redis::AsyncCommands};
33use std::time::{SystemTime, UNIX_EPOCH};
4455use crate::throttle::{Throttle, ThrottleError};
+2-6
src/unsubscribe_token.rs
···8383 Ok(UnsubscribeAction::UnsubscribeAll {
8484 receiver: dids[0].clone(),
8585 })
8686- } else if remainder.starts_with(':') {
8686+ } else if let Some(preference) = remainder.strip_prefix(':') {
8787 // [did]:preference_type
8888- let preference = &remainder[1..];
8988 let receiver = dids[0].clone();
90899190 match preference {
···131130}
132131133132/// Generate an unsubscribe token
134134-pub fn generate_token(
135135- action: &UnsubscribeAction,
136136- secret_key: &EmailSecretKey,
137137-) -> Result<String> {
133133+pub fn generate_token(action: &UnsubscribeAction, secret_key: &EmailSecretKey) -> Result<String> {
138134 let payload = action.to_payload();
139135140136 // Create HMAC-SHA256 signature
···11+Hello!
22+33+Your RSVP has been accepted for the event: {{ event_name }}
44+55+View the event and finalize your RSVP: {{ event_url }}
66+77+---
88+99+Update your notification settings: https://smokesignal.events/settings
1010+Unsubscribe from these notifications: {{ unsubscribe_url }}