this repo has no description
1use async_trait::async_trait;
2use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
3use reqwest::Client;
4use serde_json::json;
5use std::process::Stdio;
6use std::time::Duration;
7use tokio::io::AsyncWriteExt;
8use tokio::process::Command;
9
10use super::types::{CommsChannel, QueuedComms};
11
12const HTTP_TIMEOUT_SECS: u64 = 30;
13const MAX_RETRIES: u32 = 3;
14const INITIAL_RETRY_DELAY_MS: u64 = 500;
15
16#[async_trait]
17pub trait CommsSender: Send + Sync {
18 fn channel(&self) -> CommsChannel;
19 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError>;
20}
21
22#[derive(Debug, thiserror::Error)]
23pub enum SendError {
24 #[error("Failed to spawn sendmail process: {0}")]
25 ProcessSpawn(#[from] std::io::Error),
26 #[error("Sendmail exited with non-zero status: {0}")]
27 SendmailFailed(String),
28 #[error("Channel not configured: {0:?}")]
29 NotConfigured(CommsChannel),
30 #[error("External service error: {0}")]
31 ExternalService(String),
32 #[error("Invalid recipient format: {0}")]
33 InvalidRecipient(String),
34 #[error("Request timeout")]
35 Timeout,
36 #[error("Max retries exceeded: {0}")]
37 MaxRetriesExceeded(String),
38}
39
40fn create_http_client() -> Client {
41 Client::builder()
42 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS))
43 .connect_timeout(Duration::from_secs(10))
44 .build()
45 .unwrap_or_else(|_| Client::new())
46}
47
48fn is_retryable_status(status: reqwest::StatusCode) -> bool {
49 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS
50}
51
52async fn retry_delay(attempt: u32) {
53 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt);
54 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
55}
56
57pub fn sanitize_header_value(value: &str) -> String {
58 value.replace(['\r', '\n'], " ").trim().to_string()
59}
60
61pub fn mime_encode_header(value: &str) -> String {
62 if value.is_ascii() {
63 sanitize_header_value(value)
64 } else {
65 let sanitized = sanitize_header_value(value);
66 format!("=?UTF-8?B?{}?=", BASE64.encode(sanitized.as_bytes()))
67 }
68}
69
70pub fn is_valid_phone_number(number: &str) -> bool {
71 if number.len() < 2 || number.len() > 20 {
72 return false;
73 }
74 let mut chars = number.chars();
75 if chars.next() != Some('+') {
76 return false;
77 }
78 let remaining: String = chars.collect();
79 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit())
80}
81
82pub struct EmailSender {
83 from_address: String,
84 from_name: String,
85 sendmail_path: String,
86}
87
88impl EmailSender {
89 pub fn new(from_address: String, from_name: String) -> Self {
90 Self {
91 from_address,
92 from_name,
93 sendmail_path: std::env::var("SENDMAIL_PATH")
94 .unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()),
95 }
96 }
97
98 pub fn from_env() -> Option<Self> {
99 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?;
100 let from_name =
101 std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "Tranquil PDS".to_string());
102 Some(Self::new(from_address, from_name))
103 }
104
105 pub fn format_email(&self, notification: &QueuedComms) -> String {
106 let subject =
107 mime_encode_header(notification.subject.as_deref().unwrap_or("Notification"));
108 let recipient = sanitize_header_value(¬ification.recipient);
109 let from_header = if self.from_name.is_empty() {
110 self.from_address.clone()
111 } else {
112 format!(
113 "{} <{}>",
114 sanitize_header_value(&self.from_name),
115 self.from_address
116 )
117 };
118 format!(
119 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}",
120 from_header, recipient, subject, notification.body
121 )
122 }
123}
124
125#[async_trait]
126impl CommsSender for EmailSender {
127 fn channel(&self) -> CommsChannel {
128 CommsChannel::Email
129 }
130
131 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
132 let email_content = self.format_email(notification);
133 let mut child = Command::new(&self.sendmail_path)
134 .arg("-t")
135 .arg("-oi")
136 .stdin(Stdio::piped())
137 .stdout(Stdio::piped())
138 .stderr(Stdio::piped())
139 .spawn()?;
140 if let Some(mut stdin) = child.stdin.take() {
141 stdin.write_all(email_content.as_bytes()).await?;
142 }
143 let output = child.wait_with_output().await?;
144 if !output.status.success() {
145 let stderr = String::from_utf8_lossy(&output.stderr);
146 return Err(SendError::SendmailFailed(stderr.to_string()));
147 }
148 Ok(())
149 }
150}
151
152pub struct DiscordSender {
153 webhook_url: String,
154 http_client: Client,
155}
156
157impl DiscordSender {
158 pub fn new(webhook_url: String) -> Self {
159 Self {
160 webhook_url,
161 http_client: create_http_client(),
162 }
163 }
164
165 pub fn from_env() -> Option<Self> {
166 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?;
167 Some(Self::new(webhook_url))
168 }
169}
170
171#[async_trait]
172impl CommsSender for DiscordSender {
173 fn channel(&self) -> CommsChannel {
174 CommsChannel::Discord
175 }
176
177 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
178 let subject = notification.subject.as_deref().unwrap_or("Notification");
179 let content = format!("**{}**\n\n{}", subject, notification.body);
180 let payload = json!({
181 "content": content,
182 "username": "Tranquil PDS"
183 });
184 let mut last_error = None;
185 for attempt in 0..MAX_RETRIES {
186 let result = self
187 .http_client
188 .post(&self.webhook_url)
189 .json(&payload)
190 .send()
191 .await;
192 match result {
193 Ok(response) => {
194 if response.status().is_success() {
195 return Ok(());
196 }
197 let status = response.status();
198 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
199 last_error = Some(format!("Discord webhook returned {}", status));
200 retry_delay(attempt).await;
201 continue;
202 }
203 let body = response.text().await.unwrap_or_default();
204 return Err(SendError::ExternalService(format!(
205 "Discord webhook returned {}: {}",
206 status, body
207 )));
208 }
209 Err(e) => {
210 if e.is_timeout() {
211 if attempt < MAX_RETRIES - 1 {
212 last_error = Some("Discord request timed out".to_string());
213 retry_delay(attempt).await;
214 continue;
215 }
216 return Err(SendError::Timeout);
217 }
218 return Err(SendError::ExternalService(format!(
219 "Discord request failed: {}",
220 e
221 )));
222 }
223 }
224 }
225 Err(SendError::MaxRetriesExceeded(
226 last_error.unwrap_or_else(|| "Unknown error".to_string()),
227 ))
228 }
229}
230
231pub struct TelegramSender {
232 bot_token: String,
233 http_client: Client,
234}
235
236impl TelegramSender {
237 pub fn new(bot_token: String) -> Self {
238 Self {
239 bot_token,
240 http_client: create_http_client(),
241 }
242 }
243
244 pub fn from_env() -> Option<Self> {
245 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?;
246 Some(Self::new(bot_token))
247 }
248}
249
250#[async_trait]
251impl CommsSender for TelegramSender {
252 fn channel(&self) -> CommsChannel {
253 CommsChannel::Telegram
254 }
255
256 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
257 let chat_id = ¬ification.recipient;
258 let subject = notification.subject.as_deref().unwrap_or("Notification");
259 let text = format!("*{}*\n\n{}", subject, notification.body);
260 let url = format!("https://api.telegram.org/bot{}/sendMessage", self.bot_token);
261 let payload = json!({
262 "chat_id": chat_id,
263 "text": text,
264 "parse_mode": "Markdown"
265 });
266 let mut last_error = None;
267 for attempt in 0..MAX_RETRIES {
268 let result = self.http_client.post(&url).json(&payload).send().await;
269 match result {
270 Ok(response) => {
271 if response.status().is_success() {
272 return Ok(());
273 }
274 let status = response.status();
275 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
276 last_error = Some(format!("Telegram API returned {}", status));
277 retry_delay(attempt).await;
278 continue;
279 }
280 let body = response.text().await.unwrap_or_default();
281 return Err(SendError::ExternalService(format!(
282 "Telegram API returned {}: {}",
283 status, body
284 )));
285 }
286 Err(e) => {
287 if e.is_timeout() {
288 if attempt < MAX_RETRIES - 1 {
289 last_error = Some("Telegram request timed out".to_string());
290 retry_delay(attempt).await;
291 continue;
292 }
293 return Err(SendError::Timeout);
294 }
295 return Err(SendError::ExternalService(format!(
296 "Telegram request failed: {}",
297 e
298 )));
299 }
300 }
301 }
302 Err(SendError::MaxRetriesExceeded(
303 last_error.unwrap_or_else(|| "Unknown error".to_string()),
304 ))
305 }
306}
307
308pub struct SignalSender {
309 signal_cli_path: String,
310 sender_number: String,
311}
312
313impl SignalSender {
314 pub fn new(signal_cli_path: String, sender_number: String) -> Self {
315 Self {
316 signal_cli_path,
317 sender_number,
318 }
319 }
320
321 pub fn from_env() -> Option<Self> {
322 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH")
323 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string());
324 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?;
325 Some(Self::new(signal_cli_path, sender_number))
326 }
327}
328
329#[async_trait]
330impl CommsSender for SignalSender {
331 fn channel(&self) -> CommsChannel {
332 CommsChannel::Signal
333 }
334
335 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
336 let recipient = ¬ification.recipient;
337 if !is_valid_phone_number(recipient) {
338 return Err(SendError::InvalidRecipient(format!(
339 "Invalid phone number format: {}",
340 recipient
341 )));
342 }
343 let subject = notification.subject.as_deref().unwrap_or("Notification");
344 let message = format!("{}\n\n{}", subject, notification.body);
345 let output = Command::new(&self.signal_cli_path)
346 .arg("-u")
347 .arg(&self.sender_number)
348 .arg("send")
349 .arg("-m")
350 .arg(&message)
351 .arg(recipient)
352 .output()
353 .await?;
354 if !output.status.success() {
355 let stderr = String::from_utf8_lossy(&output.stderr);
356 return Err(SendError::ExternalService(format!(
357 "signal-cli failed: {}",
358 stderr
359 )));
360 }
361 Ok(())
362 }
363}