this repo has no description
at main 20 kB view raw
1use std::collections::HashMap; 2use std::sync::Arc; 3use std::time::Duration; 4 5use chrono::Utc; 6use sqlx::PgPool; 7use tokio::sync::watch; 8use tokio::time::interval; 9use tracing::{debug, error, info, warn}; 10use tranquil_comms::{ 11 CommsChannel, CommsSender, CommsStatus, CommsType, NewComms, QueuedComms, SendError, 12 format_message, get_strings, 13}; 14use uuid::Uuid; 15 16pub struct CommsService { 17 db: PgPool, 18 senders: HashMap<CommsChannel, Arc<dyn CommsSender>>, 19 poll_interval: Duration, 20 batch_size: i64, 21} 22 23impl CommsService { 24 pub fn new(db: PgPool) -> Self { 25 let poll_interval_ms: u64 = std::env::var("NOTIFICATION_POLL_INTERVAL_MS") 26 .ok() 27 .and_then(|v| v.parse().ok()) 28 .unwrap_or(1000); 29 let batch_size: i64 = std::env::var("NOTIFICATION_BATCH_SIZE") 30 .ok() 31 .and_then(|v| v.parse().ok()) 32 .unwrap_or(100); 33 Self { 34 db, 35 senders: HashMap::new(), 36 poll_interval: Duration::from_millis(poll_interval_ms), 37 batch_size, 38 } 39 } 40 41 pub fn with_poll_interval(mut self, interval: Duration) -> Self { 42 self.poll_interval = interval; 43 self 44 } 45 46 pub fn with_batch_size(mut self, size: i64) -> Self { 47 self.batch_size = size; 48 self 49 } 50 51 pub fn register_sender<S: CommsSender + 'static>(mut self, sender: S) -> Self { 52 self.senders.insert(sender.channel(), Arc::new(sender)); 53 self 54 } 55 56 pub async fn enqueue(&self, item: NewComms) -> Result<Uuid, sqlx::Error> { 57 let id = sqlx::query_scalar!( 58 r#" 59 INSERT INTO comms_queue 60 (user_id, channel, comms_type, recipient, subject, body, metadata) 61 VALUES ($1, $2, $3, $4, $5, $6, $7) 62 RETURNING id 63 "#, 64 item.user_id, 65 item.channel as CommsChannel, 66 item.comms_type as CommsType, 67 item.recipient, 68 item.subject, 69 item.body, 70 item.metadata 71 ) 72 .fetch_one(&self.db) 73 .await?; 74 debug!(comms_id = %id, "Comms enqueued"); 75 Ok(id) 76 } 77 78 pub fn has_senders(&self) -> bool { 79 !self.senders.is_empty() 80 } 81 82 pub async fn run(self, mut shutdown: watch::Receiver<bool>) { 83 if self.senders.is_empty() { 84 warn!( 85 "Comms service starting with no senders configured. Messages will be queued but not delivered until senders are configured." 86 ); 87 } 88 info!( 89 poll_interval_secs = self.poll_interval.as_secs(), 90 batch_size = self.batch_size, 91 channels = ?self.senders.keys().collect::<Vec<_>>(), 92 "Starting comms service" 93 ); 94 let mut ticker = interval(self.poll_interval); 95 loop { 96 tokio::select! { 97 _ = ticker.tick() => { 98 if let Err(e) = self.process_batch().await { 99 error!(error = %e, "Failed to process comms batch"); 100 } 101 } 102 _ = shutdown.changed() => { 103 if *shutdown.borrow() { 104 info!("Comms service shutting down"); 105 break; 106 } 107 } 108 } 109 } 110 } 111 112 async fn process_batch(&self) -> Result<(), sqlx::Error> { 113 let items = self.fetch_pending().await?; 114 if items.is_empty() { 115 return Ok(()); 116 } 117 debug!(count = items.len(), "Processing comms batch"); 118 futures::future::join_all(items.into_iter().map(|item| self.process_item(item))).await; 119 Ok(()) 120 } 121 122 async fn fetch_pending(&self) -> Result<Vec<QueuedComms>, sqlx::Error> { 123 let now = Utc::now(); 124 sqlx::query_as!( 125 QueuedComms, 126 r#" 127 UPDATE comms_queue 128 SET status = 'processing', updated_at = NOW() 129 WHERE id IN ( 130 SELECT id FROM comms_queue 131 WHERE status = 'pending' 132 AND scheduled_for <= $1 133 AND attempts < max_attempts 134 ORDER BY scheduled_for ASC 135 LIMIT $2 136 FOR UPDATE SKIP LOCKED 137 ) 138 RETURNING 139 id, user_id, 140 channel as "channel: CommsChannel", 141 comms_type as "comms_type: CommsType", 142 status as "status: CommsStatus", 143 recipient, subject, body, metadata, 144 attempts, max_attempts, last_error, 145 created_at, updated_at, scheduled_for, processed_at 146 "#, 147 now, 148 self.batch_size 149 ) 150 .fetch_all(&self.db) 151 .await 152 } 153 154 async fn process_item(&self, item: QueuedComms) { 155 let comms_id = item.id; 156 let channel = item.channel; 157 let result = match self.senders.get(&channel) { 158 Some(sender) => sender.send(&item).await, 159 None => { 160 warn!( 161 comms_id = %comms_id, 162 channel = ?channel, 163 "No sender registered for channel" 164 ); 165 Err(SendError::NotConfigured(channel)) 166 } 167 }; 168 match result { 169 Ok(()) => { 170 debug!(comms_id = %comms_id, "Comms sent successfully"); 171 if let Err(e) = self.mark_sent(comms_id).await { 172 error!( 173 comms_id = %comms_id, 174 error = %e, 175 "Failed to mark comms as sent" 176 ); 177 } 178 } 179 Err(e) => { 180 let error_msg = e.to_string(); 181 warn!( 182 comms_id = %comms_id, 183 error = %error_msg, 184 "Failed to send comms" 185 ); 186 if let Err(db_err) = self.mark_failed(comms_id, &error_msg).await { 187 error!( 188 comms_id = %comms_id, 189 error = %db_err, 190 "Failed to mark comms as failed" 191 ); 192 } 193 } 194 } 195 } 196 197 async fn mark_sent(&self, id: Uuid) -> Result<(), sqlx::Error> { 198 sqlx::query!( 199 r#" 200 UPDATE comms_queue 201 SET status = 'sent', processed_at = NOW(), updated_at = NOW() 202 WHERE id = $1 203 "#, 204 id 205 ) 206 .execute(&self.db) 207 .await?; 208 Ok(()) 209 } 210 211 async fn mark_failed(&self, id: Uuid, error: &str) -> Result<(), sqlx::Error> { 212 sqlx::query!( 213 r#" 214 UPDATE comms_queue 215 SET 216 status = CASE 217 WHEN attempts + 1 >= max_attempts THEN 'failed'::comms_status 218 ELSE 'pending'::comms_status 219 END, 220 attempts = attempts + 1, 221 last_error = $2, 222 updated_at = NOW(), 223 scheduled_for = NOW() + (INTERVAL '1 minute' * (attempts + 1)) 224 WHERE id = $1 225 "#, 226 id, 227 error 228 ) 229 .execute(&self.db) 230 .await?; 231 Ok(()) 232 } 233} 234 235pub async fn enqueue_comms(db: &PgPool, item: NewComms) -> Result<Uuid, sqlx::Error> { 236 sqlx::query_scalar!( 237 r#" 238 INSERT INTO comms_queue 239 (user_id, channel, comms_type, recipient, subject, body, metadata) 240 VALUES ($1, $2, $3, $4, $5, $6, $7) 241 RETURNING id 242 "#, 243 item.user_id, 244 item.channel as CommsChannel, 245 item.comms_type as CommsType, 246 item.recipient, 247 item.subject, 248 item.body, 249 item.metadata 250 ) 251 .fetch_one(db) 252 .await 253} 254 255pub struct UserCommsPrefs { 256 pub channel: CommsChannel, 257 pub email: Option<String>, 258 pub handle: crate::types::Handle, 259 pub locale: String, 260} 261 262pub async fn get_user_comms_prefs( 263 db: &PgPool, 264 user_id: Uuid, 265) -> Result<UserCommsPrefs, sqlx::Error> { 266 let row = sqlx::query!( 267 r#" 268 SELECT 269 email, 270 handle, 271 preferred_comms_channel as "channel: CommsChannel", 272 preferred_locale 273 FROM users 274 WHERE id = $1 275 "#, 276 user_id 277 ) 278 .fetch_one(db) 279 .await?; 280 Ok(UserCommsPrefs { 281 channel: row.channel, 282 email: row.email, 283 handle: row.handle.into(), 284 locale: row.preferred_locale.unwrap_or_else(|| "en".to_string()), 285 }) 286} 287 288pub async fn enqueue_welcome( 289 db: &PgPool, 290 user_id: Uuid, 291 hostname: &str, 292) -> Result<Uuid, sqlx::Error> { 293 let prefs = get_user_comms_prefs(db, user_id).await?; 294 let strings = get_strings(&prefs.locale); 295 let body = format_message( 296 strings.welcome_body, 297 &[("hostname", hostname), ("handle", &prefs.handle)], 298 ); 299 let subject = format_message(strings.welcome_subject, &[("hostname", hostname)]); 300 enqueue_comms( 301 db, 302 NewComms::new( 303 user_id, 304 prefs.channel, 305 CommsType::Welcome, 306 prefs.email.unwrap_or_default(), 307 Some(subject), 308 body, 309 ), 310 ) 311 .await 312} 313 314pub async fn enqueue_password_reset( 315 db: &PgPool, 316 user_id: Uuid, 317 code: &str, 318 hostname: &str, 319) -> Result<Uuid, sqlx::Error> { 320 let prefs = get_user_comms_prefs(db, user_id).await?; 321 let strings = get_strings(&prefs.locale); 322 let body = format_message( 323 strings.password_reset_body, 324 &[("handle", &prefs.handle), ("code", code)], 325 ); 326 let subject = format_message(strings.password_reset_subject, &[("hostname", hostname)]); 327 enqueue_comms( 328 db, 329 NewComms::new( 330 user_id, 331 prefs.channel, 332 CommsType::PasswordReset, 333 prefs.email.unwrap_or_default(), 334 Some(subject), 335 body, 336 ), 337 ) 338 .await 339} 340 341pub async fn enqueue_email_update( 342 db: &PgPool, 343 user_id: Uuid, 344 new_email: &str, 345 handle: &str, 346 code: &str, 347 hostname: &str, 348) -> Result<Uuid, sqlx::Error> { 349 let prefs = get_user_comms_prefs(db, user_id).await?; 350 let strings = get_strings(&prefs.locale); 351 let encoded_email = urlencoding::encode(new_email); 352 let encoded_token = urlencoding::encode(code); 353 let verify_page = format!("https://{}/app/verify", hostname); 354 let verify_link = format!( 355 "https://{}/app/verify?token={}&identifier={}", 356 hostname, encoded_token, encoded_email 357 ); 358 let body = format_message( 359 strings.email_update_body, 360 &[ 361 ("handle", handle), 362 ("code", code), 363 ("verify_page", &verify_page), 364 ("verify_link", &verify_link), 365 ], 366 ); 367 let subject = format_message(strings.email_update_subject, &[("hostname", hostname)]); 368 enqueue_comms( 369 db, 370 NewComms::email( 371 user_id, 372 CommsType::EmailUpdate, 373 new_email.to_string(), 374 subject, 375 body, 376 ), 377 ) 378 .await 379} 380 381pub async fn enqueue_email_update_token( 382 db: &PgPool, 383 user_id: Uuid, 384 code: &str, 385 hostname: &str, 386) -> Result<Uuid, sqlx::Error> { 387 let prefs = get_user_comms_prefs(db, user_id).await?; 388 let strings = get_strings(&prefs.locale); 389 let current_email = prefs.email.unwrap_or_default(); 390 let verify_page = format!("https://{}/app/verify?type=email-update", hostname); 391 let verify_link = format!( 392 "https://{}/app/verify?type=email-update&token={}", 393 hostname, 394 urlencoding::encode(code) 395 ); 396 let body = format_message( 397 strings.email_update_body, 398 &[ 399 ("handle", &prefs.handle), 400 ("code", code), 401 ("verify_page", &verify_page), 402 ("verify_link", &verify_link), 403 ], 404 ); 405 let subject = format_message(strings.email_update_subject, &[("hostname", hostname)]); 406 enqueue_comms( 407 db, 408 NewComms::email( 409 user_id, 410 CommsType::EmailUpdate, 411 current_email, 412 subject, 413 body, 414 ), 415 ) 416 .await 417} 418 419pub async fn enqueue_account_deletion( 420 db: &PgPool, 421 user_id: Uuid, 422 code: &str, 423 hostname: &str, 424) -> Result<Uuid, sqlx::Error> { 425 let prefs = get_user_comms_prefs(db, user_id).await?; 426 let strings = get_strings(&prefs.locale); 427 let body = format_message( 428 strings.account_deletion_body, 429 &[("handle", &prefs.handle), ("code", code)], 430 ); 431 let subject = format_message(strings.account_deletion_subject, &[("hostname", hostname)]); 432 enqueue_comms( 433 db, 434 NewComms::new( 435 user_id, 436 prefs.channel, 437 CommsType::AccountDeletion, 438 prefs.email.unwrap_or_default(), 439 Some(subject), 440 body, 441 ), 442 ) 443 .await 444} 445 446pub async fn enqueue_plc_operation( 447 db: &PgPool, 448 user_id: Uuid, 449 token: &str, 450 hostname: &str, 451) -> Result<Uuid, sqlx::Error> { 452 let prefs = get_user_comms_prefs(db, user_id).await?; 453 let strings = get_strings(&prefs.locale); 454 let body = format_message( 455 strings.plc_operation_body, 456 &[("handle", &prefs.handle), ("token", token)], 457 ); 458 let subject = format_message(strings.plc_operation_subject, &[("hostname", hostname)]); 459 enqueue_comms( 460 db, 461 NewComms::new( 462 user_id, 463 prefs.channel, 464 CommsType::PlcOperation, 465 prefs.email.unwrap_or_default(), 466 Some(subject), 467 body, 468 ), 469 ) 470 .await 471} 472 473pub async fn enqueue_2fa_code( 474 db: &PgPool, 475 user_id: Uuid, 476 code: &str, 477 hostname: &str, 478) -> Result<Uuid, sqlx::Error> { 479 let prefs = get_user_comms_prefs(db, user_id).await?; 480 let strings = get_strings(&prefs.locale); 481 let body = format_message( 482 strings.two_factor_code_body, 483 &[("handle", &prefs.handle), ("code", code)], 484 ); 485 let subject = format_message(strings.two_factor_code_subject, &[("hostname", hostname)]); 486 enqueue_comms( 487 db, 488 NewComms::new( 489 user_id, 490 prefs.channel, 491 CommsType::TwoFactorCode, 492 prefs.email.unwrap_or_default(), 493 Some(subject), 494 body, 495 ), 496 ) 497 .await 498} 499 500pub async fn enqueue_passkey_recovery( 501 db: &PgPool, 502 user_id: Uuid, 503 recovery_url: &str, 504 hostname: &str, 505) -> Result<Uuid, sqlx::Error> { 506 let prefs = get_user_comms_prefs(db, user_id).await?; 507 let strings = get_strings(&prefs.locale); 508 let body = format_message( 509 strings.passkey_recovery_body, 510 &[("handle", &prefs.handle), ("url", recovery_url)], 511 ); 512 let subject = format_message(strings.passkey_recovery_subject, &[("hostname", hostname)]); 513 enqueue_comms( 514 db, 515 NewComms::new( 516 user_id, 517 prefs.channel, 518 CommsType::PasskeyRecovery, 519 prefs.email.unwrap_or_default(), 520 Some(subject), 521 body, 522 ), 523 ) 524 .await 525} 526 527pub fn channel_display_name(channel: CommsChannel) -> &'static str { 528 match channel { 529 CommsChannel::Email => "email", 530 CommsChannel::Discord => "Discord", 531 CommsChannel::Telegram => "Telegram", 532 CommsChannel::Signal => "Signal", 533 } 534} 535 536pub async fn enqueue_signup_verification( 537 db: &PgPool, 538 user_id: Uuid, 539 channel: &str, 540 recipient: &str, 541 code: &str, 542 locale: Option<&str>, 543) -> Result<Uuid, sqlx::Error> { 544 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 545 let comms_channel = match channel { 546 "email" => CommsChannel::Email, 547 "discord" => CommsChannel::Discord, 548 "telegram" => CommsChannel::Telegram, 549 "signal" => CommsChannel::Signal, 550 _ => CommsChannel::Email, 551 }; 552 let strings = get_strings(locale.unwrap_or("en")); 553 let (verify_page, verify_link) = if comms_channel == CommsChannel::Email { 554 let encoded_email = urlencoding::encode(recipient); 555 let encoded_token = urlencoding::encode(code); 556 ( 557 format!("https://{}/app/verify", hostname), 558 format!( 559 "https://{}/app/verify?token={}&identifier={}", 560 hostname, encoded_token, encoded_email 561 ), 562 ) 563 } else { 564 (String::new(), String::new()) 565 }; 566 let body = format_message( 567 strings.signup_verification_body, 568 &[ 569 ("code", code), 570 ("hostname", &hostname), 571 ("verify_page", &verify_page), 572 ("verify_link", &verify_link), 573 ], 574 ); 575 let subject = match comms_channel { 576 CommsChannel::Email => Some(format_message( 577 strings.signup_verification_subject, 578 &[("hostname", &hostname)], 579 )), 580 _ => None, 581 }; 582 enqueue_comms( 583 db, 584 NewComms::new( 585 user_id, 586 comms_channel, 587 CommsType::EmailVerification, 588 recipient.to_string(), 589 subject, 590 body, 591 ), 592 ) 593 .await 594} 595 596pub async fn enqueue_migration_verification( 597 db: &PgPool, 598 user_id: Uuid, 599 email: &str, 600 token: &str, 601 hostname: &str, 602) -> Result<Uuid, sqlx::Error> { 603 let prefs = get_user_comms_prefs(db, user_id).await?; 604 let strings = get_strings(&prefs.locale); 605 let encoded_email = urlencoding::encode(email); 606 let encoded_token = urlencoding::encode(token); 607 let verify_page = format!("https://{}/app/verify", hostname); 608 let verify_link = format!( 609 "https://{}/app/verify?token={}&identifier={}", 610 hostname, encoded_token, encoded_email 611 ); 612 let body = format_message( 613 strings.migration_verification_body, 614 &[ 615 ("code", token), 616 ("hostname", hostname), 617 ("verify_page", &verify_page), 618 ("verify_link", &verify_link), 619 ], 620 ); 621 let subject = format_message( 622 strings.migration_verification_subject, 623 &[("hostname", hostname)], 624 ); 625 enqueue_comms( 626 db, 627 NewComms::email( 628 user_id, 629 CommsType::MigrationVerification, 630 email.to_string(), 631 subject, 632 body, 633 ), 634 ) 635 .await 636} 637 638pub async fn queue_legacy_login_notification( 639 db: &PgPool, 640 user_id: Uuid, 641 hostname: &str, 642 client_ip: &str, 643 channel: CommsChannel, 644) -> Result<Uuid, sqlx::Error> { 645 let prefs = get_user_comms_prefs(db, user_id).await?; 646 let strings = get_strings(&prefs.locale); 647 let timestamp = chrono::Utc::now() 648 .format("%Y-%m-%d %H:%M:%S UTC") 649 .to_string(); 650 let body = format_message( 651 strings.legacy_login_body, 652 &[ 653 ("handle", &prefs.handle), 654 ("timestamp", &timestamp), 655 ("ip", client_ip), 656 ("hostname", hostname), 657 ], 658 ); 659 let subject = format_message(strings.legacy_login_subject, &[("hostname", hostname)]); 660 enqueue_comms( 661 db, 662 NewComms::new( 663 user_id, 664 channel, 665 CommsType::LegacyLoginAlert, 666 prefs.email.unwrap_or_default(), 667 Some(subject), 668 body, 669 ), 670 ) 671 .await 672}