this repo has no description
1use async_trait::async_trait; 2use reqwest::Client; 3use serde_json::json; 4use std::process::Stdio; 5use std::time::Duration; 6use tokio::io::AsyncWriteExt; 7use tokio::process::Command; 8 9use super::types::{CommsChannel, QueuedComms}; 10 11const HTTP_TIMEOUT_SECS: u64 = 30; 12const MAX_RETRIES: u32 = 3; 13const INITIAL_RETRY_DELAY_MS: u64 = 500; 14 15#[async_trait] 16pub trait CommsSender: Send + Sync { 17 fn channel(&self) -> CommsChannel; 18 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError>; 19} 20 21#[derive(Debug, thiserror::Error)] 22pub enum SendError { 23 #[error("Failed to spawn sendmail process: {0}")] 24 ProcessSpawn(#[from] std::io::Error), 25 #[error("Sendmail exited with non-zero status: {0}")] 26 SendmailFailed(String), 27 #[error("Channel not configured: {0:?}")] 28 NotConfigured(CommsChannel), 29 #[error("External service error: {0}")] 30 ExternalService(String), 31 #[error("Invalid recipient format: {0}")] 32 InvalidRecipient(String), 33 #[error("Request timeout")] 34 Timeout, 35 #[error("Max retries exceeded: {0}")] 36 MaxRetriesExceeded(String), 37} 38 39fn create_http_client() -> Client { 40 Client::builder() 41 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS)) 42 .connect_timeout(Duration::from_secs(10)) 43 .build() 44 .unwrap_or_else(|_| Client::new()) 45} 46 47fn is_retryable_status(status: reqwest::StatusCode) -> bool { 48 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS 49} 50 51async fn retry_delay(attempt: u32) { 52 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt); 53 tokio::time::sleep(Duration::from_millis(delay_ms)).await; 54} 55 56pub fn sanitize_header_value(value: &str) -> String { 57 value.replace(['\r', '\n'], " ").trim().to_string() 58} 59 60pub fn is_valid_phone_number(number: &str) -> bool { 61 if number.len() < 2 || number.len() > 20 { 62 return false; 63 } 64 let mut chars = number.chars(); 65 if chars.next() != Some('+') { 66 return false; 67 } 68 let remaining: String = chars.collect(); 69 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit()) 70} 71 72pub struct EmailSender { 73 from_address: String, 74 from_name: String, 75 sendmail_path: String, 76} 77 78impl EmailSender { 79 pub fn new(from_address: String, from_name: String) -> Self { 80 Self { 81 from_address, 82 from_name, 83 sendmail_path: std::env::var("SENDMAIL_PATH") 84 .unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()), 85 } 86 } 87 88 pub fn from_env() -> Option<Self> { 89 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?; 90 let from_name = 91 std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "Tranquil PDS".to_string()); 92 Some(Self::new(from_address, from_name)) 93 } 94 95 pub fn format_email(&self, notification: &QueuedComms) -> String { 96 let subject = 97 sanitize_header_value(notification.subject.as_deref().unwrap_or("Notification")); 98 let recipient = sanitize_header_value(&notification.recipient); 99 let from_header = if self.from_name.is_empty() { 100 self.from_address.clone() 101 } else { 102 format!( 103 "{} <{}>", 104 sanitize_header_value(&self.from_name), 105 self.from_address 106 ) 107 }; 108 format!( 109 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}", 110 from_header, recipient, subject, notification.body 111 ) 112 } 113} 114 115#[async_trait] 116impl CommsSender for EmailSender { 117 fn channel(&self) -> CommsChannel { 118 CommsChannel::Email 119 } 120 121 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { 122 let email_content = self.format_email(notification); 123 let mut child = Command::new(&self.sendmail_path) 124 .arg("-t") 125 .arg("-oi") 126 .stdin(Stdio::piped()) 127 .stdout(Stdio::piped()) 128 .stderr(Stdio::piped()) 129 .spawn()?; 130 if let Some(mut stdin) = child.stdin.take() { 131 stdin.write_all(email_content.as_bytes()).await?; 132 } 133 let output = child.wait_with_output().await?; 134 if !output.status.success() { 135 let stderr = String::from_utf8_lossy(&output.stderr); 136 return Err(SendError::SendmailFailed(stderr.to_string())); 137 } 138 Ok(()) 139 } 140} 141 142pub struct DiscordSender { 143 webhook_url: String, 144 http_client: Client, 145} 146 147impl DiscordSender { 148 pub fn new(webhook_url: String) -> Self { 149 Self { 150 webhook_url, 151 http_client: create_http_client(), 152 } 153 } 154 155 pub fn from_env() -> Option<Self> { 156 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?; 157 Some(Self::new(webhook_url)) 158 } 159} 160 161#[async_trait] 162impl CommsSender for DiscordSender { 163 fn channel(&self) -> CommsChannel { 164 CommsChannel::Discord 165 } 166 167 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { 168 let subject = notification.subject.as_deref().unwrap_or("Notification"); 169 let content = format!("**{}**\n\n{}", subject, notification.body); 170 let payload = json!({ 171 "content": content, 172 "username": "Tranquil PDS" 173 }); 174 let mut last_error = None; 175 for attempt in 0..MAX_RETRIES { 176 let result = self 177 .http_client 178 .post(&self.webhook_url) 179 .json(&payload) 180 .send() 181 .await; 182 match result { 183 Ok(response) => { 184 if response.status().is_success() { 185 return Ok(()); 186 } 187 let status = response.status(); 188 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 189 last_error = Some(format!("Discord webhook returned {}", status)); 190 retry_delay(attempt).await; 191 continue; 192 } 193 let body = response.text().await.unwrap_or_default(); 194 return Err(SendError::ExternalService(format!( 195 "Discord webhook returned {}: {}", 196 status, body 197 ))); 198 } 199 Err(e) => { 200 if e.is_timeout() { 201 if attempt < MAX_RETRIES - 1 { 202 last_error = Some("Discord request timed out".to_string()); 203 retry_delay(attempt).await; 204 continue; 205 } 206 return Err(SendError::Timeout); 207 } 208 return Err(SendError::ExternalService(format!( 209 "Discord request failed: {}", 210 e 211 ))); 212 } 213 } 214 } 215 Err(SendError::MaxRetriesExceeded( 216 last_error.unwrap_or_else(|| "Unknown error".to_string()), 217 )) 218 } 219} 220 221pub struct TelegramSender { 222 bot_token: String, 223 http_client: Client, 224} 225 226impl TelegramSender { 227 pub fn new(bot_token: String) -> Self { 228 Self { 229 bot_token, 230 http_client: create_http_client(), 231 } 232 } 233 234 pub fn from_env() -> Option<Self> { 235 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?; 236 Some(Self::new(bot_token)) 237 } 238} 239 240#[async_trait] 241impl CommsSender for TelegramSender { 242 fn channel(&self) -> CommsChannel { 243 CommsChannel::Telegram 244 } 245 246 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { 247 let chat_id = &notification.recipient; 248 let subject = notification.subject.as_deref().unwrap_or("Notification"); 249 let text = format!("*{}*\n\n{}", subject, notification.body); 250 let url = format!("https://api.telegram.org/bot{}/sendMessage", self.bot_token); 251 let payload = json!({ 252 "chat_id": chat_id, 253 "text": text, 254 "parse_mode": "Markdown" 255 }); 256 let mut last_error = None; 257 for attempt in 0..MAX_RETRIES { 258 let result = self.http_client.post(&url).json(&payload).send().await; 259 match result { 260 Ok(response) => { 261 if response.status().is_success() { 262 return Ok(()); 263 } 264 let status = response.status(); 265 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 266 last_error = Some(format!("Telegram API returned {}", status)); 267 retry_delay(attempt).await; 268 continue; 269 } 270 let body = response.text().await.unwrap_or_default(); 271 return Err(SendError::ExternalService(format!( 272 "Telegram API returned {}: {}", 273 status, body 274 ))); 275 } 276 Err(e) => { 277 if e.is_timeout() { 278 if attempt < MAX_RETRIES - 1 { 279 last_error = Some("Telegram request timed out".to_string()); 280 retry_delay(attempt).await; 281 continue; 282 } 283 return Err(SendError::Timeout); 284 } 285 return Err(SendError::ExternalService(format!( 286 "Telegram request failed: {}", 287 e 288 ))); 289 } 290 } 291 } 292 Err(SendError::MaxRetriesExceeded( 293 last_error.unwrap_or_else(|| "Unknown error".to_string()), 294 )) 295 } 296} 297 298pub struct SignalSender { 299 signal_cli_path: String, 300 sender_number: String, 301} 302 303impl SignalSender { 304 pub fn new(signal_cli_path: String, sender_number: String) -> Self { 305 Self { 306 signal_cli_path, 307 sender_number, 308 } 309 } 310 311 pub fn from_env() -> Option<Self> { 312 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH") 313 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string()); 314 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?; 315 Some(Self::new(signal_cli_path, sender_number)) 316 } 317} 318 319#[async_trait] 320impl CommsSender for SignalSender { 321 fn channel(&self) -> CommsChannel { 322 CommsChannel::Signal 323 } 324 325 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { 326 let recipient = &notification.recipient; 327 if !is_valid_phone_number(recipient) { 328 return Err(SendError::InvalidRecipient(format!( 329 "Invalid phone number format: {}", 330 recipient 331 ))); 332 } 333 let subject = notification.subject.as_deref().unwrap_or("Notification"); 334 let message = format!("{}\n\n{}", subject, notification.body); 335 let output = Command::new(&self.signal_cli_path) 336 .arg("-u") 337 .arg(&self.sender_number) 338 .arg("send") 339 .arg("-m") 340 .arg(&message) 341 .arg(recipient) 342 .output() 343 .await?; 344 if !output.status.success() { 345 let stderr = String::from_utf8_lossy(&output.stderr); 346 return Err(SendError::ExternalService(format!( 347 "signal-cli failed: {}", 348 stderr 349 ))); 350 } 351 Ok(()) 352 } 353}