this repo has no description
1use async_trait::async_trait; 2use reqwest::Client; 3use serde_json::json; 4use std::process::Stdio; 5use std::time::Duration; 6use tokio::io::AsyncWriteExt; 7use tokio::process::Command; 8 9use super::types::{NotificationChannel, QueuedNotification}; 10 11const HTTP_TIMEOUT_SECS: u64 = 30; 12const MAX_RETRIES: u32 = 3; 13const INITIAL_RETRY_DELAY_MS: u64 = 500; 14 15#[async_trait] 16pub trait NotificationSender: Send + Sync { 17 fn channel(&self) -> NotificationChannel; 18 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError>; 19} 20 21#[derive(Debug, thiserror::Error)] 22pub enum SendError { 23 #[error("Failed to spawn sendmail process: {0}")] 24 ProcessSpawn(#[from] std::io::Error), 25 #[error("Sendmail exited with non-zero status: {0}")] 26 SendmailFailed(String), 27 #[error("Channel not configured: {0:?}")] 28 NotConfigured(NotificationChannel), 29 #[error("External service error: {0}")] 30 ExternalService(String), 31 #[error("Invalid recipient format: {0}")] 32 InvalidRecipient(String), 33 #[error("Request timeout")] 34 Timeout, 35 #[error("Max retries exceeded: {0}")] 36 MaxRetriesExceeded(String), 37} 38 39fn create_http_client() -> Client { 40 Client::builder() 41 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS)) 42 .connect_timeout(Duration::from_secs(10)) 43 .build() 44 .unwrap_or_else(|_| Client::new()) 45} 46 47fn is_retryable_status(status: reqwest::StatusCode) -> bool { 48 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS 49} 50 51async fn retry_delay(attempt: u32) { 52 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt); 53 tokio::time::sleep(Duration::from_millis(delay_ms)).await; 54} 55 56pub fn sanitize_header_value(value: &str) -> String { 57 value.replace(['\r', '\n'], " ").trim().to_string() 58} 59 60pub fn is_valid_phone_number(number: &str) -> bool { 61 if number.len() < 2 || number.len() > 20 { 62 return false; 63 } 64 let mut chars = number.chars(); 65 if chars.next() != Some('+') { 66 return false; 67 } 68 let remaining: String = chars.collect(); 69 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit()) 70} 71 72pub struct EmailSender { 73 from_address: String, 74 from_name: String, 75 sendmail_path: String, 76} 77 78impl EmailSender { 79 pub fn new(from_address: String, from_name: String) -> Self { 80 Self { 81 from_address, 82 from_name, 83 sendmail_path: std::env::var("SENDMAIL_PATH") 84 .unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()), 85 } 86 } 87 88 pub fn from_env() -> Option<Self> { 89 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?; 90 let from_name = std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "BSPDS".to_string()); 91 Some(Self::new(from_address, from_name)) 92 } 93 94 pub fn format_email(&self, notification: &QueuedNotification) -> String { 95 let subject = 96 sanitize_header_value(notification.subject.as_deref().unwrap_or("Notification")); 97 let recipient = sanitize_header_value(&notification.recipient); 98 let from_header = if self.from_name.is_empty() { 99 self.from_address.clone() 100 } else { 101 format!( 102 "{} <{}>", 103 sanitize_header_value(&self.from_name), 104 self.from_address 105 ) 106 }; 107 format!( 108 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}", 109 from_header, recipient, subject, notification.body 110 ) 111 } 112} 113 114#[async_trait] 115impl NotificationSender for EmailSender { 116 fn channel(&self) -> NotificationChannel { 117 NotificationChannel::Email 118 } 119 120 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 121 let email_content = self.format_email(notification); 122 let mut child = Command::new(&self.sendmail_path) 123 .arg("-t") 124 .arg("-oi") 125 .stdin(Stdio::piped()) 126 .stdout(Stdio::piped()) 127 .stderr(Stdio::piped()) 128 .spawn()?; 129 if let Some(mut stdin) = child.stdin.take() { 130 stdin.write_all(email_content.as_bytes()).await?; 131 } 132 let output = child.wait_with_output().await?; 133 if !output.status.success() { 134 let stderr = String::from_utf8_lossy(&output.stderr); 135 return Err(SendError::SendmailFailed(stderr.to_string())); 136 } 137 Ok(()) 138 } 139} 140 141pub struct DiscordSender { 142 webhook_url: String, 143 http_client: Client, 144} 145 146impl DiscordSender { 147 pub fn new(webhook_url: String) -> Self { 148 Self { 149 webhook_url, 150 http_client: create_http_client(), 151 } 152 } 153 154 pub fn from_env() -> Option<Self> { 155 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?; 156 Some(Self::new(webhook_url)) 157 } 158} 159 160#[async_trait] 161impl NotificationSender for DiscordSender { 162 fn channel(&self) -> NotificationChannel { 163 NotificationChannel::Discord 164 } 165 166 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 167 let subject = notification.subject.as_deref().unwrap_or("Notification"); 168 let content = format!("**{}**\n\n{}", subject, notification.body); 169 let payload = json!({ 170 "content": content, 171 "username": "BSPDS" 172 }); 173 let mut last_error = None; 174 for attempt in 0..MAX_RETRIES { 175 let result = self 176 .http_client 177 .post(&self.webhook_url) 178 .json(&payload) 179 .send() 180 .await; 181 match result { 182 Ok(response) => { 183 if response.status().is_success() { 184 return Ok(()); 185 } 186 let status = response.status(); 187 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 188 last_error = Some(format!("Discord webhook returned {}", status)); 189 retry_delay(attempt).await; 190 continue; 191 } 192 let body = response.text().await.unwrap_or_default(); 193 return Err(SendError::ExternalService(format!( 194 "Discord webhook returned {}: {}", 195 status, body 196 ))); 197 } 198 Err(e) => { 199 if e.is_timeout() { 200 if attempt < MAX_RETRIES - 1 { 201 last_error = Some("Discord request timed out".to_string()); 202 retry_delay(attempt).await; 203 continue; 204 } 205 return Err(SendError::Timeout); 206 } 207 return Err(SendError::ExternalService(format!( 208 "Discord request failed: {}", 209 e 210 ))); 211 } 212 } 213 } 214 Err(SendError::MaxRetriesExceeded( 215 last_error.unwrap_or_else(|| "Unknown error".to_string()), 216 )) 217 } 218} 219 220pub struct TelegramSender { 221 bot_token: String, 222 http_client: Client, 223} 224 225impl TelegramSender { 226 pub fn new(bot_token: String) -> Self { 227 Self { 228 bot_token, 229 http_client: create_http_client(), 230 } 231 } 232 233 pub fn from_env() -> Option<Self> { 234 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?; 235 Some(Self::new(bot_token)) 236 } 237} 238 239#[async_trait] 240impl NotificationSender for TelegramSender { 241 fn channel(&self) -> NotificationChannel { 242 NotificationChannel::Telegram 243 } 244 245 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 246 let chat_id = &notification.recipient; 247 let subject = notification.subject.as_deref().unwrap_or("Notification"); 248 let text = format!("*{}*\n\n{}", subject, notification.body); 249 let url = format!("https://api.telegram.org/bot{}/sendMessage", self.bot_token); 250 let payload = json!({ 251 "chat_id": chat_id, 252 "text": text, 253 "parse_mode": "Markdown" 254 }); 255 let mut last_error = None; 256 for attempt in 0..MAX_RETRIES { 257 let result = self.http_client.post(&url).json(&payload).send().await; 258 match result { 259 Ok(response) => { 260 if response.status().is_success() { 261 return Ok(()); 262 } 263 let status = response.status(); 264 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 265 last_error = Some(format!("Telegram API returned {}", status)); 266 retry_delay(attempt).await; 267 continue; 268 } 269 let body = response.text().await.unwrap_or_default(); 270 return Err(SendError::ExternalService(format!( 271 "Telegram API returned {}: {}", 272 status, body 273 ))); 274 } 275 Err(e) => { 276 if e.is_timeout() { 277 if attempt < MAX_RETRIES - 1 { 278 last_error = Some("Telegram request timed out".to_string()); 279 retry_delay(attempt).await; 280 continue; 281 } 282 return Err(SendError::Timeout); 283 } 284 return Err(SendError::ExternalService(format!( 285 "Telegram request failed: {}", 286 e 287 ))); 288 } 289 } 290 } 291 Err(SendError::MaxRetriesExceeded( 292 last_error.unwrap_or_else(|| "Unknown error".to_string()), 293 )) 294 } 295} 296 297pub struct SignalSender { 298 signal_cli_path: String, 299 sender_number: String, 300} 301 302impl SignalSender { 303 pub fn new(signal_cli_path: String, sender_number: String) -> Self { 304 Self { 305 signal_cli_path, 306 sender_number, 307 } 308 } 309 310 pub fn from_env() -> Option<Self> { 311 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH") 312 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string()); 313 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?; 314 Some(Self::new(signal_cli_path, sender_number)) 315 } 316} 317 318#[async_trait] 319impl NotificationSender for SignalSender { 320 fn channel(&self) -> NotificationChannel { 321 NotificationChannel::Signal 322 } 323 324 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 325 let recipient = &notification.recipient; 326 if !is_valid_phone_number(recipient) { 327 return Err(SendError::InvalidRecipient(format!( 328 "Invalid phone number format: {}", 329 recipient 330 ))); 331 } 332 let subject = notification.subject.as_deref().unwrap_or("Notification"); 333 let message = format!("{}\n\n{}", subject, notification.body); 334 let output = Command::new(&self.signal_cli_path) 335 .arg("-u") 336 .arg(&self.sender_number) 337 .arg("send") 338 .arg("-m") 339 .arg(&message) 340 .arg(recipient) 341 .output() 342 .await?; 343 if !output.status.success() { 344 let stderr = String::from_utf8_lossy(&output.stderr); 345 return Err(SendError::ExternalService(format!( 346 "signal-cli failed: {}", 347 stderr 348 ))); 349 } 350 Ok(()) 351 } 352}