this repo has no description
1use crate::auth::validate_bearer_token; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::State, 6 http::{HeaderMap, StatusCode}, 7 response::{IntoResponse, Response}, 8}; 9use chrono::{Duration, Utc}; 10use rand::Rng; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use sqlx::Row; 14use tracing::info; 15 16fn generate_verification_code() -> String { 17 rand::thread_rng() 18 .sample_iter(&rand::distributions::Uniform::new(0, 10)) 19 .take(6) 20 .map(|x| x.to_string()) 21 .collect() 22} 23 24#[derive(Serialize)] 25#[serde(rename_all = "camelCase")] 26pub struct NotificationPrefsResponse { 27 pub preferred_channel: String, 28 pub email: String, 29 pub discord_id: Option<String>, 30 pub discord_verified: bool, 31 pub telegram_username: Option<String>, 32 pub telegram_verified: bool, 33 pub signal_number: Option<String>, 34 pub signal_verified: bool, 35} 36 37pub async fn get_notification_prefs(State(state): State<AppState>, headers: HeaderMap) -> Response { 38 let token = match crate::auth::extract_bearer_token_from_header( 39 headers.get("Authorization").and_then(|h| h.to_str().ok()), 40 ) { 41 Some(t) => t, 42 None => return ( 43 StatusCode::UNAUTHORIZED, 44 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})), 45 ) 46 .into_response(), 47 }; 48 let user = match validate_bearer_token(&state.db, &token).await { 49 Ok(u) => u, 50 Err(_) => { 51 return ( 52 StatusCode::UNAUTHORIZED, 53 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})), 54 ) 55 .into_response(); 56 } 57 }; 58 let row = 59 match sqlx::query( 60 r#" 61 SELECT 62 email, 63 preferred_comms_channel::text as channel, 64 discord_id, 65 discord_verified, 66 telegram_username, 67 telegram_verified, 68 signal_number, 69 signal_verified 70 FROM users 71 WHERE did = $1 72 "#, 73 ) 74 .bind(&user.did) 75 .fetch_one(&state.db) 76 .await 77 { 78 Ok(r) => r, 79 Err(e) => return ( 80 StatusCode::INTERNAL_SERVER_ERROR, 81 Json( 82 json!({"error": "InternalError", "message": format!("Database error: {}", e)}), 83 ), 84 ) 85 .into_response(), 86 }; 87 let email: String = row.get("email"); 88 let channel: String = row.get("channel"); 89 let discord_id: Option<String> = row.get("discord_id"); 90 let discord_verified: bool = row.get("discord_verified"); 91 let telegram_username: Option<String> = row.get("telegram_username"); 92 let telegram_verified: bool = row.get("telegram_verified"); 93 let signal_number: Option<String> = row.get("signal_number"); 94 let signal_verified: bool = row.get("signal_verified"); 95 Json(NotificationPrefsResponse { 96 preferred_channel: channel, 97 email, 98 discord_id, 99 discord_verified, 100 telegram_username, 101 telegram_verified, 102 signal_number, 103 signal_verified, 104 }) 105 .into_response() 106} 107 108#[derive(Serialize)] 109#[serde(rename_all = "camelCase")] 110pub struct NotificationHistoryEntry { 111 pub created_at: String, 112 pub channel: String, 113 pub comms_type: String, 114 pub status: String, 115 pub subject: Option<String>, 116 pub body: String, 117} 118 119#[derive(Serialize)] 120#[serde(rename_all = "camelCase")] 121pub struct GetNotificationHistoryResponse { 122 pub notifications: Vec<NotificationHistoryEntry>, 123} 124 125pub async fn get_notification_history( 126 State(state): State<AppState>, 127 headers: HeaderMap, 128) -> Response { 129 let token = match crate::auth::extract_bearer_token_from_header( 130 headers.get("Authorization").and_then(|h| h.to_str().ok()), 131 ) { 132 Some(t) => t, 133 None => return ( 134 StatusCode::UNAUTHORIZED, 135 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})), 136 ) 137 .into_response(), 138 }; 139 let user = match validate_bearer_token(&state.db, &token).await { 140 Ok(u) => u, 141 Err(_) => { 142 return ( 143 StatusCode::UNAUTHORIZED, 144 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})), 145 ) 146 .into_response(); 147 } 148 }; 149 150 let user_id: uuid::Uuid = 151 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", user.did) 152 .fetch_one(&state.db) 153 .await 154 { 155 Ok(id) => id, 156 Err(e) => return ( 157 StatusCode::INTERNAL_SERVER_ERROR, 158 Json( 159 json!({"error": "InternalError", "message": format!("Database error: {}", e)}), 160 ), 161 ) 162 .into_response(), 163 }; 164 165 let rows = 166 match sqlx::query!( 167 r#" 168 SELECT 169 created_at, 170 channel as "channel: String", 171 comms_type as "comms_type: String", 172 status as "status: String", 173 subject, 174 body 175 FROM comms_queue 176 WHERE user_id = $1 177 ORDER BY created_at DESC 178 LIMIT 50 179 "#, 180 user_id 181 ) 182 .fetch_all(&state.db) 183 .await 184 { 185 Ok(r) => r, 186 Err(e) => return ( 187 StatusCode::INTERNAL_SERVER_ERROR, 188 Json( 189 json!({"error": "InternalError", "message": format!("Database error: {}", e)}), 190 ), 191 ) 192 .into_response(), 193 }; 194 195 let notifications = rows 196 .iter() 197 .map(|row| NotificationHistoryEntry { 198 created_at: row.created_at.to_rfc3339(), 199 channel: row.channel.clone(), 200 comms_type: row.comms_type.clone(), 201 status: row.status.clone(), 202 subject: row.subject.clone(), 203 body: row.body.clone(), 204 }) 205 .collect(); 206 207 Json(GetNotificationHistoryResponse { notifications }).into_response() 208} 209 210#[derive(Deserialize)] 211#[serde(rename_all = "camelCase")] 212pub struct UpdateNotificationPrefsInput { 213 pub preferred_channel: Option<String>, 214 pub email: Option<String>, 215 pub discord_id: Option<String>, 216 pub telegram_username: Option<String>, 217 pub signal_number: Option<String>, 218} 219 220#[derive(Serialize)] 221#[serde(rename_all = "camelCase")] 222pub struct UpdateNotificationPrefsResponse { 223 pub success: bool, 224 #[serde(skip_serializing_if = "Vec::is_empty")] 225 pub verification_required: Vec<String>, 226} 227 228pub async fn request_channel_verification( 229 db: &sqlx::PgPool, 230 user_id: uuid::Uuid, 231 channel: &str, 232 identifier: &str, 233 handle: Option<&str>, 234) -> Result<String, String> { 235 let code = generate_verification_code(); 236 let expires_at = Utc::now() + Duration::minutes(10); 237 238 sqlx::query!( 239 r#" 240 INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) 241 VALUES ($1, $2::comms_channel, $3, $4, $5) 242 ON CONFLICT (user_id, channel) DO UPDATE 243 SET code = $3, pending_identifier = $4, expires_at = $5, created_at = NOW() 244 "#, 245 user_id, 246 channel as _, 247 code, 248 identifier, 249 expires_at 250 ) 251 .execute(db) 252 .await 253 .map_err(|e| format!("Database error: {}", e))?; 254 255 if channel == "email" { 256 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 257 let handle_str = handle.unwrap_or("user"); 258 crate::comms::enqueue_email_update(db, user_id, identifier, handle_str, &code, &hostname) 259 .await 260 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?; 261 } else { 262 sqlx::query!( 263 r#" 264 INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body, metadata) 265 VALUES ($1, $2::comms_channel, 'channel_verification', $3, 'Verify your channel', $4, $5) 266 "#, 267 user_id, 268 channel as _, 269 identifier, 270 format!("Your verification code is: {}", code), 271 json!({"code": code}) 272 ) 273 .execute(db) 274 .await 275 .map_err(|e| format!("Failed to enqueue notification: {}", e))?; 276 } 277 278 Ok(code) 279} 280 281pub async fn update_notification_prefs( 282 State(state): State<AppState>, 283 headers: HeaderMap, 284 Json(input): Json<UpdateNotificationPrefsInput>, 285) -> Response { 286 let token = match crate::auth::extract_bearer_token_from_header( 287 headers.get("Authorization").and_then(|h| h.to_str().ok()), 288 ) { 289 Some(t) => t, 290 None => return ( 291 StatusCode::UNAUTHORIZED, 292 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})), 293 ) 294 .into_response(), 295 }; 296 let user = match validate_bearer_token(&state.db, &token).await { 297 Ok(u) => u, 298 Err(_) => { 299 return ( 300 StatusCode::UNAUTHORIZED, 301 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})), 302 ) 303 .into_response(); 304 } 305 }; 306 307 let user_row = 308 match sqlx::query!( 309 "SELECT id, handle, email FROM users WHERE did = $1", 310 user.did 311 ) 312 .fetch_one(&state.db) 313 .await 314 { 315 Ok(row) => row, 316 Err(e) => return ( 317 StatusCode::INTERNAL_SERVER_ERROR, 318 Json( 319 json!({"error": "InternalError", "message": format!("Database error: {}", e)}), 320 ), 321 ) 322 .into_response(), 323 }; 324 325 let user_id = user_row.id; 326 let handle = user_row.handle; 327 let current_email = user_row.email; 328 329 let mut verification_required: Vec<String> = Vec::new(); 330 331 if let Some(ref channel) = input.preferred_channel { 332 let valid_channels = ["email", "discord", "telegram", "signal"]; 333 if !valid_channels.contains(&channel.as_str()) { 334 return ( 335 StatusCode::BAD_REQUEST, 336 Json(json!({ 337 "error": "InvalidRequest", 338 "message": "Invalid channel. Must be one of: email, discord, telegram, signal" 339 })), 340 ) 341 .into_response(); 342 } 343 if let Err(e) = sqlx::query( 344 r#"UPDATE users SET preferred_comms_channel = $1::comms_channel, updated_at = NOW() WHERE did = $2"# 345 ) 346 .bind(channel) 347 .bind(&user.did) 348 .execute(&state.db) 349 .await 350 { 351 return ( 352 StatusCode::INTERNAL_SERVER_ERROR, 353 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 354 ) 355 .into_response(); 356 } 357 info!(did = %user.did, channel = %channel, "Updated preferred notification channel"); 358 } 359 360 if let Some(ref new_email) = input.email { 361 let email_clean = new_email.trim().to_lowercase(); 362 if email_clean.is_empty() { 363 return ( 364 StatusCode::BAD_REQUEST, 365 Json(json!({"error": "InvalidRequest", "message": "Email cannot be empty"})), 366 ) 367 .into_response(); 368 } 369 370 if !crate::api::validation::is_valid_email(&email_clean) { 371 return ( 372 StatusCode::BAD_REQUEST, 373 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 374 ) 375 .into_response(); 376 } 377 378 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) { 379 info!(did = %user.did, "Email unchanged, skipping"); 380 } else { 381 let exists = sqlx::query!( 382 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2", 383 email_clean, 384 user_id 385 ) 386 .fetch_optional(&state.db) 387 .await; 388 389 if let Ok(Some(_)) = exists { 390 return ( 391 StatusCode::BAD_REQUEST, 392 Json(json!({"error": "EmailTaken", "message": "Email already in use"})), 393 ) 394 .into_response(); 395 } 396 397 if let Err(e) = request_channel_verification( 398 &state.db, 399 user_id, 400 "email", 401 &email_clean, 402 Some(&handle), 403 ) 404 .await 405 { 406 return ( 407 StatusCode::INTERNAL_SERVER_ERROR, 408 Json(json!({"error": "InternalError", "message": e})), 409 ) 410 .into_response(); 411 } 412 verification_required.push("email".to_string()); 413 info!(did = %user.did, "Requested email verification"); 414 } 415 } 416 417 if let Some(ref discord_id) = input.discord_id { 418 if discord_id.is_empty() { 419 if let Err(e) = sqlx::query!( 420 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1", 421 user_id 422 ) 423 .execute(&state.db) 424 .await 425 { 426 return ( 427 StatusCode::INTERNAL_SERVER_ERROR, 428 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 429 ) 430 .into_response(); 431 } 432 let _ = sqlx::query!( 433 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'discord'", 434 user_id 435 ) 436 .execute(&state.db) 437 .await; 438 info!(did = %user.did, "Cleared Discord ID"); 439 } else { 440 if let Err(e) = 441 request_channel_verification(&state.db, user_id, "discord", discord_id, None).await 442 { 443 return ( 444 StatusCode::INTERNAL_SERVER_ERROR, 445 Json(json!({"error": "InternalError", "message": e})), 446 ) 447 .into_response(); 448 } 449 verification_required.push("discord".to_string()); 450 info!(did = %user.did, "Requested Discord verification"); 451 } 452 } 453 454 if let Some(ref telegram) = input.telegram_username { 455 let telegram_clean = telegram.trim_start_matches('@'); 456 if telegram_clean.is_empty() { 457 if let Err(e) = sqlx::query!( 458 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1", 459 user_id 460 ) 461 .execute(&state.db) 462 .await 463 { 464 return ( 465 StatusCode::INTERNAL_SERVER_ERROR, 466 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 467 ) 468 .into_response(); 469 } 470 let _ = sqlx::query!( 471 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'telegram'", 472 user_id 473 ) 474 .execute(&state.db) 475 .await; 476 info!(did = %user.did, "Cleared Telegram username"); 477 } else { 478 if let Err(e) = 479 request_channel_verification(&state.db, user_id, "telegram", telegram_clean, None) 480 .await 481 { 482 return ( 483 StatusCode::INTERNAL_SERVER_ERROR, 484 Json(json!({"error": "InternalError", "message": e})), 485 ) 486 .into_response(); 487 } 488 verification_required.push("telegram".to_string()); 489 info!(did = %user.did, "Requested Telegram verification"); 490 } 491 } 492 493 if let Some(ref signal) = input.signal_number { 494 if signal.is_empty() { 495 if let Err(e) = sqlx::query!( 496 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1", 497 user_id 498 ) 499 .execute(&state.db) 500 .await 501 { 502 return ( 503 StatusCode::INTERNAL_SERVER_ERROR, 504 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 505 ) 506 .into_response(); 507 } 508 let _ = sqlx::query!( 509 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'signal'", 510 user_id 511 ) 512 .execute(&state.db) 513 .await; 514 info!(did = %user.did, "Cleared Signal number"); 515 } else { 516 if let Err(e) = 517 request_channel_verification(&state.db, user_id, "signal", signal, None).await 518 { 519 return ( 520 StatusCode::INTERNAL_SERVER_ERROR, 521 Json(json!({"error": "InternalError", "message": e})), 522 ) 523 .into_response(); 524 } 525 verification_required.push("signal".to_string()); 526 info!(did = %user.did, "Requested Signal verification"); 527 } 528 } 529 530 Json(UpdateNotificationPrefsResponse { 531 success: true, 532 verification_required, 533 }) 534 .into_response() 535}