this repo has no description
1use async_trait::async_trait; 2use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; 3use reqwest::Client; 4use serde_json::json; 5use std::process::Stdio; 6use std::time::Duration; 7use tokio::io::AsyncWriteExt; 8use tokio::process::Command; 9 10use super::types::{CommsChannel, QueuedComms}; 11 12const HTTP_TIMEOUT_SECS: u64 = 30; 13const MAX_RETRIES: u32 = 3; 14const INITIAL_RETRY_DELAY_MS: u64 = 500; 15 16#[async_trait] 17pub trait CommsSender: Send + Sync { 18 fn channel(&self) -> CommsChannel; 19 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError>; 20} 21 22#[derive(Debug, thiserror::Error)] 23pub enum SendError { 24 #[error("Failed to spawn sendmail process: {0}")] 25 ProcessSpawn(#[from] std::io::Error), 26 #[error("Sendmail exited with non-zero status: {0}")] 27 SendmailFailed(String), 28 #[error("Channel not configured: {0:?}")] 29 NotConfigured(CommsChannel), 30 #[error("External service error: {0}")] 31 ExternalService(String), 32 #[error("Invalid recipient format: {0}")] 33 InvalidRecipient(String), 34 #[error("Request timeout")] 35 Timeout, 36 #[error("Max retries exceeded: {0}")] 37 MaxRetriesExceeded(String), 38} 39 40fn create_http_client() -> Client { 41 Client::builder() 42 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS)) 43 .connect_timeout(Duration::from_secs(10)) 44 .build() 45 .unwrap_or_else(|_| Client::new()) 46} 47 48fn is_retryable_status(status: reqwest::StatusCode) -> bool { 49 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS 50} 51 52async fn retry_delay(attempt: u32) { 53 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt); 54 tokio::time::sleep(Duration::from_millis(delay_ms)).await; 55} 56 57pub fn sanitize_header_value(value: &str) -> String { 58 value.replace(['\r', '\n'], " ").trim().to_string() 59} 60 61pub fn mime_encode_header(value: &str) -> String { 62 if value.is_ascii() { 63 sanitize_header_value(value) 64 } else { 65 let sanitized = sanitize_header_value(value); 66 format!("=?UTF-8?B?{}?=", BASE64.encode(sanitized.as_bytes())) 67 } 68} 69 70pub fn is_valid_phone_number(number: &str) -> bool { 71 if number.len() < 2 || number.len() > 20 { 72 return false; 73 } 74 let mut chars = number.chars(); 75 if chars.next() != Some('+') { 76 return false; 77 } 78 let remaining: String = chars.collect(); 79 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit()) 80} 81 82pub struct EmailSender { 83 from_address: String, 84 from_name: String, 85 sendmail_path: String, 86} 87 88impl EmailSender { 89 pub fn new(from_address: String, from_name: String) -> Self { 90 Self { 91 from_address, 92 from_name, 93 sendmail_path: std::env::var("SENDMAIL_PATH") 94 .unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()), 95 } 96 } 97 98 pub fn from_env() -> Option<Self> { 99 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?; 100 let from_name = 101 std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "Tranquil PDS".to_string()); 102 Some(Self::new(from_address, from_name)) 103 } 104 105 pub fn format_email(&self, notification: &QueuedComms) -> String { 106 let subject = 107 mime_encode_header(notification.subject.as_deref().unwrap_or("Notification")); 108 let recipient = sanitize_header_value(&notification.recipient); 109 let from_header = if self.from_name.is_empty() { 110 self.from_address.clone() 111 } else { 112 format!( 113 "{} <{}>", 114 sanitize_header_value(&self.from_name), 115 self.from_address 116 ) 117 }; 118 format!( 119 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}", 120 from_header, recipient, subject, notification.body 121 ) 122 } 123} 124 125#[async_trait] 126impl CommsSender for EmailSender { 127 fn channel(&self) -> CommsChannel { 128 CommsChannel::Email 129 } 130 131 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { 132 let email_content = self.format_email(notification); 133 let mut child = Command::new(&self.sendmail_path) 134 .arg("-t") 135 .arg("-oi") 136 .stdin(Stdio::piped()) 137 .stdout(Stdio::piped()) 138 .stderr(Stdio::piped()) 139 .spawn()?; 140 if let Some(mut stdin) = child.stdin.take() { 141 stdin.write_all(email_content.as_bytes()).await?; 142 } 143 let output = child.wait_with_output().await?; 144 if !output.status.success() { 145 let stderr = String::from_utf8_lossy(&output.stderr); 146 return Err(SendError::SendmailFailed(stderr.to_string())); 147 } 148 Ok(()) 149 } 150} 151 152pub struct DiscordSender { 153 webhook_url: String, 154 http_client: Client, 155} 156 157impl DiscordSender { 158 pub fn new(webhook_url: String) -> Self { 159 Self { 160 webhook_url, 161 http_client: create_http_client(), 162 } 163 } 164 165 pub fn from_env() -> Option<Self> { 166 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?; 167 Some(Self::new(webhook_url)) 168 } 169} 170 171#[async_trait] 172impl CommsSender for DiscordSender { 173 fn channel(&self) -> CommsChannel { 174 CommsChannel::Discord 175 } 176 177 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { 178 let subject = notification.subject.as_deref().unwrap_or("Notification"); 179 let content = format!("**{}**\n\n{}", subject, notification.body); 180 let payload = json!({ 181 "content": content, 182 "username": "Tranquil PDS" 183 }); 184 let mut last_error = None; 185 for attempt in 0..MAX_RETRIES { 186 let result = self 187 .http_client 188 .post(&self.webhook_url) 189 .json(&payload) 190 .send() 191 .await; 192 match result { 193 Ok(response) => { 194 if response.status().is_success() { 195 return Ok(()); 196 } 197 let status = response.status(); 198 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 199 last_error = Some(format!("Discord webhook returned {}", status)); 200 retry_delay(attempt).await; 201 continue; 202 } 203 let body = response.text().await.unwrap_or_default(); 204 return Err(SendError::ExternalService(format!( 205 "Discord webhook returned {}: {}", 206 status, body 207 ))); 208 } 209 Err(e) => { 210 if e.is_timeout() { 211 if attempt < MAX_RETRIES - 1 { 212 last_error = Some("Discord request timed out".to_string()); 213 retry_delay(attempt).await; 214 continue; 215 } 216 return Err(SendError::Timeout); 217 } 218 return Err(SendError::ExternalService(format!( 219 "Discord request failed: {}", 220 e 221 ))); 222 } 223 } 224 } 225 Err(SendError::MaxRetriesExceeded( 226 last_error.unwrap_or_else(|| "Unknown error".to_string()), 227 )) 228 } 229} 230 231pub struct TelegramSender { 232 bot_token: String, 233 http_client: Client, 234} 235 236impl TelegramSender { 237 pub fn new(bot_token: String) -> Self { 238 Self { 239 bot_token, 240 http_client: create_http_client(), 241 } 242 } 243 244 pub fn from_env() -> Option<Self> { 245 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?; 246 Some(Self::new(bot_token)) 247 } 248} 249 250#[async_trait] 251impl CommsSender for TelegramSender { 252 fn channel(&self) -> CommsChannel { 253 CommsChannel::Telegram 254 } 255 256 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { 257 let chat_id = &notification.recipient; 258 let subject = notification.subject.as_deref().unwrap_or("Notification"); 259 let text = format!("*{}*\n\n{}", subject, notification.body); 260 let url = format!("https://api.telegram.org/bot{}/sendMessage", self.bot_token); 261 let payload = json!({ 262 "chat_id": chat_id, 263 "text": text, 264 "parse_mode": "Markdown" 265 }); 266 let mut last_error = None; 267 for attempt in 0..MAX_RETRIES { 268 let result = self.http_client.post(&url).json(&payload).send().await; 269 match result { 270 Ok(response) => { 271 if response.status().is_success() { 272 return Ok(()); 273 } 274 let status = response.status(); 275 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 276 last_error = Some(format!("Telegram API returned {}", status)); 277 retry_delay(attempt).await; 278 continue; 279 } 280 let body = response.text().await.unwrap_or_default(); 281 return Err(SendError::ExternalService(format!( 282 "Telegram API returned {}: {}", 283 status, body 284 ))); 285 } 286 Err(e) => { 287 if e.is_timeout() { 288 if attempt < MAX_RETRIES - 1 { 289 last_error = Some("Telegram request timed out".to_string()); 290 retry_delay(attempt).await; 291 continue; 292 } 293 return Err(SendError::Timeout); 294 } 295 return Err(SendError::ExternalService(format!( 296 "Telegram request failed: {}", 297 e 298 ))); 299 } 300 } 301 } 302 Err(SendError::MaxRetriesExceeded( 303 last_error.unwrap_or_else(|| "Unknown error".to_string()), 304 )) 305 } 306} 307 308pub struct SignalSender { 309 signal_cli_path: String, 310 sender_number: String, 311} 312 313impl SignalSender { 314 pub fn new(signal_cli_path: String, sender_number: String) -> Self { 315 Self { 316 signal_cli_path, 317 sender_number, 318 } 319 } 320 321 pub fn from_env() -> Option<Self> { 322 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH") 323 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string()); 324 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?; 325 Some(Self::new(signal_cli_path, sender_number)) 326 } 327} 328 329#[async_trait] 330impl CommsSender for SignalSender { 331 fn channel(&self) -> CommsChannel { 332 CommsChannel::Signal 333 } 334 335 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { 336 let recipient = &notification.recipient; 337 if !is_valid_phone_number(recipient) { 338 return Err(SendError::InvalidRecipient(format!( 339 "Invalid phone number format: {}", 340 recipient 341 ))); 342 } 343 let subject = notification.subject.as_deref().unwrap_or("Notification"); 344 let message = format!("{}\n\n{}", subject, notification.body); 345 let output = Command::new(&self.signal_cli_path) 346 .arg("-u") 347 .arg(&self.sender_number) 348 .arg("send") 349 .arg("-m") 350 .arg(&message) 351 .arg(recipient) 352 .output() 353 .await?; 354 if !output.status.success() { 355 let stderr = String::from_utf8_lossy(&output.stderr); 356 return Err(SendError::ExternalService(format!( 357 "signal-cli failed: {}", 358 stderr 359 ))); 360 } 361 Ok(()) 362 } 363}