The smokesignal.events web application
at main 609 lines 19 kB view raw
1use chrono::Utc; 2 3use crate::storage::StoragePool; 4use crate::storage::errors::StorageError; 5 6pub mod model { 7 use chrono::{DateTime, Utc}; 8 use serde::{Deserialize, Serialize}; 9 use sqlx::FromRow; 10 11 /// Notification settings (without email - email comes from identity_profiles) 12 #[derive(Clone, FromRow, Deserialize, Serialize, Debug)] 13 pub struct IdentityNotification { 14 pub did: String, 15 pub confirmed_at: Option<DateTime<Utc>>, 16 pub email_on_rsvp: bool, 17 pub email_on_event_change: bool, 18 pub email_on_24h: bool, 19 pub email_on_rsvp_summary: bool, 20 pub created_at: DateTime<Utc>, 21 pub updated_at: DateTime<Utc>, 22 } 23 24 /// Notification settings with email joined from identity_profiles 25 #[derive(Clone, FromRow, Deserialize, Serialize, Debug)] 26 pub struct NotificationSettingsWithEmail { 27 pub did: String, 28 pub email: Option<String>, // From identity_profiles table 29 pub confirmed_at: Option<DateTime<Utc>>, 30 pub email_on_rsvp: bool, 31 pub email_on_event_change: bool, 32 pub email_on_24h: bool, 33 pub email_on_rsvp_summary: bool, 34 pub created_at: DateTime<Utc>, 35 pub updated_at: DateTime<Utc>, 36 } 37} 38 39use model::NotificationSettingsWithEmail; 40 41/// Initialize notification settings for a DID 42/// This is typically called during OAuth login flow 43pub async fn notification_init(pool: &StoragePool, did: &str) -> Result<(), StorageError> { 44 // Validate DID is not empty 45 if did.trim().is_empty() { 46 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 47 "DID cannot be empty".into(), 48 ))); 49 } 50 51 let mut tx = pool 52 .begin() 53 .await 54 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 55 56 let now = Utc::now(); 57 58 // Insert if doesn't exist 59 sqlx::query( 60 "INSERT INTO identity_notifications (did, created_at, updated_at) 61 VALUES ($1, $2, $3) 62 ON CONFLICT (did) DO UPDATE 63 SET updated_at = $3", 64 ) 65 .bind(did) 66 .bind(now) 67 .bind(now) 68 .execute(tx.as_mut()) 69 .await 70 .map_err(StorageError::UnableToExecuteQuery)?; 71 72 tx.commit() 73 .await 74 .map_err(StorageError::CannotCommitDatabaseTransaction) 75} 76 77/// Get notification settings for a DID with email from identity_profiles 78pub async fn notification_get( 79 pool: &StoragePool, 80 did: &str, 81) -> Result<Option<NotificationSettingsWithEmail>, StorageError> { 82 // Validate DID is not empty 83 if did.trim().is_empty() { 84 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 85 "DID cannot be empty".into(), 86 ))); 87 } 88 89 let mut tx = pool 90 .begin() 91 .await 92 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 93 94 let notification = sqlx::query_as::<_, NotificationSettingsWithEmail>( 95 "SELECT 96 n.did, 97 p.email, 98 n.confirmed_at, 99 n.email_on_rsvp, 100 n.email_on_event_change, 101 n.email_on_24h, 102 n.email_on_rsvp_summary, 103 n.created_at, 104 n.updated_at 105 FROM identity_notifications n 106 INNER JOIN identity_profiles p ON n.did = p.did 107 WHERE n.did = $1", 108 ) 109 .bind(did) 110 .fetch_optional(tx.as_mut()) 111 .await 112 .map_err(StorageError::UnableToExecuteQuery)?; 113 114 tx.commit() 115 .await 116 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 117 118 Ok(notification) 119} 120 121/// Reset email confirmation for a DID 122/// This should be called when the email address changes in identity_profiles 123/// Creates the notification record if it doesn't exist (handles legacy users) 124pub async fn notification_reset_confirmation( 125 pool: &StoragePool, 126 did: &str, 127) -> Result<(), StorageError> { 128 // Validate DID is not empty 129 if did.trim().is_empty() { 130 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 131 "DID cannot be empty".into(), 132 ))); 133 } 134 135 let mut tx = pool 136 .begin() 137 .await 138 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 139 140 let now = Utc::now(); 141 142 // Use UPSERT to handle both new and existing users 143 sqlx::query( 144 "INSERT INTO identity_notifications (did, confirmed_at, created_at, updated_at) 145 VALUES ($1, NULL, $2, $3) 146 ON CONFLICT (did) DO UPDATE 147 SET confirmed_at = NULL, updated_at = $3", 148 ) 149 .bind(did) 150 .bind(now) 151 .bind(now) 152 .execute(tx.as_mut()) 153 .await 154 .map_err(StorageError::UnableToExecuteQuery)?; 155 156 tx.commit() 157 .await 158 .map_err(StorageError::CannotCommitDatabaseTransaction) 159} 160 161/// Confirm email address for a DID 162/// Creates the notification record if it doesn't exist (handles legacy users) 163pub async fn notification_confirm_email(pool: &StoragePool, did: &str) -> Result<(), StorageError> { 164 // Validate DID is not empty 165 if did.trim().is_empty() { 166 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 167 "DID cannot be empty".into(), 168 ))); 169 } 170 171 let mut tx = pool 172 .begin() 173 .await 174 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 175 176 let now = Utc::now(); 177 178 // Use UPSERT to handle both new and existing users 179 sqlx::query( 180 "INSERT INTO identity_notifications (did, confirmed_at, created_at, updated_at) 181 VALUES ($1, $2, $3, $4) 182 ON CONFLICT (did) DO UPDATE 183 SET confirmed_at = $2, updated_at = $4", 184 ) 185 .bind(did) 186 .bind(now) 187 .bind(now) 188 .bind(now) 189 .execute(tx.as_mut()) 190 .await 191 .map_err(StorageError::UnableToExecuteQuery)?; 192 193 tx.commit() 194 .await 195 .map_err(StorageError::CannotCommitDatabaseTransaction) 196} 197 198/// Unconfirm email address for all DIDs with a specific email (used by webhook handlers) 199/// This joins with identity_profiles to find DIDs with the given email 200pub async fn notification_unconfirm_email( 201 pool: &StoragePool, 202 email: &str, 203) -> Result<(), StorageError> { 204 // Validate email is not empty 205 if email.trim().is_empty() { 206 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 207 "Email cannot be empty".into(), 208 ))); 209 } 210 211 let mut tx = pool 212 .begin() 213 .await 214 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 215 216 let now = Utc::now(); 217 218 sqlx::query( 219 "UPDATE identity_notifications n 220 SET confirmed_at = NULL, updated_at = $1 221 FROM identity_profiles p 222 WHERE n.did = p.did AND p.email = $2", 223 ) 224 .bind(now) 225 .bind(email) 226 .execute(tx.as_mut()) 227 .await 228 .map_err(StorageError::UnableToExecuteQuery)?; 229 230 tx.commit() 231 .await 232 .map_err(StorageError::CannotCommitDatabaseTransaction) 233} 234 235/// Update notification preference 236/// Creates the notification record if it doesn't exist (handles legacy users) 237pub async fn notification_set_preference( 238 pool: &StoragePool, 239 did: &str, 240 email_on_rsvp: Option<bool>, 241 email_on_event_change: Option<bool>, 242 email_on_24h: Option<bool>, 243 email_on_rsvp_summary: Option<bool>, 244) -> Result<(), StorageError> { 245 // Validate DID is not empty 246 if did.trim().is_empty() { 247 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 248 "DID cannot be empty".into(), 249 ))); 250 } 251 252 let mut tx = pool 253 .begin() 254 .await 255 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 256 257 let now = Utc::now(); 258 259 // Use UPSERT to handle both new and existing users 260 // Handler always passes Some values, so we can set them directly 261 let rsvp = email_on_rsvp.unwrap_or(false); 262 let event_change = email_on_event_change.unwrap_or(false); 263 let reminder_24h = email_on_24h.unwrap_or(false); 264 let rsvp_summary = email_on_rsvp_summary.unwrap_or(false); 265 266 sqlx::query( 267 "INSERT INTO identity_notifications (did, email_on_rsvp, email_on_event_change, email_on_24h, email_on_rsvp_summary, created_at, updated_at) 268 VALUES ($1, $2, $3, $4, $5, $6, $7) 269 ON CONFLICT (did) DO UPDATE SET 270 email_on_rsvp = $2, 271 email_on_event_change = $3, 272 email_on_24h = $4, 273 email_on_rsvp_summary = $5, 274 updated_at = $7" 275 ) 276 .bind(did) 277 .bind(rsvp) 278 .bind(event_change) 279 .bind(reminder_24h) 280 .bind(rsvp_summary) 281 .bind(now) 282 .bind(now) 283 .execute(tx.as_mut()) 284 .await 285 .map_err(StorageError::UnableToExecuteQuery)?; 286 287 tx.commit() 288 .await 289 .map_err(StorageError::CannotCommitDatabaseTransaction) 290} 291 292/// Helper function to check if RSVP created notifications are enabled 293/// Returns true only if: 294/// 1. A record exists for the DID 295/// 2. The email column is not null and not empty 296/// 3. The confirmed_at value is not null 297/// 4. The email_on_rsvp field is true 298pub async fn rsvp_created_enabled(pool: &StoragePool, did: &str) -> Result<bool, StorageError> { 299 // Validate DID is not empty 300 if did.trim().is_empty() { 301 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 302 "DID cannot be empty".into(), 303 ))); 304 } 305 306 let mut tx = pool 307 .begin() 308 .await 309 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 310 311 let result = sqlx::query_scalar::<_, bool>( 312 "SELECT n.email_on_rsvp FROM identity_notifications n 313 INNER JOIN identity_profiles p ON n.did = p.did 314 WHERE n.did = $1 315 AND p.email IS NOT NULL 316 AND p.email != '' 317 AND n.confirmed_at IS NOT NULL 318 AND n.email_on_rsvp = true", 319 ) 320 .bind(did) 321 .fetch_optional(tx.as_mut()) 322 .await 323 .map_err(StorageError::UnableToExecuteQuery)?; 324 325 tx.commit() 326 .await 327 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 328 329 Ok(result.unwrap_or(false)) 330} 331 332/// Helper function to check if event change notifications are enabled 333/// Returns true only if: 334/// 1. A record exists for the DID 335/// 2. The email column (from identity_profiles) is not null and not empty 336/// 3. The confirmed_at value is not null 337/// 4. The email_on_event_change field is true 338pub async fn event_change_enabled(pool: &StoragePool, did: &str) -> Result<bool, StorageError> { 339 // Validate DID is not empty 340 if did.trim().is_empty() { 341 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 342 "DID cannot be empty".into(), 343 ))); 344 } 345 346 let mut tx = pool 347 .begin() 348 .await 349 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 350 351 let result = sqlx::query_scalar::<_, bool>( 352 "SELECT n.email_on_event_change FROM identity_notifications n 353 INNER JOIN identity_profiles p ON n.did = p.did 354 WHERE n.did = $1 355 AND p.email IS NOT NULL 356 AND p.email != '' 357 AND n.confirmed_at IS NOT NULL 358 AND n.email_on_event_change = true", 359 ) 360 .bind(did) 361 .fetch_optional(tx.as_mut()) 362 .await 363 .map_err(StorageError::UnableToExecuteQuery)?; 364 365 tx.commit() 366 .await 367 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 368 369 Ok(result.unwrap_or(false)) 370} 371 372/// Helper function to check if 24-hour event start notifications are enabled 373/// Returns true only if: 374/// 1. A record exists for the DID 375/// 2. The email column (from identity_profiles) is not null and not empty 376/// 3. The confirmed_at value is not null 377/// 4. The email_on_24h field is true 378pub async fn event_24h_start(pool: &StoragePool, did: &str) -> Result<bool, StorageError> { 379 // Validate DID is not empty 380 if did.trim().is_empty() { 381 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 382 "DID cannot be empty".into(), 383 ))); 384 } 385 386 let mut tx = pool 387 .begin() 388 .await 389 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 390 391 let result = sqlx::query_scalar::<_, bool>( 392 "SELECT n.email_on_24h FROM identity_notifications n 393 INNER JOIN identity_profiles p ON n.did = p.did 394 WHERE n.did = $1 395 AND p.email IS NOT NULL 396 AND p.email != '' 397 AND n.confirmed_at IS NOT NULL 398 AND n.email_on_24h = true", 399 ) 400 .bind(did) 401 .fetch_optional(tx.as_mut()) 402 .await 403 .map_err(StorageError::UnableToExecuteQuery)?; 404 405 tx.commit() 406 .await 407 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 408 409 Ok(result.unwrap_or(false)) 410} 411 412/// Helper function to check if RSVP summary notifications are enabled 413/// Returns true only if: 414/// 1. A record exists for the DID 415/// 2. The email column (from identity_profiles) is not null and not empty 416/// 3. The confirmed_at value is not null 417/// 4. The email_on_rsvp_summary field is true 418pub async fn rsvp_summary_enabled(pool: &StoragePool, did: &str) -> Result<bool, StorageError> { 419 // Validate DID is not empty 420 if did.trim().is_empty() { 421 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 422 "DID cannot be empty".into(), 423 ))); 424 } 425 426 let mut tx = pool 427 .begin() 428 .await 429 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 430 431 let result = sqlx::query_scalar::<_, bool>( 432 "SELECT n.email_on_rsvp_summary FROM identity_notifications n 433 INNER JOIN identity_profiles p ON n.did = p.did 434 WHERE n.did = $1 435 AND p.email IS NOT NULL 436 AND p.email != '' 437 AND n.confirmed_at IS NOT NULL 438 AND n.email_on_rsvp_summary = true", 439 ) 440 .bind(did) 441 .fetch_optional(tx.as_mut()) 442 .await 443 .map_err(StorageError::UnableToExecuteQuery)?; 444 445 tx.commit() 446 .await 447 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 448 449 Ok(result.unwrap_or(false)) 450} 451 452/// Get email address for a DID (only if confirmed) 453/// Joins with identity_profiles to get the email 454pub async fn notification_get_confirmed_email( 455 pool: &StoragePool, 456 did: &str, 457) -> Result<Option<String>, StorageError> { 458 // Validate DID is not empty 459 if did.trim().is_empty() { 460 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 461 "DID cannot be empty".into(), 462 ))); 463 } 464 465 let mut tx = pool 466 .begin() 467 .await 468 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 469 470 let email = sqlx::query_scalar::<_, String>( 471 "SELECT p.email FROM identity_notifications n 472 INNER JOIN identity_profiles p ON n.did = p.did 473 WHERE n.did = $1 474 AND p.email IS NOT NULL 475 AND p.email != '' 476 AND n.confirmed_at IS NOT NULL", 477 ) 478 .bind(did) 479 .fetch_optional(tx.as_mut()) 480 .await 481 .map_err(StorageError::UnableToExecuteQuery)?; 482 483 tx.commit() 484 .await 485 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 486 487 Ok(email) 488} 489 490/// Disable a specific notification preference 491/// Creates the notification record if it doesn't exist (handles legacy users) 492pub async fn notification_disable_preference( 493 pool: &StoragePool, 494 did: &str, 495 preference: NotificationPreference, 496) -> Result<(), StorageError> { 497 // Validate DID is not empty 498 if did.trim().is_empty() { 499 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 500 "DID cannot be empty".into(), 501 ))); 502 } 503 504 let mut tx = pool 505 .begin() 506 .await 507 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 508 509 let now = Utc::now(); 510 511 // Use UPSERT to handle both new and existing users 512 let (column_name, default_rsvp, default_event_change, default_24h, default_summary) = 513 match preference { 514 NotificationPreference::EmailOnRsvp => ("email_on_rsvp", false, false, false, true), 515 NotificationPreference::EmailOnEventChange => { 516 ("email_on_event_change", false, false, false, true) 517 } 518 NotificationPreference::EmailOn24h => ("email_on_24h", false, false, false, true), 519 NotificationPreference::EmailOnRsvpSummary => { 520 ("email_on_rsvp_summary", false, false, false, false) 521 } 522 }; 523 524 let query = format!( 525 "INSERT INTO identity_notifications (did, email_on_rsvp, email_on_event_change, email_on_24h, email_on_rsvp_summary, created_at, updated_at) 526 VALUES ($1, $2, $3, $4, $5, $6, $7) 527 ON CONFLICT (did) DO UPDATE SET 528 {} = false, 529 updated_at = $7", 530 column_name 531 ); 532 533 sqlx::query(&query) 534 .bind(did) 535 .bind(default_rsvp) 536 .bind(default_event_change) 537 .bind(default_24h) 538 .bind(default_summary) 539 .bind(now) 540 .bind(now) 541 .execute(tx.as_mut()) 542 .await 543 .map_err(StorageError::UnableToExecuteQuery)?; 544 545 tx.commit() 546 .await 547 .map_err(StorageError::CannotCommitDatabaseTransaction) 548} 549 550/// Notification preference types 551#[derive(Debug, Clone, Copy)] 552pub enum NotificationPreference { 553 EmailOnRsvp, 554 EmailOnEventChange, 555 EmailOn24h, 556 EmailOnRsvpSummary, 557} 558 559/// List identity notifications with pagination 560/// Returns (total_count, records) 561/// page is 0-indexed 562pub async fn notification_list( 563 pool: &StoragePool, 564 page: i64, 565 page_size: i64, 566) -> Result<(i64, Vec<NotificationSettingsWithEmail>), StorageError> { 567 let mut tx = pool 568 .begin() 569 .await 570 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 571 572 // Get total count 573 let total_count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM identity_notifications") 574 .fetch_one(tx.as_mut()) 575 .await 576 .map_err(StorageError::UnableToExecuteQuery)?; 577 578 // Fetch one extra record to determine if there are more pages 579 let offset = page * page_size; 580 let fetch_size = page_size + 1; 581 582 let records = sqlx::query_as::<_, NotificationSettingsWithEmail>( 583 "SELECT 584 n.did, 585 p.email, 586 n.confirmed_at, 587 n.email_on_rsvp, 588 n.email_on_event_change, 589 n.email_on_24h, 590 n.email_on_rsvp_summary, 591 n.created_at, 592 n.updated_at 593 FROM identity_notifications n 594 INNER JOIN identity_profiles p ON n.did = p.did 595 ORDER BY n.created_at DESC 596 LIMIT $1 OFFSET $2", 597 ) 598 .bind(fetch_size) 599 .bind(offset) 600 .fetch_all(tx.as_mut()) 601 .await 602 .map_err(StorageError::UnableToExecuteQuery)?; 603 604 tx.commit() 605 .await 606 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 607 608 Ok((total_count, records)) 609}