this repo has no description
1use async_trait::async_trait;
2use reqwest::Client;
3use serde_json::json;
4use std::process::Stdio;
5use std::time::Duration;
6use tokio::io::AsyncWriteExt;
7use tokio::process::Command;
8use super::types::{NotificationChannel, QueuedNotification};
9const HTTP_TIMEOUT_SECS: u64 = 30;
10const MAX_RETRIES: u32 = 3;
11const INITIAL_RETRY_DELAY_MS: u64 = 500;
12#[async_trait]
13pub trait NotificationSender: Send + Sync {
14 fn channel(&self) -> NotificationChannel;
15 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError>;
16}
17#[derive(Debug, thiserror::Error)]
18pub enum SendError {
19 #[error("Failed to spawn sendmail process: {0}")]
20 ProcessSpawn(#[from] std::io::Error),
21 #[error("Sendmail exited with non-zero status: {0}")]
22 SendmailFailed(String),
23 #[error("Channel not configured: {0:?}")]
24 NotConfigured(NotificationChannel),
25 #[error("External service error: {0}")]
26 ExternalService(String),
27 #[error("Invalid recipient format: {0}")]
28 InvalidRecipient(String),
29 #[error("Request timeout")]
30 Timeout,
31 #[error("Max retries exceeded: {0}")]
32 MaxRetriesExceeded(String),
33}
34fn create_http_client() -> Client {
35 Client::builder()
36 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS))
37 .connect_timeout(Duration::from_secs(10))
38 .build()
39 .unwrap_or_else(|_| Client::new())
40}
41fn is_retryable_status(status: reqwest::StatusCode) -> bool {
42 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS
43}
44async fn retry_delay(attempt: u32) {
45 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt);
46 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
47}
48pub fn sanitize_header_value(value: &str) -> String {
49 value.replace(['\r', '\n'], " ").trim().to_string()
50}
51pub fn is_valid_phone_number(number: &str) -> bool {
52 if number.len() < 2 || number.len() > 20 {
53 return false;
54 }
55 let mut chars = number.chars();
56 if chars.next() != Some('+') {
57 return false;
58 }
59 let remaining: String = chars.collect();
60 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit())
61}
62pub struct EmailSender {
63 from_address: String,
64 from_name: String,
65 sendmail_path: String,
66}
67impl EmailSender {
68 pub fn new(from_address: String, from_name: String) -> Self {
69 Self {
70 from_address,
71 from_name,
72 sendmail_path: std::env::var("SENDMAIL_PATH").unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()),
73 }
74 }
75 pub fn from_env() -> Option<Self> {
76 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?;
77 let from_name = std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "BSPDS".to_string());
78 Some(Self::new(from_address, from_name))
79 }
80 pub fn format_email(&self, notification: &QueuedNotification) -> String {
81 let subject = sanitize_header_value(notification.subject.as_deref().unwrap_or("Notification"));
82 let recipient = sanitize_header_value(¬ification.recipient);
83 let from_header = if self.from_name.is_empty() {
84 self.from_address.clone()
85 } else {
86 format!("{} <{}>", sanitize_header_value(&self.from_name), self.from_address)
87 };
88 format!(
89 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}",
90 from_header,
91 recipient,
92 subject,
93 notification.body
94 )
95 }
96}
97#[async_trait]
98impl NotificationSender for EmailSender {
99 fn channel(&self) -> NotificationChannel {
100 NotificationChannel::Email
101 }
102 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> {
103 let email_content = self.format_email(notification);
104 let mut child = Command::new(&self.sendmail_path)
105 .arg("-t")
106 .arg("-oi")
107 .stdin(Stdio::piped())
108 .stdout(Stdio::piped())
109 .stderr(Stdio::piped())
110 .spawn()?;
111 if let Some(mut stdin) = child.stdin.take() {
112 stdin.write_all(email_content.as_bytes()).await?;
113 }
114 let output = child.wait_with_output().await?;
115 if !output.status.success() {
116 let stderr = String::from_utf8_lossy(&output.stderr);
117 return Err(SendError::SendmailFailed(stderr.to_string()));
118 }
119 Ok(())
120 }
121}
122pub struct DiscordSender {
123 webhook_url: String,
124 http_client: Client,
125}
126impl DiscordSender {
127 pub fn new(webhook_url: String) -> Self {
128 Self {
129 webhook_url,
130 http_client: create_http_client(),
131 }
132 }
133 pub fn from_env() -> Option<Self> {
134 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?;
135 Some(Self::new(webhook_url))
136 }
137}
138#[async_trait]
139impl NotificationSender for DiscordSender {
140 fn channel(&self) -> NotificationChannel {
141 NotificationChannel::Discord
142 }
143 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> {
144 let subject = notification.subject.as_deref().unwrap_or("Notification");
145 let content = format!("**{}**\n\n{}", subject, notification.body);
146 let payload = json!({
147 "content": content,
148 "username": "BSPDS"
149 });
150 let mut last_error = None;
151 for attempt in 0..MAX_RETRIES {
152 let result = self
153 .http_client
154 .post(&self.webhook_url)
155 .json(&payload)
156 .send()
157 .await;
158 match result {
159 Ok(response) => {
160 if response.status().is_success() {
161 return Ok(());
162 }
163 let status = response.status();
164 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
165 last_error = Some(format!("Discord webhook returned {}", status));
166 retry_delay(attempt).await;
167 continue;
168 }
169 let body = response.text().await.unwrap_or_default();
170 return Err(SendError::ExternalService(format!(
171 "Discord webhook returned {}: {}",
172 status, body
173 )));
174 }
175 Err(e) => {
176 if e.is_timeout() {
177 if attempt < MAX_RETRIES - 1 {
178 last_error = Some(format!("Discord request timed out"));
179 retry_delay(attempt).await;
180 continue;
181 }
182 return Err(SendError::Timeout);
183 }
184 return Err(SendError::ExternalService(format!(
185 "Discord request failed: {}",
186 e
187 )));
188 }
189 }
190 }
191 Err(SendError::MaxRetriesExceeded(
192 last_error.unwrap_or_else(|| "Unknown error".to_string()),
193 ))
194 }
195}
196pub struct TelegramSender {
197 bot_token: String,
198 http_client: Client,
199}
200impl TelegramSender {
201 pub fn new(bot_token: String) -> Self {
202 Self {
203 bot_token,
204 http_client: create_http_client(),
205 }
206 }
207 pub fn from_env() -> Option<Self> {
208 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?;
209 Some(Self::new(bot_token))
210 }
211}
212#[async_trait]
213impl NotificationSender for TelegramSender {
214 fn channel(&self) -> NotificationChannel {
215 NotificationChannel::Telegram
216 }
217 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> {
218 let chat_id = ¬ification.recipient;
219 let subject = notification.subject.as_deref().unwrap_or("Notification");
220 let text = format!("*{}*\n\n{}", subject, notification.body);
221 let url = format!(
222 "https://api.telegram.org/bot{}/sendMessage",
223 self.bot_token
224 );
225 let payload = json!({
226 "chat_id": chat_id,
227 "text": text,
228 "parse_mode": "Markdown"
229 });
230 let mut last_error = None;
231 for attempt in 0..MAX_RETRIES {
232 let result = self
233 .http_client
234 .post(&url)
235 .json(&payload)
236 .send()
237 .await;
238 match result {
239 Ok(response) => {
240 if response.status().is_success() {
241 return Ok(());
242 }
243 let status = response.status();
244 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
245 last_error = Some(format!("Telegram API returned {}", status));
246 retry_delay(attempt).await;
247 continue;
248 }
249 let body = response.text().await.unwrap_or_default();
250 return Err(SendError::ExternalService(format!(
251 "Telegram API returned {}: {}",
252 status, body
253 )));
254 }
255 Err(e) => {
256 if e.is_timeout() {
257 if attempt < MAX_RETRIES - 1 {
258 last_error = Some(format!("Telegram request timed out"));
259 retry_delay(attempt).await;
260 continue;
261 }
262 return Err(SendError::Timeout);
263 }
264 return Err(SendError::ExternalService(format!(
265 "Telegram request failed: {}",
266 e
267 )));
268 }
269 }
270 }
271 Err(SendError::MaxRetriesExceeded(
272 last_error.unwrap_or_else(|| "Unknown error".to_string()),
273 ))
274 }
275}
276pub struct SignalSender {
277 signal_cli_path: String,
278 sender_number: String,
279}
280impl SignalSender {
281 pub fn new(signal_cli_path: String, sender_number: String) -> Self {
282 Self {
283 signal_cli_path,
284 sender_number,
285 }
286 }
287 pub fn from_env() -> Option<Self> {
288 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH")
289 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string());
290 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?;
291 Some(Self::new(signal_cli_path, sender_number))
292 }
293}
294#[async_trait]
295impl NotificationSender for SignalSender {
296 fn channel(&self) -> NotificationChannel {
297 NotificationChannel::Signal
298 }
299 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> {
300 let recipient = ¬ification.recipient;
301 if !is_valid_phone_number(recipient) {
302 return Err(SendError::InvalidRecipient(format!(
303 "Invalid phone number format: {}",
304 recipient
305 )));
306 }
307 let subject = notification.subject.as_deref().unwrap_or("Notification");
308 let message = format!("{}\n\n{}", subject, notification.body);
309 let output = Command::new(&self.signal_cli_path)
310 .arg("-u")
311 .arg(&self.sender_number)
312 .arg("send")
313 .arg("-m")
314 .arg(&message)
315 .arg(recipient)
316 .output()
317 .await?;
318 if !output.status.success() {
319 let stderr = String::from_utf8_lossy(&output.stderr);
320 return Err(SendError::ExternalService(format!(
321 "signal-cli failed: {}",
322 stderr
323 )));
324 }
325 Ok(())
326 }
327}