this repo has no description
1use crate::auth::validate_bearer_token;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::State,
6 http::{HeaderMap, StatusCode},
7 response::{IntoResponse, Response},
8};
9use chrono::{Duration, Utc};
10use rand::Rng;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use sqlx::Row;
14use tracing::info;
15
16fn generate_verification_code() -> String {
17 rand::thread_rng()
18 .sample_iter(&rand::distributions::Uniform::new(0, 10))
19 .take(6)
20 .map(|x| x.to_string())
21 .collect()
22}
23
24#[derive(Serialize)]
25#[serde(rename_all = "camelCase")]
26pub struct NotificationPrefsResponse {
27 pub preferred_channel: String,
28 pub email: String,
29 pub discord_id: Option<String>,
30 pub discord_verified: bool,
31 pub telegram_username: Option<String>,
32 pub telegram_verified: bool,
33 pub signal_number: Option<String>,
34 pub signal_verified: bool,
35}
36
37pub async fn get_notification_prefs(State(state): State<AppState>, headers: HeaderMap) -> Response {
38 let token = match crate::auth::extract_bearer_token_from_header(
39 headers.get("Authorization").and_then(|h| h.to_str().ok()),
40 ) {
41 Some(t) => t,
42 None => return (
43 StatusCode::UNAUTHORIZED,
44 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})),
45 )
46 .into_response(),
47 };
48 let user = match validate_bearer_token(&state.db, &token).await {
49 Ok(u) => u,
50 Err(_) => {
51 return (
52 StatusCode::UNAUTHORIZED,
53 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})),
54 )
55 .into_response();
56 }
57 };
58 let row =
59 match sqlx::query(
60 r#"
61 SELECT
62 email,
63 preferred_comms_channel::text as channel,
64 discord_id,
65 discord_verified,
66 telegram_username,
67 telegram_verified,
68 signal_number,
69 signal_verified
70 FROM users
71 WHERE did = $1
72 "#,
73 )
74 .bind(&user.did)
75 .fetch_one(&state.db)
76 .await
77 {
78 Ok(r) => r,
79 Err(e) => return (
80 StatusCode::INTERNAL_SERVER_ERROR,
81 Json(
82 json!({"error": "InternalError", "message": format!("Database error: {}", e)}),
83 ),
84 )
85 .into_response(),
86 };
87 let email: String = row.get("email");
88 let channel: String = row.get("channel");
89 let discord_id: Option<String> = row.get("discord_id");
90 let discord_verified: bool = row.get("discord_verified");
91 let telegram_username: Option<String> = row.get("telegram_username");
92 let telegram_verified: bool = row.get("telegram_verified");
93 let signal_number: Option<String> = row.get("signal_number");
94 let signal_verified: bool = row.get("signal_verified");
95 Json(NotificationPrefsResponse {
96 preferred_channel: channel,
97 email,
98 discord_id,
99 discord_verified,
100 telegram_username,
101 telegram_verified,
102 signal_number,
103 signal_verified,
104 })
105 .into_response()
106}
107
108#[derive(Serialize)]
109#[serde(rename_all = "camelCase")]
110pub struct NotificationHistoryEntry {
111 pub created_at: String,
112 pub channel: String,
113 pub comms_type: String,
114 pub status: String,
115 pub subject: Option<String>,
116 pub body: String,
117}
118
119#[derive(Serialize)]
120#[serde(rename_all = "camelCase")]
121pub struct GetNotificationHistoryResponse {
122 pub notifications: Vec<NotificationHistoryEntry>,
123}
124
125pub async fn get_notification_history(
126 State(state): State<AppState>,
127 headers: HeaderMap,
128) -> Response {
129 let token = match crate::auth::extract_bearer_token_from_header(
130 headers.get("Authorization").and_then(|h| h.to_str().ok()),
131 ) {
132 Some(t) => t,
133 None => return (
134 StatusCode::UNAUTHORIZED,
135 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})),
136 )
137 .into_response(),
138 };
139 let user = match validate_bearer_token(&state.db, &token).await {
140 Ok(u) => u,
141 Err(_) => {
142 return (
143 StatusCode::UNAUTHORIZED,
144 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})),
145 )
146 .into_response();
147 }
148 };
149
150 let user_id: uuid::Uuid =
151 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", user.did)
152 .fetch_one(&state.db)
153 .await
154 {
155 Ok(id) => id,
156 Err(e) => return (
157 StatusCode::INTERNAL_SERVER_ERROR,
158 Json(
159 json!({"error": "InternalError", "message": format!("Database error: {}", e)}),
160 ),
161 )
162 .into_response(),
163 };
164
165 let rows =
166 match sqlx::query!(
167 r#"
168 SELECT
169 created_at,
170 channel as "channel: String",
171 comms_type as "comms_type: String",
172 status as "status: String",
173 subject,
174 body
175 FROM comms_queue
176 WHERE user_id = $1
177 ORDER BY created_at DESC
178 LIMIT 50
179 "#,
180 user_id
181 )
182 .fetch_all(&state.db)
183 .await
184 {
185 Ok(r) => r,
186 Err(e) => return (
187 StatusCode::INTERNAL_SERVER_ERROR,
188 Json(
189 json!({"error": "InternalError", "message": format!("Database error: {}", e)}),
190 ),
191 )
192 .into_response(),
193 };
194
195 let notifications = rows
196 .iter()
197 .map(|row| NotificationHistoryEntry {
198 created_at: row.created_at.to_rfc3339(),
199 channel: row.channel.clone(),
200 comms_type: row.comms_type.clone(),
201 status: row.status.clone(),
202 subject: row.subject.clone(),
203 body: row.body.clone(),
204 })
205 .collect();
206
207 Json(GetNotificationHistoryResponse { notifications }).into_response()
208}
209
210#[derive(Deserialize)]
211#[serde(rename_all = "camelCase")]
212pub struct UpdateNotificationPrefsInput {
213 pub preferred_channel: Option<String>,
214 pub email: Option<String>,
215 pub discord_id: Option<String>,
216 pub telegram_username: Option<String>,
217 pub signal_number: Option<String>,
218}
219
220#[derive(Serialize)]
221#[serde(rename_all = "camelCase")]
222pub struct UpdateNotificationPrefsResponse {
223 pub success: bool,
224 #[serde(skip_serializing_if = "Vec::is_empty")]
225 pub verification_required: Vec<String>,
226}
227
228pub async fn request_channel_verification(
229 db: &sqlx::PgPool,
230 user_id: uuid::Uuid,
231 channel: &str,
232 identifier: &str,
233 handle: Option<&str>,
234) -> Result<String, String> {
235 let code = generate_verification_code();
236 let expires_at = Utc::now() + Duration::minutes(10);
237
238 sqlx::query!(
239 r#"
240 INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at)
241 VALUES ($1, $2::comms_channel, $3, $4, $5)
242 ON CONFLICT (user_id, channel) DO UPDATE
243 SET code = $3, pending_identifier = $4, expires_at = $5, created_at = NOW()
244 "#,
245 user_id,
246 channel as _,
247 code,
248 identifier,
249 expires_at
250 )
251 .execute(db)
252 .await
253 .map_err(|e| format!("Database error: {}", e))?;
254
255 if channel == "email" {
256 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
257 let handle_str = handle.unwrap_or("user");
258 crate::comms::enqueue_email_update(db, user_id, identifier, handle_str, &code, &hostname)
259 .await
260 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?;
261 } else {
262 sqlx::query!(
263 r#"
264 INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body, metadata)
265 VALUES ($1, $2::comms_channel, 'channel_verification', $3, 'Verify your channel', $4, $5)
266 "#,
267 user_id,
268 channel as _,
269 identifier,
270 format!("Your verification code is: {}", code),
271 json!({"code": code})
272 )
273 .execute(db)
274 .await
275 .map_err(|e| format!("Failed to enqueue notification: {}", e))?;
276 }
277
278 Ok(code)
279}
280
281pub async fn update_notification_prefs(
282 State(state): State<AppState>,
283 headers: HeaderMap,
284 Json(input): Json<UpdateNotificationPrefsInput>,
285) -> Response {
286 let token = match crate::auth::extract_bearer_token_from_header(
287 headers.get("Authorization").and_then(|h| h.to_str().ok()),
288 ) {
289 Some(t) => t,
290 None => return (
291 StatusCode::UNAUTHORIZED,
292 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})),
293 )
294 .into_response(),
295 };
296 let user = match validate_bearer_token(&state.db, &token).await {
297 Ok(u) => u,
298 Err(_) => {
299 return (
300 StatusCode::UNAUTHORIZED,
301 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})),
302 )
303 .into_response();
304 }
305 };
306
307 let user_row =
308 match sqlx::query!(
309 "SELECT id, handle, email FROM users WHERE did = $1",
310 user.did
311 )
312 .fetch_one(&state.db)
313 .await
314 {
315 Ok(row) => row,
316 Err(e) => return (
317 StatusCode::INTERNAL_SERVER_ERROR,
318 Json(
319 json!({"error": "InternalError", "message": format!("Database error: {}", e)}),
320 ),
321 )
322 .into_response(),
323 };
324
325 let user_id = user_row.id;
326 let handle = user_row.handle;
327 let current_email = user_row.email;
328
329 let mut verification_required: Vec<String> = Vec::new();
330
331 if let Some(ref channel) = input.preferred_channel {
332 let valid_channels = ["email", "discord", "telegram", "signal"];
333 if !valid_channels.contains(&channel.as_str()) {
334 return (
335 StatusCode::BAD_REQUEST,
336 Json(json!({
337 "error": "InvalidRequest",
338 "message": "Invalid channel. Must be one of: email, discord, telegram, signal"
339 })),
340 )
341 .into_response();
342 }
343 if let Err(e) = sqlx::query(
344 r#"UPDATE users SET preferred_comms_channel = $1::comms_channel, updated_at = NOW() WHERE did = $2"#
345 )
346 .bind(channel)
347 .bind(&user.did)
348 .execute(&state.db)
349 .await
350 {
351 return (
352 StatusCode::INTERNAL_SERVER_ERROR,
353 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
354 )
355 .into_response();
356 }
357 info!(did = %user.did, channel = %channel, "Updated preferred notification channel");
358 }
359
360 if let Some(ref new_email) = input.email {
361 let email_clean = new_email.trim().to_lowercase();
362 if email_clean.is_empty() {
363 return (
364 StatusCode::BAD_REQUEST,
365 Json(json!({"error": "InvalidRequest", "message": "Email cannot be empty"})),
366 )
367 .into_response();
368 }
369
370 if !crate::api::validation::is_valid_email(&email_clean) {
371 return (
372 StatusCode::BAD_REQUEST,
373 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
374 )
375 .into_response();
376 }
377
378 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) {
379 info!(did = %user.did, "Email unchanged, skipping");
380 } else {
381 let exists = sqlx::query!(
382 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2",
383 email_clean,
384 user_id
385 )
386 .fetch_optional(&state.db)
387 .await;
388
389 if let Ok(Some(_)) = exists {
390 return (
391 StatusCode::BAD_REQUEST,
392 Json(json!({"error": "EmailTaken", "message": "Email already in use"})),
393 )
394 .into_response();
395 }
396
397 if let Err(e) = request_channel_verification(
398 &state.db,
399 user_id,
400 "email",
401 &email_clean,
402 Some(&handle),
403 )
404 .await
405 {
406 return (
407 StatusCode::INTERNAL_SERVER_ERROR,
408 Json(json!({"error": "InternalError", "message": e})),
409 )
410 .into_response();
411 }
412 verification_required.push("email".to_string());
413 info!(did = %user.did, "Requested email verification");
414 }
415 }
416
417 if let Some(ref discord_id) = input.discord_id {
418 if discord_id.is_empty() {
419 if let Err(e) = sqlx::query!(
420 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1",
421 user_id
422 )
423 .execute(&state.db)
424 .await
425 {
426 return (
427 StatusCode::INTERNAL_SERVER_ERROR,
428 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
429 )
430 .into_response();
431 }
432 let _ = sqlx::query!(
433 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'discord'",
434 user_id
435 )
436 .execute(&state.db)
437 .await;
438 info!(did = %user.did, "Cleared Discord ID");
439 } else {
440 if let Err(e) =
441 request_channel_verification(&state.db, user_id, "discord", discord_id, None).await
442 {
443 return (
444 StatusCode::INTERNAL_SERVER_ERROR,
445 Json(json!({"error": "InternalError", "message": e})),
446 )
447 .into_response();
448 }
449 verification_required.push("discord".to_string());
450 info!(did = %user.did, "Requested Discord verification");
451 }
452 }
453
454 if let Some(ref telegram) = input.telegram_username {
455 let telegram_clean = telegram.trim_start_matches('@');
456 if telegram_clean.is_empty() {
457 if let Err(e) = sqlx::query!(
458 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1",
459 user_id
460 )
461 .execute(&state.db)
462 .await
463 {
464 return (
465 StatusCode::INTERNAL_SERVER_ERROR,
466 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
467 )
468 .into_response();
469 }
470 let _ = sqlx::query!(
471 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'telegram'",
472 user_id
473 )
474 .execute(&state.db)
475 .await;
476 info!(did = %user.did, "Cleared Telegram username");
477 } else {
478 if let Err(e) =
479 request_channel_verification(&state.db, user_id, "telegram", telegram_clean, None)
480 .await
481 {
482 return (
483 StatusCode::INTERNAL_SERVER_ERROR,
484 Json(json!({"error": "InternalError", "message": e})),
485 )
486 .into_response();
487 }
488 verification_required.push("telegram".to_string());
489 info!(did = %user.did, "Requested Telegram verification");
490 }
491 }
492
493 if let Some(ref signal) = input.signal_number {
494 if signal.is_empty() {
495 if let Err(e) = sqlx::query!(
496 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1",
497 user_id
498 )
499 .execute(&state.db)
500 .await
501 {
502 return (
503 StatusCode::INTERNAL_SERVER_ERROR,
504 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
505 )
506 .into_response();
507 }
508 let _ = sqlx::query!(
509 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'signal'",
510 user_id
511 )
512 .execute(&state.db)
513 .await;
514 info!(did = %user.did, "Cleared Signal number");
515 } else {
516 if let Err(e) =
517 request_channel_verification(&state.db, user_id, "signal", signal, None).await
518 {
519 return (
520 StatusCode::INTERNAL_SERVER_ERROR,
521 Json(json!({"error": "InternalError", "message": e})),
522 )
523 .into_response();
524 }
525 verification_required.push("signal".to_string());
526 info!(did = %user.did, "Requested Signal verification");
527 }
528 }
529
530 Json(UpdateNotificationPrefsResponse {
531 success: true,
532 verification_required,
533 })
534 .into_response()
535}