this repo has no description
1use crate::auth::validate_bearer_token; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::State, 6 http::{HeaderMap, StatusCode}, 7 response::{IntoResponse, Response}, 8}; 9use chrono::{Duration, Utc}; 10use rand::Rng; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use sqlx::Row; 14use tracing::info; 15 16fn generate_verification_code() -> String { 17 rand::thread_rng() 18 .sample_iter(&rand::distributions::Uniform::new(0, 10)) 19 .take(6) 20 .map(|x| x.to_string()) 21 .collect() 22} 23 24#[derive(Serialize)] 25#[serde(rename_all = "camelCase")] 26pub struct NotificationPrefsResponse { 27 pub preferred_channel: String, 28 pub email: String, 29 pub discord_id: Option<String>, 30 pub discord_verified: bool, 31 pub telegram_username: Option<String>, 32 pub telegram_verified: bool, 33 pub signal_number: Option<String>, 34 pub signal_verified: bool, 35} 36 37pub async fn get_notification_prefs(State(state): State<AppState>, headers: HeaderMap) -> Response { 38 let token = match crate::auth::extract_bearer_token_from_header( 39 headers.get("Authorization").and_then(|h| h.to_str().ok()), 40 ) { 41 Some(t) => t, 42 None => return ( 43 StatusCode::UNAUTHORIZED, 44 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})), 45 ) 46 .into_response(), 47 }; 48 let user = match validate_bearer_token(&state.db, &token).await { 49 Ok(u) => u, 50 Err(_) => { 51 return ( 52 StatusCode::UNAUTHORIZED, 53 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})), 54 ) 55 .into_response(); 56 } 57 }; 58 let row = 59 match sqlx::query( 60 r#" 61 SELECT 62 email, 63 preferred_comms_channel::text as channel, 64 discord_id, 65 discord_verified, 66 telegram_username, 67 telegram_verified, 68 signal_number, 69 signal_verified 70 FROM users 71 WHERE did = $1 72 "#, 73 ) 74 .bind(&user.did) 75 .fetch_one(&state.db) 76 .await 77 { 78 Ok(r) => r, 79 Err(e) => return ( 80 StatusCode::INTERNAL_SERVER_ERROR, 81 Json( 82 json!({"error": "InternalError", "message": format!("Database error: {}", e)}), 83 ), 84 ) 85 .into_response(), 86 }; 87 let email: String = row.get("email"); 88 let channel: String = row.get("channel"); 89 let discord_id: Option<String> = row.get("discord_id"); 90 let discord_verified: bool = row.get("discord_verified"); 91 let telegram_username: Option<String> = row.get("telegram_username"); 92 let telegram_verified: bool = row.get("telegram_verified"); 93 let signal_number: Option<String> = row.get("signal_number"); 94 let signal_verified: bool = row.get("signal_verified"); 95 Json(NotificationPrefsResponse { 96 preferred_channel: channel, 97 email, 98 discord_id, 99 discord_verified, 100 telegram_username, 101 telegram_verified, 102 signal_number, 103 signal_verified, 104 }) 105 .into_response() 106} 107 108#[derive(Serialize)] 109#[serde(rename_all = "camelCase")] 110pub struct NotificationHistoryEntry { 111 pub created_at: String, 112 pub channel: String, 113 pub comms_type: String, 114 pub status: String, 115 pub subject: Option<String>, 116 pub body: String, 117} 118 119#[derive(Serialize)] 120#[serde(rename_all = "camelCase")] 121pub struct GetNotificationHistoryResponse { 122 pub notifications: Vec<NotificationHistoryEntry>, 123} 124 125pub async fn get_notification_history( 126 State(state): State<AppState>, 127 headers: HeaderMap, 128) -> Response { 129 let token = match crate::auth::extract_bearer_token_from_header( 130 headers.get("Authorization").and_then(|h| h.to_str().ok()), 131 ) { 132 Some(t) => t, 133 None => return ( 134 StatusCode::UNAUTHORIZED, 135 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})), 136 ) 137 .into_response(), 138 }; 139 let user = match validate_bearer_token(&state.db, &token).await { 140 Ok(u) => u, 141 Err(_) => { 142 return ( 143 StatusCode::UNAUTHORIZED, 144 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})), 145 ) 146 .into_response(); 147 } 148 }; 149 150 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", user.did) 151 .fetch_one(&state.db) 152 .await 153 { 154 Ok(id) => id, 155 Err(e) => return ( 156 StatusCode::INTERNAL_SERVER_ERROR, 157 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 158 ) 159 .into_response(), 160 }; 161 162 let rows = match sqlx::query!( 163 r#" 164 SELECT 165 created_at, 166 channel as "channel: String", 167 comms_type as "comms_type: String", 168 status as "status: String", 169 subject, 170 body 171 FROM comms_queue 172 WHERE user_id = $1 173 ORDER BY created_at DESC 174 LIMIT 50 175 "#, 176 user_id 177 ) 178 .fetch_all(&state.db) 179 .await 180 { 181 Ok(r) => r, 182 Err(e) => return ( 183 StatusCode::INTERNAL_SERVER_ERROR, 184 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 185 ) 186 .into_response(), 187 }; 188 189 let notifications = rows.iter().map(|row| { 190 NotificationHistoryEntry { 191 created_at: row.created_at.to_rfc3339(), 192 channel: row.channel.clone(), 193 comms_type: row.comms_type.clone(), 194 status: row.status.clone(), 195 subject: row.subject.clone(), 196 body: row.body.clone(), 197 } 198 }).collect(); 199 200 Json(GetNotificationHistoryResponse { notifications }).into_response() 201} 202 203#[derive(Deserialize)] 204#[serde(rename_all = "camelCase")] 205pub struct UpdateNotificationPrefsInput { 206 pub preferred_channel: Option<String>, 207 pub email: Option<String>, 208 pub discord_id: Option<String>, 209 pub telegram_username: Option<String>, 210 pub signal_number: Option<String>, 211} 212 213#[derive(Serialize)] 214#[serde(rename_all = "camelCase")] 215pub struct UpdateNotificationPrefsResponse { 216 pub success: bool, 217 #[serde(skip_serializing_if = "Vec::is_empty")] 218 pub verification_required: Vec<String>, 219} 220 221pub async fn request_channel_verification( 222 db: &sqlx::PgPool, 223 user_id: uuid::Uuid, 224 channel: &str, 225 identifier: &str, 226 handle: Option<&str>, 227) -> Result<String, String> { 228 let code = generate_verification_code(); 229 let expires_at = Utc::now() + Duration::minutes(10); 230 231 sqlx::query!( 232 r#" 233 INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) 234 VALUES ($1, $2::comms_channel, $3, $4, $5) 235 ON CONFLICT (user_id, channel) DO UPDATE 236 SET code = $3, pending_identifier = $4, expires_at = $5, created_at = NOW() 237 "#, 238 user_id, 239 channel as _, 240 code, 241 identifier, 242 expires_at 243 ) 244 .execute(db) 245 .await 246 .map_err(|e| format!("Database error: {}", e))?; 247 248 if channel == "email" { 249 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 250 let handle_str = handle.unwrap_or("user"); 251 crate::comms::enqueue_email_update(db, user_id, identifier, handle_str, &code, &hostname) 252 .await 253 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?; 254 } else { 255 sqlx::query!( 256 r#" 257 INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body, metadata) 258 VALUES ($1, $2::comms_channel, 'channel_verification', $3, 'Verify your channel', $4, $5) 259 "#, 260 user_id, 261 channel as _, 262 identifier, 263 format!("Your verification code is: {}", code), 264 json!({"code": code}) 265 ) 266 .execute(db) 267 .await 268 .map_err(|e| format!("Failed to enqueue notification: {}", e))?; 269 } 270 271 Ok(code) 272} 273 274pub async fn update_notification_prefs( 275 State(state): State<AppState>, 276 headers: HeaderMap, 277 Json(input): Json<UpdateNotificationPrefsInput>, 278) -> Response { 279 let token = match crate::auth::extract_bearer_token_from_header( 280 headers.get("Authorization").and_then(|h| h.to_str().ok()), 281 ) { 282 Some(t) => t, 283 None => return ( 284 StatusCode::UNAUTHORIZED, 285 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})), 286 ) 287 .into_response(), 288 }; 289 let user = match validate_bearer_token(&state.db, &token).await { 290 Ok(u) => u, 291 Err(_) => { 292 return ( 293 StatusCode::UNAUTHORIZED, 294 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})), 295 ) 296 .into_response(); 297 } 298 }; 299 300 let user_row = match sqlx::query!( 301 "SELECT id, handle, email FROM users WHERE did = $1", 302 user.did 303 ) 304 .fetch_one(&state.db) 305 .await 306 { 307 Ok(row) => row, 308 Err(e) => return ( 309 StatusCode::INTERNAL_SERVER_ERROR, 310 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 311 ) 312 .into_response(), 313 }; 314 315 let user_id = user_row.id; 316 let handle = user_row.handle; 317 let current_email = user_row.email; 318 319 let mut verification_required: Vec<String> = Vec::new(); 320 321 if let Some(ref channel) = input.preferred_channel { 322 let valid_channels = ["email", "discord", "telegram", "signal"]; 323 if !valid_channels.contains(&channel.as_str()) { 324 return ( 325 StatusCode::BAD_REQUEST, 326 Json(json!({ 327 "error": "InvalidRequest", 328 "message": "Invalid channel. Must be one of: email, discord, telegram, signal" 329 })), 330 ) 331 .into_response(); 332 } 333 if let Err(e) = sqlx::query( 334 r#"UPDATE users SET preferred_comms_channel = $1::comms_channel, updated_at = NOW() WHERE did = $2"# 335 ) 336 .bind(channel) 337 .bind(&user.did) 338 .execute(&state.db) 339 .await 340 { 341 return ( 342 StatusCode::INTERNAL_SERVER_ERROR, 343 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 344 ) 345 .into_response(); 346 } 347 info!(did = %user.did, channel = %channel, "Updated preferred notification channel"); 348 } 349 350 if let Some(ref new_email) = input.email { 351 let email_clean = new_email.trim().to_lowercase(); 352 if email_clean.is_empty() { 353 return ( 354 StatusCode::BAD_REQUEST, 355 Json(json!({"error": "InvalidRequest", "message": "Email cannot be empty"})), 356 ) 357 .into_response(); 358 } 359 360 if !crate::api::validation::is_valid_email(&email_clean) { 361 return ( 362 StatusCode::BAD_REQUEST, 363 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 364 ) 365 .into_response(); 366 } 367 368 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) { 369 info!(did = %user.did, "Email unchanged, skipping"); 370 } else { 371 let exists = sqlx::query!( 372 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2", 373 email_clean, 374 user_id 375 ) 376 .fetch_optional(&state.db) 377 .await; 378 379 if let Ok(Some(_)) = exists { 380 return ( 381 StatusCode::BAD_REQUEST, 382 Json(json!({"error": "EmailTaken", "message": "Email already in use"})), 383 ) 384 .into_response(); 385 } 386 387 if let Err(e) = request_channel_verification(&state.db, user_id, "email", &email_clean, Some(&handle)).await { 388 return ( 389 StatusCode::INTERNAL_SERVER_ERROR, 390 Json(json!({"error": "InternalError", "message": e})), 391 ) 392 .into_response(); 393 } 394 verification_required.push("email".to_string()); 395 info!(did = %user.did, "Requested email verification"); 396 } 397 } 398 399 if let Some(ref discord_id) = input.discord_id { 400 if discord_id.is_empty() { 401 if let Err(e) = sqlx::query!( 402 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1", 403 user_id 404 ) 405 .execute(&state.db) 406 .await 407 { 408 return ( 409 StatusCode::INTERNAL_SERVER_ERROR, 410 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 411 ) 412 .into_response(); 413 } 414 let _ = sqlx::query!( 415 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'discord'", 416 user_id 417 ) 418 .execute(&state.db) 419 .await; 420 info!(did = %user.did, "Cleared Discord ID"); 421 } else { 422 if let Err(e) = request_channel_verification(&state.db, user_id, "discord", discord_id, None).await { 423 return ( 424 StatusCode::INTERNAL_SERVER_ERROR, 425 Json(json!({"error": "InternalError", "message": e})), 426 ) 427 .into_response(); 428 } 429 verification_required.push("discord".to_string()); 430 info!(did = %user.did, "Requested Discord verification"); 431 } 432 } 433 434 if let Some(ref telegram) = input.telegram_username { 435 let telegram_clean = telegram.trim_start_matches('@'); 436 if telegram_clean.is_empty() { 437 if let Err(e) = sqlx::query!( 438 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1", 439 user_id 440 ) 441 .execute(&state.db) 442 .await 443 { 444 return ( 445 StatusCode::INTERNAL_SERVER_ERROR, 446 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 447 ) 448 .into_response(); 449 } 450 let _ = sqlx::query!( 451 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'telegram'", 452 user_id 453 ) 454 .execute(&state.db) 455 .await; 456 info!(did = %user.did, "Cleared Telegram username"); 457 } else { 458 if let Err(e) = request_channel_verification(&state.db, user_id, "telegram", telegram_clean, None).await { 459 return ( 460 StatusCode::INTERNAL_SERVER_ERROR, 461 Json(json!({"error": "InternalError", "message": e})), 462 ) 463 .into_response(); 464 } 465 verification_required.push("telegram".to_string()); 466 info!(did = %user.did, "Requested Telegram verification"); 467 } 468 } 469 470 if let Some(ref signal) = input.signal_number { 471 if signal.is_empty() { 472 if let Err(e) = sqlx::query!( 473 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1", 474 user_id 475 ) 476 .execute(&state.db) 477 .await 478 { 479 return ( 480 StatusCode::INTERNAL_SERVER_ERROR, 481 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 482 ) 483 .into_response(); 484 } 485 let _ = sqlx::query!( 486 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'signal'", 487 user_id 488 ) 489 .execute(&state.db) 490 .await; 491 info!(did = %user.did, "Cleared Signal number"); 492 } else { 493 if let Err(e) = request_channel_verification(&state.db, user_id, "signal", signal, None).await { 494 return ( 495 StatusCode::INTERNAL_SERVER_ERROR, 496 Json(json!({"error": "InternalError", "message": e})), 497 ) 498 .into_response(); 499 } 500 verification_required.push("signal".to_string()); 501 info!(did = %user.did, "Requested Signal verification"); 502 } 503 } 504 505 Json(UpdateNotificationPrefsResponse { 506 success: true, 507 verification_required, 508 }).into_response() 509}