this repo has no description
1use async_trait::async_trait; 2use reqwest::Client; 3use serde_json::json; 4use std::process::Stdio; 5use std::time::Duration; 6use tokio::io::AsyncWriteExt; 7use tokio::process::Command; 8 9use super::types::{NotificationChannel, QueuedNotification}; 10 11const HTTP_TIMEOUT_SECS: u64 = 30; 12const MAX_RETRIES: u32 = 3; 13const INITIAL_RETRY_DELAY_MS: u64 = 500; 14 15#[async_trait] 16pub trait NotificationSender: Send + Sync { 17 fn channel(&self) -> NotificationChannel; 18 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError>; 19} 20 21#[derive(Debug, thiserror::Error)] 22pub enum SendError { 23 #[error("Failed to spawn sendmail process: {0}")] 24 ProcessSpawn(#[from] std::io::Error), 25 #[error("Sendmail exited with non-zero status: {0}")] 26 SendmailFailed(String), 27 #[error("Channel not configured: {0:?}")] 28 NotConfigured(NotificationChannel), 29 #[error("External service error: {0}")] 30 ExternalService(String), 31 #[error("Invalid recipient format: {0}")] 32 InvalidRecipient(String), 33 #[error("Request timeout")] 34 Timeout, 35 #[error("Max retries exceeded: {0}")] 36 MaxRetriesExceeded(String), 37} 38 39fn create_http_client() -> Client { 40 Client::builder() 41 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS)) 42 .connect_timeout(Duration::from_secs(10)) 43 .build() 44 .unwrap_or_else(|_| Client::new()) 45} 46 47fn is_retryable_status(status: reqwest::StatusCode) -> bool { 48 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS 49} 50 51async fn retry_delay(attempt: u32) { 52 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt); 53 tokio::time::sleep(Duration::from_millis(delay_ms)).await; 54} 55 56pub fn sanitize_header_value(value: &str) -> String { 57 value.replace(['\r', '\n'], " ").trim().to_string() 58} 59 60pub fn is_valid_phone_number(number: &str) -> bool { 61 if number.len() < 2 || number.len() > 20 { 62 return false; 63 } 64 let mut chars = number.chars(); 65 if chars.next() != Some('+') { 66 return false; 67 } 68 let remaining: String = chars.collect(); 69 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit()) 70} 71 72pub struct EmailSender { 73 from_address: String, 74 from_name: String, 75 sendmail_path: String, 76} 77 78impl EmailSender { 79 pub fn new(from_address: String, from_name: String) -> Self { 80 Self { 81 from_address, 82 from_name, 83 sendmail_path: std::env::var("SENDMAIL_PATH").unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()), 84 } 85 } 86 87 pub fn from_env() -> Option<Self> { 88 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?; 89 let from_name = std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "BSPDS".to_string()); 90 Some(Self::new(from_address, from_name)) 91 } 92 93 pub fn format_email(&self, notification: &QueuedNotification) -> String { 94 let subject = sanitize_header_value(notification.subject.as_deref().unwrap_or("Notification")); 95 let recipient = sanitize_header_value(&notification.recipient); 96 let from_header = if self.from_name.is_empty() { 97 self.from_address.clone() 98 } else { 99 format!("{} <{}>", sanitize_header_value(&self.from_name), self.from_address) 100 }; 101 format!( 102 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}", 103 from_header, 104 recipient, 105 subject, 106 notification.body 107 ) 108 } 109} 110 111#[async_trait] 112impl NotificationSender for EmailSender { 113 fn channel(&self) -> NotificationChannel { 114 NotificationChannel::Email 115 } 116 117 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 118 let email_content = self.format_email(notification); 119 let mut child = Command::new(&self.sendmail_path) 120 .arg("-t") 121 .arg("-oi") 122 .stdin(Stdio::piped()) 123 .stdout(Stdio::piped()) 124 .stderr(Stdio::piped()) 125 .spawn()?; 126 if let Some(mut stdin) = child.stdin.take() { 127 stdin.write_all(email_content.as_bytes()).await?; 128 } 129 let output = child.wait_with_output().await?; 130 if !output.status.success() { 131 let stderr = String::from_utf8_lossy(&output.stderr); 132 return Err(SendError::SendmailFailed(stderr.to_string())); 133 } 134 Ok(()) 135 } 136} 137 138pub struct DiscordSender { 139 webhook_url: String, 140 http_client: Client, 141} 142 143impl DiscordSender { 144 pub fn new(webhook_url: String) -> Self { 145 Self { 146 webhook_url, 147 http_client: create_http_client(), 148 } 149 } 150 151 pub fn from_env() -> Option<Self> { 152 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?; 153 Some(Self::new(webhook_url)) 154 } 155} 156 157#[async_trait] 158impl NotificationSender for DiscordSender { 159 fn channel(&self) -> NotificationChannel { 160 NotificationChannel::Discord 161 } 162 163 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 164 let subject = notification.subject.as_deref().unwrap_or("Notification"); 165 let content = format!("**{}**\n\n{}", subject, notification.body); 166 let payload = json!({ 167 "content": content, 168 "username": "BSPDS" 169 }); 170 let mut last_error = None; 171 for attempt in 0..MAX_RETRIES { 172 let result = self 173 .http_client 174 .post(&self.webhook_url) 175 .json(&payload) 176 .send() 177 .await; 178 match result { 179 Ok(response) => { 180 if response.status().is_success() { 181 return Ok(()); 182 } 183 let status = response.status(); 184 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 185 last_error = Some(format!("Discord webhook returned {}", status)); 186 retry_delay(attempt).await; 187 continue; 188 } 189 let body = response.text().await.unwrap_or_default(); 190 return Err(SendError::ExternalService(format!( 191 "Discord webhook returned {}: {}", 192 status, body 193 ))); 194 } 195 Err(e) => { 196 if e.is_timeout() { 197 if attempt < MAX_RETRIES - 1 { 198 last_error = Some(format!("Discord request timed out")); 199 retry_delay(attempt).await; 200 continue; 201 } 202 return Err(SendError::Timeout); 203 } 204 return Err(SendError::ExternalService(format!( 205 "Discord request failed: {}", 206 e 207 ))); 208 } 209 } 210 } 211 Err(SendError::MaxRetriesExceeded( 212 last_error.unwrap_or_else(|| "Unknown error".to_string()), 213 )) 214 } 215} 216 217pub struct TelegramSender { 218 bot_token: String, 219 http_client: Client, 220} 221 222impl TelegramSender { 223 pub fn new(bot_token: String) -> Self { 224 Self { 225 bot_token, 226 http_client: create_http_client(), 227 } 228 } 229 230 pub fn from_env() -> Option<Self> { 231 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?; 232 Some(Self::new(bot_token)) 233 } 234} 235 236#[async_trait] 237impl NotificationSender for TelegramSender { 238 fn channel(&self) -> NotificationChannel { 239 NotificationChannel::Telegram 240 } 241 242 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 243 let chat_id = &notification.recipient; 244 let subject = notification.subject.as_deref().unwrap_or("Notification"); 245 let text = format!("*{}*\n\n{}", subject, notification.body); 246 let url = format!( 247 "https://api.telegram.org/bot{}/sendMessage", 248 self.bot_token 249 ); 250 let payload = json!({ 251 "chat_id": chat_id, 252 "text": text, 253 "parse_mode": "Markdown" 254 }); 255 let mut last_error = None; 256 for attempt in 0..MAX_RETRIES { 257 let result = self 258 .http_client 259 .post(&url) 260 .json(&payload) 261 .send() 262 .await; 263 match result { 264 Ok(response) => { 265 if response.status().is_success() { 266 return Ok(()); 267 } 268 let status = response.status(); 269 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 270 last_error = Some(format!("Telegram API returned {}", status)); 271 retry_delay(attempt).await; 272 continue; 273 } 274 let body = response.text().await.unwrap_or_default(); 275 return Err(SendError::ExternalService(format!( 276 "Telegram API returned {}: {}", 277 status, body 278 ))); 279 } 280 Err(e) => { 281 if e.is_timeout() { 282 if attempt < MAX_RETRIES - 1 { 283 last_error = Some(format!("Telegram request timed out")); 284 retry_delay(attempt).await; 285 continue; 286 } 287 return Err(SendError::Timeout); 288 } 289 return Err(SendError::ExternalService(format!( 290 "Telegram request failed: {}", 291 e 292 ))); 293 } 294 } 295 } 296 Err(SendError::MaxRetriesExceeded( 297 last_error.unwrap_or_else(|| "Unknown error".to_string()), 298 )) 299 } 300} 301 302pub struct SignalSender { 303 signal_cli_path: String, 304 sender_number: String, 305} 306 307impl SignalSender { 308 pub fn new(signal_cli_path: String, sender_number: String) -> Self { 309 Self { 310 signal_cli_path, 311 sender_number, 312 } 313 } 314 315 pub fn from_env() -> Option<Self> { 316 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH") 317 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string()); 318 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?; 319 Some(Self::new(signal_cli_path, sender_number)) 320 } 321} 322 323#[async_trait] 324impl NotificationSender for SignalSender { 325 fn channel(&self) -> NotificationChannel { 326 NotificationChannel::Signal 327 } 328 329 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 330 let recipient = &notification.recipient; 331 if !is_valid_phone_number(recipient) { 332 return Err(SendError::InvalidRecipient(format!( 333 "Invalid phone number format: {}", 334 recipient 335 ))); 336 } 337 let subject = notification.subject.as_deref().unwrap_or("Notification"); 338 let message = format!("{}\n\n{}", subject, notification.body); 339 let output = Command::new(&self.signal_cli_path) 340 .arg("-u") 341 .arg(&self.sender_number) 342 .arg("send") 343 .arg("-m") 344 .arg(&message) 345 .arg(recipient) 346 .output() 347 .await?; 348 if !output.status.success() { 349 let stderr = String::from_utf8_lossy(&output.stderr); 350 return Err(SendError::ExternalService(format!( 351 "signal-cli failed: {}", 352 stderr 353 ))); 354 } 355 Ok(()) 356 } 357}