this repo has no description
1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::Utc;
6use sqlx::PgPool;
7use tokio::sync::watch;
8use tokio::time::interval;
9use tracing::{debug, error, info, warn};
10use uuid::Uuid;
11
12use super::sender::{NotificationSender, SendError};
13use super::types::{NewNotification, NotificationChannel, NotificationStatus, QueuedNotification};
14
15pub struct NotificationService {
16 db: PgPool,
17 senders: HashMap<NotificationChannel, Arc<dyn NotificationSender>>,
18 poll_interval: Duration,
19 batch_size: i64,
20}
21
22impl NotificationService {
23 pub fn new(db: PgPool) -> Self {
24 Self {
25 db,
26 senders: HashMap::new(),
27 poll_interval: Duration::from_secs(5),
28 batch_size: 10,
29 }
30 }
31
32 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
33 self.poll_interval = interval;
34 self
35 }
36
37 pub fn with_batch_size(mut self, size: i64) -> Self {
38 self.batch_size = size;
39 self
40 }
41
42 pub fn register_sender<S: NotificationSender + 'static>(mut self, sender: S) -> Self {
43 self.senders.insert(sender.channel(), Arc::new(sender));
44 self
45 }
46
47 pub async fn enqueue(&self, notification: NewNotification) -> Result<Uuid, sqlx::Error> {
48 let id = sqlx::query_scalar!(
49 r#"
50 INSERT INTO notification_queue
51 (user_id, channel, notification_type, recipient, subject, body, metadata)
52 VALUES ($1, $2, $3, $4, $5, $6, $7)
53 RETURNING id
54 "#,
55 notification.user_id,
56 notification.channel as NotificationChannel,
57 notification.notification_type as super::types::NotificationType,
58 notification.recipient,
59 notification.subject,
60 notification.body,
61 notification.metadata
62 )
63 .fetch_one(&self.db)
64 .await?;
65
66 debug!(notification_id = %id, "Notification enqueued");
67 Ok(id)
68 }
69
70 pub fn has_senders(&self) -> bool {
71 !self.senders.is_empty()
72 }
73
74 pub async fn run(self, mut shutdown: watch::Receiver<bool>) {
75 if self.senders.is_empty() {
76 warn!("Notification service starting with no senders configured. Notifications will be queued but not delivered until senders are configured.");
77 }
78
79 info!(
80 poll_interval_secs = self.poll_interval.as_secs(),
81 batch_size = self.batch_size,
82 channels = ?self.senders.keys().collect::<Vec<_>>(),
83 "Starting notification service"
84 );
85
86 let mut ticker = interval(self.poll_interval);
87
88 loop {
89 tokio::select! {
90 _ = ticker.tick() => {
91 if let Err(e) = self.process_batch().await {
92 error!(error = %e, "Failed to process notification batch");
93 }
94 }
95 _ = shutdown.changed() => {
96 if *shutdown.borrow() {
97 info!("Notification service shutting down");
98 break;
99 }
100 }
101 }
102 }
103 }
104
105 async fn process_batch(&self) -> Result<(), sqlx::Error> {
106 let notifications = self.fetch_pending_notifications().await?;
107
108 if notifications.is_empty() {
109 return Ok(());
110 }
111
112 debug!(count = notifications.len(), "Processing notification batch");
113
114 for notification in notifications {
115 self.process_notification(notification).await;
116 }
117
118 Ok(())
119 }
120
121 async fn fetch_pending_notifications(&self) -> Result<Vec<QueuedNotification>, sqlx::Error> {
122 let now = Utc::now();
123
124 sqlx::query_as!(
125 QueuedNotification,
126 r#"
127 UPDATE notification_queue
128 SET status = 'processing', updated_at = NOW()
129 WHERE id IN (
130 SELECT id FROM notification_queue
131 WHERE status = 'pending'
132 AND scheduled_for <= $1
133 AND attempts < max_attempts
134 ORDER BY scheduled_for ASC
135 LIMIT $2
136 FOR UPDATE SKIP LOCKED
137 )
138 RETURNING
139 id, user_id,
140 channel as "channel: NotificationChannel",
141 notification_type as "notification_type: super::types::NotificationType",
142 status as "status: NotificationStatus",
143 recipient, subject, body, metadata,
144 attempts, max_attempts, last_error,
145 created_at, updated_at, scheduled_for, processed_at
146 "#,
147 now,
148 self.batch_size
149 )
150 .fetch_all(&self.db)
151 .await
152 }
153
154 async fn process_notification(&self, notification: QueuedNotification) {
155 let notification_id = notification.id;
156 let channel = notification.channel;
157
158 let result = match self.senders.get(&channel) {
159 Some(sender) => sender.send(¬ification).await,
160 None => {
161 warn!(
162 notification_id = %notification_id,
163 channel = ?channel,
164 "No sender registered for channel"
165 );
166 Err(SendError::NotConfigured(channel))
167 }
168 };
169
170 match result {
171 Ok(()) => {
172 debug!(notification_id = %notification_id, "Notification sent successfully");
173 if let Err(e) = self.mark_sent(notification_id).await {
174 error!(
175 notification_id = %notification_id,
176 error = %e,
177 "Failed to mark notification as sent"
178 );
179 }
180 }
181 Err(e) => {
182 let error_msg = e.to_string();
183 warn!(
184 notification_id = %notification_id,
185 error = %error_msg,
186 "Failed to send notification"
187 );
188 if let Err(db_err) = self.mark_failed(notification_id, &error_msg).await {
189 error!(
190 notification_id = %notification_id,
191 error = %db_err,
192 "Failed to mark notification as failed"
193 );
194 }
195 }
196 }
197 }
198
199 async fn mark_sent(&self, id: Uuid) -> Result<(), sqlx::Error> {
200 sqlx::query!(
201 r#"
202 UPDATE notification_queue
203 SET status = 'sent', processed_at = NOW(), updated_at = NOW()
204 WHERE id = $1
205 "#,
206 id
207 )
208 .execute(&self.db)
209 .await?;
210 Ok(())
211 }
212
213 async fn mark_failed(&self, id: Uuid, error: &str) -> Result<(), sqlx::Error> {
214 sqlx::query!(
215 r#"
216 UPDATE notification_queue
217 SET
218 status = CASE
219 WHEN attempts + 1 >= max_attempts THEN 'failed'::notification_status
220 ELSE 'pending'::notification_status
221 END,
222 attempts = attempts + 1,
223 last_error = $2,
224 updated_at = NOW(),
225 scheduled_for = NOW() + (INTERVAL '1 minute' * (attempts + 1))
226 WHERE id = $1
227 "#,
228 id,
229 error
230 )
231 .execute(&self.db)
232 .await?;
233 Ok(())
234 }
235}
236
237pub async fn enqueue_notification(db: &PgPool, notification: NewNotification) -> Result<Uuid, sqlx::Error> {
238 sqlx::query_scalar!(
239 r#"
240 INSERT INTO notification_queue
241 (user_id, channel, notification_type, recipient, subject, body, metadata)
242 VALUES ($1, $2, $3, $4, $5, $6, $7)
243 RETURNING id
244 "#,
245 notification.user_id,
246 notification.channel as NotificationChannel,
247 notification.notification_type as super::types::NotificationType,
248 notification.recipient,
249 notification.subject,
250 notification.body,
251 notification.metadata
252 )
253 .fetch_one(db)
254 .await
255}
256
257pub struct UserNotificationPrefs {
258 pub channel: NotificationChannel,
259 pub email: String,
260 pub handle: String,
261}
262
263pub async fn get_user_notification_prefs(
264 db: &PgPool,
265 user_id: Uuid,
266) -> Result<UserNotificationPrefs, sqlx::Error> {
267 let row = sqlx::query!(
268 r#"
269 SELECT
270 email,
271 handle,
272 preferred_notification_channel as "channel: NotificationChannel"
273 FROM users
274 WHERE id = $1
275 "#,
276 user_id
277 )
278 .fetch_one(db)
279 .await?;
280
281 Ok(UserNotificationPrefs {
282 channel: row.channel,
283 email: row.email,
284 handle: row.handle,
285 })
286}
287
288pub async fn enqueue_welcome(
289 db: &PgPool,
290 user_id: Uuid,
291 hostname: &str,
292) -> Result<Uuid, sqlx::Error> {
293 let prefs = get_user_notification_prefs(db, user_id).await?;
294
295 let body = format!(
296 "Welcome to {}!\n\nYour handle is: @{}\n\nThank you for joining us.",
297 hostname, prefs.handle
298 );
299
300 enqueue_notification(
301 db,
302 NewNotification::new(
303 user_id,
304 prefs.channel,
305 super::types::NotificationType::Welcome,
306 prefs.email.clone(),
307 Some(format!("Welcome to {}", hostname)),
308 body,
309 ),
310 )
311 .await
312}
313
314pub async fn enqueue_email_verification(
315 db: &PgPool,
316 user_id: Uuid,
317 email: &str,
318 handle: &str,
319 code: &str,
320 hostname: &str,
321) -> Result<Uuid, sqlx::Error> {
322 let body = format!(
323 "Hello @{},\n\nYour email verification code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this email.",
324 handle, code
325 );
326
327 enqueue_notification(
328 db,
329 NewNotification::email(
330 user_id,
331 super::types::NotificationType::EmailVerification,
332 email.to_string(),
333 format!("Verify your email - {}", hostname),
334 body,
335 ),
336 )
337 .await
338}
339
340pub async fn enqueue_password_reset(
341 db: &PgPool,
342 user_id: Uuid,
343 code: &str,
344 hostname: &str,
345) -> Result<Uuid, sqlx::Error> {
346 let prefs = get_user_notification_prefs(db, user_id).await?;
347
348 let body = format!(
349 "Hello @{},\n\nYour password reset code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this message.",
350 prefs.handle, code
351 );
352
353 enqueue_notification(
354 db,
355 NewNotification::new(
356 user_id,
357 prefs.channel,
358 super::types::NotificationType::PasswordReset,
359 prefs.email.clone(),
360 Some(format!("Password Reset - {}", hostname)),
361 body,
362 ),
363 )
364 .await
365}
366
367pub async fn enqueue_email_update(
368 db: &PgPool,
369 user_id: Uuid,
370 new_email: &str,
371 handle: &str,
372 code: &str,
373 hostname: &str,
374) -> Result<Uuid, sqlx::Error> {
375 let body = format!(
376 "Hello @{},\n\nYour email update confirmation code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this email.",
377 handle, code
378 );
379
380 enqueue_notification(
381 db,
382 NewNotification::email(
383 user_id,
384 super::types::NotificationType::EmailUpdate,
385 new_email.to_string(),
386 format!("Confirm your new email - {}", hostname),
387 body,
388 ),
389 )
390 .await
391}
392
393pub async fn enqueue_account_deletion(
394 db: &PgPool,
395 user_id: Uuid,
396 code: &str,
397 hostname: &str,
398) -> Result<Uuid, sqlx::Error> {
399 let prefs = get_user_notification_prefs(db, user_id).await?;
400
401 let body = format!(
402 "Hello @{},\n\nYour account deletion confirmation code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please secure your account immediately.",
403 prefs.handle, code
404 );
405
406 enqueue_notification(
407 db,
408 NewNotification::new(
409 user_id,
410 prefs.channel,
411 super::types::NotificationType::AccountDeletion,
412 prefs.email.clone(),
413 Some(format!("Account Deletion Request - {}", hostname)),
414 body,
415 ),
416 )
417 .await
418}
419
420pub async fn enqueue_plc_operation(
421 db: &PgPool,
422 user_id: Uuid,
423 token: &str,
424 hostname: &str,
425) -> Result<Uuid, sqlx::Error> {
426 let prefs = get_user_notification_prefs(db, user_id).await?;
427
428 let body = format!(
429 "Hello @{},\n\nYou requested to sign a PLC operation for your account.\n\nYour verification token is: {}\n\nThis token will expire in 10 minutes.\n\nIf you did not request this, you can safely ignore this message.",
430 prefs.handle, token
431 );
432
433 enqueue_notification(
434 db,
435 NewNotification::new(
436 user_id,
437 prefs.channel,
438 super::types::NotificationType::PlcOperation,
439 prefs.email.clone(),
440 Some(format!("{} - PLC Operation Token", hostname)),
441 body,
442 ),
443 )
444 .await
445}