this repo has no description
1use async_trait::async_trait;
2use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
3use reqwest::Client;
4use serde_json::json;
5use std::process::Stdio;
6use std::time::Duration;
7use tokio::io::AsyncWriteExt;
8use tokio::process::Command;
9
10use super::types::{CommsChannel, QueuedComms};
11
12const HTTP_TIMEOUT_SECS: u64 = 30;
13const MAX_RETRIES: u32 = 3;
14const INITIAL_RETRY_DELAY_MS: u64 = 500;
15
16#[async_trait]
17pub trait CommsSender: Send + Sync {
18 fn channel(&self) -> CommsChannel;
19 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError>;
20}
21
22#[derive(Debug, thiserror::Error)]
23pub enum SendError {
24 #[error("Failed to spawn sendmail process: {0}")]
25 ProcessSpawn(#[from] std::io::Error),
26 #[error("Sendmail exited with non-zero status: {0}")]
27 SendmailFailed(String),
28 #[error("Channel not configured: {0:?}")]
29 NotConfigured(CommsChannel),
30 #[error("External service error: {0}")]
31 ExternalService(String),
32 #[error("Invalid recipient format: {0}")]
33 InvalidRecipient(String),
34 #[error("Request timeout")]
35 Timeout,
36 #[error("Max retries exceeded: {0}")]
37 MaxRetriesExceeded(String),
38}
39
40fn create_http_client() -> Client {
41 Client::builder()
42 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS))
43 .connect_timeout(Duration::from_secs(10))
44 .build()
45 .unwrap_or_else(|_| Client::new())
46}
47
48fn is_retryable_status(status: reqwest::StatusCode) -> bool {
49 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS
50}
51
52async fn retry_delay(attempt: u32) {
53 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt);
54 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
55}
56
57pub fn sanitize_header_value(value: &str) -> String {
58 value.replace(['\r', '\n'], " ").trim().to_string()
59}
60
61pub fn mime_encode_header(value: &str) -> String {
62 if value.is_ascii() {
63 sanitize_header_value(value)
64 } else {
65 let sanitized = sanitize_header_value(value);
66 format!("=?UTF-8?B?{}?=", BASE64.encode(sanitized.as_bytes()))
67 }
68}
69
70pub fn is_valid_phone_number(number: &str) -> bool {
71 if number.len() < 2 || number.len() > 20 {
72 return false;
73 }
74 let mut chars = number.chars();
75 if chars.next() != Some('+') {
76 return false;
77 }
78 let remaining: String = chars.collect();
79 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit())
80}
81
82pub struct EmailSender {
83 from_address: String,
84 from_name: String,
85 sendmail_path: String,
86}
87
88impl EmailSender {
89 pub fn new(from_address: String, from_name: String) -> Self {
90 Self {
91 from_address,
92 from_name,
93 sendmail_path: std::env::var("SENDMAIL_PATH")
94 .unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()),
95 }
96 }
97
98 pub fn from_env() -> Option<Self> {
99 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?;
100 let from_name =
101 std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "Tranquil PDS".to_string());
102 Some(Self::new(from_address, from_name))
103 }
104
105 pub fn format_email(&self, notification: &QueuedComms) -> String {
106 let subject = mime_encode_header(notification.subject.as_deref().unwrap_or("Notification"));
107 let recipient = sanitize_header_value(¬ification.recipient);
108 let from_header = if self.from_name.is_empty() {
109 self.from_address.clone()
110 } else {
111 format!(
112 "{} <{}>",
113 sanitize_header_value(&self.from_name),
114 self.from_address
115 )
116 };
117 format!(
118 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}",
119 from_header, recipient, subject, notification.body
120 )
121 }
122}
123
124#[async_trait]
125impl CommsSender for EmailSender {
126 fn channel(&self) -> CommsChannel {
127 CommsChannel::Email
128 }
129
130 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
131 let email_content = self.format_email(notification);
132 let mut child = Command::new(&self.sendmail_path)
133 .arg("-t")
134 .arg("-oi")
135 .stdin(Stdio::piped())
136 .stdout(Stdio::piped())
137 .stderr(Stdio::piped())
138 .spawn()?;
139 if let Some(mut stdin) = child.stdin.take() {
140 stdin.write_all(email_content.as_bytes()).await?;
141 }
142 let output = child.wait_with_output().await?;
143 if !output.status.success() {
144 let stderr = String::from_utf8_lossy(&output.stderr);
145 return Err(SendError::SendmailFailed(stderr.to_string()));
146 }
147 Ok(())
148 }
149}
150
151pub struct DiscordSender {
152 webhook_url: String,
153 http_client: Client,
154}
155
156impl DiscordSender {
157 pub fn new(webhook_url: String) -> Self {
158 Self {
159 webhook_url,
160 http_client: create_http_client(),
161 }
162 }
163
164 pub fn from_env() -> Option<Self> {
165 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?;
166 Some(Self::new(webhook_url))
167 }
168}
169
170#[async_trait]
171impl CommsSender for DiscordSender {
172 fn channel(&self) -> CommsChannel {
173 CommsChannel::Discord
174 }
175
176 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
177 let subject = notification.subject.as_deref().unwrap_or("Notification");
178 let content = format!("**{}**\n\n{}", subject, notification.body);
179 let payload = json!({
180 "content": content,
181 "username": "Tranquil PDS"
182 });
183 let mut last_error = None;
184 for attempt in 0..MAX_RETRIES {
185 let result = self
186 .http_client
187 .post(&self.webhook_url)
188 .json(&payload)
189 .send()
190 .await;
191 match result {
192 Ok(response) => {
193 if response.status().is_success() {
194 return Ok(());
195 }
196 let status = response.status();
197 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
198 last_error = Some(format!("Discord webhook returned {}", status));
199 retry_delay(attempt).await;
200 continue;
201 }
202 let body = response.text().await.unwrap_or_default();
203 return Err(SendError::ExternalService(format!(
204 "Discord webhook returned {}: {}",
205 status, body
206 )));
207 }
208 Err(e) => {
209 if e.is_timeout() {
210 if attempt < MAX_RETRIES - 1 {
211 last_error = Some("Discord request timed out".to_string());
212 retry_delay(attempt).await;
213 continue;
214 }
215 return Err(SendError::Timeout);
216 }
217 return Err(SendError::ExternalService(format!(
218 "Discord request failed: {}",
219 e
220 )));
221 }
222 }
223 }
224 Err(SendError::MaxRetriesExceeded(
225 last_error.unwrap_or_else(|| "Unknown error".to_string()),
226 ))
227 }
228}
229
230pub struct TelegramSender {
231 bot_token: String,
232 http_client: Client,
233}
234
235impl TelegramSender {
236 pub fn new(bot_token: String) -> Self {
237 Self {
238 bot_token,
239 http_client: create_http_client(),
240 }
241 }
242
243 pub fn from_env() -> Option<Self> {
244 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?;
245 Some(Self::new(bot_token))
246 }
247}
248
249#[async_trait]
250impl CommsSender for TelegramSender {
251 fn channel(&self) -> CommsChannel {
252 CommsChannel::Telegram
253 }
254
255 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
256 let chat_id = ¬ification.recipient;
257 let subject = notification.subject.as_deref().unwrap_or("Notification");
258 let text = format!("*{}*\n\n{}", subject, notification.body);
259 let url = format!("https://api.telegram.org/bot{}/sendMessage", self.bot_token);
260 let payload = json!({
261 "chat_id": chat_id,
262 "text": text,
263 "parse_mode": "Markdown"
264 });
265 let mut last_error = None;
266 for attempt in 0..MAX_RETRIES {
267 let result = self.http_client.post(&url).json(&payload).send().await;
268 match result {
269 Ok(response) => {
270 if response.status().is_success() {
271 return Ok(());
272 }
273 let status = response.status();
274 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
275 last_error = Some(format!("Telegram API returned {}", status));
276 retry_delay(attempt).await;
277 continue;
278 }
279 let body = response.text().await.unwrap_or_default();
280 return Err(SendError::ExternalService(format!(
281 "Telegram API returned {}: {}",
282 status, body
283 )));
284 }
285 Err(e) => {
286 if e.is_timeout() {
287 if attempt < MAX_RETRIES - 1 {
288 last_error = Some("Telegram request timed out".to_string());
289 retry_delay(attempt).await;
290 continue;
291 }
292 return Err(SendError::Timeout);
293 }
294 return Err(SendError::ExternalService(format!(
295 "Telegram request failed: {}",
296 e
297 )));
298 }
299 }
300 }
301 Err(SendError::MaxRetriesExceeded(
302 last_error.unwrap_or_else(|| "Unknown error".to_string()),
303 ))
304 }
305}
306
307pub struct SignalSender {
308 signal_cli_path: String,
309 sender_number: String,
310}
311
312impl SignalSender {
313 pub fn new(signal_cli_path: String, sender_number: String) -> Self {
314 Self {
315 signal_cli_path,
316 sender_number,
317 }
318 }
319
320 pub fn from_env() -> Option<Self> {
321 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH")
322 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string());
323 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?;
324 Some(Self::new(signal_cli_path, sender_number))
325 }
326}
327
328#[async_trait]
329impl CommsSender for SignalSender {
330 fn channel(&self) -> CommsChannel {
331 CommsChannel::Signal
332 }
333
334 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
335 let recipient = ¬ification.recipient;
336 if !is_valid_phone_number(recipient) {
337 return Err(SendError::InvalidRecipient(format!(
338 "Invalid phone number format: {}",
339 recipient
340 )));
341 }
342 let subject = notification.subject.as_deref().unwrap_or("Notification");
343 let message = format!("{}\n\n{}", subject, notification.body);
344 let output = Command::new(&self.signal_cli_path)
345 .arg("-u")
346 .arg(&self.sender_number)
347 .arg("send")
348 .arg("-m")
349 .arg(&message)
350 .arg(recipient)
351 .output()
352 .await?;
353 if !output.status.success() {
354 let stderr = String::from_utf8_lossy(&output.stderr);
355 return Err(SendError::ExternalService(format!(
356 "signal-cli failed: {}",
357 stderr
358 )));
359 }
360 Ok(())
361 }
362}