this repo has no description
1use async_trait::async_trait; 2use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; 3use reqwest::Client; 4use serde_json::json; 5use std::process::Stdio; 6use std::time::Duration; 7use tokio::io::AsyncWriteExt; 8use tokio::process::Command; 9 10use super::types::{CommsChannel, QueuedComms}; 11 12const HTTP_TIMEOUT_SECS: u64 = 30; 13const MAX_RETRIES: u32 = 3; 14const INITIAL_RETRY_DELAY_MS: u64 = 500; 15 16#[async_trait] 17pub trait CommsSender: Send + Sync { 18 fn channel(&self) -> CommsChannel; 19 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError>; 20} 21 22#[derive(Debug, thiserror::Error)] 23pub enum SendError { 24 #[error("Failed to spawn sendmail process: {0}")] 25 ProcessSpawn(#[from] std::io::Error), 26 #[error("Sendmail exited with non-zero status: {0}")] 27 SendmailFailed(String), 28 #[error("Channel not configured: {0:?}")] 29 NotConfigured(CommsChannel), 30 #[error("External service error: {0}")] 31 ExternalService(String), 32 #[error("Invalid recipient format: {0}")] 33 InvalidRecipient(String), 34 #[error("Request timeout")] 35 Timeout, 36 #[error("Max retries exceeded: {0}")] 37 MaxRetriesExceeded(String), 38} 39 40fn create_http_client() -> Client { 41 Client::builder() 42 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS)) 43 .connect_timeout(Duration::from_secs(10)) 44 .build() 45 .unwrap_or_else(|_| Client::new()) 46} 47 48fn is_retryable_status(status: reqwest::StatusCode) -> bool { 49 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS 50} 51 52async fn retry_delay(attempt: u32) { 53 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt); 54 tokio::time::sleep(Duration::from_millis(delay_ms)).await; 55} 56 57pub fn sanitize_header_value(value: &str) -> String { 58 value.replace(['\r', '\n'], " ").trim().to_string() 59} 60 61pub fn mime_encode_header(value: &str) -> String { 62 if value.is_ascii() { 63 sanitize_header_value(value) 64 } else { 65 let sanitized = sanitize_header_value(value); 66 format!("=?UTF-8?B?{}?=", BASE64.encode(sanitized.as_bytes())) 67 } 68} 69 70pub fn is_valid_phone_number(number: &str) -> bool { 71 if number.len() < 2 || number.len() > 20 { 72 return false; 73 } 74 let mut chars = number.chars(); 75 if chars.next() != Some('+') { 76 return false; 77 } 78 let remaining: String = chars.collect(); 79 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit()) 80} 81 82pub struct EmailSender { 83 from_address: String, 84 from_name: String, 85 sendmail_path: String, 86} 87 88impl EmailSender { 89 pub fn new(from_address: String, from_name: String) -> Self { 90 Self { 91 from_address, 92 from_name, 93 sendmail_path: std::env::var("SENDMAIL_PATH") 94 .unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()), 95 } 96 } 97 98 pub fn from_env() -> Option<Self> { 99 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?; 100 let from_name = 101 std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "Tranquil PDS".to_string()); 102 Some(Self::new(from_address, from_name)) 103 } 104 105 pub fn format_email(&self, notification: &QueuedComms) -> String { 106 let subject = mime_encode_header(notification.subject.as_deref().unwrap_or("Notification")); 107 let recipient = sanitize_header_value(&notification.recipient); 108 let from_header = if self.from_name.is_empty() { 109 self.from_address.clone() 110 } else { 111 format!( 112 "{} <{}>", 113 sanitize_header_value(&self.from_name), 114 self.from_address 115 ) 116 }; 117 format!( 118 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}", 119 from_header, recipient, subject, notification.body 120 ) 121 } 122} 123 124#[async_trait] 125impl CommsSender for EmailSender { 126 fn channel(&self) -> CommsChannel { 127 CommsChannel::Email 128 } 129 130 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { 131 let email_content = self.format_email(notification); 132 let mut child = Command::new(&self.sendmail_path) 133 .arg("-t") 134 .arg("-oi") 135 .stdin(Stdio::piped()) 136 .stdout(Stdio::piped()) 137 .stderr(Stdio::piped()) 138 .spawn()?; 139 if let Some(mut stdin) = child.stdin.take() { 140 stdin.write_all(email_content.as_bytes()).await?; 141 } 142 let output = child.wait_with_output().await?; 143 if !output.status.success() { 144 let stderr = String::from_utf8_lossy(&output.stderr); 145 return Err(SendError::SendmailFailed(stderr.to_string())); 146 } 147 Ok(()) 148 } 149} 150 151pub struct DiscordSender { 152 webhook_url: String, 153 http_client: Client, 154} 155 156impl DiscordSender { 157 pub fn new(webhook_url: String) -> Self { 158 Self { 159 webhook_url, 160 http_client: create_http_client(), 161 } 162 } 163 164 pub fn from_env() -> Option<Self> { 165 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?; 166 Some(Self::new(webhook_url)) 167 } 168} 169 170#[async_trait] 171impl CommsSender for DiscordSender { 172 fn channel(&self) -> CommsChannel { 173 CommsChannel::Discord 174 } 175 176 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { 177 let subject = notification.subject.as_deref().unwrap_or("Notification"); 178 let content = format!("**{}**\n\n{}", subject, notification.body); 179 let payload = json!({ 180 "content": content, 181 "username": "Tranquil PDS" 182 }); 183 let mut last_error = None; 184 for attempt in 0..MAX_RETRIES { 185 let result = self 186 .http_client 187 .post(&self.webhook_url) 188 .json(&payload) 189 .send() 190 .await; 191 match result { 192 Ok(response) => { 193 if response.status().is_success() { 194 return Ok(()); 195 } 196 let status = response.status(); 197 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 198 last_error = Some(format!("Discord webhook returned {}", status)); 199 retry_delay(attempt).await; 200 continue; 201 } 202 let body = response.text().await.unwrap_or_default(); 203 return Err(SendError::ExternalService(format!( 204 "Discord webhook returned {}: {}", 205 status, body 206 ))); 207 } 208 Err(e) => { 209 if e.is_timeout() { 210 if attempt < MAX_RETRIES - 1 { 211 last_error = Some("Discord request timed out".to_string()); 212 retry_delay(attempt).await; 213 continue; 214 } 215 return Err(SendError::Timeout); 216 } 217 return Err(SendError::ExternalService(format!( 218 "Discord request failed: {}", 219 e 220 ))); 221 } 222 } 223 } 224 Err(SendError::MaxRetriesExceeded( 225 last_error.unwrap_or_else(|| "Unknown error".to_string()), 226 )) 227 } 228} 229 230pub struct TelegramSender { 231 bot_token: String, 232 http_client: Client, 233} 234 235impl TelegramSender { 236 pub fn new(bot_token: String) -> Self { 237 Self { 238 bot_token, 239 http_client: create_http_client(), 240 } 241 } 242 243 pub fn from_env() -> Option<Self> { 244 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?; 245 Some(Self::new(bot_token)) 246 } 247} 248 249#[async_trait] 250impl CommsSender for TelegramSender { 251 fn channel(&self) -> CommsChannel { 252 CommsChannel::Telegram 253 } 254 255 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { 256 let chat_id = &notification.recipient; 257 let subject = notification.subject.as_deref().unwrap_or("Notification"); 258 let text = format!("*{}*\n\n{}", subject, notification.body); 259 let url = format!("https://api.telegram.org/bot{}/sendMessage", self.bot_token); 260 let payload = json!({ 261 "chat_id": chat_id, 262 "text": text, 263 "parse_mode": "Markdown" 264 }); 265 let mut last_error = None; 266 for attempt in 0..MAX_RETRIES { 267 let result = self.http_client.post(&url).json(&payload).send().await; 268 match result { 269 Ok(response) => { 270 if response.status().is_success() { 271 return Ok(()); 272 } 273 let status = response.status(); 274 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 275 last_error = Some(format!("Telegram API returned {}", status)); 276 retry_delay(attempt).await; 277 continue; 278 } 279 let body = response.text().await.unwrap_or_default(); 280 return Err(SendError::ExternalService(format!( 281 "Telegram API returned {}: {}", 282 status, body 283 ))); 284 } 285 Err(e) => { 286 if e.is_timeout() { 287 if attempt < MAX_RETRIES - 1 { 288 last_error = Some("Telegram request timed out".to_string()); 289 retry_delay(attempt).await; 290 continue; 291 } 292 return Err(SendError::Timeout); 293 } 294 return Err(SendError::ExternalService(format!( 295 "Telegram request failed: {}", 296 e 297 ))); 298 } 299 } 300 } 301 Err(SendError::MaxRetriesExceeded( 302 last_error.unwrap_or_else(|| "Unknown error".to_string()), 303 )) 304 } 305} 306 307pub struct SignalSender { 308 signal_cli_path: String, 309 sender_number: String, 310} 311 312impl SignalSender { 313 pub fn new(signal_cli_path: String, sender_number: String) -> Self { 314 Self { 315 signal_cli_path, 316 sender_number, 317 } 318 } 319 320 pub fn from_env() -> Option<Self> { 321 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH") 322 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string()); 323 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?; 324 Some(Self::new(signal_cli_path, sender_number)) 325 } 326} 327 328#[async_trait] 329impl CommsSender for SignalSender { 330 fn channel(&self) -> CommsChannel { 331 CommsChannel::Signal 332 } 333 334 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { 335 let recipient = &notification.recipient; 336 if !is_valid_phone_number(recipient) { 337 return Err(SendError::InvalidRecipient(format!( 338 "Invalid phone number format: {}", 339 recipient 340 ))); 341 } 342 let subject = notification.subject.as_deref().unwrap_or("Notification"); 343 let message = format!("{}\n\n{}", subject, notification.body); 344 let output = Command::new(&self.signal_cli_path) 345 .arg("-u") 346 .arg(&self.sender_number) 347 .arg("send") 348 .arg("-m") 349 .arg(&message) 350 .arg(recipient) 351 .output() 352 .await?; 353 if !output.status.success() { 354 let stderr = String::from_utf8_lossy(&output.stderr); 355 return Err(SendError::ExternalService(format!( 356 "signal-cli failed: {}", 357 stderr 358 ))); 359 } 360 Ok(()) 361 } 362}