this repo has no description
1use async_trait::async_trait;
2use reqwest::Client;
3use serde_json::json;
4use std::process::Stdio;
5use std::time::Duration;
6use tokio::io::AsyncWriteExt;
7use tokio::process::Command;
8
9use super::types::{NotificationChannel, QueuedNotification};
10
11const HTTP_TIMEOUT_SECS: u64 = 30;
12const MAX_RETRIES: u32 = 3;
13const INITIAL_RETRY_DELAY_MS: u64 = 500;
14
15#[async_trait]
16pub trait NotificationSender: Send + Sync {
17 fn channel(&self) -> NotificationChannel;
18 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError>;
19}
20
21#[derive(Debug, thiserror::Error)]
22pub enum SendError {
23 #[error("Failed to spawn sendmail process: {0}")]
24 ProcessSpawn(#[from] std::io::Error),
25 #[error("Sendmail exited with non-zero status: {0}")]
26 SendmailFailed(String),
27 #[error("Channel not configured: {0:?}")]
28 NotConfigured(NotificationChannel),
29 #[error("External service error: {0}")]
30 ExternalService(String),
31 #[error("Invalid recipient format: {0}")]
32 InvalidRecipient(String),
33 #[error("Request timeout")]
34 Timeout,
35 #[error("Max retries exceeded: {0}")]
36 MaxRetriesExceeded(String),
37}
38
39fn create_http_client() -> Client {
40 Client::builder()
41 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS))
42 .connect_timeout(Duration::from_secs(10))
43 .build()
44 .unwrap_or_else(|_| Client::new())
45}
46
47fn is_retryable_status(status: reqwest::StatusCode) -> bool {
48 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS
49}
50
51async fn retry_delay(attempt: u32) {
52 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt);
53 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
54}
55
56pub fn sanitize_header_value(value: &str) -> String {
57 value.replace(['\r', '\n'], " ").trim().to_string()
58}
59
60pub fn is_valid_phone_number(number: &str) -> bool {
61 if number.len() < 2 || number.len() > 20 {
62 return false;
63 }
64 let mut chars = number.chars();
65 if chars.next() != Some('+') {
66 return false;
67 }
68 let remaining: String = chars.collect();
69 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit())
70}
71
72pub struct EmailSender {
73 from_address: String,
74 from_name: String,
75 sendmail_path: String,
76}
77
78impl EmailSender {
79 pub fn new(from_address: String, from_name: String) -> Self {
80 Self {
81 from_address,
82 from_name,
83 sendmail_path: std::env::var("SENDMAIL_PATH").unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()),
84 }
85 }
86
87 pub fn from_env() -> Option<Self> {
88 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?;
89 let from_name = std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "BSPDS".to_string());
90 Some(Self::new(from_address, from_name))
91 }
92
93 pub fn format_email(&self, notification: &QueuedNotification) -> String {
94 let subject = sanitize_header_value(notification.subject.as_deref().unwrap_or("Notification"));
95 let recipient = sanitize_header_value(¬ification.recipient);
96 let from_header = if self.from_name.is_empty() {
97 self.from_address.clone()
98 } else {
99 format!("{} <{}>", sanitize_header_value(&self.from_name), self.from_address)
100 };
101 format!(
102 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}",
103 from_header,
104 recipient,
105 subject,
106 notification.body
107 )
108 }
109}
110
111#[async_trait]
112impl NotificationSender for EmailSender {
113 fn channel(&self) -> NotificationChannel {
114 NotificationChannel::Email
115 }
116
117 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> {
118 let email_content = self.format_email(notification);
119 let mut child = Command::new(&self.sendmail_path)
120 .arg("-t")
121 .arg("-oi")
122 .stdin(Stdio::piped())
123 .stdout(Stdio::piped())
124 .stderr(Stdio::piped())
125 .spawn()?;
126 if let Some(mut stdin) = child.stdin.take() {
127 stdin.write_all(email_content.as_bytes()).await?;
128 }
129 let output = child.wait_with_output().await?;
130 if !output.status.success() {
131 let stderr = String::from_utf8_lossy(&output.stderr);
132 return Err(SendError::SendmailFailed(stderr.to_string()));
133 }
134 Ok(())
135 }
136}
137
138pub struct DiscordSender {
139 webhook_url: String,
140 http_client: Client,
141}
142
143impl DiscordSender {
144 pub fn new(webhook_url: String) -> Self {
145 Self {
146 webhook_url,
147 http_client: create_http_client(),
148 }
149 }
150
151 pub fn from_env() -> Option<Self> {
152 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?;
153 Some(Self::new(webhook_url))
154 }
155}
156
157#[async_trait]
158impl NotificationSender for DiscordSender {
159 fn channel(&self) -> NotificationChannel {
160 NotificationChannel::Discord
161 }
162
163 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> {
164 let subject = notification.subject.as_deref().unwrap_or("Notification");
165 let content = format!("**{}**\n\n{}", subject, notification.body);
166 let payload = json!({
167 "content": content,
168 "username": "BSPDS"
169 });
170 let mut last_error = None;
171 for attempt in 0..MAX_RETRIES {
172 let result = self
173 .http_client
174 .post(&self.webhook_url)
175 .json(&payload)
176 .send()
177 .await;
178 match result {
179 Ok(response) => {
180 if response.status().is_success() {
181 return Ok(());
182 }
183 let status = response.status();
184 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
185 last_error = Some(format!("Discord webhook returned {}", status));
186 retry_delay(attempt).await;
187 continue;
188 }
189 let body = response.text().await.unwrap_or_default();
190 return Err(SendError::ExternalService(format!(
191 "Discord webhook returned {}: {}",
192 status, body
193 )));
194 }
195 Err(e) => {
196 if e.is_timeout() {
197 if attempt < MAX_RETRIES - 1 {
198 last_error = Some(format!("Discord request timed out"));
199 retry_delay(attempt).await;
200 continue;
201 }
202 return Err(SendError::Timeout);
203 }
204 return Err(SendError::ExternalService(format!(
205 "Discord request failed: {}",
206 e
207 )));
208 }
209 }
210 }
211 Err(SendError::MaxRetriesExceeded(
212 last_error.unwrap_or_else(|| "Unknown error".to_string()),
213 ))
214 }
215}
216
217pub struct TelegramSender {
218 bot_token: String,
219 http_client: Client,
220}
221
222impl TelegramSender {
223 pub fn new(bot_token: String) -> Self {
224 Self {
225 bot_token,
226 http_client: create_http_client(),
227 }
228 }
229
230 pub fn from_env() -> Option<Self> {
231 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?;
232 Some(Self::new(bot_token))
233 }
234}
235
236#[async_trait]
237impl NotificationSender for TelegramSender {
238 fn channel(&self) -> NotificationChannel {
239 NotificationChannel::Telegram
240 }
241
242 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> {
243 let chat_id = ¬ification.recipient;
244 let subject = notification.subject.as_deref().unwrap_or("Notification");
245 let text = format!("*{}*\n\n{}", subject, notification.body);
246 let url = format!(
247 "https://api.telegram.org/bot{}/sendMessage",
248 self.bot_token
249 );
250 let payload = json!({
251 "chat_id": chat_id,
252 "text": text,
253 "parse_mode": "Markdown"
254 });
255 let mut last_error = None;
256 for attempt in 0..MAX_RETRIES {
257 let result = self
258 .http_client
259 .post(&url)
260 .json(&payload)
261 .send()
262 .await;
263 match result {
264 Ok(response) => {
265 if response.status().is_success() {
266 return Ok(());
267 }
268 let status = response.status();
269 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
270 last_error = Some(format!("Telegram API returned {}", status));
271 retry_delay(attempt).await;
272 continue;
273 }
274 let body = response.text().await.unwrap_or_default();
275 return Err(SendError::ExternalService(format!(
276 "Telegram API returned {}: {}",
277 status, body
278 )));
279 }
280 Err(e) => {
281 if e.is_timeout() {
282 if attempt < MAX_RETRIES - 1 {
283 last_error = Some(format!("Telegram request timed out"));
284 retry_delay(attempt).await;
285 continue;
286 }
287 return Err(SendError::Timeout);
288 }
289 return Err(SendError::ExternalService(format!(
290 "Telegram request failed: {}",
291 e
292 )));
293 }
294 }
295 }
296 Err(SendError::MaxRetriesExceeded(
297 last_error.unwrap_or_else(|| "Unknown error".to_string()),
298 ))
299 }
300}
301
302pub struct SignalSender {
303 signal_cli_path: String,
304 sender_number: String,
305}
306
307impl SignalSender {
308 pub fn new(signal_cli_path: String, sender_number: String) -> Self {
309 Self {
310 signal_cli_path,
311 sender_number,
312 }
313 }
314
315 pub fn from_env() -> Option<Self> {
316 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH")
317 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string());
318 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?;
319 Some(Self::new(signal_cli_path, sender_number))
320 }
321}
322
323#[async_trait]
324impl NotificationSender for SignalSender {
325 fn channel(&self) -> NotificationChannel {
326 NotificationChannel::Signal
327 }
328
329 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> {
330 let recipient = ¬ification.recipient;
331 if !is_valid_phone_number(recipient) {
332 return Err(SendError::InvalidRecipient(format!(
333 "Invalid phone number format: {}",
334 recipient
335 )));
336 }
337 let subject = notification.subject.as_deref().unwrap_or("Notification");
338 let message = format!("{}\n\n{}", subject, notification.body);
339 let output = Command::new(&self.signal_cli_path)
340 .arg("-u")
341 .arg(&self.sender_number)
342 .arg("send")
343 .arg("-m")
344 .arg(&message)
345 .arg(recipient)
346 .output()
347 .await?;
348 if !output.status.success() {
349 let stderr = String::from_utf8_lossy(&output.stderr);
350 return Err(SendError::ExternalService(format!(
351 "signal-cli failed: {}",
352 stderr
353 )));
354 }
355 Ok(())
356 }
357}