this repo has no description

Initial notification sender service

lewis 32968c93 0dd5e380

+17
.env.example
··· 13 13 PDS_HOSTNAME=localhost:3000 14 14 PLC_URL=plc.directory 15 15 16 + # Notification Service Configuration 17 + # At least one notification channel should be configured for user notifications to work. 18 + # Email notifications (via sendmail/msmtp) 19 + # MAIL_FROM_ADDRESS=noreply@example.com 20 + # MAIL_FROM_NAME=My PDS 21 + # SENDMAIL_PATH=/usr/sbin/sendmail 22 + 23 + # Discord notifications (not yet implemented) 24 + # DISCORD_BOT_TOKEN=your-bot-token 25 + 26 + # Telegram notifications (not yet implemented) 27 + # TELEGRAM_BOT_TOKEN=your-bot-token 28 + 29 + # Signal notifications (not yet implemented) 30 + # SIGNAL_CLI_PATH=/usr/local/bin/signal-cli 31 + # SIGNAL_PHONE_NUMBER=+1234567890 32 + 16 33 CARGO_MOMMYS_LITTLE=mister 17 34 CARGO_MOMMYS_PRONOUNS=his 18 35 CARGO_MOMMYS_ROLES=daddy
+15
.sqlx/query-2c6cb8f15fe71cb5f38ffd7f5085b60bc852c4f1042c95a76fce773efd369511.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n UPDATE notification_queue\n SET\n status = CASE\n WHEN attempts + 1 >= max_attempts THEN 'failed'::notification_status\n ELSE 'pending'::notification_status\n END,\n attempts = attempts + 1,\n last_error = $2,\n updated_at = NOW(),\n scheduled_for = NOW() + (INTERVAL '1 minute' * (attempts + 1))\n WHERE id = $1\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "Text" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "2c6cb8f15fe71cb5f38ffd7f5085b60bc852c4f1042c95a76fce773efd369511" 15 + }
+53
.sqlx/query-303777d97e6ed344f8c699eae37b7b0c241c734a5b7726019c2a59ae277caee6.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO notification_queue\n (user_id, channel, notification_type, recipient, subject, body, metadata)\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n RETURNING id\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "id", 9 + "type_info": "Uuid" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Uuid", 15 + { 16 + "Custom": { 17 + "name": "notification_channel", 18 + "kind": { 19 + "Enum": [ 20 + "email", 21 + "discord", 22 + "telegram", 23 + "signal" 24 + ] 25 + } 26 + } 27 + }, 28 + { 29 + "Custom": { 30 + "name": "notification_type", 31 + "kind": { 32 + "Enum": [ 33 + "welcome", 34 + "email_verification", 35 + "password_reset", 36 + "email_update", 37 + "account_deletion" 38 + ] 39 + } 40 + } 41 + }, 42 + "Text", 43 + "Text", 44 + "Text", 45 + "Jsonb" 46 + ] 47 + }, 48 + "nullable": [ 49 + false 50 + ] 51 + }, 52 + "hash": "303777d97e6ed344f8c699eae37b7b0c241c734a5b7726019c2a59ae277caee6" 53 + }
+14
.sqlx/query-344c851d3f1b026e8632aa2f04052dcbc957b7077c856da6a1a256ec2fe85ad3.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n UPDATE notification_queue\n SET status = 'sent', processed_at = NOW(), updated_at = NOW()\n WHERE id = $1\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid" 9 + ] 10 + }, 11 + "nullable": [] 12 + }, 13 + "hash": "344c851d3f1b026e8632aa2f04052dcbc957b7077c856da6a1a256ec2fe85ad3" 14 + }
+46
.sqlx/query-55c4e13e5ff23aaa71c3ab417891a5f56542571ba3f15c6d9dae153405bc4275.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT s.did, u.id as user_id, u.email, u.handle, k.key_bytes\n FROM sessions s\n JOIN users u ON s.did = u.did\n JOIN user_keys k ON u.id = k.user_id\n WHERE s.access_jwt = $1\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "did", 9 + "type_info": "Text" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "user_id", 14 + "type_info": "Uuid" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "email", 19 + "type_info": "Text" 20 + }, 21 + { 22 + "ordinal": 3, 23 + "name": "handle", 24 + "type_info": "Text" 25 + }, 26 + { 27 + "ordinal": 4, 28 + "name": "key_bytes", 29 + "type_info": "Bytea" 30 + } 31 + ], 32 + "parameters": { 33 + "Left": [ 34 + "Text" 35 + ] 36 + }, 37 + "nullable": [ 38 + false, 39 + false, 40 + false, 41 + false, 42 + false 43 + ] 44 + }, 45 + "hash": "55c4e13e5ff23aaa71c3ab417891a5f56542571ba3f15c6d9dae153405bc4275" 46 + }
+53
.sqlx/query-5d49bbf0307a0c642b0174d641de748fa648c97f8109255120e969c957ff95bf.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO notification_queue\n (user_id, channel, notification_type, recipient, subject, body, metadata)\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n RETURNING id\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "id", 9 + "type_info": "Uuid" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Uuid", 15 + { 16 + "Custom": { 17 + "name": "notification_channel", 18 + "kind": { 19 + "Enum": [ 20 + "email", 21 + "discord", 22 + "telegram", 23 + "signal" 24 + ] 25 + } 26 + } 27 + }, 28 + { 29 + "Custom": { 30 + "name": "notification_type", 31 + "kind": { 32 + "Enum": [ 33 + "welcome", 34 + "email_verification", 35 + "password_reset", 36 + "email_update", 37 + "account_deletion" 38 + ] 39 + } 40 + } 41 + }, 42 + "Text", 43 + "Text", 44 + "Text", 45 + "Jsonb" 46 + ] 47 + }, 48 + "nullable": [ 49 + false 50 + ] 51 + }, 52 + "hash": "5d49bbf0307a0c642b0174d641de748fa648c97f8109255120e969c957ff95bf" 53 + }
+150
.sqlx/query-cb6f48aaba124c79308d20e66c23adb44d1196296b7f93fad19b2d17548ed3de.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n UPDATE notification_queue\n SET status = 'processing', updated_at = NOW()\n WHERE id IN (\n SELECT id FROM notification_queue\n WHERE status = 'pending'\n AND scheduled_for <= $1\n AND attempts < max_attempts\n ORDER BY scheduled_for ASC\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n )\n RETURNING\n id, user_id,\n channel as \"channel: NotificationChannel\",\n notification_type as \"notification_type: super::types::NotificationType\",\n status as \"status: NotificationStatus\",\n recipient, subject, body, metadata,\n attempts, max_attempts, last_error,\n created_at, updated_at, scheduled_for, processed_at\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "id", 9 + "type_info": "Uuid" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "user_id", 14 + "type_info": "Uuid" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "channel: NotificationChannel", 19 + "type_info": { 20 + "Custom": { 21 + "name": "notification_channel", 22 + "kind": { 23 + "Enum": [ 24 + "email", 25 + "discord", 26 + "telegram", 27 + "signal" 28 + ] 29 + } 30 + } 31 + } 32 + }, 33 + { 34 + "ordinal": 3, 35 + "name": "notification_type: super::types::NotificationType", 36 + "type_info": { 37 + "Custom": { 38 + "name": "notification_type", 39 + "kind": { 40 + "Enum": [ 41 + "welcome", 42 + "email_verification", 43 + "password_reset", 44 + "email_update", 45 + "account_deletion" 46 + ] 47 + } 48 + } 49 + } 50 + }, 51 + { 52 + "ordinal": 4, 53 + "name": "status: NotificationStatus", 54 + "type_info": { 55 + "Custom": { 56 + "name": "notification_status", 57 + "kind": { 58 + "Enum": [ 59 + "pending", 60 + "processing", 61 + "sent", 62 + "failed" 63 + ] 64 + } 65 + } 66 + } 67 + }, 68 + { 69 + "ordinal": 5, 70 + "name": "recipient", 71 + "type_info": "Text" 72 + }, 73 + { 74 + "ordinal": 6, 75 + "name": "subject", 76 + "type_info": "Text" 77 + }, 78 + { 79 + "ordinal": 7, 80 + "name": "body", 81 + "type_info": "Text" 82 + }, 83 + { 84 + "ordinal": 8, 85 + "name": "metadata", 86 + "type_info": "Jsonb" 87 + }, 88 + { 89 + "ordinal": 9, 90 + "name": "attempts", 91 + "type_info": "Int4" 92 + }, 93 + { 94 + "ordinal": 10, 95 + "name": "max_attempts", 96 + "type_info": "Int4" 97 + }, 98 + { 99 + "ordinal": 11, 100 + "name": "last_error", 101 + "type_info": "Text" 102 + }, 103 + { 104 + "ordinal": 12, 105 + "name": "created_at", 106 + "type_info": "Timestamptz" 107 + }, 108 + { 109 + "ordinal": 13, 110 + "name": "updated_at", 111 + "type_info": "Timestamptz" 112 + }, 113 + { 114 + "ordinal": 14, 115 + "name": "scheduled_for", 116 + "type_info": "Timestamptz" 117 + }, 118 + { 119 + "ordinal": 15, 120 + "name": "processed_at", 121 + "type_info": "Timestamptz" 122 + } 123 + ], 124 + "parameters": { 125 + "Left": [ 126 + "Timestamptz", 127 + "Int8" 128 + ] 129 + }, 130 + "nullable": [ 131 + false, 132 + false, 133 + false, 134 + false, 135 + false, 136 + false, 137 + true, 138 + false, 139 + true, 140 + false, 141 + false, 142 + true, 143 + false, 144 + false, 145 + false, 146 + true 147 + ] 148 + }, 149 + "hash": "cb6f48aaba124c79308d20e66c23adb44d1196296b7f93fad19b2d17548ed3de" 150 + }
+1 -1
Cargo.toml
··· 30 30 sha2 = "0.10.9" 31 31 sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "uuid", "chrono", "json"] } 32 32 thiserror = "2.0.17" 33 - tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread", "time"] } 33 + tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread", "time", "signal", "process"] } 34 34 tracing = "0.1.43" 35 35 tracing-subscriber = "0.3.22" 36 36 uuid = { version = "1.19.0", features = ["v4", "fast-rng"] }
+9 -3
TODO.md
··· 126 126 - [ ] Implement caching layer for DID resolution (Redis or in-memory). 127 127 - [ ] Handle cache invalidation/expiry. 128 128 - [ ] Background Jobs 129 - - [ ] Implement background queue for async tasks (crawler notifications, discord/telegram 2FA sending instead of email). 130 129 - [ ] Implement `Crawlers` service (debounce notifications to relays). 131 - - [ ] Mailer equivalent 132 - - [ ] Implement code/notification sending service as a replacement for the mailer because there's no way I'm starting with email. :D 130 + - [x] Notification Service 131 + - [x] Queue-based notification system with database table 132 + - [x] Background worker polling for pending notifications 133 + - [x] Extensible sender trait for multiple channels 134 + - [x] Email sender via OS sendmail/msmtp 135 + - [ ] Discord bot sender 136 + - [ ] Telegram bot sender 137 + - [ ] Signal bot sender 138 + - [x] Helper functions for common notification types (welcome, password reset, email verification, etc.) 133 139 - [ ] Image Processing 134 140 - [ ] Implement image resize/formatting pipeline (for blob uploads). 135 141 - [ ] IPLD & MST
+36
migrations/202512212000_notification_queue.sql
··· 1 + CREATE TYPE notification_channel AS ENUM ('email', 'discord', 'telegram', 'signal'); 2 + CREATE TYPE notification_status AS ENUM ('pending', 'processing', 'sent', 'failed'); 3 + CREATE TYPE notification_type AS ENUM ( 4 + 'welcome', 5 + 'email_verification', 6 + 'password_reset', 7 + 'email_update', 8 + 'account_deletion' 9 + ); 10 + 11 + CREATE TABLE IF NOT EXISTS notification_queue ( 12 + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), 13 + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, 14 + channel notification_channel NOT NULL DEFAULT 'email', 15 + notification_type notification_type NOT NULL, 16 + status notification_status NOT NULL DEFAULT 'pending', 17 + recipient TEXT NOT NULL, 18 + subject TEXT, 19 + body TEXT NOT NULL, 20 + metadata JSONB, 21 + attempts INT NOT NULL DEFAULT 0, 22 + max_attempts INT NOT NULL DEFAULT 3, 23 + last_error TEXT, 24 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 25 + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 26 + scheduled_for TIMESTAMPTZ NOT NULL DEFAULT NOW(), 27 + processed_at TIMESTAMPTZ 28 + ); 29 + 30 + CREATE INDEX idx_notification_queue_status_scheduled 31 + ON notification_queue(status, scheduled_for) 32 + WHERE status = 'pending'; 33 + 34 + CREATE INDEX idx_notification_queue_user_id ON notification_queue(user_id); 35 + 36 + ALTER TABLE users ADD COLUMN IF NOT EXISTS preferred_notification_channel notification_channel NOT NULL DEFAULT 'email';
+14 -1
src/api/identity/account.rs
··· 14 14 use serde::{Deserialize, Serialize}; 15 15 use serde_json::json; 16 16 use std::sync::Arc; 17 - use tracing::{error, info}; 17 + use tracing::{error, info, warn}; 18 18 19 19 #[derive(Deserialize)] 20 20 pub struct CreateAccountInput { ··· 330 330 Json(json!({"error": "InternalError"})), 331 331 ) 332 332 .into_response(); 333 + } 334 + 335 + let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 336 + if let Err(e) = crate::notifications::enqueue_welcome_email( 337 + &state.db, 338 + user_id, 339 + &input.email, 340 + &input.handle, 341 + &hostname, 342 + ) 343 + .await 344 + { 345 + warn!("Failed to enqueue welcome email: {:?}", e); 333 346 } 334 347 335 348 (
+18 -5
src/api/server/session.rs
··· 363 363 364 364 let session = sqlx::query!( 365 365 r#" 366 - SELECT s.did, k.key_bytes 366 + SELECT s.did, u.id as user_id, u.email, u.handle, k.key_bytes 367 367 FROM sessions s 368 368 JOIN users u ON s.did = u.did 369 369 JOIN user_keys k ON u.id = k.user_id ··· 374 374 .fetch_optional(&state.db) 375 375 .await; 376 376 377 - let (did, key_bytes) = match session { 378 - Ok(Some(row)) => (row.did, row.key_bytes), 377 + let (did, user_id, email, handle, key_bytes) = match session { 378 + Ok(Some(row)) => (row.did, row.user_id, row.email, row.handle, row.key_bytes), 379 379 Ok(None) => { 380 380 return ( 381 381 StatusCode::UNAUTHORIZED, ··· 422 422 .into_response(); 423 423 } 424 424 425 - // TODO: Send email or other notification 426 - info!("Account deletion requested for user {}, token: {}", did, confirmation_token); 425 + let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 426 + if let Err(e) = crate::notifications::enqueue_account_deletion( 427 + &state.db, 428 + user_id, 429 + &email, 430 + &handle, 431 + &confirmation_token, 432 + &hostname, 433 + ) 434 + .await 435 + { 436 + warn!("Failed to enqueue account deletion notification: {:?}", e); 437 + } 438 + 439 + info!("Account deletion requested for user {}", did); 427 440 428 441 (StatusCode::OK, Json(json!({}))).into_response() 429 442 }
+1
src/lib.rs
··· 1 1 pub mod api; 2 2 pub mod auth; 3 + pub mod notifications; 3 4 pub mod repo; 4 5 pub mod state; 5 6 pub mod storage;
+54 -3
src/main.rs
··· 1 + use bspds::notifications::{EmailSender, NotificationService}; 1 2 use bspds::state::AppState; 2 3 use std::net::SocketAddr; 3 - use tracing::info; 4 + use tokio::sync::watch; 5 + use tracing::{info, warn}; 4 6 5 7 #[tokio::main] 6 8 async fn main() { ··· 20 22 .await 21 23 .expect("Failed to run migrations"); 22 24 23 - let state = AppState::new(pool).await; 25 + let state = AppState::new(pool.clone()).await; 26 + 27 + let (shutdown_tx, shutdown_rx) = watch::channel(false); 28 + 29 + let mut notification_service = NotificationService::new(pool); 30 + 31 + if let Some(email_sender) = EmailSender::from_env() { 32 + info!("Email notifications enabled"); 33 + notification_service = notification_service.register_sender(email_sender); 34 + } else { 35 + warn!("Email notifications disabled (MAIL_FROM_ADDRESS not set)"); 36 + } 37 + 38 + let notification_handle = tokio::spawn(notification_service.run(shutdown_rx)); 24 39 25 40 let app = bspds::app(state); 26 41 27 42 let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); 28 43 info!("listening on {}", addr); 29 44 let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); 30 - axum::serve(listener, app).await.unwrap(); 45 + 46 + let server_result = axum::serve(listener, app) 47 + .with_graceful_shutdown(shutdown_signal(shutdown_tx)) 48 + .await; 49 + 50 + notification_handle.await.ok(); 51 + 52 + if let Err(e) = server_result { 53 + tracing::error!("Server error: {}", e); 54 + } 55 + } 56 + 57 + async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) { 58 + let ctrl_c = async { 59 + tokio::signal::ctrl_c() 60 + .await 61 + .expect("Failed to install Ctrl+C handler"); 62 + }; 63 + 64 + #[cfg(unix)] 65 + let terminate = async { 66 + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) 67 + .expect("Failed to install signal handler") 68 + .recv() 69 + .await; 70 + }; 71 + 72 + #[cfg(not(unix))] 73 + let terminate = std::future::pending::<()>(); 74 + 75 + tokio::select! { 76 + _ = ctrl_c => {}, 77 + _ = terminate => {}, 78 + } 79 + 80 + info!("Shutdown signal received, stopping services..."); 81 + shutdown_tx.send(true).ok(); 31 82 }
+12
src/notifications/mod.rs
··· 1 + mod sender; 2 + mod service; 3 + mod types; 4 + 5 + pub use sender::{EmailSender, NotificationSender}; 6 + pub use service::{ 7 + enqueue_account_deletion, enqueue_email_update, enqueue_email_verification, 8 + enqueue_notification, enqueue_password_reset, enqueue_welcome_email, NotificationService, 9 + }; 10 + pub use types::{ 11 + NewNotification, NotificationChannel, NotificationStatus, NotificationType, QueuedNotification, 12 + };
+98
src/notifications/sender.rs
··· 1 + use async_trait::async_trait; 2 + use std::process::Stdio; 3 + use tokio::io::AsyncWriteExt; 4 + use tokio::process::Command; 5 + 6 + use super::types::{NotificationChannel, QueuedNotification}; 7 + 8 + #[async_trait] 9 + pub trait NotificationSender: Send + Sync { 10 + fn channel(&self) -> NotificationChannel; 11 + async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError>; 12 + } 13 + 14 + #[derive(Debug, thiserror::Error)] 15 + pub enum SendError { 16 + #[error("Failed to spawn sendmail process: {0}")] 17 + ProcessSpawn(#[from] std::io::Error), 18 + 19 + #[error("Sendmail exited with non-zero status: {0}")] 20 + SendmailFailed(String), 21 + 22 + #[error("Channel not configured: {0:?}")] 23 + NotConfigured(NotificationChannel), 24 + 25 + #[error("External service error: {0}")] 26 + ExternalService(String), 27 + } 28 + 29 + pub struct EmailSender { 30 + from_address: String, 31 + from_name: String, 32 + sendmail_path: String, 33 + } 34 + 35 + impl EmailSender { 36 + pub fn new(from_address: String, from_name: String) -> Self { 37 + Self { 38 + from_address, 39 + from_name, 40 + sendmail_path: std::env::var("SENDMAIL_PATH").unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()), 41 + } 42 + } 43 + 44 + pub fn from_env() -> Option<Self> { 45 + let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?; 46 + let from_name = std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "BSPDS".to_string()); 47 + Some(Self::new(from_address, from_name)) 48 + } 49 + 50 + fn format_email(&self, notification: &QueuedNotification) -> String { 51 + let subject = notification.subject.as_deref().unwrap_or("Notification"); 52 + let from_header = if self.from_name.is_empty() { 53 + self.from_address.clone() 54 + } else { 55 + format!("{} <{}>", self.from_name, self.from_address) 56 + }; 57 + 58 + format!( 59 + "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}", 60 + from_header, 61 + notification.recipient, 62 + subject, 63 + notification.body 64 + ) 65 + } 66 + } 67 + 68 + #[async_trait] 69 + impl NotificationSender for EmailSender { 70 + fn channel(&self) -> NotificationChannel { 71 + NotificationChannel::Email 72 + } 73 + 74 + async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 75 + let email_content = self.format_email(notification); 76 + 77 + let mut child = Command::new(&self.sendmail_path) 78 + .arg("-t") 79 + .arg("-oi") 80 + .stdin(Stdio::piped()) 81 + .stdout(Stdio::piped()) 82 + .stderr(Stdio::piped()) 83 + .spawn()?; 84 + 85 + if let Some(mut stdin) = child.stdin.take() { 86 + stdin.write_all(email_content.as_bytes()).await?; 87 + } 88 + 89 + let output = child.wait_with_output().await?; 90 + 91 + if !output.status.success() { 92 + let stderr = String::from_utf8_lossy(&output.stderr); 93 + return Err(SendError::SendmailFailed(stderr.to_string())); 94 + } 95 + 96 + Ok(()) 97 + } 98 + }
+384
src/notifications/service.rs
··· 1 + use std::collections::HashMap; 2 + use std::sync::Arc; 3 + use std::time::Duration; 4 + 5 + use chrono::Utc; 6 + use sqlx::PgPool; 7 + use tokio::sync::watch; 8 + use tokio::time::interval; 9 + use tracing::{debug, error, info, warn}; 10 + use uuid::Uuid; 11 + 12 + use super::sender::{NotificationSender, SendError}; 13 + use super::types::{NewNotification, NotificationChannel, NotificationStatus, QueuedNotification}; 14 + 15 + pub struct NotificationService { 16 + db: PgPool, 17 + senders: HashMap<NotificationChannel, Arc<dyn NotificationSender>>, 18 + poll_interval: Duration, 19 + batch_size: i64, 20 + } 21 + 22 + impl NotificationService { 23 + pub fn new(db: PgPool) -> Self { 24 + Self { 25 + db, 26 + senders: HashMap::new(), 27 + poll_interval: Duration::from_secs(5), 28 + batch_size: 10, 29 + } 30 + } 31 + 32 + pub fn with_poll_interval(mut self, interval: Duration) -> Self { 33 + self.poll_interval = interval; 34 + self 35 + } 36 + 37 + pub fn with_batch_size(mut self, size: i64) -> Self { 38 + self.batch_size = size; 39 + self 40 + } 41 + 42 + pub fn register_sender<S: NotificationSender + 'static>(mut self, sender: S) -> Self { 43 + self.senders.insert(sender.channel(), Arc::new(sender)); 44 + self 45 + } 46 + 47 + pub async fn enqueue(&self, notification: NewNotification) -> Result<Uuid, sqlx::Error> { 48 + let id = sqlx::query_scalar!( 49 + r#" 50 + INSERT INTO notification_queue 51 + (user_id, channel, notification_type, recipient, subject, body, metadata) 52 + VALUES ($1, $2, $3, $4, $5, $6, $7) 53 + RETURNING id 54 + "#, 55 + notification.user_id, 56 + notification.channel as NotificationChannel, 57 + notification.notification_type as super::types::NotificationType, 58 + notification.recipient, 59 + notification.subject, 60 + notification.body, 61 + notification.metadata 62 + ) 63 + .fetch_one(&self.db) 64 + .await?; 65 + 66 + debug!(notification_id = %id, "Notification enqueued"); 67 + Ok(id) 68 + } 69 + 70 + pub fn has_senders(&self) -> bool { 71 + !self.senders.is_empty() 72 + } 73 + 74 + pub async fn run(self, mut shutdown: watch::Receiver<bool>) { 75 + if self.senders.is_empty() { 76 + warn!("Notification service starting with no senders configured. Notifications will be queued but not delivered until senders are configured."); 77 + } 78 + 79 + info!( 80 + poll_interval_secs = self.poll_interval.as_secs(), 81 + batch_size = self.batch_size, 82 + channels = ?self.senders.keys().collect::<Vec<_>>(), 83 + "Starting notification service" 84 + ); 85 + 86 + let mut ticker = interval(self.poll_interval); 87 + 88 + loop { 89 + tokio::select! { 90 + _ = ticker.tick() => { 91 + if let Err(e) = self.process_batch().await { 92 + error!(error = %e, "Failed to process notification batch"); 93 + } 94 + } 95 + _ = shutdown.changed() => { 96 + if *shutdown.borrow() { 97 + info!("Notification service shutting down"); 98 + break; 99 + } 100 + } 101 + } 102 + } 103 + } 104 + 105 + async fn process_batch(&self) -> Result<(), sqlx::Error> { 106 + let notifications = self.fetch_pending_notifications().await?; 107 + 108 + if notifications.is_empty() { 109 + return Ok(()); 110 + } 111 + 112 + debug!(count = notifications.len(), "Processing notification batch"); 113 + 114 + for notification in notifications { 115 + self.process_notification(notification).await; 116 + } 117 + 118 + Ok(()) 119 + } 120 + 121 + async fn fetch_pending_notifications(&self) -> Result<Vec<QueuedNotification>, sqlx::Error> { 122 + let now = Utc::now(); 123 + 124 + sqlx::query_as!( 125 + QueuedNotification, 126 + r#" 127 + UPDATE notification_queue 128 + SET status = 'processing', updated_at = NOW() 129 + WHERE id IN ( 130 + SELECT id FROM notification_queue 131 + WHERE status = 'pending' 132 + AND scheduled_for <= $1 133 + AND attempts < max_attempts 134 + ORDER BY scheduled_for ASC 135 + LIMIT $2 136 + FOR UPDATE SKIP LOCKED 137 + ) 138 + RETURNING 139 + id, user_id, 140 + channel as "channel: NotificationChannel", 141 + notification_type as "notification_type: super::types::NotificationType", 142 + status as "status: NotificationStatus", 143 + recipient, subject, body, metadata, 144 + attempts, max_attempts, last_error, 145 + created_at, updated_at, scheduled_for, processed_at 146 + "#, 147 + now, 148 + self.batch_size 149 + ) 150 + .fetch_all(&self.db) 151 + .await 152 + } 153 + 154 + async fn process_notification(&self, notification: QueuedNotification) { 155 + let notification_id = notification.id; 156 + let channel = notification.channel; 157 + 158 + let result = match self.senders.get(&channel) { 159 + Some(sender) => sender.send(&notification).await, 160 + None => { 161 + warn!( 162 + notification_id = %notification_id, 163 + channel = ?channel, 164 + "No sender registered for channel" 165 + ); 166 + Err(SendError::NotConfigured(channel)) 167 + } 168 + }; 169 + 170 + match result { 171 + Ok(()) => { 172 + debug!(notification_id = %notification_id, "Notification sent successfully"); 173 + if let Err(e) = self.mark_sent(notification_id).await { 174 + error!( 175 + notification_id = %notification_id, 176 + error = %e, 177 + "Failed to mark notification as sent" 178 + ); 179 + } 180 + } 181 + Err(e) => { 182 + let error_msg = e.to_string(); 183 + warn!( 184 + notification_id = %notification_id, 185 + error = %error_msg, 186 + "Failed to send notification" 187 + ); 188 + if let Err(db_err) = self.mark_failed(notification_id, &error_msg).await { 189 + error!( 190 + notification_id = %notification_id, 191 + error = %db_err, 192 + "Failed to mark notification as failed" 193 + ); 194 + } 195 + } 196 + } 197 + } 198 + 199 + async fn mark_sent(&self, id: Uuid) -> Result<(), sqlx::Error> { 200 + sqlx::query!( 201 + r#" 202 + UPDATE notification_queue 203 + SET status = 'sent', processed_at = NOW(), updated_at = NOW() 204 + WHERE id = $1 205 + "#, 206 + id 207 + ) 208 + .execute(&self.db) 209 + .await?; 210 + Ok(()) 211 + } 212 + 213 + async fn mark_failed(&self, id: Uuid, error: &str) -> Result<(), sqlx::Error> { 214 + sqlx::query!( 215 + r#" 216 + UPDATE notification_queue 217 + SET 218 + status = CASE 219 + WHEN attempts + 1 >= max_attempts THEN 'failed'::notification_status 220 + ELSE 'pending'::notification_status 221 + END, 222 + attempts = attempts + 1, 223 + last_error = $2, 224 + updated_at = NOW(), 225 + scheduled_for = NOW() + (INTERVAL '1 minute' * (attempts + 1)) 226 + WHERE id = $1 227 + "#, 228 + id, 229 + error 230 + ) 231 + .execute(&self.db) 232 + .await?; 233 + Ok(()) 234 + } 235 + } 236 + 237 + pub async fn enqueue_notification(db: &PgPool, notification: NewNotification) -> Result<Uuid, sqlx::Error> { 238 + sqlx::query_scalar!( 239 + r#" 240 + INSERT INTO notification_queue 241 + (user_id, channel, notification_type, recipient, subject, body, metadata) 242 + VALUES ($1, $2, $3, $4, $5, $6, $7) 243 + RETURNING id 244 + "#, 245 + notification.user_id, 246 + notification.channel as NotificationChannel, 247 + notification.notification_type as super::types::NotificationType, 248 + notification.recipient, 249 + notification.subject, 250 + notification.body, 251 + notification.metadata 252 + ) 253 + .fetch_one(db) 254 + .await 255 + } 256 + 257 + pub async fn enqueue_welcome_email( 258 + db: &PgPool, 259 + user_id: Uuid, 260 + email: &str, 261 + handle: &str, 262 + hostname: &str, 263 + ) -> Result<Uuid, sqlx::Error> { 264 + let body = format!( 265 + "Welcome to {}!\n\nYour handle is: @{}\n\nThank you for joining us.", 266 + hostname, handle 267 + ); 268 + 269 + enqueue_notification( 270 + db, 271 + NewNotification::email( 272 + user_id, 273 + super::types::NotificationType::Welcome, 274 + email.to_string(), 275 + format!("Welcome to {}", hostname), 276 + body, 277 + ), 278 + ) 279 + .await 280 + } 281 + 282 + pub async fn enqueue_email_verification( 283 + db: &PgPool, 284 + user_id: Uuid, 285 + email: &str, 286 + handle: &str, 287 + code: &str, 288 + hostname: &str, 289 + ) -> Result<Uuid, sqlx::Error> { 290 + let body = format!( 291 + "Hello @{},\n\nYour email verification code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this email.", 292 + handle, code 293 + ); 294 + 295 + enqueue_notification( 296 + db, 297 + NewNotification::email( 298 + user_id, 299 + super::types::NotificationType::EmailVerification, 300 + email.to_string(), 301 + format!("Verify your email - {}", hostname), 302 + body, 303 + ), 304 + ) 305 + .await 306 + } 307 + 308 + pub async fn enqueue_password_reset( 309 + db: &PgPool, 310 + user_id: Uuid, 311 + email: &str, 312 + handle: &str, 313 + code: &str, 314 + hostname: &str, 315 + ) -> Result<Uuid, sqlx::Error> { 316 + let body = format!( 317 + "Hello @{},\n\nYour password reset code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this email.", 318 + handle, code 319 + ); 320 + 321 + enqueue_notification( 322 + db, 323 + NewNotification::email( 324 + user_id, 325 + super::types::NotificationType::PasswordReset, 326 + email.to_string(), 327 + format!("Password Reset - {}", hostname), 328 + body, 329 + ), 330 + ) 331 + .await 332 + } 333 + 334 + pub async fn enqueue_email_update( 335 + db: &PgPool, 336 + user_id: Uuid, 337 + new_email: &str, 338 + handle: &str, 339 + code: &str, 340 + hostname: &str, 341 + ) -> Result<Uuid, sqlx::Error> { 342 + let body = format!( 343 + "Hello @{},\n\nYour email update confirmation code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this email.", 344 + handle, code 345 + ); 346 + 347 + enqueue_notification( 348 + db, 349 + NewNotification::email( 350 + user_id, 351 + super::types::NotificationType::EmailUpdate, 352 + new_email.to_string(), 353 + format!("Confirm your new email - {}", hostname), 354 + body, 355 + ), 356 + ) 357 + .await 358 + } 359 + 360 + pub async fn enqueue_account_deletion( 361 + db: &PgPool, 362 + user_id: Uuid, 363 + email: &str, 364 + handle: &str, 365 + code: &str, 366 + hostname: &str, 367 + ) -> Result<Uuid, sqlx::Error> { 368 + let body = format!( 369 + "Hello @{},\n\nYour account deletion confirmation code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please secure your account immediately.", 370 + handle, code 371 + ); 372 + 373 + enqueue_notification( 374 + db, 375 + NewNotification::email( 376 + user_id, 377 + super::types::NotificationType::AccountDeletion, 378 + email.to_string(), 379 + format!("Account Deletion Request - {}", hostname), 380 + body, 381 + ), 382 + ) 383 + .await 384 + }
+82
src/notifications/types.rs
··· 1 + use chrono::{DateTime, Utc}; 2 + use serde::{Deserialize, Serialize}; 3 + use sqlx::FromRow; 4 + use uuid::Uuid; 5 + 6 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, sqlx::Type, Serialize, Deserialize)] 7 + #[sqlx(type_name = "notification_channel", rename_all = "lowercase")] 8 + pub enum NotificationChannel { 9 + Email, 10 + Discord, 11 + Telegram, 12 + Signal, 13 + } 14 + 15 + #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type, Serialize, Deserialize)] 16 + #[sqlx(type_name = "notification_status", rename_all = "lowercase")] 17 + pub enum NotificationStatus { 18 + Pending, 19 + Processing, 20 + Sent, 21 + Failed, 22 + } 23 + 24 + #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type, Serialize, Deserialize)] 25 + #[sqlx(type_name = "notification_type", rename_all = "snake_case")] 26 + pub enum NotificationType { 27 + Welcome, 28 + EmailVerification, 29 + PasswordReset, 30 + EmailUpdate, 31 + AccountDeletion, 32 + } 33 + 34 + #[derive(Debug, Clone, FromRow)] 35 + pub struct QueuedNotification { 36 + pub id: Uuid, 37 + pub user_id: Uuid, 38 + pub channel: NotificationChannel, 39 + pub notification_type: NotificationType, 40 + pub status: NotificationStatus, 41 + pub recipient: String, 42 + pub subject: Option<String>, 43 + pub body: String, 44 + pub metadata: Option<serde_json::Value>, 45 + pub attempts: i32, 46 + pub max_attempts: i32, 47 + pub last_error: Option<String>, 48 + pub created_at: DateTime<Utc>, 49 + pub updated_at: DateTime<Utc>, 50 + pub scheduled_for: DateTime<Utc>, 51 + pub processed_at: Option<DateTime<Utc>>, 52 + } 53 + 54 + pub struct NewNotification { 55 + pub user_id: Uuid, 56 + pub channel: NotificationChannel, 57 + pub notification_type: NotificationType, 58 + pub recipient: String, 59 + pub subject: Option<String>, 60 + pub body: String, 61 + pub metadata: Option<serde_json::Value>, 62 + } 63 + 64 + impl NewNotification { 65 + pub fn email( 66 + user_id: Uuid, 67 + notification_type: NotificationType, 68 + recipient: String, 69 + subject: String, 70 + body: String, 71 + ) -> Self { 72 + Self { 73 + user_id, 74 + channel: NotificationChannel::Email, 75 + notification_type, 76 + recipient, 77 + subject: Some(subject), 78 + body, 79 + metadata: None, 80 + } 81 + } 82 + }
+144
tests/notifications.rs
··· 1 + mod common; 2 + 3 + use bspds::notifications::{ 4 + enqueue_notification, enqueue_welcome_email, NewNotification, NotificationChannel, 5 + NotificationStatus, NotificationType, 6 + }; 7 + use sqlx::PgPool; 8 + 9 + async fn get_pool() -> PgPool { 10 + let conn_str = common::get_db_connection_string().await; 11 + sqlx::postgres::PgPoolOptions::new() 12 + .max_connections(5) 13 + .connect(&conn_str) 14 + .await 15 + .expect("Failed to connect to test database") 16 + } 17 + 18 + #[tokio::test] 19 + async fn test_enqueue_notification() { 20 + let pool = get_pool().await; 21 + 22 + let (_, did) = common::create_account_and_login(&common::client()).await; 23 + 24 + let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 25 + .fetch_one(&pool) 26 + .await 27 + .expect("User not found"); 28 + 29 + let notification = NewNotification::email( 30 + user_id, 31 + NotificationType::Welcome, 32 + "test@example.com".to_string(), 33 + "Test Subject".to_string(), 34 + "Test body".to_string(), 35 + ); 36 + 37 + let notification_id = enqueue_notification(&pool, notification) 38 + .await 39 + .expect("Failed to enqueue notification"); 40 + 41 + let row = sqlx::query!( 42 + r#" 43 + SELECT 44 + id, user_id, recipient, subject, body, 45 + channel as "channel: NotificationChannel", 46 + notification_type as "notification_type: NotificationType", 47 + status as "status: NotificationStatus" 48 + FROM notification_queue 49 + WHERE id = $1 50 + "#, 51 + notification_id 52 + ) 53 + .fetch_one(&pool) 54 + .await 55 + .expect("Notification not found"); 56 + 57 + assert_eq!(row.user_id, user_id); 58 + assert_eq!(row.recipient, "test@example.com"); 59 + assert_eq!(row.subject.as_deref(), Some("Test Subject")); 60 + assert_eq!(row.body, "Test body"); 61 + assert_eq!(row.channel, NotificationChannel::Email); 62 + assert_eq!(row.notification_type, NotificationType::Welcome); 63 + assert_eq!(row.status, NotificationStatus::Pending); 64 + } 65 + 66 + #[tokio::test] 67 + async fn test_enqueue_welcome_email() { 68 + let pool = get_pool().await; 69 + 70 + let (_, did) = common::create_account_and_login(&common::client()).await; 71 + 72 + let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 73 + .fetch_one(&pool) 74 + .await 75 + .expect("User not found"); 76 + 77 + let notification_id = enqueue_welcome_email(&pool, user_id, "user@example.com", "testhandle", "example.com") 78 + .await 79 + .expect("Failed to enqueue welcome email"); 80 + 81 + let row = sqlx::query!( 82 + r#" 83 + SELECT 84 + recipient, subject, body, 85 + notification_type as "notification_type: NotificationType" 86 + FROM notification_queue 87 + WHERE id = $1 88 + "#, 89 + notification_id 90 + ) 91 + .fetch_one(&pool) 92 + .await 93 + .expect("Notification not found"); 94 + 95 + assert_eq!(row.recipient, "user@example.com"); 96 + assert_eq!(row.subject.as_deref(), Some("Welcome to example.com")); 97 + assert!(row.body.contains("@testhandle")); 98 + assert_eq!(row.notification_type, NotificationType::Welcome); 99 + } 100 + 101 + #[tokio::test] 102 + async fn test_notification_queue_status_index() { 103 + let pool = get_pool().await; 104 + 105 + let (_, did) = common::create_account_and_login(&common::client()).await; 106 + 107 + let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 108 + .fetch_one(&pool) 109 + .await 110 + .expect("User not found"); 111 + 112 + let initial_count: i64 = sqlx::query_scalar!( 113 + "SELECT COUNT(*) FROM notification_queue WHERE status = 'pending' AND user_id = $1", 114 + user_id 115 + ) 116 + .fetch_one(&pool) 117 + .await 118 + .expect("Failed to count") 119 + .unwrap_or(0); 120 + 121 + for i in 0..5 { 122 + let notification = NewNotification::email( 123 + user_id, 124 + NotificationType::PasswordReset, 125 + format!("test{}@example.com", i), 126 + "Test".to_string(), 127 + "Body".to_string(), 128 + ); 129 + enqueue_notification(&pool, notification) 130 + .await 131 + .expect("Failed to enqueue"); 132 + } 133 + 134 + let final_count: i64 = sqlx::query_scalar!( 135 + "SELECT COUNT(*) FROM notification_queue WHERE status = 'pending' AND user_id = $1", 136 + user_id 137 + ) 138 + .fetch_one(&pool) 139 + .await 140 + .expect("Failed to count") 141 + .unwrap_or(0); 142 + 143 + assert_eq!(final_count - initial_count, 5); 144 + }