The smokesignal.events web application
1use chrono::Utc;
2
3use crate::storage::StoragePool;
4use crate::storage::errors::StorageError;
5
6pub mod model {
7 use chrono::{DateTime, Utc};
8 use serde::{Deserialize, Serialize};
9 use sqlx::FromRow;
10
11 /// Notification settings (without email - email comes from identity_profiles)
12 #[derive(Clone, FromRow, Deserialize, Serialize, Debug)]
13 pub struct IdentityNotification {
14 pub did: String,
15 pub confirmed_at: Option<DateTime<Utc>>,
16 pub email_on_rsvp: bool,
17 pub email_on_event_change: bool,
18 pub email_on_24h: bool,
19 pub email_on_rsvp_summary: bool,
20 pub created_at: DateTime<Utc>,
21 pub updated_at: DateTime<Utc>,
22 }
23
24 /// Notification settings with email joined from identity_profiles
25 #[derive(Clone, FromRow, Deserialize, Serialize, Debug)]
26 pub struct NotificationSettingsWithEmail {
27 pub did: String,
28 pub email: Option<String>, // From identity_profiles table
29 pub confirmed_at: Option<DateTime<Utc>>,
30 pub email_on_rsvp: bool,
31 pub email_on_event_change: bool,
32 pub email_on_24h: bool,
33 pub email_on_rsvp_summary: bool,
34 pub created_at: DateTime<Utc>,
35 pub updated_at: DateTime<Utc>,
36 }
37}
38
39use model::NotificationSettingsWithEmail;
40
41/// Initialize notification settings for a DID
42/// This is typically called during OAuth login flow
43pub async fn notification_init(pool: &StoragePool, did: &str) -> Result<(), StorageError> {
44 // Validate DID is not empty
45 if did.trim().is_empty() {
46 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
47 "DID cannot be empty".into(),
48 )));
49 }
50
51 let mut tx = pool
52 .begin()
53 .await
54 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
55
56 let now = Utc::now();
57
58 // Insert if doesn't exist
59 sqlx::query(
60 "INSERT INTO identity_notifications (did, created_at, updated_at)
61 VALUES ($1, $2, $3)
62 ON CONFLICT (did) DO UPDATE
63 SET updated_at = $3",
64 )
65 .bind(did)
66 .bind(now)
67 .bind(now)
68 .execute(tx.as_mut())
69 .await
70 .map_err(StorageError::UnableToExecuteQuery)?;
71
72 tx.commit()
73 .await
74 .map_err(StorageError::CannotCommitDatabaseTransaction)
75}
76
77/// Get notification settings for a DID with email from identity_profiles
78pub async fn notification_get(
79 pool: &StoragePool,
80 did: &str,
81) -> Result<Option<NotificationSettingsWithEmail>, StorageError> {
82 // Validate DID is not empty
83 if did.trim().is_empty() {
84 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
85 "DID cannot be empty".into(),
86 )));
87 }
88
89 let mut tx = pool
90 .begin()
91 .await
92 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
93
94 let notification = sqlx::query_as::<_, NotificationSettingsWithEmail>(
95 "SELECT
96 n.did,
97 p.email,
98 n.confirmed_at,
99 n.email_on_rsvp,
100 n.email_on_event_change,
101 n.email_on_24h,
102 n.email_on_rsvp_summary,
103 n.created_at,
104 n.updated_at
105 FROM identity_notifications n
106 INNER JOIN identity_profiles p ON n.did = p.did
107 WHERE n.did = $1",
108 )
109 .bind(did)
110 .fetch_optional(tx.as_mut())
111 .await
112 .map_err(StorageError::UnableToExecuteQuery)?;
113
114 tx.commit()
115 .await
116 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
117
118 Ok(notification)
119}
120
121/// Reset email confirmation for a DID
122/// This should be called when the email address changes in identity_profiles
123/// Creates the notification record if it doesn't exist (handles legacy users)
124pub async fn notification_reset_confirmation(
125 pool: &StoragePool,
126 did: &str,
127) -> Result<(), StorageError> {
128 // Validate DID is not empty
129 if did.trim().is_empty() {
130 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
131 "DID cannot be empty".into(),
132 )));
133 }
134
135 let mut tx = pool
136 .begin()
137 .await
138 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
139
140 let now = Utc::now();
141
142 // Use UPSERT to handle both new and existing users
143 sqlx::query(
144 "INSERT INTO identity_notifications (did, confirmed_at, created_at, updated_at)
145 VALUES ($1, NULL, $2, $3)
146 ON CONFLICT (did) DO UPDATE
147 SET confirmed_at = NULL, updated_at = $3",
148 )
149 .bind(did)
150 .bind(now)
151 .bind(now)
152 .execute(tx.as_mut())
153 .await
154 .map_err(StorageError::UnableToExecuteQuery)?;
155
156 tx.commit()
157 .await
158 .map_err(StorageError::CannotCommitDatabaseTransaction)
159}
160
161/// Confirm email address for a DID
162/// Creates the notification record if it doesn't exist (handles legacy users)
163pub async fn notification_confirm_email(pool: &StoragePool, did: &str) -> Result<(), StorageError> {
164 // Validate DID is not empty
165 if did.trim().is_empty() {
166 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
167 "DID cannot be empty".into(),
168 )));
169 }
170
171 let mut tx = pool
172 .begin()
173 .await
174 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
175
176 let now = Utc::now();
177
178 // Use UPSERT to handle both new and existing users
179 sqlx::query(
180 "INSERT INTO identity_notifications (did, confirmed_at, created_at, updated_at)
181 VALUES ($1, $2, $3, $4)
182 ON CONFLICT (did) DO UPDATE
183 SET confirmed_at = $2, updated_at = $4",
184 )
185 .bind(did)
186 .bind(now)
187 .bind(now)
188 .bind(now)
189 .execute(tx.as_mut())
190 .await
191 .map_err(StorageError::UnableToExecuteQuery)?;
192
193 tx.commit()
194 .await
195 .map_err(StorageError::CannotCommitDatabaseTransaction)
196}
197
198/// Unconfirm email address for all DIDs with a specific email (used by webhook handlers)
199/// This joins with identity_profiles to find DIDs with the given email
200pub async fn notification_unconfirm_email(
201 pool: &StoragePool,
202 email: &str,
203) -> Result<(), StorageError> {
204 // Validate email is not empty
205 if email.trim().is_empty() {
206 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
207 "Email cannot be empty".into(),
208 )));
209 }
210
211 let mut tx = pool
212 .begin()
213 .await
214 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
215
216 let now = Utc::now();
217
218 sqlx::query(
219 "UPDATE identity_notifications n
220 SET confirmed_at = NULL, updated_at = $1
221 FROM identity_profiles p
222 WHERE n.did = p.did AND p.email = $2",
223 )
224 .bind(now)
225 .bind(email)
226 .execute(tx.as_mut())
227 .await
228 .map_err(StorageError::UnableToExecuteQuery)?;
229
230 tx.commit()
231 .await
232 .map_err(StorageError::CannotCommitDatabaseTransaction)
233}
234
235/// Update notification preference
236/// Creates the notification record if it doesn't exist (handles legacy users)
237pub async fn notification_set_preference(
238 pool: &StoragePool,
239 did: &str,
240 email_on_rsvp: Option<bool>,
241 email_on_event_change: Option<bool>,
242 email_on_24h: Option<bool>,
243 email_on_rsvp_summary: Option<bool>,
244) -> Result<(), StorageError> {
245 // Validate DID is not empty
246 if did.trim().is_empty() {
247 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
248 "DID cannot be empty".into(),
249 )));
250 }
251
252 let mut tx = pool
253 .begin()
254 .await
255 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
256
257 let now = Utc::now();
258
259 // Use UPSERT to handle both new and existing users
260 // Handler always passes Some values, so we can set them directly
261 let rsvp = email_on_rsvp.unwrap_or(false);
262 let event_change = email_on_event_change.unwrap_or(false);
263 let reminder_24h = email_on_24h.unwrap_or(false);
264 let rsvp_summary = email_on_rsvp_summary.unwrap_or(false);
265
266 sqlx::query(
267 "INSERT INTO identity_notifications (did, email_on_rsvp, email_on_event_change, email_on_24h, email_on_rsvp_summary, created_at, updated_at)
268 VALUES ($1, $2, $3, $4, $5, $6, $7)
269 ON CONFLICT (did) DO UPDATE SET
270 email_on_rsvp = $2,
271 email_on_event_change = $3,
272 email_on_24h = $4,
273 email_on_rsvp_summary = $5,
274 updated_at = $7"
275 )
276 .bind(did)
277 .bind(rsvp)
278 .bind(event_change)
279 .bind(reminder_24h)
280 .bind(rsvp_summary)
281 .bind(now)
282 .bind(now)
283 .execute(tx.as_mut())
284 .await
285 .map_err(StorageError::UnableToExecuteQuery)?;
286
287 tx.commit()
288 .await
289 .map_err(StorageError::CannotCommitDatabaseTransaction)
290}
291
292/// Helper function to check if RSVP created notifications are enabled
293/// Returns true only if:
294/// 1. A record exists for the DID
295/// 2. The email column is not null and not empty
296/// 3. The confirmed_at value is not null
297/// 4. The email_on_rsvp field is true
298pub async fn rsvp_created_enabled(pool: &StoragePool, did: &str) -> Result<bool, StorageError> {
299 // Validate DID is not empty
300 if did.trim().is_empty() {
301 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
302 "DID cannot be empty".into(),
303 )));
304 }
305
306 let mut tx = pool
307 .begin()
308 .await
309 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
310
311 let result = sqlx::query_scalar::<_, bool>(
312 "SELECT n.email_on_rsvp FROM identity_notifications n
313 INNER JOIN identity_profiles p ON n.did = p.did
314 WHERE n.did = $1
315 AND p.email IS NOT NULL
316 AND p.email != ''
317 AND n.confirmed_at IS NOT NULL
318 AND n.email_on_rsvp = true",
319 )
320 .bind(did)
321 .fetch_optional(tx.as_mut())
322 .await
323 .map_err(StorageError::UnableToExecuteQuery)?;
324
325 tx.commit()
326 .await
327 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
328
329 Ok(result.unwrap_or(false))
330}
331
332/// Helper function to check if event change notifications are enabled
333/// Returns true only if:
334/// 1. A record exists for the DID
335/// 2. The email column (from identity_profiles) is not null and not empty
336/// 3. The confirmed_at value is not null
337/// 4. The email_on_event_change field is true
338pub async fn event_change_enabled(pool: &StoragePool, did: &str) -> Result<bool, StorageError> {
339 // Validate DID is not empty
340 if did.trim().is_empty() {
341 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
342 "DID cannot be empty".into(),
343 )));
344 }
345
346 let mut tx = pool
347 .begin()
348 .await
349 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
350
351 let result = sqlx::query_scalar::<_, bool>(
352 "SELECT n.email_on_event_change FROM identity_notifications n
353 INNER JOIN identity_profiles p ON n.did = p.did
354 WHERE n.did = $1
355 AND p.email IS NOT NULL
356 AND p.email != ''
357 AND n.confirmed_at IS NOT NULL
358 AND n.email_on_event_change = true",
359 )
360 .bind(did)
361 .fetch_optional(tx.as_mut())
362 .await
363 .map_err(StorageError::UnableToExecuteQuery)?;
364
365 tx.commit()
366 .await
367 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
368
369 Ok(result.unwrap_or(false))
370}
371
372/// Helper function to check if 24-hour event start notifications are enabled
373/// Returns true only if:
374/// 1. A record exists for the DID
375/// 2. The email column (from identity_profiles) is not null and not empty
376/// 3. The confirmed_at value is not null
377/// 4. The email_on_24h field is true
378pub async fn event_24h_start(pool: &StoragePool, did: &str) -> Result<bool, StorageError> {
379 // Validate DID is not empty
380 if did.trim().is_empty() {
381 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
382 "DID cannot be empty".into(),
383 )));
384 }
385
386 let mut tx = pool
387 .begin()
388 .await
389 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
390
391 let result = sqlx::query_scalar::<_, bool>(
392 "SELECT n.email_on_24h FROM identity_notifications n
393 INNER JOIN identity_profiles p ON n.did = p.did
394 WHERE n.did = $1
395 AND p.email IS NOT NULL
396 AND p.email != ''
397 AND n.confirmed_at IS NOT NULL
398 AND n.email_on_24h = true",
399 )
400 .bind(did)
401 .fetch_optional(tx.as_mut())
402 .await
403 .map_err(StorageError::UnableToExecuteQuery)?;
404
405 tx.commit()
406 .await
407 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
408
409 Ok(result.unwrap_or(false))
410}
411
412/// Helper function to check if RSVP summary notifications are enabled
413/// Returns true only if:
414/// 1. A record exists for the DID
415/// 2. The email column (from identity_profiles) is not null and not empty
416/// 3. The confirmed_at value is not null
417/// 4. The email_on_rsvp_summary field is true
418pub async fn rsvp_summary_enabled(pool: &StoragePool, did: &str) -> Result<bool, StorageError> {
419 // Validate DID is not empty
420 if did.trim().is_empty() {
421 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
422 "DID cannot be empty".into(),
423 )));
424 }
425
426 let mut tx = pool
427 .begin()
428 .await
429 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
430
431 let result = sqlx::query_scalar::<_, bool>(
432 "SELECT n.email_on_rsvp_summary FROM identity_notifications n
433 INNER JOIN identity_profiles p ON n.did = p.did
434 WHERE n.did = $1
435 AND p.email IS NOT NULL
436 AND p.email != ''
437 AND n.confirmed_at IS NOT NULL
438 AND n.email_on_rsvp_summary = true",
439 )
440 .bind(did)
441 .fetch_optional(tx.as_mut())
442 .await
443 .map_err(StorageError::UnableToExecuteQuery)?;
444
445 tx.commit()
446 .await
447 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
448
449 Ok(result.unwrap_or(false))
450}
451
452/// Get email address for a DID (only if confirmed)
453/// Joins with identity_profiles to get the email
454pub async fn notification_get_confirmed_email(
455 pool: &StoragePool,
456 did: &str,
457) -> Result<Option<String>, StorageError> {
458 // Validate DID is not empty
459 if did.trim().is_empty() {
460 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
461 "DID cannot be empty".into(),
462 )));
463 }
464
465 let mut tx = pool
466 .begin()
467 .await
468 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
469
470 let email = sqlx::query_scalar::<_, String>(
471 "SELECT p.email FROM identity_notifications n
472 INNER JOIN identity_profiles p ON n.did = p.did
473 WHERE n.did = $1
474 AND p.email IS NOT NULL
475 AND p.email != ''
476 AND n.confirmed_at IS NOT NULL",
477 )
478 .bind(did)
479 .fetch_optional(tx.as_mut())
480 .await
481 .map_err(StorageError::UnableToExecuteQuery)?;
482
483 tx.commit()
484 .await
485 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
486
487 Ok(email)
488}
489
490/// Disable a specific notification preference
491/// Creates the notification record if it doesn't exist (handles legacy users)
492pub async fn notification_disable_preference(
493 pool: &StoragePool,
494 did: &str,
495 preference: NotificationPreference,
496) -> Result<(), StorageError> {
497 // Validate DID is not empty
498 if did.trim().is_empty() {
499 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
500 "DID cannot be empty".into(),
501 )));
502 }
503
504 let mut tx = pool
505 .begin()
506 .await
507 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
508
509 let now = Utc::now();
510
511 // Use UPSERT to handle both new and existing users
512 let (column_name, default_rsvp, default_event_change, default_24h, default_summary) =
513 match preference {
514 NotificationPreference::EmailOnRsvp => ("email_on_rsvp", false, false, false, true),
515 NotificationPreference::EmailOnEventChange => {
516 ("email_on_event_change", false, false, false, true)
517 }
518 NotificationPreference::EmailOn24h => ("email_on_24h", false, false, false, true),
519 NotificationPreference::EmailOnRsvpSummary => {
520 ("email_on_rsvp_summary", false, false, false, false)
521 }
522 };
523
524 let query = format!(
525 "INSERT INTO identity_notifications (did, email_on_rsvp, email_on_event_change, email_on_24h, email_on_rsvp_summary, created_at, updated_at)
526 VALUES ($1, $2, $3, $4, $5, $6, $7)
527 ON CONFLICT (did) DO UPDATE SET
528 {} = false,
529 updated_at = $7",
530 column_name
531 );
532
533 sqlx::query(&query)
534 .bind(did)
535 .bind(default_rsvp)
536 .bind(default_event_change)
537 .bind(default_24h)
538 .bind(default_summary)
539 .bind(now)
540 .bind(now)
541 .execute(tx.as_mut())
542 .await
543 .map_err(StorageError::UnableToExecuteQuery)?;
544
545 tx.commit()
546 .await
547 .map_err(StorageError::CannotCommitDatabaseTransaction)
548}
549
550/// Notification preference types
551#[derive(Debug, Clone, Copy)]
552pub enum NotificationPreference {
553 EmailOnRsvp,
554 EmailOnEventChange,
555 EmailOn24h,
556 EmailOnRsvpSummary,
557}
558
559/// List identity notifications with pagination
560/// Returns (total_count, records)
561/// page is 0-indexed
562pub async fn notification_list(
563 pool: &StoragePool,
564 page: i64,
565 page_size: i64,
566) -> Result<(i64, Vec<NotificationSettingsWithEmail>), StorageError> {
567 let mut tx = pool
568 .begin()
569 .await
570 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
571
572 // Get total count
573 let total_count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM identity_notifications")
574 .fetch_one(tx.as_mut())
575 .await
576 .map_err(StorageError::UnableToExecuteQuery)?;
577
578 // Fetch one extra record to determine if there are more pages
579 let offset = page * page_size;
580 let fetch_size = page_size + 1;
581
582 let records = sqlx::query_as::<_, NotificationSettingsWithEmail>(
583 "SELECT
584 n.did,
585 p.email,
586 n.confirmed_at,
587 n.email_on_rsvp,
588 n.email_on_event_change,
589 n.email_on_24h,
590 n.email_on_rsvp_summary,
591 n.created_at,
592 n.updated_at
593 FROM identity_notifications n
594 INNER JOIN identity_profiles p ON n.did = p.did
595 ORDER BY n.created_at DESC
596 LIMIT $1 OFFSET $2",
597 )
598 .bind(fetch_size)
599 .bind(offset)
600 .fetch_all(tx.as_mut())
601 .await
602 .map_err(StorageError::UnableToExecuteQuery)?;
603
604 tx.commit()
605 .await
606 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
607
608 Ok((total_count, records))
609}