this repo has no description
1use crate::auth::validate_bearer_token;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::State,
6 http::{HeaderMap, StatusCode},
7 response::{IntoResponse, Response},
8};
9use serde::{Deserialize, Serialize};
10use serde_json::json;
11use sqlx::Row;
12use tracing::info;
13
14#[derive(Serialize)]
15#[serde(rename_all = "camelCase")]
16pub struct NotificationPrefsResponse {
17 pub preferred_channel: String,
18 pub email: String,
19 pub discord_id: Option<String>,
20 pub discord_verified: bool,
21 pub telegram_username: Option<String>,
22 pub telegram_verified: bool,
23 pub signal_number: Option<String>,
24 pub signal_verified: bool,
25}
26
27pub async fn get_notification_prefs(State(state): State<AppState>, headers: HeaderMap) -> Response {
28 let token = match crate::auth::extract_bearer_token_from_header(
29 headers.get("Authorization").and_then(|h| h.to_str().ok()),
30 ) {
31 Some(t) => t,
32 None => return (
33 StatusCode::UNAUTHORIZED,
34 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})),
35 )
36 .into_response(),
37 };
38 let user = match validate_bearer_token(&state.db, &token).await {
39 Ok(u) => u,
40 Err(_) => {
41 return (
42 StatusCode::UNAUTHORIZED,
43 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})),
44 )
45 .into_response();
46 }
47 };
48 let row =
49 match sqlx::query(
50 r#"
51 SELECT
52 email,
53 preferred_comms_channel::text as channel,
54 discord_id,
55 discord_verified,
56 telegram_username,
57 telegram_verified,
58 signal_number,
59 signal_verified
60 FROM users
61 WHERE did = $1
62 "#,
63 )
64 .bind(&user.did)
65 .fetch_one(&state.db)
66 .await
67 {
68 Ok(r) => r,
69 Err(e) => return (
70 StatusCode::INTERNAL_SERVER_ERROR,
71 Json(
72 json!({"error": "InternalError", "message": format!("Database error: {}", e)}),
73 ),
74 )
75 .into_response(),
76 };
77 let email: String = row.get("email");
78 let channel: String = row.get("channel");
79 let discord_id: Option<String> = row.get("discord_id");
80 let discord_verified: bool = row.get("discord_verified");
81 let telegram_username: Option<String> = row.get("telegram_username");
82 let telegram_verified: bool = row.get("telegram_verified");
83 let signal_number: Option<String> = row.get("signal_number");
84 let signal_verified: bool = row.get("signal_verified");
85 Json(NotificationPrefsResponse {
86 preferred_channel: channel,
87 email,
88 discord_id,
89 discord_verified,
90 telegram_username,
91 telegram_verified,
92 signal_number,
93 signal_verified,
94 })
95 .into_response()
96}
97
98#[derive(Serialize)]
99#[serde(rename_all = "camelCase")]
100pub struct NotificationHistoryEntry {
101 pub created_at: String,
102 pub channel: String,
103 pub comms_type: String,
104 pub status: String,
105 pub subject: Option<String>,
106 pub body: String,
107}
108
109#[derive(Serialize)]
110#[serde(rename_all = "camelCase")]
111pub struct GetNotificationHistoryResponse {
112 pub notifications: Vec<NotificationHistoryEntry>,
113}
114
115pub async fn get_notification_history(
116 State(state): State<AppState>,
117 headers: HeaderMap,
118) -> Response {
119 let token = match crate::auth::extract_bearer_token_from_header(
120 headers.get("Authorization").and_then(|h| h.to_str().ok()),
121 ) {
122 Some(t) => t,
123 None => return (
124 StatusCode::UNAUTHORIZED,
125 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})),
126 )
127 .into_response(),
128 };
129 let user = match validate_bearer_token(&state.db, &token).await {
130 Ok(u) => u,
131 Err(_) => {
132 return (
133 StatusCode::UNAUTHORIZED,
134 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})),
135 )
136 .into_response();
137 }
138 };
139
140 let user_id: uuid::Uuid =
141 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", user.did)
142 .fetch_one(&state.db)
143 .await
144 {
145 Ok(id) => id,
146 Err(e) => return (
147 StatusCode::INTERNAL_SERVER_ERROR,
148 Json(
149 json!({"error": "InternalError", "message": format!("Database error: {}", e)}),
150 ),
151 )
152 .into_response(),
153 };
154
155 let rows =
156 match sqlx::query!(
157 r#"
158 SELECT
159 created_at,
160 channel as "channel: String",
161 comms_type as "comms_type: String",
162 status as "status: String",
163 subject,
164 body
165 FROM comms_queue
166 WHERE user_id = $1
167 ORDER BY created_at DESC
168 LIMIT 50
169 "#,
170 user_id
171 )
172 .fetch_all(&state.db)
173 .await
174 {
175 Ok(r) => r,
176 Err(e) => return (
177 StatusCode::INTERNAL_SERVER_ERROR,
178 Json(
179 json!({"error": "InternalError", "message": format!("Database error: {}", e)}),
180 ),
181 )
182 .into_response(),
183 };
184
185 let sensitive_types = [
186 "email_verification",
187 "password_reset",
188 "email_update",
189 "two_factor_code",
190 "passkey_recovery",
191 "migration_verification",
192 "plc_operation",
193 "channel_verification",
194 "signup_verification",
195 ];
196
197 let notifications = rows
198 .iter()
199 .map(|row| {
200 let body = if sensitive_types.contains(&row.comms_type.as_str()) {
201 "[Code redacted for security]".to_string()
202 } else {
203 row.body.clone()
204 };
205 NotificationHistoryEntry {
206 created_at: row.created_at.to_rfc3339(),
207 channel: row.channel.clone(),
208 comms_type: row.comms_type.clone(),
209 status: row.status.clone(),
210 subject: row.subject.clone(),
211 body,
212 }
213 })
214 .collect();
215
216 Json(GetNotificationHistoryResponse { notifications }).into_response()
217}
218
219#[derive(Deserialize)]
220#[serde(rename_all = "camelCase")]
221pub struct UpdateNotificationPrefsInput {
222 pub preferred_channel: Option<String>,
223 pub email: Option<String>,
224 pub discord_id: Option<String>,
225 pub telegram_username: Option<String>,
226 pub signal_number: Option<String>,
227}
228
229#[derive(Serialize)]
230#[serde(rename_all = "camelCase")]
231pub struct UpdateNotificationPrefsResponse {
232 pub success: bool,
233 #[serde(skip_serializing_if = "Vec::is_empty")]
234 pub verification_required: Vec<String>,
235}
236
237pub async fn request_channel_verification(
238 db: &sqlx::PgPool,
239 user_id: uuid::Uuid,
240 did: &str,
241 channel: &str,
242 identifier: &str,
243 handle: Option<&str>,
244) -> Result<String, String> {
245 let token =
246 crate::auth::verification_token::generate_channel_update_token(did, channel, identifier);
247 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
248
249 if channel == "email" {
250 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
251 let handle_str = handle.unwrap_or("user");
252 crate::comms::enqueue_email_update(
253 db,
254 user_id,
255 identifier,
256 handle_str,
257 &formatted_token,
258 &hostname,
259 )
260 .await
261 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?;
262 } else {
263 sqlx::query!(
264 r#"
265 INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body, metadata)
266 VALUES ($1, $2::comms_channel, 'channel_verification', $3, 'Verify your channel', $4, $5)
267 "#,
268 user_id,
269 channel as _,
270 identifier,
271 format!("Your verification code is: {}", formatted_token),
272 json!({"code": formatted_token})
273 )
274 .execute(db)
275 .await
276 .map_err(|e| format!("Failed to enqueue notification: {}", e))?;
277 }
278
279 Ok(token)
280}
281
282pub async fn update_notification_prefs(
283 State(state): State<AppState>,
284 headers: HeaderMap,
285 Json(input): Json<UpdateNotificationPrefsInput>,
286) -> Response {
287 let token = match crate::auth::extract_bearer_token_from_header(
288 headers.get("Authorization").and_then(|h| h.to_str().ok()),
289 ) {
290 Some(t) => t,
291 None => return (
292 StatusCode::UNAUTHORIZED,
293 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})),
294 )
295 .into_response(),
296 };
297 let user = match validate_bearer_token(&state.db, &token).await {
298 Ok(u) => u,
299 Err(_) => {
300 return (
301 StatusCode::UNAUTHORIZED,
302 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})),
303 )
304 .into_response();
305 }
306 };
307
308 let user_row =
309 match sqlx::query!(
310 "SELECT id, handle, email FROM users WHERE did = $1",
311 user.did
312 )
313 .fetch_one(&state.db)
314 .await
315 {
316 Ok(row) => row,
317 Err(e) => return (
318 StatusCode::INTERNAL_SERVER_ERROR,
319 Json(
320 json!({"error": "InternalError", "message": format!("Database error: {}", e)}),
321 ),
322 )
323 .into_response(),
324 };
325
326 let user_id = user_row.id;
327 let handle = user_row.handle;
328 let current_email = user_row.email;
329
330 let mut verification_required: Vec<String> = Vec::new();
331
332 if let Some(ref channel) = input.preferred_channel {
333 let valid_channels = ["email", "discord", "telegram", "signal"];
334 if !valid_channels.contains(&channel.as_str()) {
335 return (
336 StatusCode::BAD_REQUEST,
337 Json(json!({
338 "error": "InvalidRequest",
339 "message": "Invalid channel. Must be one of: email, discord, telegram, signal"
340 })),
341 )
342 .into_response();
343 }
344 if let Err(e) = sqlx::query(
345 r#"UPDATE users SET preferred_comms_channel = $1::comms_channel, updated_at = NOW() WHERE did = $2"#
346 )
347 .bind(channel)
348 .bind(&user.did)
349 .execute(&state.db)
350 .await
351 {
352 return (
353 StatusCode::INTERNAL_SERVER_ERROR,
354 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
355 )
356 .into_response();
357 }
358 info!(did = %user.did, channel = %channel, "Updated preferred notification channel");
359 }
360
361 if let Some(ref new_email) = input.email {
362 let email_clean = new_email.trim().to_lowercase();
363 if email_clean.is_empty() {
364 return (
365 StatusCode::BAD_REQUEST,
366 Json(json!({"error": "InvalidRequest", "message": "Email cannot be empty"})),
367 )
368 .into_response();
369 }
370
371 if !crate::api::validation::is_valid_email(&email_clean) {
372 return (
373 StatusCode::BAD_REQUEST,
374 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
375 )
376 .into_response();
377 }
378
379 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) {
380 info!(did = %user.did, "Email unchanged, skipping");
381 } else {
382 let exists = sqlx::query!(
383 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2",
384 email_clean,
385 user_id
386 )
387 .fetch_optional(&state.db)
388 .await;
389
390 if let Ok(Some(_)) = exists {
391 return (
392 StatusCode::BAD_REQUEST,
393 Json(json!({"error": "EmailTaken", "message": "Email already in use"})),
394 )
395 .into_response();
396 }
397
398 if let Err(e) = request_channel_verification(
399 &state.db,
400 user_id,
401 &user.did,
402 "email",
403 &email_clean,
404 Some(&handle),
405 )
406 .await
407 {
408 return (
409 StatusCode::INTERNAL_SERVER_ERROR,
410 Json(json!({"error": "InternalError", "message": e})),
411 )
412 .into_response();
413 }
414 verification_required.push("email".to_string());
415 info!(did = %user.did, "Requested email verification");
416 }
417 }
418
419 if let Some(ref discord_id) = input.discord_id {
420 if discord_id.is_empty() {
421 if let Err(e) = sqlx::query!(
422 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1",
423 user_id
424 )
425 .execute(&state.db)
426 .await
427 {
428 return (
429 StatusCode::INTERNAL_SERVER_ERROR,
430 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
431 )
432 .into_response();
433 }
434 info!(did = %user.did, "Cleared Discord ID");
435 } else {
436 if let Err(e) = request_channel_verification(
437 &state.db, user_id, &user.did, "discord", discord_id, None,
438 )
439 .await
440 {
441 return (
442 StatusCode::INTERNAL_SERVER_ERROR,
443 Json(json!({"error": "InternalError", "message": e})),
444 )
445 .into_response();
446 }
447 verification_required.push("discord".to_string());
448 info!(did = %user.did, "Requested Discord verification");
449 }
450 }
451
452 if let Some(ref telegram) = input.telegram_username {
453 let telegram_clean = telegram.trim_start_matches('@');
454 if telegram_clean.is_empty() {
455 if let Err(e) = sqlx::query!(
456 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1",
457 user_id
458 )
459 .execute(&state.db)
460 .await
461 {
462 return (
463 StatusCode::INTERNAL_SERVER_ERROR,
464 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
465 )
466 .into_response();
467 }
468 info!(did = %user.did, "Cleared Telegram username");
469 } else {
470 if let Err(e) = request_channel_verification(
471 &state.db,
472 user_id,
473 &user.did,
474 "telegram",
475 telegram_clean,
476 None,
477 )
478 .await
479 {
480 return (
481 StatusCode::INTERNAL_SERVER_ERROR,
482 Json(json!({"error": "InternalError", "message": e})),
483 )
484 .into_response();
485 }
486 verification_required.push("telegram".to_string());
487 info!(did = %user.did, "Requested Telegram verification");
488 }
489 }
490
491 if let Some(ref signal) = input.signal_number {
492 if signal.is_empty() {
493 if let Err(e) = sqlx::query!(
494 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1",
495 user_id
496 )
497 .execute(&state.db)
498 .await
499 {
500 return (
501 StatusCode::INTERNAL_SERVER_ERROR,
502 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
503 )
504 .into_response();
505 }
506 info!(did = %user.did, "Cleared Signal number");
507 } else {
508 if let Err(e) =
509 request_channel_verification(&state.db, user_id, &user.did, "signal", signal, None)
510 .await
511 {
512 return (
513 StatusCode::INTERNAL_SERVER_ERROR,
514 Json(json!({"error": "InternalError", "message": e})),
515 )
516 .into_response();
517 }
518 verification_required.push("signal".to_string());
519 info!(did = %user.did, "Requested Signal verification");
520 }
521 }
522
523 Json(UpdateNotificationPrefsResponse {
524 success: true,
525 verification_required,
526 })
527 .into_response()
528}