this repo has no description
1use async_trait::async_trait;
2use reqwest::Client;
3use serde_json::json;
4use std::process::Stdio;
5use std::time::Duration;
6use tokio::io::AsyncWriteExt;
7use tokio::process::Command;
8
9use super::types::{CommsChannel, QueuedComms};
10
11const HTTP_TIMEOUT_SECS: u64 = 30;
12const MAX_RETRIES: u32 = 3;
13const INITIAL_RETRY_DELAY_MS: u64 = 500;
14
15#[async_trait]
16pub trait CommsSender: Send + Sync {
17 fn channel(&self) -> CommsChannel;
18 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError>;
19}
20
21#[derive(Debug, thiserror::Error)]
22pub enum SendError {
23 #[error("Failed to spawn sendmail process: {0}")]
24 ProcessSpawn(#[from] std::io::Error),
25 #[error("Sendmail exited with non-zero status: {0}")]
26 SendmailFailed(String),
27 #[error("Channel not configured: {0:?}")]
28 NotConfigured(CommsChannel),
29 #[error("External service error: {0}")]
30 ExternalService(String),
31 #[error("Invalid recipient format: {0}")]
32 InvalidRecipient(String),
33 #[error("Request timeout")]
34 Timeout,
35 #[error("Max retries exceeded: {0}")]
36 MaxRetriesExceeded(String),
37}
38
39fn create_http_client() -> Client {
40 Client::builder()
41 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS))
42 .connect_timeout(Duration::from_secs(10))
43 .build()
44 .unwrap_or_else(|_| Client::new())
45}
46
47fn is_retryable_status(status: reqwest::StatusCode) -> bool {
48 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS
49}
50
51async fn retry_delay(attempt: u32) {
52 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt);
53 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
54}
55
56pub fn sanitize_header_value(value: &str) -> String {
57 value.replace(['\r', '\n'], " ").trim().to_string()
58}
59
60pub fn is_valid_phone_number(number: &str) -> bool {
61 if number.len() < 2 || number.len() > 20 {
62 return false;
63 }
64 let mut chars = number.chars();
65 if chars.next() != Some('+') {
66 return false;
67 }
68 let remaining: String = chars.collect();
69 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit())
70}
71
72pub struct EmailSender {
73 from_address: String,
74 from_name: String,
75 sendmail_path: String,
76}
77
78impl EmailSender {
79 pub fn new(from_address: String, from_name: String) -> Self {
80 Self {
81 from_address,
82 from_name,
83 sendmail_path: std::env::var("SENDMAIL_PATH")
84 .unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()),
85 }
86 }
87
88 pub fn from_env() -> Option<Self> {
89 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?;
90 let from_name =
91 std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "Tranquil PDS".to_string());
92 Some(Self::new(from_address, from_name))
93 }
94
95 pub fn format_email(&self, notification: &QueuedComms) -> String {
96 let subject =
97 sanitize_header_value(notification.subject.as_deref().unwrap_or("Notification"));
98 let recipient = sanitize_header_value(¬ification.recipient);
99 let from_header = if self.from_name.is_empty() {
100 self.from_address.clone()
101 } else {
102 format!(
103 "{} <{}>",
104 sanitize_header_value(&self.from_name),
105 self.from_address
106 )
107 };
108 format!(
109 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}",
110 from_header, recipient, subject, notification.body
111 )
112 }
113}
114
115#[async_trait]
116impl CommsSender for EmailSender {
117 fn channel(&self) -> CommsChannel {
118 CommsChannel::Email
119 }
120
121 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
122 let email_content = self.format_email(notification);
123 let mut child = Command::new(&self.sendmail_path)
124 .arg("-t")
125 .arg("-oi")
126 .stdin(Stdio::piped())
127 .stdout(Stdio::piped())
128 .stderr(Stdio::piped())
129 .spawn()?;
130 if let Some(mut stdin) = child.stdin.take() {
131 stdin.write_all(email_content.as_bytes()).await?;
132 }
133 let output = child.wait_with_output().await?;
134 if !output.status.success() {
135 let stderr = String::from_utf8_lossy(&output.stderr);
136 return Err(SendError::SendmailFailed(stderr.to_string()));
137 }
138 Ok(())
139 }
140}
141
142pub struct DiscordSender {
143 webhook_url: String,
144 http_client: Client,
145}
146
147impl DiscordSender {
148 pub fn new(webhook_url: String) -> Self {
149 Self {
150 webhook_url,
151 http_client: create_http_client(),
152 }
153 }
154
155 pub fn from_env() -> Option<Self> {
156 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?;
157 Some(Self::new(webhook_url))
158 }
159}
160
161#[async_trait]
162impl CommsSender for DiscordSender {
163 fn channel(&self) -> CommsChannel {
164 CommsChannel::Discord
165 }
166
167 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
168 let subject = notification.subject.as_deref().unwrap_or("Notification");
169 let content = format!("**{}**\n\n{}", subject, notification.body);
170 let payload = json!({
171 "content": content,
172 "username": "Tranquil PDS"
173 });
174 let mut last_error = None;
175 for attempt in 0..MAX_RETRIES {
176 let result = self
177 .http_client
178 .post(&self.webhook_url)
179 .json(&payload)
180 .send()
181 .await;
182 match result {
183 Ok(response) => {
184 if response.status().is_success() {
185 return Ok(());
186 }
187 let status = response.status();
188 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
189 last_error = Some(format!("Discord webhook returned {}", status));
190 retry_delay(attempt).await;
191 continue;
192 }
193 let body = response.text().await.unwrap_or_default();
194 return Err(SendError::ExternalService(format!(
195 "Discord webhook returned {}: {}",
196 status, body
197 )));
198 }
199 Err(e) => {
200 if e.is_timeout() {
201 if attempt < MAX_RETRIES - 1 {
202 last_error = Some("Discord request timed out".to_string());
203 retry_delay(attempt).await;
204 continue;
205 }
206 return Err(SendError::Timeout);
207 }
208 return Err(SendError::ExternalService(format!(
209 "Discord request failed: {}",
210 e
211 )));
212 }
213 }
214 }
215 Err(SendError::MaxRetriesExceeded(
216 last_error.unwrap_or_else(|| "Unknown error".to_string()),
217 ))
218 }
219}
220
221pub struct TelegramSender {
222 bot_token: String,
223 http_client: Client,
224}
225
226impl TelegramSender {
227 pub fn new(bot_token: String) -> Self {
228 Self {
229 bot_token,
230 http_client: create_http_client(),
231 }
232 }
233
234 pub fn from_env() -> Option<Self> {
235 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?;
236 Some(Self::new(bot_token))
237 }
238}
239
240#[async_trait]
241impl CommsSender for TelegramSender {
242 fn channel(&self) -> CommsChannel {
243 CommsChannel::Telegram
244 }
245
246 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
247 let chat_id = ¬ification.recipient;
248 let subject = notification.subject.as_deref().unwrap_or("Notification");
249 let text = format!("*{}*\n\n{}", subject, notification.body);
250 let url = format!("https://api.telegram.org/bot{}/sendMessage", self.bot_token);
251 let payload = json!({
252 "chat_id": chat_id,
253 "text": text,
254 "parse_mode": "Markdown"
255 });
256 let mut last_error = None;
257 for attempt in 0..MAX_RETRIES {
258 let result = self.http_client.post(&url).json(&payload).send().await;
259 match result {
260 Ok(response) => {
261 if response.status().is_success() {
262 return Ok(());
263 }
264 let status = response.status();
265 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
266 last_error = Some(format!("Telegram API returned {}", status));
267 retry_delay(attempt).await;
268 continue;
269 }
270 let body = response.text().await.unwrap_or_default();
271 return Err(SendError::ExternalService(format!(
272 "Telegram API returned {}: {}",
273 status, body
274 )));
275 }
276 Err(e) => {
277 if e.is_timeout() {
278 if attempt < MAX_RETRIES - 1 {
279 last_error = Some("Telegram request timed out".to_string());
280 retry_delay(attempt).await;
281 continue;
282 }
283 return Err(SendError::Timeout);
284 }
285 return Err(SendError::ExternalService(format!(
286 "Telegram request failed: {}",
287 e
288 )));
289 }
290 }
291 }
292 Err(SendError::MaxRetriesExceeded(
293 last_error.unwrap_or_else(|| "Unknown error".to_string()),
294 ))
295 }
296}
297
298pub struct SignalSender {
299 signal_cli_path: String,
300 sender_number: String,
301}
302
303impl SignalSender {
304 pub fn new(signal_cli_path: String, sender_number: String) -> Self {
305 Self {
306 signal_cli_path,
307 sender_number,
308 }
309 }
310
311 pub fn from_env() -> Option<Self> {
312 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH")
313 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string());
314 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?;
315 Some(Self::new(signal_cli_path, sender_number))
316 }
317}
318
319#[async_trait]
320impl CommsSender for SignalSender {
321 fn channel(&self) -> CommsChannel {
322 CommsChannel::Signal
323 }
324
325 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
326 let recipient = ¬ification.recipient;
327 if !is_valid_phone_number(recipient) {
328 return Err(SendError::InvalidRecipient(format!(
329 "Invalid phone number format: {}",
330 recipient
331 )));
332 }
333 let subject = notification.subject.as_deref().unwrap_or("Notification");
334 let message = format!("{}\n\n{}", subject, notification.body);
335 let output = Command::new(&self.signal_cli_path)
336 .arg("-u")
337 .arg(&self.sender_number)
338 .arg("send")
339 .arg("-m")
340 .arg(&message)
341 .arg(recipient)
342 .output()
343 .await?;
344 if !output.status.success() {
345 let stderr = String::from_utf8_lossy(&output.stderr);
346 return Err(SendError::ExternalService(format!(
347 "signal-cli failed: {}",
348 stderr
349 )));
350 }
351 Ok(())
352 }
353}