this repo has no description
1use async_trait::async_trait;
2use reqwest::Client;
3use serde_json::json;
4use std::process::Stdio;
5use std::time::Duration;
6use tokio::io::AsyncWriteExt;
7use tokio::process::Command;
8
9use super::types::{NotificationChannel, QueuedNotification};
10
11const HTTP_TIMEOUT_SECS: u64 = 30;
12const MAX_RETRIES: u32 = 3;
13const INITIAL_RETRY_DELAY_MS: u64 = 500;
14
15#[async_trait]
16pub trait NotificationSender: Send + Sync {
17 fn channel(&self) -> NotificationChannel;
18 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError>;
19}
20
21#[derive(Debug, thiserror::Error)]
22pub enum SendError {
23 #[error("Failed to spawn sendmail process: {0}")]
24 ProcessSpawn(#[from] std::io::Error),
25
26 #[error("Sendmail exited with non-zero status: {0}")]
27 SendmailFailed(String),
28
29 #[error("Channel not configured: {0:?}")]
30 NotConfigured(NotificationChannel),
31
32 #[error("External service error: {0}")]
33 ExternalService(String),
34
35 #[error("Invalid recipient format: {0}")]
36 InvalidRecipient(String),
37
38 #[error("Request timeout")]
39 Timeout,
40
41 #[error("Max retries exceeded: {0}")]
42 MaxRetriesExceeded(String),
43}
44
45fn create_http_client() -> Client {
46 Client::builder()
47 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS))
48 .connect_timeout(Duration::from_secs(10))
49 .build()
50 .unwrap_or_else(|_| Client::new())
51}
52
53fn is_retryable_status(status: reqwest::StatusCode) -> bool {
54 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS
55}
56
57async fn retry_delay(attempt: u32) {
58 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt);
59 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
60}
61
62pub fn sanitize_header_value(value: &str) -> String {
63 value.replace(['\r', '\n'], " ").trim().to_string()
64}
65
66pub fn is_valid_phone_number(number: &str) -> bool {
67 if number.len() < 2 || number.len() > 20 {
68 return false;
69 }
70 let mut chars = number.chars();
71 if chars.next() != Some('+') {
72 return false;
73 }
74 let remaining: String = chars.collect();
75 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit())
76}
77
78pub struct EmailSender {
79 from_address: String,
80 from_name: String,
81 sendmail_path: String,
82}
83
84impl EmailSender {
85 pub fn new(from_address: String, from_name: String) -> Self {
86 Self {
87 from_address,
88 from_name,
89 sendmail_path: std::env::var("SENDMAIL_PATH").unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()),
90 }
91 }
92
93 pub fn from_env() -> Option<Self> {
94 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?;
95 let from_name = std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "BSPDS".to_string());
96 Some(Self::new(from_address, from_name))
97 }
98
99 pub fn format_email(&self, notification: &QueuedNotification) -> String {
100 let subject = sanitize_header_value(notification.subject.as_deref().unwrap_or("Notification"));
101 let recipient = sanitize_header_value(¬ification.recipient);
102 let from_header = if self.from_name.is_empty() {
103 self.from_address.clone()
104 } else {
105 format!("{} <{}>", sanitize_header_value(&self.from_name), self.from_address)
106 };
107
108 format!(
109 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}",
110 from_header,
111 recipient,
112 subject,
113 notification.body
114 )
115 }
116}
117
118#[async_trait]
119impl NotificationSender for EmailSender {
120 fn channel(&self) -> NotificationChannel {
121 NotificationChannel::Email
122 }
123
124 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> {
125 let email_content = self.format_email(notification);
126
127 let mut child = Command::new(&self.sendmail_path)
128 .arg("-t")
129 .arg("-oi")
130 .stdin(Stdio::piped())
131 .stdout(Stdio::piped())
132 .stderr(Stdio::piped())
133 .spawn()?;
134
135 if let Some(mut stdin) = child.stdin.take() {
136 stdin.write_all(email_content.as_bytes()).await?;
137 }
138
139 let output = child.wait_with_output().await?;
140
141 if !output.status.success() {
142 let stderr = String::from_utf8_lossy(&output.stderr);
143 return Err(SendError::SendmailFailed(stderr.to_string()));
144 }
145
146 Ok(())
147 }
148}
149
150pub struct DiscordSender {
151 webhook_url: String,
152 http_client: Client,
153}
154
155impl DiscordSender {
156 pub fn new(webhook_url: String) -> Self {
157 Self {
158 webhook_url,
159 http_client: create_http_client(),
160 }
161 }
162
163 pub fn from_env() -> Option<Self> {
164 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?;
165 Some(Self::new(webhook_url))
166 }
167}
168
169#[async_trait]
170impl NotificationSender for DiscordSender {
171 fn channel(&self) -> NotificationChannel {
172 NotificationChannel::Discord
173 }
174
175 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> {
176 let subject = notification.subject.as_deref().unwrap_or("Notification");
177 let content = format!("**{}**\n\n{}", subject, notification.body);
178
179 let payload = json!({
180 "content": content,
181 "username": "BSPDS"
182 });
183
184 let mut last_error = None;
185 for attempt in 0..MAX_RETRIES {
186 let result = self
187 .http_client
188 .post(&self.webhook_url)
189 .json(&payload)
190 .send()
191 .await;
192
193 match result {
194 Ok(response) => {
195 if response.status().is_success() {
196 return Ok(());
197 }
198
199 let status = response.status();
200 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
201 last_error = Some(format!("Discord webhook returned {}", status));
202 retry_delay(attempt).await;
203 continue;
204 }
205
206 let body = response.text().await.unwrap_or_default();
207 return Err(SendError::ExternalService(format!(
208 "Discord webhook returned {}: {}",
209 status, body
210 )));
211 }
212 Err(e) => {
213 if e.is_timeout() {
214 if attempt < MAX_RETRIES - 1 {
215 last_error = Some(format!("Discord request timed out"));
216 retry_delay(attempt).await;
217 continue;
218 }
219 return Err(SendError::Timeout);
220 }
221 return Err(SendError::ExternalService(format!(
222 "Discord request failed: {}",
223 e
224 )));
225 }
226 }
227 }
228
229 Err(SendError::MaxRetriesExceeded(
230 last_error.unwrap_or_else(|| "Unknown error".to_string()),
231 ))
232 }
233}
234
235pub struct TelegramSender {
236 bot_token: String,
237 http_client: Client,
238}
239
240impl TelegramSender {
241 pub fn new(bot_token: String) -> Self {
242 Self {
243 bot_token,
244 http_client: create_http_client(),
245 }
246 }
247
248 pub fn from_env() -> Option<Self> {
249 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?;
250 Some(Self::new(bot_token))
251 }
252}
253
254#[async_trait]
255impl NotificationSender for TelegramSender {
256 fn channel(&self) -> NotificationChannel {
257 NotificationChannel::Telegram
258 }
259
260 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> {
261 let chat_id = ¬ification.recipient;
262 let subject = notification.subject.as_deref().unwrap_or("Notification");
263 let text = format!("*{}*\n\n{}", subject, notification.body);
264
265 let url = format!(
266 "https://api.telegram.org/bot{}/sendMessage",
267 self.bot_token
268 );
269
270 let payload = json!({
271 "chat_id": chat_id,
272 "text": text,
273 "parse_mode": "Markdown"
274 });
275
276 let mut last_error = None;
277 for attempt in 0..MAX_RETRIES {
278 let result = self
279 .http_client
280 .post(&url)
281 .json(&payload)
282 .send()
283 .await;
284
285 match result {
286 Ok(response) => {
287 if response.status().is_success() {
288 return Ok(());
289 }
290
291 let status = response.status();
292 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
293 last_error = Some(format!("Telegram API returned {}", status));
294 retry_delay(attempt).await;
295 continue;
296 }
297
298 let body = response.text().await.unwrap_or_default();
299 return Err(SendError::ExternalService(format!(
300 "Telegram API returned {}: {}",
301 status, body
302 )));
303 }
304 Err(e) => {
305 if e.is_timeout() {
306 if attempt < MAX_RETRIES - 1 {
307 last_error = Some(format!("Telegram request timed out"));
308 retry_delay(attempt).await;
309 continue;
310 }
311 return Err(SendError::Timeout);
312 }
313 return Err(SendError::ExternalService(format!(
314 "Telegram request failed: {}",
315 e
316 )));
317 }
318 }
319 }
320
321 Err(SendError::MaxRetriesExceeded(
322 last_error.unwrap_or_else(|| "Unknown error".to_string()),
323 ))
324 }
325}
326
327pub struct SignalSender {
328 signal_cli_path: String,
329 sender_number: String,
330}
331
332impl SignalSender {
333 pub fn new(signal_cli_path: String, sender_number: String) -> Self {
334 Self {
335 signal_cli_path,
336 sender_number,
337 }
338 }
339
340 pub fn from_env() -> Option<Self> {
341 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH")
342 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string());
343 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?;
344 Some(Self::new(signal_cli_path, sender_number))
345 }
346}
347
348#[async_trait]
349impl NotificationSender for SignalSender {
350 fn channel(&self) -> NotificationChannel {
351 NotificationChannel::Signal
352 }
353
354 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> {
355 let recipient = ¬ification.recipient;
356
357 if !is_valid_phone_number(recipient) {
358 return Err(SendError::InvalidRecipient(format!(
359 "Invalid phone number format: {}",
360 recipient
361 )));
362 }
363
364 let subject = notification.subject.as_deref().unwrap_or("Notification");
365 let message = format!("{}\n\n{}", subject, notification.body);
366
367 let output = Command::new(&self.signal_cli_path)
368 .arg("-u")
369 .arg(&self.sender_number)
370 .arg("send")
371 .arg("-m")
372 .arg(&message)
373 .arg(recipient)
374 .output()
375 .await?;
376
377 if !output.status.success() {
378 let stderr = String::from_utf8_lossy(&output.stderr);
379 return Err(SendError::ExternalService(format!(
380 "signal-cli failed: {}",
381 stderr
382 )));
383 }
384
385 Ok(())
386 }
387}