this repo has no description
1use crate::auth::validate_bearer_token; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::State, 6 http::{HeaderMap, StatusCode}, 7 response::{IntoResponse, Response}, 8}; 9use serde::{Deserialize, Serialize}; 10use serde_json::json; 11use sqlx::Row; 12use tracing::info; 13 14#[derive(Serialize)] 15#[serde(rename_all = "camelCase")] 16pub struct NotificationPrefsResponse { 17 pub preferred_channel: String, 18 pub email: String, 19 pub discord_id: Option<String>, 20 pub discord_verified: bool, 21 pub telegram_username: Option<String>, 22 pub telegram_verified: bool, 23 pub signal_number: Option<String>, 24 pub signal_verified: bool, 25} 26 27pub async fn get_notification_prefs(State(state): State<AppState>, headers: HeaderMap) -> Response { 28 let token = match crate::auth::extract_bearer_token_from_header( 29 headers.get("Authorization").and_then(|h| h.to_str().ok()), 30 ) { 31 Some(t) => t, 32 None => return ( 33 StatusCode::UNAUTHORIZED, 34 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})), 35 ) 36 .into_response(), 37 }; 38 let user = match validate_bearer_token(&state.db, &token).await { 39 Ok(u) => u, 40 Err(_) => { 41 return ( 42 StatusCode::UNAUTHORIZED, 43 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})), 44 ) 45 .into_response(); 46 } 47 }; 48 let row = 49 match sqlx::query( 50 r#" 51 SELECT 52 email, 53 preferred_comms_channel::text as channel, 54 discord_id, 55 discord_verified, 56 telegram_username, 57 telegram_verified, 58 signal_number, 59 signal_verified 60 FROM users 61 WHERE did = $1 62 "#, 63 ) 64 .bind(&user.did) 65 .fetch_one(&state.db) 66 .await 67 { 68 Ok(r) => r, 69 Err(e) => return ( 70 StatusCode::INTERNAL_SERVER_ERROR, 71 Json( 72 json!({"error": "InternalError", "message": format!("Database error: {}", e)}), 73 ), 74 ) 75 .into_response(), 76 }; 77 let email: String = row.get("email"); 78 let channel: String = row.get("channel"); 79 let discord_id: Option<String> = row.get("discord_id"); 80 let discord_verified: bool = row.get("discord_verified"); 81 let telegram_username: Option<String> = row.get("telegram_username"); 82 let telegram_verified: bool = row.get("telegram_verified"); 83 let signal_number: Option<String> = row.get("signal_number"); 84 let signal_verified: bool = row.get("signal_verified"); 85 Json(NotificationPrefsResponse { 86 preferred_channel: channel, 87 email, 88 discord_id, 89 discord_verified, 90 telegram_username, 91 telegram_verified, 92 signal_number, 93 signal_verified, 94 }) 95 .into_response() 96} 97 98#[derive(Serialize)] 99#[serde(rename_all = "camelCase")] 100pub struct NotificationHistoryEntry { 101 pub created_at: String, 102 pub channel: String, 103 pub comms_type: String, 104 pub status: String, 105 pub subject: Option<String>, 106 pub body: String, 107} 108 109#[derive(Serialize)] 110#[serde(rename_all = "camelCase")] 111pub struct GetNotificationHistoryResponse { 112 pub notifications: Vec<NotificationHistoryEntry>, 113} 114 115pub async fn get_notification_history( 116 State(state): State<AppState>, 117 headers: HeaderMap, 118) -> Response { 119 let token = match crate::auth::extract_bearer_token_from_header( 120 headers.get("Authorization").and_then(|h| h.to_str().ok()), 121 ) { 122 Some(t) => t, 123 None => return ( 124 StatusCode::UNAUTHORIZED, 125 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})), 126 ) 127 .into_response(), 128 }; 129 let user = match validate_bearer_token(&state.db, &token).await { 130 Ok(u) => u, 131 Err(_) => { 132 return ( 133 StatusCode::UNAUTHORIZED, 134 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})), 135 ) 136 .into_response(); 137 } 138 }; 139 140 let user_id: uuid::Uuid = 141 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", user.did) 142 .fetch_one(&state.db) 143 .await 144 { 145 Ok(id) => id, 146 Err(e) => return ( 147 StatusCode::INTERNAL_SERVER_ERROR, 148 Json( 149 json!({"error": "InternalError", "message": format!("Database error: {}", e)}), 150 ), 151 ) 152 .into_response(), 153 }; 154 155 let rows = 156 match sqlx::query!( 157 r#" 158 SELECT 159 created_at, 160 channel as "channel: String", 161 comms_type as "comms_type: String", 162 status as "status: String", 163 subject, 164 body 165 FROM comms_queue 166 WHERE user_id = $1 167 ORDER BY created_at DESC 168 LIMIT 50 169 "#, 170 user_id 171 ) 172 .fetch_all(&state.db) 173 .await 174 { 175 Ok(r) => r, 176 Err(e) => return ( 177 StatusCode::INTERNAL_SERVER_ERROR, 178 Json( 179 json!({"error": "InternalError", "message": format!("Database error: {}", e)}), 180 ), 181 ) 182 .into_response(), 183 }; 184 185 let sensitive_types = [ 186 "email_verification", 187 "password_reset", 188 "email_update", 189 "two_factor_code", 190 "passkey_recovery", 191 "migration_verification", 192 "plc_operation", 193 "channel_verification", 194 "signup_verification", 195 ]; 196 197 let notifications = rows 198 .iter() 199 .map(|row| { 200 let body = if sensitive_types.contains(&row.comms_type.as_str()) { 201 "[Code redacted for security]".to_string() 202 } else { 203 row.body.clone() 204 }; 205 NotificationHistoryEntry { 206 created_at: row.created_at.to_rfc3339(), 207 channel: row.channel.clone(), 208 comms_type: row.comms_type.clone(), 209 status: row.status.clone(), 210 subject: row.subject.clone(), 211 body, 212 } 213 }) 214 .collect(); 215 216 Json(GetNotificationHistoryResponse { notifications }).into_response() 217} 218 219#[derive(Deserialize)] 220#[serde(rename_all = "camelCase")] 221pub struct UpdateNotificationPrefsInput { 222 pub preferred_channel: Option<String>, 223 pub email: Option<String>, 224 pub discord_id: Option<String>, 225 pub telegram_username: Option<String>, 226 pub signal_number: Option<String>, 227} 228 229#[derive(Serialize)] 230#[serde(rename_all = "camelCase")] 231pub struct UpdateNotificationPrefsResponse { 232 pub success: bool, 233 #[serde(skip_serializing_if = "Vec::is_empty")] 234 pub verification_required: Vec<String>, 235} 236 237pub async fn request_channel_verification( 238 db: &sqlx::PgPool, 239 user_id: uuid::Uuid, 240 did: &str, 241 channel: &str, 242 identifier: &str, 243 handle: Option<&str>, 244) -> Result<String, String> { 245 let token = 246 crate::auth::verification_token::generate_channel_update_token(did, channel, identifier); 247 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 248 249 if channel == "email" { 250 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 251 let handle_str = handle.unwrap_or("user"); 252 crate::comms::enqueue_email_update( 253 db, 254 user_id, 255 identifier, 256 handle_str, 257 &formatted_token, 258 &hostname, 259 ) 260 .await 261 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?; 262 } else { 263 sqlx::query!( 264 r#" 265 INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body, metadata) 266 VALUES ($1, $2::comms_channel, 'channel_verification', $3, 'Verify your channel', $4, $5) 267 "#, 268 user_id, 269 channel as _, 270 identifier, 271 format!("Your verification code is: {}", formatted_token), 272 json!({"code": formatted_token}) 273 ) 274 .execute(db) 275 .await 276 .map_err(|e| format!("Failed to enqueue notification: {}", e))?; 277 } 278 279 Ok(token) 280} 281 282pub async fn update_notification_prefs( 283 State(state): State<AppState>, 284 headers: HeaderMap, 285 Json(input): Json<UpdateNotificationPrefsInput>, 286) -> Response { 287 let token = match crate::auth::extract_bearer_token_from_header( 288 headers.get("Authorization").and_then(|h| h.to_str().ok()), 289 ) { 290 Some(t) => t, 291 None => return ( 292 StatusCode::UNAUTHORIZED, 293 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})), 294 ) 295 .into_response(), 296 }; 297 let user = match validate_bearer_token(&state.db, &token).await { 298 Ok(u) => u, 299 Err(_) => { 300 return ( 301 StatusCode::UNAUTHORIZED, 302 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})), 303 ) 304 .into_response(); 305 } 306 }; 307 308 let user_row = 309 match sqlx::query!( 310 "SELECT id, handle, email FROM users WHERE did = $1", 311 user.did 312 ) 313 .fetch_one(&state.db) 314 .await 315 { 316 Ok(row) => row, 317 Err(e) => return ( 318 StatusCode::INTERNAL_SERVER_ERROR, 319 Json( 320 json!({"error": "InternalError", "message": format!("Database error: {}", e)}), 321 ), 322 ) 323 .into_response(), 324 }; 325 326 let user_id = user_row.id; 327 let handle = user_row.handle; 328 let current_email = user_row.email; 329 330 let mut verification_required: Vec<String> = Vec::new(); 331 332 if let Some(ref channel) = input.preferred_channel { 333 let valid_channels = ["email", "discord", "telegram", "signal"]; 334 if !valid_channels.contains(&channel.as_str()) { 335 return ( 336 StatusCode::BAD_REQUEST, 337 Json(json!({ 338 "error": "InvalidRequest", 339 "message": "Invalid channel. Must be one of: email, discord, telegram, signal" 340 })), 341 ) 342 .into_response(); 343 } 344 if let Err(e) = sqlx::query( 345 r#"UPDATE users SET preferred_comms_channel = $1::comms_channel, updated_at = NOW() WHERE did = $2"# 346 ) 347 .bind(channel) 348 .bind(&user.did) 349 .execute(&state.db) 350 .await 351 { 352 return ( 353 StatusCode::INTERNAL_SERVER_ERROR, 354 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 355 ) 356 .into_response(); 357 } 358 info!(did = %user.did, channel = %channel, "Updated preferred notification channel"); 359 } 360 361 if let Some(ref new_email) = input.email { 362 let email_clean = new_email.trim().to_lowercase(); 363 if email_clean.is_empty() { 364 return ( 365 StatusCode::BAD_REQUEST, 366 Json(json!({"error": "InvalidRequest", "message": "Email cannot be empty"})), 367 ) 368 .into_response(); 369 } 370 371 if !crate::api::validation::is_valid_email(&email_clean) { 372 return ( 373 StatusCode::BAD_REQUEST, 374 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 375 ) 376 .into_response(); 377 } 378 379 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) { 380 info!(did = %user.did, "Email unchanged, skipping"); 381 } else { 382 let exists = sqlx::query!( 383 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2", 384 email_clean, 385 user_id 386 ) 387 .fetch_optional(&state.db) 388 .await; 389 390 if let Ok(Some(_)) = exists { 391 return ( 392 StatusCode::BAD_REQUEST, 393 Json(json!({"error": "EmailTaken", "message": "Email already in use"})), 394 ) 395 .into_response(); 396 } 397 398 if let Err(e) = request_channel_verification( 399 &state.db, 400 user_id, 401 &user.did, 402 "email", 403 &email_clean, 404 Some(&handle), 405 ) 406 .await 407 { 408 return ( 409 StatusCode::INTERNAL_SERVER_ERROR, 410 Json(json!({"error": "InternalError", "message": e})), 411 ) 412 .into_response(); 413 } 414 verification_required.push("email".to_string()); 415 info!(did = %user.did, "Requested email verification"); 416 } 417 } 418 419 if let Some(ref discord_id) = input.discord_id { 420 if discord_id.is_empty() { 421 if let Err(e) = sqlx::query!( 422 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1", 423 user_id 424 ) 425 .execute(&state.db) 426 .await 427 { 428 return ( 429 StatusCode::INTERNAL_SERVER_ERROR, 430 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 431 ) 432 .into_response(); 433 } 434 info!(did = %user.did, "Cleared Discord ID"); 435 } else { 436 if let Err(e) = request_channel_verification( 437 &state.db, user_id, &user.did, "discord", discord_id, None, 438 ) 439 .await 440 { 441 return ( 442 StatusCode::INTERNAL_SERVER_ERROR, 443 Json(json!({"error": "InternalError", "message": e})), 444 ) 445 .into_response(); 446 } 447 verification_required.push("discord".to_string()); 448 info!(did = %user.did, "Requested Discord verification"); 449 } 450 } 451 452 if let Some(ref telegram) = input.telegram_username { 453 let telegram_clean = telegram.trim_start_matches('@'); 454 if telegram_clean.is_empty() { 455 if let Err(e) = sqlx::query!( 456 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1", 457 user_id 458 ) 459 .execute(&state.db) 460 .await 461 { 462 return ( 463 StatusCode::INTERNAL_SERVER_ERROR, 464 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 465 ) 466 .into_response(); 467 } 468 info!(did = %user.did, "Cleared Telegram username"); 469 } else { 470 if let Err(e) = request_channel_verification( 471 &state.db, 472 user_id, 473 &user.did, 474 "telegram", 475 telegram_clean, 476 None, 477 ) 478 .await 479 { 480 return ( 481 StatusCode::INTERNAL_SERVER_ERROR, 482 Json(json!({"error": "InternalError", "message": e})), 483 ) 484 .into_response(); 485 } 486 verification_required.push("telegram".to_string()); 487 info!(did = %user.did, "Requested Telegram verification"); 488 } 489 } 490 491 if let Some(ref signal) = input.signal_number { 492 if signal.is_empty() { 493 if let Err(e) = sqlx::query!( 494 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1", 495 user_id 496 ) 497 .execute(&state.db) 498 .await 499 { 500 return ( 501 StatusCode::INTERNAL_SERVER_ERROR, 502 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 503 ) 504 .into_response(); 505 } 506 info!(did = %user.did, "Cleared Signal number"); 507 } else { 508 if let Err(e) = 509 request_channel_verification(&state.db, user_id, &user.did, "signal", signal, None) 510 .await 511 { 512 return ( 513 StatusCode::INTERNAL_SERVER_ERROR, 514 Json(json!({"error": "InternalError", "message": e})), 515 ) 516 .into_response(); 517 } 518 verification_required.push("signal".to_string()); 519 info!(did = %user.did, "Requested Signal verification"); 520 } 521 } 522 523 Json(UpdateNotificationPrefsResponse { 524 success: true, 525 verification_required, 526 }) 527 .into_response() 528}