this repo has no description
1use crate::auth::validate_bearer_token;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::State,
6 http::{HeaderMap, StatusCode},
7 response::{IntoResponse, Response},
8};
9use serde::{Deserialize, Serialize};
10use serde_json::json;
11use sqlx::Row;
12use tracing::info;
13
14#[derive(Serialize)]
15#[serde(rename_all = "camelCase")]
16pub struct NotificationPrefsResponse {
17 pub preferred_channel: String,
18 pub email: String,
19 pub discord_id: Option<String>,
20 pub discord_verified: bool,
21 pub telegram_username: Option<String>,
22 pub telegram_verified: bool,
23 pub signal_number: Option<String>,
24 pub signal_verified: bool,
25}
26
27pub async fn get_notification_prefs(State(state): State<AppState>, headers: HeaderMap) -> Response {
28 let token = match crate::auth::extract_bearer_token_from_header(
29 headers.get("Authorization").and_then(|h| h.to_str().ok()),
30 ) {
31 Some(t) => t,
32 None => return (
33 StatusCode::UNAUTHORIZED,
34 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})),
35 )
36 .into_response(),
37 };
38 let user = match validate_bearer_token(&state.db, &token).await {
39 Ok(u) => u,
40 Err(_) => {
41 return (
42 StatusCode::UNAUTHORIZED,
43 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})),
44 )
45 .into_response();
46 }
47 };
48 let row =
49 match sqlx::query(
50 r#"
51 SELECT
52 email,
53 preferred_comms_channel::text as channel,
54 discord_id,
55 discord_verified,
56 telegram_username,
57 telegram_verified,
58 signal_number,
59 signal_verified
60 FROM users
61 WHERE did = $1
62 "#,
63 )
64 .bind(&user.did)
65 .fetch_one(&state.db)
66 .await
67 {
68 Ok(r) => r,
69 Err(e) => return (
70 StatusCode::INTERNAL_SERVER_ERROR,
71 Json(
72 json!({"error": "InternalError", "message": format!("Database error: {}", e)}),
73 ),
74 )
75 .into_response(),
76 };
77 let email: String = row.get("email");
78 let channel: String = row.get("channel");
79 let discord_id: Option<String> = row.get("discord_id");
80 let discord_verified: bool = row.get("discord_verified");
81 let telegram_username: Option<String> = row.get("telegram_username");
82 let telegram_verified: bool = row.get("telegram_verified");
83 let signal_number: Option<String> = row.get("signal_number");
84 let signal_verified: bool = row.get("signal_verified");
85 Json(NotificationPrefsResponse {
86 preferred_channel: channel,
87 email,
88 discord_id,
89 discord_verified,
90 telegram_username,
91 telegram_verified,
92 signal_number,
93 signal_verified,
94 })
95 .into_response()
96}
97
98#[derive(Serialize)]
99#[serde(rename_all = "camelCase")]
100pub struct NotificationHistoryEntry {
101 pub created_at: String,
102 pub channel: String,
103 pub comms_type: String,
104 pub status: String,
105 pub subject: Option<String>,
106 pub body: String,
107}
108
109#[derive(Serialize)]
110#[serde(rename_all = "camelCase")]
111pub struct GetNotificationHistoryResponse {
112 pub notifications: Vec<NotificationHistoryEntry>,
113}
114
115pub async fn get_notification_history(
116 State(state): State<AppState>,
117 headers: HeaderMap,
118) -> Response {
119 let token = match crate::auth::extract_bearer_token_from_header(
120 headers.get("Authorization").and_then(|h| h.to_str().ok()),
121 ) {
122 Some(t) => t,
123 None => return (
124 StatusCode::UNAUTHORIZED,
125 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})),
126 )
127 .into_response(),
128 };
129 let user = match validate_bearer_token(&state.db, &token).await {
130 Ok(u) => u,
131 Err(_) => {
132 return (
133 StatusCode::UNAUTHORIZED,
134 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})),
135 )
136 .into_response();
137 }
138 };
139
140 let user_id: uuid::Uuid =
141 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", user.did)
142 .fetch_one(&state.db)
143 .await
144 {
145 Ok(id) => id,
146 Err(e) => return (
147 StatusCode::INTERNAL_SERVER_ERROR,
148 Json(
149 json!({"error": "InternalError", "message": format!("Database error: {}", e)}),
150 ),
151 )
152 .into_response(),
153 };
154
155 let rows =
156 match sqlx::query!(
157 r#"
158 SELECT
159 created_at,
160 channel as "channel: String",
161 comms_type as "comms_type: String",
162 status as "status: String",
163 subject,
164 body
165 FROM comms_queue
166 WHERE user_id = $1
167 ORDER BY created_at DESC
168 LIMIT 50
169 "#,
170 user_id
171 )
172 .fetch_all(&state.db)
173 .await
174 {
175 Ok(r) => r,
176 Err(e) => return (
177 StatusCode::INTERNAL_SERVER_ERROR,
178 Json(
179 json!({"error": "InternalError", "message": format!("Database error: {}", e)}),
180 ),
181 )
182 .into_response(),
183 };
184
185 let notifications = rows
186 .iter()
187 .map(|row| NotificationHistoryEntry {
188 created_at: row.created_at.to_rfc3339(),
189 channel: row.channel.clone(),
190 comms_type: row.comms_type.clone(),
191 status: row.status.clone(),
192 subject: row.subject.clone(),
193 body: row.body.clone(),
194 })
195 .collect();
196
197 Json(GetNotificationHistoryResponse { notifications }).into_response()
198}
199
200#[derive(Deserialize)]
201#[serde(rename_all = "camelCase")]
202pub struct UpdateNotificationPrefsInput {
203 pub preferred_channel: Option<String>,
204 pub email: Option<String>,
205 pub discord_id: Option<String>,
206 pub telegram_username: Option<String>,
207 pub signal_number: Option<String>,
208}
209
210#[derive(Serialize)]
211#[serde(rename_all = "camelCase")]
212pub struct UpdateNotificationPrefsResponse {
213 pub success: bool,
214 #[serde(skip_serializing_if = "Vec::is_empty")]
215 pub verification_required: Vec<String>,
216}
217
218pub async fn request_channel_verification(
219 db: &sqlx::PgPool,
220 user_id: uuid::Uuid,
221 did: &str,
222 channel: &str,
223 identifier: &str,
224 handle: Option<&str>,
225) -> Result<String, String> {
226 let token =
227 crate::auth::verification_token::generate_channel_update_token(did, channel, identifier);
228 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
229
230 if channel == "email" {
231 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
232 let handle_str = handle.unwrap_or("user");
233 crate::comms::enqueue_email_update(
234 db,
235 user_id,
236 identifier,
237 handle_str,
238 &formatted_token,
239 &hostname,
240 )
241 .await
242 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?;
243 } else {
244 sqlx::query!(
245 r#"
246 INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body, metadata)
247 VALUES ($1, $2::comms_channel, 'channel_verification', $3, 'Verify your channel', $4, $5)
248 "#,
249 user_id,
250 channel as _,
251 identifier,
252 format!("Your verification code is: {}", formatted_token),
253 json!({"code": formatted_token})
254 )
255 .execute(db)
256 .await
257 .map_err(|e| format!("Failed to enqueue notification: {}", e))?;
258 }
259
260 Ok(token)
261}
262
263pub async fn update_notification_prefs(
264 State(state): State<AppState>,
265 headers: HeaderMap,
266 Json(input): Json<UpdateNotificationPrefsInput>,
267) -> Response {
268 let token = match crate::auth::extract_bearer_token_from_header(
269 headers.get("Authorization").and_then(|h| h.to_str().ok()),
270 ) {
271 Some(t) => t,
272 None => return (
273 StatusCode::UNAUTHORIZED,
274 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})),
275 )
276 .into_response(),
277 };
278 let user = match validate_bearer_token(&state.db, &token).await {
279 Ok(u) => u,
280 Err(_) => {
281 return (
282 StatusCode::UNAUTHORIZED,
283 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})),
284 )
285 .into_response();
286 }
287 };
288
289 let user_row =
290 match sqlx::query!(
291 "SELECT id, handle, email FROM users WHERE did = $1",
292 user.did
293 )
294 .fetch_one(&state.db)
295 .await
296 {
297 Ok(row) => row,
298 Err(e) => return (
299 StatusCode::INTERNAL_SERVER_ERROR,
300 Json(
301 json!({"error": "InternalError", "message": format!("Database error: {}", e)}),
302 ),
303 )
304 .into_response(),
305 };
306
307 let user_id = user_row.id;
308 let handle = user_row.handle;
309 let current_email = user_row.email;
310
311 let mut verification_required: Vec<String> = Vec::new();
312
313 if let Some(ref channel) = input.preferred_channel {
314 let valid_channels = ["email", "discord", "telegram", "signal"];
315 if !valid_channels.contains(&channel.as_str()) {
316 return (
317 StatusCode::BAD_REQUEST,
318 Json(json!({
319 "error": "InvalidRequest",
320 "message": "Invalid channel. Must be one of: email, discord, telegram, signal"
321 })),
322 )
323 .into_response();
324 }
325 if let Err(e) = sqlx::query(
326 r#"UPDATE users SET preferred_comms_channel = $1::comms_channel, updated_at = NOW() WHERE did = $2"#
327 )
328 .bind(channel)
329 .bind(&user.did)
330 .execute(&state.db)
331 .await
332 {
333 return (
334 StatusCode::INTERNAL_SERVER_ERROR,
335 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
336 )
337 .into_response();
338 }
339 info!(did = %user.did, channel = %channel, "Updated preferred notification channel");
340 }
341
342 if let Some(ref new_email) = input.email {
343 let email_clean = new_email.trim().to_lowercase();
344 if email_clean.is_empty() {
345 return (
346 StatusCode::BAD_REQUEST,
347 Json(json!({"error": "InvalidRequest", "message": "Email cannot be empty"})),
348 )
349 .into_response();
350 }
351
352 if !crate::api::validation::is_valid_email(&email_clean) {
353 return (
354 StatusCode::BAD_REQUEST,
355 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
356 )
357 .into_response();
358 }
359
360 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) {
361 info!(did = %user.did, "Email unchanged, skipping");
362 } else {
363 let exists = sqlx::query!(
364 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2",
365 email_clean,
366 user_id
367 )
368 .fetch_optional(&state.db)
369 .await;
370
371 if let Ok(Some(_)) = exists {
372 return (
373 StatusCode::BAD_REQUEST,
374 Json(json!({"error": "EmailTaken", "message": "Email already in use"})),
375 )
376 .into_response();
377 }
378
379 if let Err(e) = request_channel_verification(
380 &state.db,
381 user_id,
382 &user.did,
383 "email",
384 &email_clean,
385 Some(&handle),
386 )
387 .await
388 {
389 return (
390 StatusCode::INTERNAL_SERVER_ERROR,
391 Json(json!({"error": "InternalError", "message": e})),
392 )
393 .into_response();
394 }
395 verification_required.push("email".to_string());
396 info!(did = %user.did, "Requested email verification");
397 }
398 }
399
400 if let Some(ref discord_id) = input.discord_id {
401 if discord_id.is_empty() {
402 if let Err(e) = sqlx::query!(
403 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1",
404 user_id
405 )
406 .execute(&state.db)
407 .await
408 {
409 return (
410 StatusCode::INTERNAL_SERVER_ERROR,
411 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
412 )
413 .into_response();
414 }
415 info!(did = %user.did, "Cleared Discord ID");
416 } else {
417 if let Err(e) = request_channel_verification(
418 &state.db, user_id, &user.did, "discord", discord_id, None,
419 )
420 .await
421 {
422 return (
423 StatusCode::INTERNAL_SERVER_ERROR,
424 Json(json!({"error": "InternalError", "message": e})),
425 )
426 .into_response();
427 }
428 verification_required.push("discord".to_string());
429 info!(did = %user.did, "Requested Discord verification");
430 }
431 }
432
433 if let Some(ref telegram) = input.telegram_username {
434 let telegram_clean = telegram.trim_start_matches('@');
435 if telegram_clean.is_empty() {
436 if let Err(e) = sqlx::query!(
437 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1",
438 user_id
439 )
440 .execute(&state.db)
441 .await
442 {
443 return (
444 StatusCode::INTERNAL_SERVER_ERROR,
445 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
446 )
447 .into_response();
448 }
449 info!(did = %user.did, "Cleared Telegram username");
450 } else {
451 if let Err(e) = request_channel_verification(
452 &state.db,
453 user_id,
454 &user.did,
455 "telegram",
456 telegram_clean,
457 None,
458 )
459 .await
460 {
461 return (
462 StatusCode::INTERNAL_SERVER_ERROR,
463 Json(json!({"error": "InternalError", "message": e})),
464 )
465 .into_response();
466 }
467 verification_required.push("telegram".to_string());
468 info!(did = %user.did, "Requested Telegram verification");
469 }
470 }
471
472 if let Some(ref signal) = input.signal_number {
473 if signal.is_empty() {
474 if let Err(e) = sqlx::query!(
475 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1",
476 user_id
477 )
478 .execute(&state.db)
479 .await
480 {
481 return (
482 StatusCode::INTERNAL_SERVER_ERROR,
483 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
484 )
485 .into_response();
486 }
487 info!(did = %user.did, "Cleared Signal number");
488 } else {
489 if let Err(e) =
490 request_channel_verification(&state.db, user_id, &user.did, "signal", signal, None)
491 .await
492 {
493 return (
494 StatusCode::INTERNAL_SERVER_ERROR,
495 Json(json!({"error": "InternalError", "message": e})),
496 )
497 .into_response();
498 }
499 verification_required.push("signal".to_string());
500 info!(did = %user.did, "Requested Signal verification");
501 }
502 }
503
504 Json(UpdateNotificationPrefsResponse {
505 success: true,
506 verification_required,
507 })
508 .into_response()
509}