this repo has no description
1use async_trait::async_trait; 2use reqwest::Client; 3use serde_json::json; 4use std::process::Stdio; 5use std::time::Duration; 6use tokio::io::AsyncWriteExt; 7use tokio::process::Command; 8use super::types::{NotificationChannel, QueuedNotification}; 9const HTTP_TIMEOUT_SECS: u64 = 30; 10const MAX_RETRIES: u32 = 3; 11const INITIAL_RETRY_DELAY_MS: u64 = 500; 12#[async_trait] 13pub trait NotificationSender: Send + Sync { 14 fn channel(&self) -> NotificationChannel; 15 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError>; 16} 17#[derive(Debug, thiserror::Error)] 18pub enum SendError { 19 #[error("Failed to spawn sendmail process: {0}")] 20 ProcessSpawn(#[from] std::io::Error), 21 #[error("Sendmail exited with non-zero status: {0}")] 22 SendmailFailed(String), 23 #[error("Channel not configured: {0:?}")] 24 NotConfigured(NotificationChannel), 25 #[error("External service error: {0}")] 26 ExternalService(String), 27 #[error("Invalid recipient format: {0}")] 28 InvalidRecipient(String), 29 #[error("Request timeout")] 30 Timeout, 31 #[error("Max retries exceeded: {0}")] 32 MaxRetriesExceeded(String), 33} 34fn create_http_client() -> Client { 35 Client::builder() 36 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS)) 37 .connect_timeout(Duration::from_secs(10)) 38 .build() 39 .unwrap_or_else(|_| Client::new()) 40} 41fn is_retryable_status(status: reqwest::StatusCode) -> bool { 42 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS 43} 44async fn retry_delay(attempt: u32) { 45 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt); 46 tokio::time::sleep(Duration::from_millis(delay_ms)).await; 47} 48pub fn sanitize_header_value(value: &str) -> String { 49 value.replace(['\r', '\n'], " ").trim().to_string() 50} 51pub fn is_valid_phone_number(number: &str) -> bool { 52 if number.len() < 2 || number.len() > 20 { 53 return false; 54 } 55 let mut chars = number.chars(); 56 if chars.next() != Some('+') { 57 return false; 58 } 59 let remaining: String = chars.collect(); 60 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit()) 61} 62pub struct EmailSender { 63 from_address: String, 64 from_name: String, 65 sendmail_path: String, 66} 67impl EmailSender { 68 pub fn new(from_address: String, from_name: String) -> Self { 69 Self { 70 from_address, 71 from_name, 72 sendmail_path: std::env::var("SENDMAIL_PATH").unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()), 73 } 74 } 75 pub fn from_env() -> Option<Self> { 76 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?; 77 let from_name = std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "BSPDS".to_string()); 78 Some(Self::new(from_address, from_name)) 79 } 80 pub fn format_email(&self, notification: &QueuedNotification) -> String { 81 let subject = sanitize_header_value(notification.subject.as_deref().unwrap_or("Notification")); 82 let recipient = sanitize_header_value(&notification.recipient); 83 let from_header = if self.from_name.is_empty() { 84 self.from_address.clone() 85 } else { 86 format!("{} <{}>", sanitize_header_value(&self.from_name), self.from_address) 87 }; 88 format!( 89 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}", 90 from_header, 91 recipient, 92 subject, 93 notification.body 94 ) 95 } 96} 97#[async_trait] 98impl NotificationSender for EmailSender { 99 fn channel(&self) -> NotificationChannel { 100 NotificationChannel::Email 101 } 102 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 103 let email_content = self.format_email(notification); 104 let mut child = Command::new(&self.sendmail_path) 105 .arg("-t") 106 .arg("-oi") 107 .stdin(Stdio::piped()) 108 .stdout(Stdio::piped()) 109 .stderr(Stdio::piped()) 110 .spawn()?; 111 if let Some(mut stdin) = child.stdin.take() { 112 stdin.write_all(email_content.as_bytes()).await?; 113 } 114 let output = child.wait_with_output().await?; 115 if !output.status.success() { 116 let stderr = String::from_utf8_lossy(&output.stderr); 117 return Err(SendError::SendmailFailed(stderr.to_string())); 118 } 119 Ok(()) 120 } 121} 122pub struct DiscordSender { 123 webhook_url: String, 124 http_client: Client, 125} 126impl DiscordSender { 127 pub fn new(webhook_url: String) -> Self { 128 Self { 129 webhook_url, 130 http_client: create_http_client(), 131 } 132 } 133 pub fn from_env() -> Option<Self> { 134 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?; 135 Some(Self::new(webhook_url)) 136 } 137} 138#[async_trait] 139impl NotificationSender for DiscordSender { 140 fn channel(&self) -> NotificationChannel { 141 NotificationChannel::Discord 142 } 143 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 144 let subject = notification.subject.as_deref().unwrap_or("Notification"); 145 let content = format!("**{}**\n\n{}", subject, notification.body); 146 let payload = json!({ 147 "content": content, 148 "username": "BSPDS" 149 }); 150 let mut last_error = None; 151 for attempt in 0..MAX_RETRIES { 152 let result = self 153 .http_client 154 .post(&self.webhook_url) 155 .json(&payload) 156 .send() 157 .await; 158 match result { 159 Ok(response) => { 160 if response.status().is_success() { 161 return Ok(()); 162 } 163 let status = response.status(); 164 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 165 last_error = Some(format!("Discord webhook returned {}", status)); 166 retry_delay(attempt).await; 167 continue; 168 } 169 let body = response.text().await.unwrap_or_default(); 170 return Err(SendError::ExternalService(format!( 171 "Discord webhook returned {}: {}", 172 status, body 173 ))); 174 } 175 Err(e) => { 176 if e.is_timeout() { 177 if attempt < MAX_RETRIES - 1 { 178 last_error = Some(format!("Discord request timed out")); 179 retry_delay(attempt).await; 180 continue; 181 } 182 return Err(SendError::Timeout); 183 } 184 return Err(SendError::ExternalService(format!( 185 "Discord request failed: {}", 186 e 187 ))); 188 } 189 } 190 } 191 Err(SendError::MaxRetriesExceeded( 192 last_error.unwrap_or_else(|| "Unknown error".to_string()), 193 )) 194 } 195} 196pub struct TelegramSender { 197 bot_token: String, 198 http_client: Client, 199} 200impl TelegramSender { 201 pub fn new(bot_token: String) -> Self { 202 Self { 203 bot_token, 204 http_client: create_http_client(), 205 } 206 } 207 pub fn from_env() -> Option<Self> { 208 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?; 209 Some(Self::new(bot_token)) 210 } 211} 212#[async_trait] 213impl NotificationSender for TelegramSender { 214 fn channel(&self) -> NotificationChannel { 215 NotificationChannel::Telegram 216 } 217 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 218 let chat_id = &notification.recipient; 219 let subject = notification.subject.as_deref().unwrap_or("Notification"); 220 let text = format!("*{}*\n\n{}", subject, notification.body); 221 let url = format!( 222 "https://api.telegram.org/bot{}/sendMessage", 223 self.bot_token 224 ); 225 let payload = json!({ 226 "chat_id": chat_id, 227 "text": text, 228 "parse_mode": "Markdown" 229 }); 230 let mut last_error = None; 231 for attempt in 0..MAX_RETRIES { 232 let result = self 233 .http_client 234 .post(&url) 235 .json(&payload) 236 .send() 237 .await; 238 match result { 239 Ok(response) => { 240 if response.status().is_success() { 241 return Ok(()); 242 } 243 let status = response.status(); 244 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 245 last_error = Some(format!("Telegram API returned {}", status)); 246 retry_delay(attempt).await; 247 continue; 248 } 249 let body = response.text().await.unwrap_or_default(); 250 return Err(SendError::ExternalService(format!( 251 "Telegram API returned {}: {}", 252 status, body 253 ))); 254 } 255 Err(e) => { 256 if e.is_timeout() { 257 if attempt < MAX_RETRIES - 1 { 258 last_error = Some(format!("Telegram request timed out")); 259 retry_delay(attempt).await; 260 continue; 261 } 262 return Err(SendError::Timeout); 263 } 264 return Err(SendError::ExternalService(format!( 265 "Telegram request failed: {}", 266 e 267 ))); 268 } 269 } 270 } 271 Err(SendError::MaxRetriesExceeded( 272 last_error.unwrap_or_else(|| "Unknown error".to_string()), 273 )) 274 } 275} 276pub struct SignalSender { 277 signal_cli_path: String, 278 sender_number: String, 279} 280impl SignalSender { 281 pub fn new(signal_cli_path: String, sender_number: String) -> Self { 282 Self { 283 signal_cli_path, 284 sender_number, 285 } 286 } 287 pub fn from_env() -> Option<Self> { 288 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH") 289 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string()); 290 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?; 291 Some(Self::new(signal_cli_path, sender_number)) 292 } 293} 294#[async_trait] 295impl NotificationSender for SignalSender { 296 fn channel(&self) -> NotificationChannel { 297 NotificationChannel::Signal 298 } 299 async fn send(&self, notification: &QueuedNotification) -> Result<(), SendError> { 300 let recipient = &notification.recipient; 301 if !is_valid_phone_number(recipient) { 302 return Err(SendError::InvalidRecipient(format!( 303 "Invalid phone number format: {}", 304 recipient 305 ))); 306 } 307 let subject = notification.subject.as_deref().unwrap_or("Notification"); 308 let message = format!("{}\n\n{}", subject, notification.body); 309 let output = Command::new(&self.signal_cli_path) 310 .arg("-u") 311 .arg(&self.sender_number) 312 .arg("send") 313 .arg("-m") 314 .arg(&message) 315 .arg(recipient) 316 .output() 317 .await?; 318 if !output.status.success() { 319 let stderr = String::from_utf8_lossy(&output.stderr); 320 return Err(SendError::ExternalService(format!( 321 "signal-cli failed: {}", 322 stderr 323 ))); 324 } 325 Ok(()) 326 } 327}