this repo has no description
1use std::collections::HashMap; 2use std::sync::Arc; 3use std::time::Duration; 4 5use chrono::Utc; 6use sqlx::PgPool; 7use tokio::sync::watch; 8use tokio::time::interval; 9use tracing::{debug, error, info, warn}; 10use uuid::Uuid; 11 12use super::sender::{NotificationSender, SendError}; 13use super::types::{NewNotification, NotificationChannel, NotificationStatus, QueuedNotification}; 14 15pub struct NotificationService { 16 db: PgPool, 17 senders: HashMap<NotificationChannel, Arc<dyn NotificationSender>>, 18 poll_interval: Duration, 19 batch_size: i64, 20} 21 22impl NotificationService { 23 pub fn new(db: PgPool) -> Self { 24 Self { 25 db, 26 senders: HashMap::new(), 27 poll_interval: Duration::from_secs(5), 28 batch_size: 10, 29 } 30 } 31 32 pub fn with_poll_interval(mut self, interval: Duration) -> Self { 33 self.poll_interval = interval; 34 self 35 } 36 37 pub fn with_batch_size(mut self, size: i64) -> Self { 38 self.batch_size = size; 39 self 40 } 41 42 pub fn register_sender<S: NotificationSender + 'static>(mut self, sender: S) -> Self { 43 self.senders.insert(sender.channel(), Arc::new(sender)); 44 self 45 } 46 47 pub async fn enqueue(&self, notification: NewNotification) -> Result<Uuid, sqlx::Error> { 48 let id = sqlx::query_scalar!( 49 r#" 50 INSERT INTO notification_queue 51 (user_id, channel, notification_type, recipient, subject, body, metadata) 52 VALUES ($1, $2, $3, $4, $5, $6, $7) 53 RETURNING id 54 "#, 55 notification.user_id, 56 notification.channel as NotificationChannel, 57 notification.notification_type as super::types::NotificationType, 58 notification.recipient, 59 notification.subject, 60 notification.body, 61 notification.metadata 62 ) 63 .fetch_one(&self.db) 64 .await?; 65 66 debug!(notification_id = %id, "Notification enqueued"); 67 Ok(id) 68 } 69 70 pub fn has_senders(&self) -> bool { 71 !self.senders.is_empty() 72 } 73 74 pub async fn run(self, mut shutdown: watch::Receiver<bool>) { 75 if self.senders.is_empty() { 76 warn!("Notification service starting with no senders configured. Notifications will be queued but not delivered until senders are configured."); 77 } 78 79 info!( 80 poll_interval_secs = self.poll_interval.as_secs(), 81 batch_size = self.batch_size, 82 channels = ?self.senders.keys().collect::<Vec<_>>(), 83 "Starting notification service" 84 ); 85 86 let mut ticker = interval(self.poll_interval); 87 88 loop { 89 tokio::select! { 90 _ = ticker.tick() => { 91 if let Err(e) = self.process_batch().await { 92 error!(error = %e, "Failed to process notification batch"); 93 } 94 } 95 _ = shutdown.changed() => { 96 if *shutdown.borrow() { 97 info!("Notification service shutting down"); 98 break; 99 } 100 } 101 } 102 } 103 } 104 105 async fn process_batch(&self) -> Result<(), sqlx::Error> { 106 let notifications = self.fetch_pending_notifications().await?; 107 108 if notifications.is_empty() { 109 return Ok(()); 110 } 111 112 debug!(count = notifications.len(), "Processing notification batch"); 113 114 for notification in notifications { 115 self.process_notification(notification).await; 116 } 117 118 Ok(()) 119 } 120 121 async fn fetch_pending_notifications(&self) -> Result<Vec<QueuedNotification>, sqlx::Error> { 122 let now = Utc::now(); 123 124 sqlx::query_as!( 125 QueuedNotification, 126 r#" 127 UPDATE notification_queue 128 SET status = 'processing', updated_at = NOW() 129 WHERE id IN ( 130 SELECT id FROM notification_queue 131 WHERE status = 'pending' 132 AND scheduled_for <= $1 133 AND attempts < max_attempts 134 ORDER BY scheduled_for ASC 135 LIMIT $2 136 FOR UPDATE SKIP LOCKED 137 ) 138 RETURNING 139 id, user_id, 140 channel as "channel: NotificationChannel", 141 notification_type as "notification_type: super::types::NotificationType", 142 status as "status: NotificationStatus", 143 recipient, subject, body, metadata, 144 attempts, max_attempts, last_error, 145 created_at, updated_at, scheduled_for, processed_at 146 "#, 147 now, 148 self.batch_size 149 ) 150 .fetch_all(&self.db) 151 .await 152 } 153 154 async fn process_notification(&self, notification: QueuedNotification) { 155 let notification_id = notification.id; 156 let channel = notification.channel; 157 158 let result = match self.senders.get(&channel) { 159 Some(sender) => sender.send(&notification).await, 160 None => { 161 warn!( 162 notification_id = %notification_id, 163 channel = ?channel, 164 "No sender registered for channel" 165 ); 166 Err(SendError::NotConfigured(channel)) 167 } 168 }; 169 170 match result { 171 Ok(()) => { 172 debug!(notification_id = %notification_id, "Notification sent successfully"); 173 if let Err(e) = self.mark_sent(notification_id).await { 174 error!( 175 notification_id = %notification_id, 176 error = %e, 177 "Failed to mark notification as sent" 178 ); 179 } 180 } 181 Err(e) => { 182 let error_msg = e.to_string(); 183 warn!( 184 notification_id = %notification_id, 185 error = %error_msg, 186 "Failed to send notification" 187 ); 188 if let Err(db_err) = self.mark_failed(notification_id, &error_msg).await { 189 error!( 190 notification_id = %notification_id, 191 error = %db_err, 192 "Failed to mark notification as failed" 193 ); 194 } 195 } 196 } 197 } 198 199 async fn mark_sent(&self, id: Uuid) -> Result<(), sqlx::Error> { 200 sqlx::query!( 201 r#" 202 UPDATE notification_queue 203 SET status = 'sent', processed_at = NOW(), updated_at = NOW() 204 WHERE id = $1 205 "#, 206 id 207 ) 208 .execute(&self.db) 209 .await?; 210 Ok(()) 211 } 212 213 async fn mark_failed(&self, id: Uuid, error: &str) -> Result<(), sqlx::Error> { 214 sqlx::query!( 215 r#" 216 UPDATE notification_queue 217 SET 218 status = CASE 219 WHEN attempts + 1 >= max_attempts THEN 'failed'::notification_status 220 ELSE 'pending'::notification_status 221 END, 222 attempts = attempts + 1, 223 last_error = $2, 224 updated_at = NOW(), 225 scheduled_for = NOW() + (INTERVAL '1 minute' * (attempts + 1)) 226 WHERE id = $1 227 "#, 228 id, 229 error 230 ) 231 .execute(&self.db) 232 .await?; 233 Ok(()) 234 } 235} 236 237pub async fn enqueue_notification(db: &PgPool, notification: NewNotification) -> Result<Uuid, sqlx::Error> { 238 sqlx::query_scalar!( 239 r#" 240 INSERT INTO notification_queue 241 (user_id, channel, notification_type, recipient, subject, body, metadata) 242 VALUES ($1, $2, $3, $4, $5, $6, $7) 243 RETURNING id 244 "#, 245 notification.user_id, 246 notification.channel as NotificationChannel, 247 notification.notification_type as super::types::NotificationType, 248 notification.recipient, 249 notification.subject, 250 notification.body, 251 notification.metadata 252 ) 253 .fetch_one(db) 254 .await 255} 256 257pub async fn enqueue_welcome_email( 258 db: &PgPool, 259 user_id: Uuid, 260 email: &str, 261 handle: &str, 262 hostname: &str, 263) -> Result<Uuid, sqlx::Error> { 264 let body = format!( 265 "Welcome to {}!\n\nYour handle is: @{}\n\nThank you for joining us.", 266 hostname, handle 267 ); 268 269 enqueue_notification( 270 db, 271 NewNotification::email( 272 user_id, 273 super::types::NotificationType::Welcome, 274 email.to_string(), 275 format!("Welcome to {}", hostname), 276 body, 277 ), 278 ) 279 .await 280} 281 282pub async fn enqueue_email_verification( 283 db: &PgPool, 284 user_id: Uuid, 285 email: &str, 286 handle: &str, 287 code: &str, 288 hostname: &str, 289) -> Result<Uuid, sqlx::Error> { 290 let body = format!( 291 "Hello @{},\n\nYour email verification code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this email.", 292 handle, code 293 ); 294 295 enqueue_notification( 296 db, 297 NewNotification::email( 298 user_id, 299 super::types::NotificationType::EmailVerification, 300 email.to_string(), 301 format!("Verify your email - {}", hostname), 302 body, 303 ), 304 ) 305 .await 306} 307 308pub async fn enqueue_password_reset( 309 db: &PgPool, 310 user_id: Uuid, 311 email: &str, 312 handle: &str, 313 code: &str, 314 hostname: &str, 315) -> Result<Uuid, sqlx::Error> { 316 let body = format!( 317 "Hello @{},\n\nYour password reset code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this email.", 318 handle, code 319 ); 320 321 enqueue_notification( 322 db, 323 NewNotification::email( 324 user_id, 325 super::types::NotificationType::PasswordReset, 326 email.to_string(), 327 format!("Password Reset - {}", hostname), 328 body, 329 ), 330 ) 331 .await 332} 333 334pub async fn enqueue_email_update( 335 db: &PgPool, 336 user_id: Uuid, 337 new_email: &str, 338 handle: &str, 339 code: &str, 340 hostname: &str, 341) -> Result<Uuid, sqlx::Error> { 342 let body = format!( 343 "Hello @{},\n\nYour email update confirmation code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this email.", 344 handle, code 345 ); 346 347 enqueue_notification( 348 db, 349 NewNotification::email( 350 user_id, 351 super::types::NotificationType::EmailUpdate, 352 new_email.to_string(), 353 format!("Confirm your new email - {}", hostname), 354 body, 355 ), 356 ) 357 .await 358} 359 360pub async fn enqueue_account_deletion( 361 db: &PgPool, 362 user_id: Uuid, 363 email: &str, 364 handle: &str, 365 code: &str, 366 hostname: &str, 367) -> Result<Uuid, sqlx::Error> { 368 let body = format!( 369 "Hello @{},\n\nYour account deletion confirmation code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please secure your account immediately.", 370 handle, code 371 ); 372 373 enqueue_notification( 374 db, 375 NewNotification::email( 376 user_id, 377 super::types::NotificationType::AccountDeletion, 378 email.to_string(), 379 format!("Account Deletion Request - {}", hostname), 380 body, 381 ), 382 ) 383 .await 384}