this repo has no description
1use async_trait::async_trait; 2use reqwest::Client; 3use serde_json::json; 4use std::process::Stdio; 5use std::time::Duration; 6use tokio::io::AsyncWriteExt; 7use tokio::process::Command; 8 9use super::types::{NotificationChannel, QueuedNotification}; 10 11const HTTP_TIMEOUT_SECS: u64 = 30; 12const MAX_RETRIES: u32 = 3; 13const INITIAL_RETRY_DELAY_MS: u64 = 500; 14 15#[async_trait] 16pub trait NotificationSender: Send + Sync { 17 fn channel(&self) -> NotificationChannel; 18 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError>; 19} 20 21#[derive(Debug, thiserror::Error)] 22pub enum SendError { 23 #[error("Failed to spawn sendmail process: {0}")] 24 ProcessSpawn(#[from] std::io::Error), 25 26 #[error("Sendmail exited with non-zero status: {0}")] 27 SendmailFailed(String), 28 29 #[error("Channel not configured: {0:?}")] 30 NotConfigured(NotificationChannel), 31 32 #[error("External service error: {0}")] 33 ExternalService(String), 34 35 #[error("Invalid recipient format: {0}")] 36 InvalidRecipient(String), 37 38 #[error("Request timeout")] 39 Timeout, 40 41 #[error("Max retries exceeded: {0}")] 42 MaxRetriesExceeded(String), 43} 44 45fn create_http_client() -> Client { 46 Client::builder() 47 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS)) 48 .connect_timeout(Duration::from_secs(10)) 49 .build() 50 .unwrap_or_else(|_| Client::new()) 51} 52 53fn is_retryable_status(status: reqwest::StatusCode) -> bool { 54 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS 55} 56 57async fn retry_delay(attempt: u32) { 58 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt); 59 tokio::time::sleep(Duration::from_millis(delay_ms)).await; 60} 61 62pub fn sanitize_header_value(value: &str) -> String { 63 value.replace(['\r', '\n'], " ").trim().to_string() 64} 65 66pub fn is_valid_phone_number(number: &str) -> bool { 67 if number.len() < 2 || number.len() > 20 { 68 return false; 69 } 70 let mut chars = number.chars(); 71 if chars.next() != Some('+') { 72 return false; 73 } 74 let remaining: String = chars.collect(); 75 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit()) 76} 77 78pub struct EmailSender { 79 from_address: String, 80 from_name: String, 81 sendmail_path: String, 82} 83 84impl EmailSender { 85 pub fn new(from_address: String, from_name: String) -> Self { 86 Self { 87 from_address, 88 from_name, 89 sendmail_path: std::env::var("SENDMAIL_PATH").unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()), 90 } 91 } 92 93 pub fn from_env() -> Option<Self> { 94 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?; 95 let from_name = std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "BSPDS".to_string()); 96 Some(Self::new(from_address, from_name)) 97 } 98 99 pub fn format_email(&self, notification: &QueuedNotification) -> String { 100 let subject = sanitize_header_value(notification.subject.as_deref().unwrap_or("Notification")); 101 let recipient = sanitize_header_value(&notification.recipient); 102 let from_header = if self.from_name.is_empty() { 103 self.from_address.clone() 104 } else { 105 format!("{} <{}>", sanitize_header_value(&self.from_name), self.from_address) 106 }; 107 108 format!( 109 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}", 110 from_header, 111 recipient, 112 subject, 113 notification.body 114 ) 115 } 116} 117 118#[async_trait] 119impl NotificationSender for EmailSender { 120 fn channel(&self) -> NotificationChannel { 121 NotificationChannel::Email 122 } 123 124 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 125 let email_content = self.format_email(notification); 126 127 let mut child = Command::new(&self.sendmail_path) 128 .arg("-t") 129 .arg("-oi") 130 .stdin(Stdio::piped()) 131 .stdout(Stdio::piped()) 132 .stderr(Stdio::piped()) 133 .spawn()?; 134 135 if let Some(mut stdin) = child.stdin.take() { 136 stdin.write_all(email_content.as_bytes()).await?; 137 } 138 139 let output = child.wait_with_output().await?; 140 141 if !output.status.success() { 142 let stderr = String::from_utf8_lossy(&output.stderr); 143 return Err(SendError::SendmailFailed(stderr.to_string())); 144 } 145 146 Ok(()) 147 } 148} 149 150pub struct DiscordSender { 151 webhook_url: String, 152 http_client: Client, 153} 154 155impl DiscordSender { 156 pub fn new(webhook_url: String) -> Self { 157 Self { 158 webhook_url, 159 http_client: create_http_client(), 160 } 161 } 162 163 pub fn from_env() -> Option<Self> { 164 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?; 165 Some(Self::new(webhook_url)) 166 } 167} 168 169#[async_trait] 170impl NotificationSender for DiscordSender { 171 fn channel(&self) -> NotificationChannel { 172 NotificationChannel::Discord 173 } 174 175 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 176 let subject = notification.subject.as_deref().unwrap_or("Notification"); 177 let content = format!("**{}**\n\n{}", subject, notification.body); 178 179 let payload = json!({ 180 "content": content, 181 "username": "BSPDS" 182 }); 183 184 let mut last_error = None; 185 for attempt in 0..MAX_RETRIES { 186 let result = self 187 .http_client 188 .post(&self.webhook_url) 189 .json(&payload) 190 .send() 191 .await; 192 193 match result { 194 Ok(response) => { 195 if response.status().is_success() { 196 return Ok(()); 197 } 198 199 let status = response.status(); 200 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 201 last_error = Some(format!("Discord webhook returned {}", status)); 202 retry_delay(attempt).await; 203 continue; 204 } 205 206 let body = response.text().await.unwrap_or_default(); 207 return Err(SendError::ExternalService(format!( 208 "Discord webhook returned {}: {}", 209 status, body 210 ))); 211 } 212 Err(e) => { 213 if e.is_timeout() { 214 if attempt < MAX_RETRIES - 1 { 215 last_error = Some(format!("Discord request timed out")); 216 retry_delay(attempt).await; 217 continue; 218 } 219 return Err(SendError::Timeout); 220 } 221 return Err(SendError::ExternalService(format!( 222 "Discord request failed: {}", 223 e 224 ))); 225 } 226 } 227 } 228 229 Err(SendError::MaxRetriesExceeded( 230 last_error.unwrap_or_else(|| "Unknown error".to_string()), 231 )) 232 } 233} 234 235pub struct TelegramSender { 236 bot_token: String, 237 http_client: Client, 238} 239 240impl TelegramSender { 241 pub fn new(bot_token: String) -> Self { 242 Self { 243 bot_token, 244 http_client: create_http_client(), 245 } 246 } 247 248 pub fn from_env() -> Option<Self> { 249 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?; 250 Some(Self::new(bot_token)) 251 } 252} 253 254#[async_trait] 255impl NotificationSender for TelegramSender { 256 fn channel(&self) -> NotificationChannel { 257 NotificationChannel::Telegram 258 } 259 260 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 261 let chat_id = &notification.recipient; 262 let subject = notification.subject.as_deref().unwrap_or("Notification"); 263 let text = format!("*{}*\n\n{}", subject, notification.body); 264 265 let url = format!( 266 "https://api.telegram.org/bot{}/sendMessage", 267 self.bot_token 268 ); 269 270 let payload = json!({ 271 "chat_id": chat_id, 272 "text": text, 273 "parse_mode": "Markdown" 274 }); 275 276 let mut last_error = None; 277 for attempt in 0..MAX_RETRIES { 278 let result = self 279 .http_client 280 .post(&url) 281 .json(&payload) 282 .send() 283 .await; 284 285 match result { 286 Ok(response) => { 287 if response.status().is_success() { 288 return Ok(()); 289 } 290 291 let status = response.status(); 292 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 293 last_error = Some(format!("Telegram API returned {}", status)); 294 retry_delay(attempt).await; 295 continue; 296 } 297 298 let body = response.text().await.unwrap_or_default(); 299 return Err(SendError::ExternalService(format!( 300 "Telegram API returned {}: {}", 301 status, body 302 ))); 303 } 304 Err(e) => { 305 if e.is_timeout() { 306 if attempt < MAX_RETRIES - 1 { 307 last_error = Some(format!("Telegram request timed out")); 308 retry_delay(attempt).await; 309 continue; 310 } 311 return Err(SendError::Timeout); 312 } 313 return Err(SendError::ExternalService(format!( 314 "Telegram request failed: {}", 315 e 316 ))); 317 } 318 } 319 } 320 321 Err(SendError::MaxRetriesExceeded( 322 last_error.unwrap_or_else(|| "Unknown error".to_string()), 323 )) 324 } 325} 326 327pub struct SignalSender { 328 signal_cli_path: String, 329 sender_number: String, 330} 331 332impl SignalSender { 333 pub fn new(signal_cli_path: String, sender_number: String) -> Self { 334 Self { 335 signal_cli_path, 336 sender_number, 337 } 338 } 339 340 pub fn from_env() -> Option<Self> { 341 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH") 342 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string()); 343 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?; 344 Some(Self::new(signal_cli_path, sender_number)) 345 } 346} 347 348#[async_trait] 349impl NotificationSender for SignalSender { 350 fn channel(&self) -> NotificationChannel { 351 NotificationChannel::Signal 352 } 353 354 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 355 let recipient = &notification.recipient; 356 357 if !is_valid_phone_number(recipient) { 358 return Err(SendError::InvalidRecipient(format!( 359 "Invalid phone number format: {}", 360 recipient 361 ))); 362 } 363 364 let subject = notification.subject.as_deref().unwrap_or("Notification"); 365 let message = format!("{}\n\n{}", subject, notification.body); 366 367 let output = Command::new(&self.signal_cli_path) 368 .arg("-u") 369 .arg(&self.sender_number) 370 .arg("send") 371 .arg("-m") 372 .arg(&message) 373 .arg(recipient) 374 .output() 375 .await?; 376 377 if !output.status.success() { 378 let stderr = String::from_utf8_lossy(&output.stderr); 379 return Err(SendError::ExternalService(format!( 380 "signal-cli failed: {}", 381 stderr 382 ))); 383 } 384 385 Ok(()) 386 } 387}