this repo has no description
1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::Utc;
6use sqlx::PgPool;
7use tokio::sync::watch;
8use tokio::time::interval;
9use tracing::{debug, error, info, warn};
10use tranquil_comms::{
11 CommsChannel, CommsSender, CommsStatus, CommsType, NewComms, QueuedComms, SendError,
12 format_message, get_strings,
13};
14use uuid::Uuid;
15
16pub struct CommsService {
17 db: PgPool,
18 senders: HashMap<CommsChannel, Arc<dyn CommsSender>>,
19 poll_interval: Duration,
20 batch_size: i64,
21}
22
23impl CommsService {
24 pub fn new(db: PgPool) -> Self {
25 let poll_interval_ms: u64 = std::env::var("NOTIFICATION_POLL_INTERVAL_MS")
26 .ok()
27 .and_then(|v| v.parse().ok())
28 .unwrap_or(1000);
29 let batch_size: i64 = std::env::var("NOTIFICATION_BATCH_SIZE")
30 .ok()
31 .and_then(|v| v.parse().ok())
32 .unwrap_or(100);
33 Self {
34 db,
35 senders: HashMap::new(),
36 poll_interval: Duration::from_millis(poll_interval_ms),
37 batch_size,
38 }
39 }
40
41 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
42 self.poll_interval = interval;
43 self
44 }
45
46 pub fn with_batch_size(mut self, size: i64) -> Self {
47 self.batch_size = size;
48 self
49 }
50
51 pub fn register_sender<S: CommsSender + 'static>(mut self, sender: S) -> Self {
52 self.senders.insert(sender.channel(), Arc::new(sender));
53 self
54 }
55
56 pub async fn enqueue(&self, item: NewComms) -> Result<Uuid, sqlx::Error> {
57 let id = sqlx::query_scalar!(
58 r#"
59 INSERT INTO comms_queue
60 (user_id, channel, comms_type, recipient, subject, body, metadata)
61 VALUES ($1, $2, $3, $4, $5, $6, $7)
62 RETURNING id
63 "#,
64 item.user_id,
65 item.channel as CommsChannel,
66 item.comms_type as CommsType,
67 item.recipient,
68 item.subject,
69 item.body,
70 item.metadata
71 )
72 .fetch_one(&self.db)
73 .await?;
74 debug!(comms_id = %id, "Comms enqueued");
75 Ok(id)
76 }
77
78 pub fn has_senders(&self) -> bool {
79 !self.senders.is_empty()
80 }
81
82 pub async fn run(self, mut shutdown: watch::Receiver<bool>) {
83 if self.senders.is_empty() {
84 warn!(
85 "Comms service starting with no senders configured. Messages will be queued but not delivered until senders are configured."
86 );
87 }
88 info!(
89 poll_interval_secs = self.poll_interval.as_secs(),
90 batch_size = self.batch_size,
91 channels = ?self.senders.keys().collect::<Vec<_>>(),
92 "Starting comms service"
93 );
94 let mut ticker = interval(self.poll_interval);
95 loop {
96 tokio::select! {
97 _ = ticker.tick() => {
98 if let Err(e) = self.process_batch().await {
99 error!(error = %e, "Failed to process comms batch");
100 }
101 }
102 _ = shutdown.changed() => {
103 if *shutdown.borrow() {
104 info!("Comms service shutting down");
105 break;
106 }
107 }
108 }
109 }
110 }
111
112 async fn process_batch(&self) -> Result<(), sqlx::Error> {
113 let items = self.fetch_pending().await?;
114 if items.is_empty() {
115 return Ok(());
116 }
117 debug!(count = items.len(), "Processing comms batch");
118 futures::future::join_all(items.into_iter().map(|item| self.process_item(item))).await;
119 Ok(())
120 }
121
122 async fn fetch_pending(&self) -> Result<Vec<QueuedComms>, sqlx::Error> {
123 let now = Utc::now();
124 sqlx::query_as!(
125 QueuedComms,
126 r#"
127 UPDATE comms_queue
128 SET status = 'processing', updated_at = NOW()
129 WHERE id IN (
130 SELECT id FROM comms_queue
131 WHERE status = 'pending'
132 AND scheduled_for <= $1
133 AND attempts < max_attempts
134 ORDER BY scheduled_for ASC
135 LIMIT $2
136 FOR UPDATE SKIP LOCKED
137 )
138 RETURNING
139 id, user_id,
140 channel as "channel: CommsChannel",
141 comms_type as "comms_type: CommsType",
142 status as "status: CommsStatus",
143 recipient, subject, body, metadata,
144 attempts, max_attempts, last_error,
145 created_at, updated_at, scheduled_for, processed_at
146 "#,
147 now,
148 self.batch_size
149 )
150 .fetch_all(&self.db)
151 .await
152 }
153
154 async fn process_item(&self, item: QueuedComms) {
155 let comms_id = item.id;
156 let channel = item.channel;
157 let result = match self.senders.get(&channel) {
158 Some(sender) => sender.send(&item).await,
159 None => {
160 warn!(
161 comms_id = %comms_id,
162 channel = ?channel,
163 "No sender registered for channel"
164 );
165 Err(SendError::NotConfigured(channel))
166 }
167 };
168 match result {
169 Ok(()) => {
170 debug!(comms_id = %comms_id, "Comms sent successfully");
171 if let Err(e) = self.mark_sent(comms_id).await {
172 error!(
173 comms_id = %comms_id,
174 error = %e,
175 "Failed to mark comms as sent"
176 );
177 }
178 }
179 Err(e) => {
180 let error_msg = e.to_string();
181 warn!(
182 comms_id = %comms_id,
183 error = %error_msg,
184 "Failed to send comms"
185 );
186 if let Err(db_err) = self.mark_failed(comms_id, &error_msg).await {
187 error!(
188 comms_id = %comms_id,
189 error = %db_err,
190 "Failed to mark comms as failed"
191 );
192 }
193 }
194 }
195 }
196
197 async fn mark_sent(&self, id: Uuid) -> Result<(), sqlx::Error> {
198 sqlx::query!(
199 r#"
200 UPDATE comms_queue
201 SET status = 'sent', processed_at = NOW(), updated_at = NOW()
202 WHERE id = $1
203 "#,
204 id
205 )
206 .execute(&self.db)
207 .await?;
208 Ok(())
209 }
210
211 async fn mark_failed(&self, id: Uuid, error: &str) -> Result<(), sqlx::Error> {
212 sqlx::query!(
213 r#"
214 UPDATE comms_queue
215 SET
216 status = CASE
217 WHEN attempts + 1 >= max_attempts THEN 'failed'::comms_status
218 ELSE 'pending'::comms_status
219 END,
220 attempts = attempts + 1,
221 last_error = $2,
222 updated_at = NOW(),
223 scheduled_for = NOW() + (INTERVAL '1 minute' * (attempts + 1))
224 WHERE id = $1
225 "#,
226 id,
227 error
228 )
229 .execute(&self.db)
230 .await?;
231 Ok(())
232 }
233}
234
235pub async fn enqueue_comms(db: &PgPool, item: NewComms) -> Result<Uuid, sqlx::Error> {
236 sqlx::query_scalar!(
237 r#"
238 INSERT INTO comms_queue
239 (user_id, channel, comms_type, recipient, subject, body, metadata)
240 VALUES ($1, $2, $3, $4, $5, $6, $7)
241 RETURNING id
242 "#,
243 item.user_id,
244 item.channel as CommsChannel,
245 item.comms_type as CommsType,
246 item.recipient,
247 item.subject,
248 item.body,
249 item.metadata
250 )
251 .fetch_one(db)
252 .await
253}
254
255pub struct UserCommsPrefs {
256 pub channel: CommsChannel,
257 pub email: Option<String>,
258 pub handle: crate::types::Handle,
259 pub locale: String,
260}
261
262pub async fn get_user_comms_prefs(
263 db: &PgPool,
264 user_id: Uuid,
265) -> Result<UserCommsPrefs, sqlx::Error> {
266 let row = sqlx::query!(
267 r#"
268 SELECT
269 email,
270 handle,
271 preferred_comms_channel as "channel: CommsChannel",
272 preferred_locale
273 FROM users
274 WHERE id = $1
275 "#,
276 user_id
277 )
278 .fetch_one(db)
279 .await?;
280 Ok(UserCommsPrefs {
281 channel: row.channel,
282 email: row.email,
283 handle: row.handle.into(),
284 locale: row.preferred_locale.unwrap_or_else(|| "en".to_string()),
285 })
286}
287
288pub async fn enqueue_welcome(
289 db: &PgPool,
290 user_id: Uuid,
291 hostname: &str,
292) -> Result<Uuid, sqlx::Error> {
293 let prefs = get_user_comms_prefs(db, user_id).await?;
294 let strings = get_strings(&prefs.locale);
295 let body = format_message(
296 strings.welcome_body,
297 &[("hostname", hostname), ("handle", &prefs.handle)],
298 );
299 let subject = format_message(strings.welcome_subject, &[("hostname", hostname)]);
300 enqueue_comms(
301 db,
302 NewComms::new(
303 user_id,
304 prefs.channel,
305 CommsType::Welcome,
306 prefs.email.unwrap_or_default(),
307 Some(subject),
308 body,
309 ),
310 )
311 .await
312}
313
314pub async fn enqueue_password_reset(
315 db: &PgPool,
316 user_id: Uuid,
317 code: &str,
318 hostname: &str,
319) -> Result<Uuid, sqlx::Error> {
320 let prefs = get_user_comms_prefs(db, user_id).await?;
321 let strings = get_strings(&prefs.locale);
322 let body = format_message(
323 strings.password_reset_body,
324 &[("handle", &prefs.handle), ("code", code)],
325 );
326 let subject = format_message(strings.password_reset_subject, &[("hostname", hostname)]);
327 enqueue_comms(
328 db,
329 NewComms::new(
330 user_id,
331 prefs.channel,
332 CommsType::PasswordReset,
333 prefs.email.unwrap_or_default(),
334 Some(subject),
335 body,
336 ),
337 )
338 .await
339}
340
341pub async fn enqueue_email_update(
342 db: &PgPool,
343 user_id: Uuid,
344 new_email: &str,
345 handle: &str,
346 code: &str,
347 hostname: &str,
348) -> Result<Uuid, sqlx::Error> {
349 let prefs = get_user_comms_prefs(db, user_id).await?;
350 let strings = get_strings(&prefs.locale);
351 let encoded_email = urlencoding::encode(new_email);
352 let encoded_token = urlencoding::encode(code);
353 let verify_page = format!("https://{}/app/verify", hostname);
354 let verify_link = format!(
355 "https://{}/app/verify?token={}&identifier={}",
356 hostname, encoded_token, encoded_email
357 );
358 let body = format_message(
359 strings.email_update_body,
360 &[
361 ("handle", handle),
362 ("code", code),
363 ("verify_page", &verify_page),
364 ("verify_link", &verify_link),
365 ],
366 );
367 let subject = format_message(strings.email_update_subject, &[("hostname", hostname)]);
368 enqueue_comms(
369 db,
370 NewComms::email(
371 user_id,
372 CommsType::EmailUpdate,
373 new_email.to_string(),
374 subject,
375 body,
376 ),
377 )
378 .await
379}
380
381pub async fn enqueue_email_update_token(
382 db: &PgPool,
383 user_id: Uuid,
384 code: &str,
385 hostname: &str,
386) -> Result<Uuid, sqlx::Error> {
387 let prefs = get_user_comms_prefs(db, user_id).await?;
388 let strings = get_strings(&prefs.locale);
389 let current_email = prefs.email.unwrap_or_default();
390 let verify_page = format!("https://{}/app/verify?type=email-update", hostname);
391 let verify_link = format!(
392 "https://{}/app/verify?type=email-update&token={}",
393 hostname,
394 urlencoding::encode(code)
395 );
396 let body = format_message(
397 strings.email_update_body,
398 &[
399 ("handle", &prefs.handle),
400 ("code", code),
401 ("verify_page", &verify_page),
402 ("verify_link", &verify_link),
403 ],
404 );
405 let subject = format_message(strings.email_update_subject, &[("hostname", hostname)]);
406 enqueue_comms(
407 db,
408 NewComms::email(
409 user_id,
410 CommsType::EmailUpdate,
411 current_email,
412 subject,
413 body,
414 ),
415 )
416 .await
417}
418
419pub async fn enqueue_account_deletion(
420 db: &PgPool,
421 user_id: Uuid,
422 code: &str,
423 hostname: &str,
424) -> Result<Uuid, sqlx::Error> {
425 let prefs = get_user_comms_prefs(db, user_id).await?;
426 let strings = get_strings(&prefs.locale);
427 let body = format_message(
428 strings.account_deletion_body,
429 &[("handle", &prefs.handle), ("code", code)],
430 );
431 let subject = format_message(strings.account_deletion_subject, &[("hostname", hostname)]);
432 enqueue_comms(
433 db,
434 NewComms::new(
435 user_id,
436 prefs.channel,
437 CommsType::AccountDeletion,
438 prefs.email.unwrap_or_default(),
439 Some(subject),
440 body,
441 ),
442 )
443 .await
444}
445
446pub async fn enqueue_plc_operation(
447 db: &PgPool,
448 user_id: Uuid,
449 token: &str,
450 hostname: &str,
451) -> Result<Uuid, sqlx::Error> {
452 let prefs = get_user_comms_prefs(db, user_id).await?;
453 let strings = get_strings(&prefs.locale);
454 let body = format_message(
455 strings.plc_operation_body,
456 &[("handle", &prefs.handle), ("token", token)],
457 );
458 let subject = format_message(strings.plc_operation_subject, &[("hostname", hostname)]);
459 enqueue_comms(
460 db,
461 NewComms::new(
462 user_id,
463 prefs.channel,
464 CommsType::PlcOperation,
465 prefs.email.unwrap_or_default(),
466 Some(subject),
467 body,
468 ),
469 )
470 .await
471}
472
473pub async fn enqueue_2fa_code(
474 db: &PgPool,
475 user_id: Uuid,
476 code: &str,
477 hostname: &str,
478) -> Result<Uuid, sqlx::Error> {
479 let prefs = get_user_comms_prefs(db, user_id).await?;
480 let strings = get_strings(&prefs.locale);
481 let body = format_message(
482 strings.two_factor_code_body,
483 &[("handle", &prefs.handle), ("code", code)],
484 );
485 let subject = format_message(strings.two_factor_code_subject, &[("hostname", hostname)]);
486 enqueue_comms(
487 db,
488 NewComms::new(
489 user_id,
490 prefs.channel,
491 CommsType::TwoFactorCode,
492 prefs.email.unwrap_or_default(),
493 Some(subject),
494 body,
495 ),
496 )
497 .await
498}
499
500pub async fn enqueue_passkey_recovery(
501 db: &PgPool,
502 user_id: Uuid,
503 recovery_url: &str,
504 hostname: &str,
505) -> Result<Uuid, sqlx::Error> {
506 let prefs = get_user_comms_prefs(db, user_id).await?;
507 let strings = get_strings(&prefs.locale);
508 let body = format_message(
509 strings.passkey_recovery_body,
510 &[("handle", &prefs.handle), ("url", recovery_url)],
511 );
512 let subject = format_message(strings.passkey_recovery_subject, &[("hostname", hostname)]);
513 enqueue_comms(
514 db,
515 NewComms::new(
516 user_id,
517 prefs.channel,
518 CommsType::PasskeyRecovery,
519 prefs.email.unwrap_or_default(),
520 Some(subject),
521 body,
522 ),
523 )
524 .await
525}
526
527pub fn channel_display_name(channel: CommsChannel) -> &'static str {
528 match channel {
529 CommsChannel::Email => "email",
530 CommsChannel::Discord => "Discord",
531 CommsChannel::Telegram => "Telegram",
532 CommsChannel::Signal => "Signal",
533 }
534}
535
536pub async fn enqueue_signup_verification(
537 db: &PgPool,
538 user_id: Uuid,
539 channel: &str,
540 recipient: &str,
541 code: &str,
542 locale: Option<&str>,
543) -> Result<Uuid, sqlx::Error> {
544 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
545 let comms_channel = match channel {
546 "email" => CommsChannel::Email,
547 "discord" => CommsChannel::Discord,
548 "telegram" => CommsChannel::Telegram,
549 "signal" => CommsChannel::Signal,
550 _ => CommsChannel::Email,
551 };
552 let strings = get_strings(locale.unwrap_or("en"));
553 let (verify_page, verify_link) = if comms_channel == CommsChannel::Email {
554 let encoded_email = urlencoding::encode(recipient);
555 let encoded_token = urlencoding::encode(code);
556 (
557 format!("https://{}/app/verify", hostname),
558 format!(
559 "https://{}/app/verify?token={}&identifier={}",
560 hostname, encoded_token, encoded_email
561 ),
562 )
563 } else {
564 (String::new(), String::new())
565 };
566 let body = format_message(
567 strings.signup_verification_body,
568 &[
569 ("code", code),
570 ("hostname", &hostname),
571 ("verify_page", &verify_page),
572 ("verify_link", &verify_link),
573 ],
574 );
575 let subject = match comms_channel {
576 CommsChannel::Email => Some(format_message(
577 strings.signup_verification_subject,
578 &[("hostname", &hostname)],
579 )),
580 _ => None,
581 };
582 enqueue_comms(
583 db,
584 NewComms::new(
585 user_id,
586 comms_channel,
587 CommsType::EmailVerification,
588 recipient.to_string(),
589 subject,
590 body,
591 ),
592 )
593 .await
594}
595
596pub async fn enqueue_migration_verification(
597 db: &PgPool,
598 user_id: Uuid,
599 email: &str,
600 token: &str,
601 hostname: &str,
602) -> Result<Uuid, sqlx::Error> {
603 let prefs = get_user_comms_prefs(db, user_id).await?;
604 let strings = get_strings(&prefs.locale);
605 let encoded_email = urlencoding::encode(email);
606 let encoded_token = urlencoding::encode(token);
607 let verify_page = format!("https://{}/app/verify", hostname);
608 let verify_link = format!(
609 "https://{}/app/verify?token={}&identifier={}",
610 hostname, encoded_token, encoded_email
611 );
612 let body = format_message(
613 strings.migration_verification_body,
614 &[
615 ("code", token),
616 ("hostname", hostname),
617 ("verify_page", &verify_page),
618 ("verify_link", &verify_link),
619 ],
620 );
621 let subject = format_message(
622 strings.migration_verification_subject,
623 &[("hostname", hostname)],
624 );
625 enqueue_comms(
626 db,
627 NewComms::email(
628 user_id,
629 CommsType::MigrationVerification,
630 email.to_string(),
631 subject,
632 body,
633 ),
634 )
635 .await
636}
637
638pub async fn queue_legacy_login_notification(
639 db: &PgPool,
640 user_id: Uuid,
641 hostname: &str,
642 client_ip: &str,
643 channel: CommsChannel,
644) -> Result<Uuid, sqlx::Error> {
645 let prefs = get_user_comms_prefs(db, user_id).await?;
646 let strings = get_strings(&prefs.locale);
647 let timestamp = chrono::Utc::now()
648 .format("%Y-%m-%d %H:%M:%S UTC")
649 .to_string();
650 let body = format_message(
651 strings.legacy_login_body,
652 &[
653 ("handle", &prefs.handle),
654 ("timestamp", ×tamp),
655 ("ip", client_ip),
656 ("hostname", hostname),
657 ],
658 );
659 let subject = format_message(strings.legacy_login_subject, &[("hostname", hostname)]);
660 enqueue_comms(
661 db,
662 NewComms::new(
663 user_id,
664 channel,
665 CommsType::LegacyLoginAlert,
666 prefs.email.unwrap_or_default(),
667 Some(subject),
668 body,
669 ),
670 )
671 .await
672}