this repo has no description
1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::Utc;
6use sqlx::PgPool;
7use tokio::sync::watch;
8use tokio::time::interval;
9use tracing::{debug, error, info, warn};
10use uuid::Uuid;
11
12use super::sender::{NotificationSender, SendError};
13use super::types::{NewNotification, NotificationChannel, NotificationStatus, QueuedNotification};
14
15pub struct NotificationService {
16 db: PgPool,
17 senders: HashMap<NotificationChannel, Arc<dyn NotificationSender>>,
18 poll_interval: Duration,
19 batch_size: i64,
20}
21
22impl NotificationService {
23 pub fn new(db: PgPool) -> Self {
24 Self {
25 db,
26 senders: HashMap::new(),
27 poll_interval: Duration::from_secs(5),
28 batch_size: 10,
29 }
30 }
31
32 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
33 self.poll_interval = interval;
34 self
35 }
36
37 pub fn with_batch_size(mut self, size: i64) -> Self {
38 self.batch_size = size;
39 self
40 }
41
42 pub fn register_sender<S: NotificationSender + 'static>(mut self, sender: S) -> Self {
43 self.senders.insert(sender.channel(), Arc::new(sender));
44 self
45 }
46
47 pub async fn enqueue(&self, notification: NewNotification) -> Result<Uuid, sqlx::Error> {
48 let id = sqlx::query_scalar!(
49 r#"
50 INSERT INTO notification_queue
51 (user_id, channel, notification_type, recipient, subject, body, metadata)
52 VALUES ($1, $2, $3, $4, $5, $6, $7)
53 RETURNING id
54 "#,
55 notification.user_id,
56 notification.channel as NotificationChannel,
57 notification.notification_type as super::types::NotificationType,
58 notification.recipient,
59 notification.subject,
60 notification.body,
61 notification.metadata
62 )
63 .fetch_one(&self.db)
64 .await?;
65
66 debug!(notification_id = %id, "Notification enqueued");
67 Ok(id)
68 }
69
70 pub fn has_senders(&self) -> bool {
71 !self.senders.is_empty()
72 }
73
74 pub async fn run(self, mut shutdown: watch::Receiver<bool>) {
75 if self.senders.is_empty() {
76 warn!("Notification service starting with no senders configured. Notifications will be queued but not delivered until senders are configured.");
77 }
78
79 info!(
80 poll_interval_secs = self.poll_interval.as_secs(),
81 batch_size = self.batch_size,
82 channels = ?self.senders.keys().collect::<Vec<_>>(),
83 "Starting notification service"
84 );
85
86 let mut ticker = interval(self.poll_interval);
87
88 loop {
89 tokio::select! {
90 _ = ticker.tick() => {
91 if let Err(e) = self.process_batch().await {
92 error!(error = %e, "Failed to process notification batch");
93 }
94 }
95 _ = shutdown.changed() => {
96 if *shutdown.borrow() {
97 info!("Notification service shutting down");
98 break;
99 }
100 }
101 }
102 }
103 }
104
105 async fn process_batch(&self) -> Result<(), sqlx::Error> {
106 let notifications = self.fetch_pending_notifications().await?;
107
108 if notifications.is_empty() {
109 return Ok(());
110 }
111
112 debug!(count = notifications.len(), "Processing notification batch");
113
114 for notification in notifications {
115 self.process_notification(notification).await;
116 }
117
118 Ok(())
119 }
120
121 async fn fetch_pending_notifications(&self) -> Result<Vec<QueuedNotification>, sqlx::Error> {
122 let now = Utc::now();
123
124 sqlx::query_as!(
125 QueuedNotification,
126 r#"
127 UPDATE notification_queue
128 SET status = 'processing', updated_at = NOW()
129 WHERE id IN (
130 SELECT id FROM notification_queue
131 WHERE status = 'pending'
132 AND scheduled_for <= $1
133 AND attempts < max_attempts
134 ORDER BY scheduled_for ASC
135 LIMIT $2
136 FOR UPDATE SKIP LOCKED
137 )
138 RETURNING
139 id, user_id,
140 channel as "channel: NotificationChannel",
141 notification_type as "notification_type: super::types::NotificationType",
142 status as "status: NotificationStatus",
143 recipient, subject, body, metadata,
144 attempts, max_attempts, last_error,
145 created_at, updated_at, scheduled_for, processed_at
146 "#,
147 now,
148 self.batch_size
149 )
150 .fetch_all(&self.db)
151 .await
152 }
153
154 async fn process_notification(&self, notification: QueuedNotification) {
155 let notification_id = notification.id;
156 let channel = notification.channel;
157
158 let result = match self.senders.get(&channel) {
159 Some(sender) => sender.send(¬ification).await,
160 None => {
161 warn!(
162 notification_id = %notification_id,
163 channel = ?channel,
164 "No sender registered for channel"
165 );
166 Err(SendError::NotConfigured(channel))
167 }
168 };
169
170 match result {
171 Ok(()) => {
172 debug!(notification_id = %notification_id, "Notification sent successfully");
173 if let Err(e) = self.mark_sent(notification_id).await {
174 error!(
175 notification_id = %notification_id,
176 error = %e,
177 "Failed to mark notification as sent"
178 );
179 }
180 }
181 Err(e) => {
182 let error_msg = e.to_string();
183 warn!(
184 notification_id = %notification_id,
185 error = %error_msg,
186 "Failed to send notification"
187 );
188 if let Err(db_err) = self.mark_failed(notification_id, &error_msg).await {
189 error!(
190 notification_id = %notification_id,
191 error = %db_err,
192 "Failed to mark notification as failed"
193 );
194 }
195 }
196 }
197 }
198
199 async fn mark_sent(&self, id: Uuid) -> Result<(), sqlx::Error> {
200 sqlx::query!(
201 r#"
202 UPDATE notification_queue
203 SET status = 'sent', processed_at = NOW(), updated_at = NOW()
204 WHERE id = $1
205 "#,
206 id
207 )
208 .execute(&self.db)
209 .await?;
210 Ok(())
211 }
212
213 async fn mark_failed(&self, id: Uuid, error: &str) -> Result<(), sqlx::Error> {
214 sqlx::query!(
215 r#"
216 UPDATE notification_queue
217 SET
218 status = CASE
219 WHEN attempts + 1 >= max_attempts THEN 'failed'::notification_status
220 ELSE 'pending'::notification_status
221 END,
222 attempts = attempts + 1,
223 last_error = $2,
224 updated_at = NOW(),
225 scheduled_for = NOW() + (INTERVAL '1 minute' * (attempts + 1))
226 WHERE id = $1
227 "#,
228 id,
229 error
230 )
231 .execute(&self.db)
232 .await?;
233 Ok(())
234 }
235}
236
237pub async fn enqueue_notification(db: &PgPool, notification: NewNotification) -> Result<Uuid, sqlx::Error> {
238 sqlx::query_scalar!(
239 r#"
240 INSERT INTO notification_queue
241 (user_id, channel, notification_type, recipient, subject, body, metadata)
242 VALUES ($1, $2, $3, $4, $5, $6, $7)
243 RETURNING id
244 "#,
245 notification.user_id,
246 notification.channel as NotificationChannel,
247 notification.notification_type as super::types::NotificationType,
248 notification.recipient,
249 notification.subject,
250 notification.body,
251 notification.metadata
252 )
253 .fetch_one(db)
254 .await
255}
256
257pub async fn enqueue_welcome_email(
258 db: &PgPool,
259 user_id: Uuid,
260 email: &str,
261 handle: &str,
262 hostname: &str,
263) -> Result<Uuid, sqlx::Error> {
264 let body = format!(
265 "Welcome to {}!\n\nYour handle is: @{}\n\nThank you for joining us.",
266 hostname, handle
267 );
268
269 enqueue_notification(
270 db,
271 NewNotification::email(
272 user_id,
273 super::types::NotificationType::Welcome,
274 email.to_string(),
275 format!("Welcome to {}", hostname),
276 body,
277 ),
278 )
279 .await
280}
281
282pub async fn enqueue_email_verification(
283 db: &PgPool,
284 user_id: Uuid,
285 email: &str,
286 handle: &str,
287 code: &str,
288 hostname: &str,
289) -> Result<Uuid, sqlx::Error> {
290 let body = format!(
291 "Hello @{},\n\nYour email verification code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this email.",
292 handle, code
293 );
294
295 enqueue_notification(
296 db,
297 NewNotification::email(
298 user_id,
299 super::types::NotificationType::EmailVerification,
300 email.to_string(),
301 format!("Verify your email - {}", hostname),
302 body,
303 ),
304 )
305 .await
306}
307
308pub async fn enqueue_password_reset(
309 db: &PgPool,
310 user_id: Uuid,
311 email: &str,
312 handle: &str,
313 code: &str,
314 hostname: &str,
315) -> Result<Uuid, sqlx::Error> {
316 let body = format!(
317 "Hello @{},\n\nYour password reset code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this email.",
318 handle, code
319 );
320
321 enqueue_notification(
322 db,
323 NewNotification::email(
324 user_id,
325 super::types::NotificationType::PasswordReset,
326 email.to_string(),
327 format!("Password Reset - {}", hostname),
328 body,
329 ),
330 )
331 .await
332}
333
334pub async fn enqueue_email_update(
335 db: &PgPool,
336 user_id: Uuid,
337 new_email: &str,
338 handle: &str,
339 code: &str,
340 hostname: &str,
341) -> Result<Uuid, sqlx::Error> {
342 let body = format!(
343 "Hello @{},\n\nYour email update confirmation code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this email.",
344 handle, code
345 );
346
347 enqueue_notification(
348 db,
349 NewNotification::email(
350 user_id,
351 super::types::NotificationType::EmailUpdate,
352 new_email.to_string(),
353 format!("Confirm your new email - {}", hostname),
354 body,
355 ),
356 )
357 .await
358}
359
360pub async fn enqueue_account_deletion(
361 db: &PgPool,
362 user_id: Uuid,
363 email: &str,
364 handle: &str,
365 code: &str,
366 hostname: &str,
367) -> Result<Uuid, sqlx::Error> {
368 let body = format!(
369 "Hello @{},\n\nYour account deletion confirmation code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please secure your account immediately.",
370 handle, code
371 );
372
373 enqueue_notification(
374 db,
375 NewNotification::email(
376 user_id,
377 super::types::NotificationType::AccountDeletion,
378 email.to_string(),
379 format!("Account Deletion Request - {}", hostname),
380 body,
381 ),
382 )
383 .await
384}