this repo has no description
1use async_trait::async_trait;
2use reqwest::Client;
3use serde_json::json;
4use std::process::Stdio;
5use std::time::Duration;
6use tokio::io::AsyncWriteExt;
7use tokio::process::Command;
8
9use super::types::{CommsChannel, QueuedComms};
10
11const HTTP_TIMEOUT_SECS: u64 = 30;
12const MAX_RETRIES: u32 = 3;
13const INITIAL_RETRY_DELAY_MS: u64 = 500;
14
15#[async_trait]
16pub trait CommsSender: Send + Sync {
17 fn channel(&self) -> CommsChannel;
18 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError>;
19}
20
21#[derive(Debug, thiserror::Error)]
22pub enum SendError {
23 #[error("Failed to spawn sendmail process: {0}")]
24 ProcessSpawn(#[from] std::io::Error),
25 #[error("Sendmail exited with non-zero status: {0}")]
26 SendmailFailed(String),
27 #[error("Channel not configured: {0:?}")]
28 NotConfigured(CommsChannel),
29 #[error("External service error: {0}")]
30 ExternalService(String),
31 #[error("Invalid recipient format: {0}")]
32 InvalidRecipient(String),
33 #[error("Request timeout")]
34 Timeout,
35 #[error("Max retries exceeded: {0}")]
36 MaxRetriesExceeded(String),
37}
38
39fn create_http_client() -> Client {
40 Client::builder()
41 .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS))
42 .connect_timeout(Duration::from_secs(10))
43 .build()
44 .unwrap_or_else(|_| Client::new())
45}
46
47fn is_retryable_status(status: reqwest::StatusCode) -> bool {
48 status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS
49}
50
51async fn retry_delay(attempt: u32) {
52 let delay_ms = INITIAL_RETRY_DELAY_MS * 2u64.pow(attempt);
53 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
54}
55
56pub fn sanitize_header_value(value: &str) -> String {
57 value.replace(['\r', '\n'], " ").trim().to_string()
58}
59
60pub fn is_valid_phone_number(number: &str) -> bool {
61 if number.len() < 2 || number.len() > 20 {
62 return false;
63 }
64 let mut chars = number.chars();
65 if chars.next() != Some('+') {
66 return false;
67 }
68 let remaining: String = chars.collect();
69 !remaining.is_empty() && remaining.chars().all(|c| c.is_ascii_digit())
70}
71
72pub struct EmailSender {
73 from_address: String,
74 from_name: String,
75 sendmail_path: String,
76}
77
78impl EmailSender {
79 pub fn new(from_address: String, from_name: String) -> Self {
80 Self {
81 from_address,
82 from_name,
83 sendmail_path: std::env::var("SENDMAIL_PATH")
84 .unwrap_or_else(|_| "/usr/sbin/sendmail".to_string()),
85 }
86 }
87
88 pub fn from_env() -> Option<Self> {
89 let from_address = std::env::var("MAIL_FROM_ADDRESS").ok()?;
90 let from_name = std::env::var("MAIL_FROM_NAME").unwrap_or_else(|_| "BSPDS".to_string());
91 Some(Self::new(from_address, from_name))
92 }
93
94 pub fn format_email(&self, notification: &QueuedComms) -> String {
95 let subject =
96 sanitize_header_value(notification.subject.as_deref().unwrap_or("Notification"));
97 let recipient = sanitize_header_value(¬ification.recipient);
98 let from_header = if self.from_name.is_empty() {
99 self.from_address.clone()
100 } else {
101 format!(
102 "{} <{}>",
103 sanitize_header_value(&self.from_name),
104 self.from_address
105 )
106 };
107 format!(
108 "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}",
109 from_header, recipient, subject, notification.body
110 )
111 }
112}
113
114#[async_trait]
115impl CommsSender for EmailSender {
116 fn channel(&self) -> CommsChannel {
117 CommsChannel::Email
118 }
119
120 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
121 let email_content = self.format_email(notification);
122 let mut child = Command::new(&self.sendmail_path)
123 .arg("-t")
124 .arg("-oi")
125 .stdin(Stdio::piped())
126 .stdout(Stdio::piped())
127 .stderr(Stdio::piped())
128 .spawn()?;
129 if let Some(mut stdin) = child.stdin.take() {
130 stdin.write_all(email_content.as_bytes()).await?;
131 }
132 let output = child.wait_with_output().await?;
133 if !output.status.success() {
134 let stderr = String::from_utf8_lossy(&output.stderr);
135 return Err(SendError::SendmailFailed(stderr.to_string()));
136 }
137 Ok(())
138 }
139}
140
141pub struct DiscordSender {
142 webhook_url: String,
143 http_client: Client,
144}
145
146impl DiscordSender {
147 pub fn new(webhook_url: String) -> Self {
148 Self {
149 webhook_url,
150 http_client: create_http_client(),
151 }
152 }
153
154 pub fn from_env() -> Option<Self> {
155 let webhook_url = std::env::var("DISCORD_WEBHOOK_URL").ok()?;
156 Some(Self::new(webhook_url))
157 }
158}
159
160#[async_trait]
161impl CommsSender for DiscordSender {
162 fn channel(&self) -> CommsChannel {
163 CommsChannel::Discord
164 }
165
166 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
167 let subject = notification.subject.as_deref().unwrap_or("Notification");
168 let content = format!("**{}**\n\n{}", subject, notification.body);
169 let payload = json!({
170 "content": content,
171 "username": "BSPDS"
172 });
173 let mut last_error = None;
174 for attempt in 0..MAX_RETRIES {
175 let result = self
176 .http_client
177 .post(&self.webhook_url)
178 .json(&payload)
179 .send()
180 .await;
181 match result {
182 Ok(response) => {
183 if response.status().is_success() {
184 return Ok(());
185 }
186 let status = response.status();
187 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
188 last_error = Some(format!("Discord webhook returned {}", status));
189 retry_delay(attempt).await;
190 continue;
191 }
192 let body = response.text().await.unwrap_or_default();
193 return Err(SendError::ExternalService(format!(
194 "Discord webhook returned {}: {}",
195 status, body
196 )));
197 }
198 Err(e) => {
199 if e.is_timeout() {
200 if attempt < MAX_RETRIES - 1 {
201 last_error = Some("Discord request timed out".to_string());
202 retry_delay(attempt).await;
203 continue;
204 }
205 return Err(SendError::Timeout);
206 }
207 return Err(SendError::ExternalService(format!(
208 "Discord request failed: {}",
209 e
210 )));
211 }
212 }
213 }
214 Err(SendError::MaxRetriesExceeded(
215 last_error.unwrap_or_else(|| "Unknown error".to_string()),
216 ))
217 }
218}
219
220pub struct TelegramSender {
221 bot_token: String,
222 http_client: Client,
223}
224
225impl TelegramSender {
226 pub fn new(bot_token: String) -> Self {
227 Self {
228 bot_token,
229 http_client: create_http_client(),
230 }
231 }
232
233 pub fn from_env() -> Option<Self> {
234 let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok()?;
235 Some(Self::new(bot_token))
236 }
237}
238
239#[async_trait]
240impl CommsSender for TelegramSender {
241 fn channel(&self) -> CommsChannel {
242 CommsChannel::Telegram
243 }
244
245 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
246 let chat_id = ¬ification.recipient;
247 let subject = notification.subject.as_deref().unwrap_or("Notification");
248 let text = format!("*{}*\n\n{}", subject, notification.body);
249 let url = format!("https://api.telegram.org/bot{}/sendMessage", self.bot_token);
250 let payload = json!({
251 "chat_id": chat_id,
252 "text": text,
253 "parse_mode": "Markdown"
254 });
255 let mut last_error = None;
256 for attempt in 0..MAX_RETRIES {
257 let result = self.http_client.post(&url).json(&payload).send().await;
258 match result {
259 Ok(response) => {
260 if response.status().is_success() {
261 return Ok(());
262 }
263 let status = response.status();
264 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
265 last_error = Some(format!("Telegram API returned {}", status));
266 retry_delay(attempt).await;
267 continue;
268 }
269 let body = response.text().await.unwrap_or_default();
270 return Err(SendError::ExternalService(format!(
271 "Telegram API returned {}: {}",
272 status, body
273 )));
274 }
275 Err(e) => {
276 if e.is_timeout() {
277 if attempt < MAX_RETRIES - 1 {
278 last_error = Some("Telegram request timed out".to_string());
279 retry_delay(attempt).await;
280 continue;
281 }
282 return Err(SendError::Timeout);
283 }
284 return Err(SendError::ExternalService(format!(
285 "Telegram request failed: {}",
286 e
287 )));
288 }
289 }
290 }
291 Err(SendError::MaxRetriesExceeded(
292 last_error.unwrap_or_else(|| "Unknown error".to_string()),
293 ))
294 }
295}
296
297pub struct SignalSender {
298 signal_cli_path: String,
299 sender_number: String,
300}
301
302impl SignalSender {
303 pub fn new(signal_cli_path: String, sender_number: String) -> Self {
304 Self {
305 signal_cli_path,
306 sender_number,
307 }
308 }
309
310 pub fn from_env() -> Option<Self> {
311 let signal_cli_path = std::env::var("SIGNAL_CLI_PATH")
312 .unwrap_or_else(|_| "/usr/local/bin/signal-cli".to_string());
313 let sender_number = std::env::var("SIGNAL_SENDER_NUMBER").ok()?;
314 Some(Self::new(signal_cli_path, sender_number))
315 }
316}
317
318#[async_trait]
319impl CommsSender for SignalSender {
320 fn channel(&self) -> CommsChannel {
321 CommsChannel::Signal
322 }
323
324 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
325 let recipient = ¬ification.recipient;
326 if !is_valid_phone_number(recipient) {
327 return Err(SendError::InvalidRecipient(format!(
328 "Invalid phone number format: {}",
329 recipient
330 )));
331 }
332 let subject = notification.subject.as_deref().unwrap_or("Notification");
333 let message = format!("{}\n\n{}", subject, notification.body);
334 let output = Command::new(&self.signal_cli_path)
335 .arg("-u")
336 .arg(&self.sender_number)
337 .arg("send")
338 .arg("-m")
339 .arg(&message)
340 .arg(recipient)
341 .output()
342 .await?;
343 if !output.status.success() {
344 let stderr = String::from_utf8_lossy(&output.stderr);
345 return Err(SendError::ExternalService(format!(
346 "signal-cli failed: {}",
347 stderr
348 )));
349 }
350 Ok(())
351 }
352}