this repo has no description
1use crate::auth::validate_bearer_token;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::State,
6 http::{HeaderMap, StatusCode},
7 response::{IntoResponse, Response},
8};
9use chrono::{Duration, Utc};
10use rand::Rng;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use sqlx::Row;
14use tracing::info;
15
16fn generate_verification_code() -> String {
17 rand::thread_rng()
18 .sample_iter(&rand::distributions::Uniform::new(0, 10))
19 .take(6)
20 .map(|x| x.to_string())
21 .collect()
22}
23
24#[derive(Serialize)]
25#[serde(rename_all = "camelCase")]
26pub struct NotificationPrefsResponse {
27 pub preferred_channel: String,
28 pub email: String,
29 pub discord_id: Option<String>,
30 pub discord_verified: bool,
31 pub telegram_username: Option<String>,
32 pub telegram_verified: bool,
33 pub signal_number: Option<String>,
34 pub signal_verified: bool,
35}
36
37pub async fn get_notification_prefs(State(state): State<AppState>, headers: HeaderMap) -> Response {
38 let token = match crate::auth::extract_bearer_token_from_header(
39 headers.get("Authorization").and_then(|h| h.to_str().ok()),
40 ) {
41 Some(t) => t,
42 None => return (
43 StatusCode::UNAUTHORIZED,
44 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})),
45 )
46 .into_response(),
47 };
48 let user = match validate_bearer_token(&state.db, &token).await {
49 Ok(u) => u,
50 Err(_) => {
51 return (
52 StatusCode::UNAUTHORIZED,
53 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})),
54 )
55 .into_response();
56 }
57 };
58 let row =
59 match sqlx::query(
60 r#"
61 SELECT
62 email,
63 preferred_notification_channel::text as channel,
64 discord_id,
65 discord_verified,
66 telegram_username,
67 telegram_verified,
68 signal_number,
69 signal_verified
70 FROM users
71 WHERE did = $1
72 "#,
73 )
74 .bind(&user.did)
75 .fetch_one(&state.db)
76 .await
77 {
78 Ok(r) => r,
79 Err(e) => return (
80 StatusCode::INTERNAL_SERVER_ERROR,
81 Json(
82 json!({"error": "InternalError", "message": format!("Database error: {}", e)}),
83 ),
84 )
85 .into_response(),
86 };
87 let email: String = row.get("email");
88 let channel: String = row.get("channel");
89 let discord_id: Option<String> = row.get("discord_id");
90 let discord_verified: bool = row.get("discord_verified");
91 let telegram_username: Option<String> = row.get("telegram_username");
92 let telegram_verified: bool = row.get("telegram_verified");
93 let signal_number: Option<String> = row.get("signal_number");
94 let signal_verified: bool = row.get("signal_verified");
95 Json(NotificationPrefsResponse {
96 preferred_channel: channel,
97 email,
98 discord_id,
99 discord_verified,
100 telegram_username,
101 telegram_verified,
102 signal_number,
103 signal_verified,
104 })
105 .into_response()
106}
107
108#[derive(Serialize)]
109#[serde(rename_all = "camelCase")]
110pub struct NotificationHistoryEntry {
111 pub created_at: String,
112 pub channel: String,
113 pub notification_type: String,
114 pub status: String,
115 pub subject: Option<String>,
116 pub body: String,
117}
118
119#[derive(Serialize)]
120#[serde(rename_all = "camelCase")]
121pub struct GetNotificationHistoryResponse {
122 pub notifications: Vec<NotificationHistoryEntry>,
123}
124
125pub async fn get_notification_history(
126 State(state): State<AppState>,
127 headers: HeaderMap,
128) -> Response {
129 let token = match crate::auth::extract_bearer_token_from_header(
130 headers.get("Authorization").and_then(|h| h.to_str().ok()),
131 ) {
132 Some(t) => t,
133 None => return (
134 StatusCode::UNAUTHORIZED,
135 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})),
136 )
137 .into_response(),
138 };
139 let user = match validate_bearer_token(&state.db, &token).await {
140 Ok(u) => u,
141 Err(_) => {
142 return (
143 StatusCode::UNAUTHORIZED,
144 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})),
145 )
146 .into_response();
147 }
148 };
149
150 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", user.did)
151 .fetch_one(&state.db)
152 .await
153 {
154 Ok(id) => id,
155 Err(e) => return (
156 StatusCode::INTERNAL_SERVER_ERROR,
157 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
158 )
159 .into_response(),
160 };
161
162 let rows = match sqlx::query!(
163 r#"
164 SELECT
165 created_at,
166 channel as "channel: String",
167 notification_type as "notification_type: String",
168 status as "status: String",
169 subject,
170 body
171 FROM notification_queue
172 WHERE user_id = $1
173 ORDER BY created_at DESC
174 LIMIT 50
175 "#,
176 user_id
177 )
178 .fetch_all(&state.db)
179 .await
180 {
181 Ok(r) => r,
182 Err(e) => return (
183 StatusCode::INTERNAL_SERVER_ERROR,
184 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
185 )
186 .into_response(),
187 };
188
189 let notifications = rows.iter().map(|row| {
190 NotificationHistoryEntry {
191 created_at: row.created_at.to_rfc3339(),
192 channel: row.channel.clone(),
193 notification_type: row.notification_type.clone(),
194 status: row.status.clone(),
195 subject: row.subject.clone(),
196 body: row.body.clone(),
197 }
198 }).collect();
199
200 Json(GetNotificationHistoryResponse { notifications }).into_response()
201}
202
203#[derive(Deserialize)]
204#[serde(rename_all = "camelCase")]
205pub struct UpdateNotificationPrefsInput {
206 pub preferred_channel: Option<String>,
207 pub email: Option<String>,
208 pub discord_id: Option<String>,
209 pub telegram_username: Option<String>,
210 pub signal_number: Option<String>,
211}
212
213#[derive(Serialize)]
214#[serde(rename_all = "camelCase")]
215pub struct UpdateNotificationPrefsResponse {
216 pub success: bool,
217 #[serde(skip_serializing_if = "Vec::is_empty")]
218 pub verification_required: Vec<String>,
219}
220
221pub async fn request_channel_verification(
222 db: &sqlx::PgPool,
223 user_id: uuid::Uuid,
224 channel: &str,
225 identifier: &str,
226 handle: Option<&str>,
227) -> Result<String, String> {
228 let code = generate_verification_code();
229 let expires_at = Utc::now() + Duration::minutes(10);
230
231 sqlx::query!(
232 r#"
233 INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at)
234 VALUES ($1, $2::notification_channel, $3, $4, $5)
235 ON CONFLICT (user_id, channel) DO UPDATE
236 SET code = $3, pending_identifier = $4, expires_at = $5, created_at = NOW()
237 "#,
238 user_id,
239 channel as _,
240 code,
241 identifier,
242 expires_at
243 )
244 .execute(db)
245 .await
246 .map_err(|e| format!("Database error: {}", e))?;
247
248 if channel == "email" {
249 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
250 let handle_str = handle.unwrap_or("user");
251 crate::notifications::enqueue_email_update(db, user_id, identifier, handle_str, &code, &hostname)
252 .await
253 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?;
254 } else {
255 sqlx::query!(
256 r#"
257 INSERT INTO notification_queue (user_id, channel, notification_type, recipient, subject, body, metadata)
258 VALUES ($1, $2::notification_channel, 'channel_verification', $3, 'Verify your channel', $4, $5)
259 "#,
260 user_id,
261 channel as _,
262 identifier,
263 format!("Your verification code is: {}", code),
264 json!({"code": code})
265 )
266 .execute(db)
267 .await
268 .map_err(|e| format!("Failed to enqueue notification: {}", e))?;
269 }
270
271 Ok(code)
272}
273
274pub async fn update_notification_prefs(
275 State(state): State<AppState>,
276 headers: HeaderMap,
277 Json(input): Json<UpdateNotificationPrefsInput>,
278) -> Response {
279 let token = match crate::auth::extract_bearer_token_from_header(
280 headers.get("Authorization").and_then(|h| h.to_str().ok()),
281 ) {
282 Some(t) => t,
283 None => return (
284 StatusCode::UNAUTHORIZED,
285 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})),
286 )
287 .into_response(),
288 };
289 let user = match validate_bearer_token(&state.db, &token).await {
290 Ok(u) => u,
291 Err(_) => {
292 return (
293 StatusCode::UNAUTHORIZED,
294 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})),
295 )
296 .into_response();
297 }
298 };
299
300 let user_row = match sqlx::query!(
301 "SELECT id, handle, email FROM users WHERE did = $1",
302 user.did
303 )
304 .fetch_one(&state.db)
305 .await
306 {
307 Ok(row) => row,
308 Err(e) => return (
309 StatusCode::INTERNAL_SERVER_ERROR,
310 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
311 )
312 .into_response(),
313 };
314
315 let user_id = user_row.id;
316 let handle = user_row.handle;
317 let current_email = user_row.email;
318
319 let mut verification_required: Vec<String> = Vec::new();
320
321 if let Some(ref channel) = input.preferred_channel {
322 let valid_channels = ["email", "discord", "telegram", "signal"];
323 if !valid_channels.contains(&channel.as_str()) {
324 return (
325 StatusCode::BAD_REQUEST,
326 Json(json!({
327 "error": "InvalidRequest",
328 "message": "Invalid channel. Must be one of: email, discord, telegram, signal"
329 })),
330 )
331 .into_response();
332 }
333 if let Err(e) = sqlx::query(
334 r#"UPDATE users SET preferred_notification_channel = $1::notification_channel, updated_at = NOW() WHERE did = $2"#
335 )
336 .bind(channel)
337 .bind(&user.did)
338 .execute(&state.db)
339 .await
340 {
341 return (
342 StatusCode::INTERNAL_SERVER_ERROR,
343 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
344 )
345 .into_response();
346 }
347 info!(did = %user.did, channel = %channel, "Updated preferred notification channel");
348 }
349
350 if let Some(ref new_email) = input.email {
351 let email_clean = new_email.trim().to_lowercase();
352 if email_clean.is_empty() {
353 return (
354 StatusCode::BAD_REQUEST,
355 Json(json!({"error": "InvalidRequest", "message": "Email cannot be empty"})),
356 )
357 .into_response();
358 }
359
360 if !crate::api::validation::is_valid_email(&email_clean) {
361 return (
362 StatusCode::BAD_REQUEST,
363 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
364 )
365 .into_response();
366 }
367
368 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) {
369 info!(did = %user.did, "Email unchanged, skipping");
370 } else {
371 let exists = sqlx::query!(
372 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2",
373 email_clean,
374 user_id
375 )
376 .fetch_optional(&state.db)
377 .await;
378
379 if let Ok(Some(_)) = exists {
380 return (
381 StatusCode::BAD_REQUEST,
382 Json(json!({"error": "EmailTaken", "message": "Email already in use"})),
383 )
384 .into_response();
385 }
386
387 if let Err(e) = request_channel_verification(&state.db, user_id, "email", &email_clean, Some(&handle)).await {
388 return (
389 StatusCode::INTERNAL_SERVER_ERROR,
390 Json(json!({"error": "InternalError", "message": e})),
391 )
392 .into_response();
393 }
394 verification_required.push("email".to_string());
395 info!(did = %user.did, "Requested email verification");
396 }
397 }
398
399 if let Some(ref discord_id) = input.discord_id {
400 if discord_id.is_empty() {
401 if let Err(e) = sqlx::query!(
402 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1",
403 user_id
404 )
405 .execute(&state.db)
406 .await
407 {
408 return (
409 StatusCode::INTERNAL_SERVER_ERROR,
410 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
411 )
412 .into_response();
413 }
414 let _ = sqlx::query!(
415 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'discord'",
416 user_id
417 )
418 .execute(&state.db)
419 .await;
420 info!(did = %user.did, "Cleared Discord ID");
421 } else {
422 if let Err(e) = request_channel_verification(&state.db, user_id, "discord", discord_id, None).await {
423 return (
424 StatusCode::INTERNAL_SERVER_ERROR,
425 Json(json!({"error": "InternalError", "message": e})),
426 )
427 .into_response();
428 }
429 verification_required.push("discord".to_string());
430 info!(did = %user.did, "Requested Discord verification");
431 }
432 }
433
434 if let Some(ref telegram) = input.telegram_username {
435 let telegram_clean = telegram.trim_start_matches('@');
436 if telegram_clean.is_empty() {
437 if let Err(e) = sqlx::query!(
438 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1",
439 user_id
440 )
441 .execute(&state.db)
442 .await
443 {
444 return (
445 StatusCode::INTERNAL_SERVER_ERROR,
446 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
447 )
448 .into_response();
449 }
450 let _ = sqlx::query!(
451 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'telegram'",
452 user_id
453 )
454 .execute(&state.db)
455 .await;
456 info!(did = %user.did, "Cleared Telegram username");
457 } else {
458 if let Err(e) = request_channel_verification(&state.db, user_id, "telegram", telegram_clean, None).await {
459 return (
460 StatusCode::INTERNAL_SERVER_ERROR,
461 Json(json!({"error": "InternalError", "message": e})),
462 )
463 .into_response();
464 }
465 verification_required.push("telegram".to_string());
466 info!(did = %user.did, "Requested Telegram verification");
467 }
468 }
469
470 if let Some(ref signal) = input.signal_number {
471 if signal.is_empty() {
472 if let Err(e) = sqlx::query!(
473 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1",
474 user_id
475 )
476 .execute(&state.db)
477 .await
478 {
479 return (
480 StatusCode::INTERNAL_SERVER_ERROR,
481 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})),
482 )
483 .into_response();
484 }
485 let _ = sqlx::query!(
486 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'signal'",
487 user_id
488 )
489 .execute(&state.db)
490 .await;
491 info!(did = %user.did, "Cleared Signal number");
492 } else {
493 if let Err(e) = request_channel_verification(&state.db, user_id, "signal", signal, None).await {
494 return (
495 StatusCode::INTERNAL_SERVER_ERROR,
496 Json(json!({"error": "InternalError", "message": e})),
497 )
498 .into_response();
499 }
500 verification_required.push("signal".to_string());
501 info!(did = %user.did, "Requested Signal verification");
502 }
503 }
504
505 Json(UpdateNotificationPrefsResponse {
506 success: true,
507 verification_required,
508 }).into_response()
509}