this repo has no description
1use crate::auth::validate_bearer_token; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::State, 6 http::{HeaderMap, StatusCode}, 7 response::{IntoResponse, Response}, 8}; 9use serde::{Deserialize, Serialize}; 10use serde_json::json; 11use sqlx::Row; 12use tracing::info; 13 14#[derive(Serialize)] 15#[serde(rename_all = "camelCase")] 16pub struct NotificationPrefsResponse { 17 pub preferred_channel: String, 18 pub email: String, 19 pub discord_id: Option<String>, 20 pub discord_verified: bool, 21 pub telegram_username: Option<String>, 22 pub telegram_verified: bool, 23 pub signal_number: Option<String>, 24 pub signal_verified: bool, 25} 26 27pub async fn get_notification_prefs(State(state): State<AppState>, headers: HeaderMap) -> Response { 28 let token = match crate::auth::extract_bearer_token_from_header( 29 headers.get("Authorization").and_then(|h| h.to_str().ok()), 30 ) { 31 Some(t) => t, 32 None => return ( 33 StatusCode::UNAUTHORIZED, 34 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})), 35 ) 36 .into_response(), 37 }; 38 let user = match validate_bearer_token(&state.db, &token).await { 39 Ok(u) => u, 40 Err(_) => { 41 return ( 42 StatusCode::UNAUTHORIZED, 43 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})), 44 ) 45 .into_response(); 46 } 47 }; 48 let row = 49 match sqlx::query( 50 r#" 51 SELECT 52 email, 53 preferred_comms_channel::text as channel, 54 discord_id, 55 discord_verified, 56 telegram_username, 57 telegram_verified, 58 signal_number, 59 signal_verified 60 FROM users 61 WHERE did = $1 62 "#, 63 ) 64 .bind(&user.did) 65 .fetch_one(&state.db) 66 .await 67 { 68 Ok(r) => r, 69 Err(e) => return ( 70 StatusCode::INTERNAL_SERVER_ERROR, 71 Json( 72 json!({"error": "InternalError", "message": format!("Database error: {}", e)}), 73 ), 74 ) 75 .into_response(), 76 }; 77 let email: String = row.get("email"); 78 let channel: String = row.get("channel"); 79 let discord_id: Option<String> = row.get("discord_id"); 80 let discord_verified: bool = row.get("discord_verified"); 81 let telegram_username: Option<String> = row.get("telegram_username"); 82 let telegram_verified: bool = row.get("telegram_verified"); 83 let signal_number: Option<String> = row.get("signal_number"); 84 let signal_verified: bool = row.get("signal_verified"); 85 Json(NotificationPrefsResponse { 86 preferred_channel: channel, 87 email, 88 discord_id, 89 discord_verified, 90 telegram_username, 91 telegram_verified, 92 signal_number, 93 signal_verified, 94 }) 95 .into_response() 96} 97 98#[derive(Serialize)] 99#[serde(rename_all = "camelCase")] 100pub struct NotificationHistoryEntry { 101 pub created_at: String, 102 pub channel: String, 103 pub comms_type: String, 104 pub status: String, 105 pub subject: Option<String>, 106 pub body: String, 107} 108 109#[derive(Serialize)] 110#[serde(rename_all = "camelCase")] 111pub struct GetNotificationHistoryResponse { 112 pub notifications: Vec<NotificationHistoryEntry>, 113} 114 115pub async fn get_notification_history( 116 State(state): State<AppState>, 117 headers: HeaderMap, 118) -> Response { 119 let token = match crate::auth::extract_bearer_token_from_header( 120 headers.get("Authorization").and_then(|h| h.to_str().ok()), 121 ) { 122 Some(t) => t, 123 None => return ( 124 StatusCode::UNAUTHORIZED, 125 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})), 126 ) 127 .into_response(), 128 }; 129 let user = match validate_bearer_token(&state.db, &token).await { 130 Ok(u) => u, 131 Err(_) => { 132 return ( 133 StatusCode::UNAUTHORIZED, 134 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})), 135 ) 136 .into_response(); 137 } 138 }; 139 140 let user_id: uuid::Uuid = 141 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", user.did) 142 .fetch_one(&state.db) 143 .await 144 { 145 Ok(id) => id, 146 Err(e) => return ( 147 StatusCode::INTERNAL_SERVER_ERROR, 148 Json( 149 json!({"error": "InternalError", "message": format!("Database error: {}", e)}), 150 ), 151 ) 152 .into_response(), 153 }; 154 155 let rows = 156 match sqlx::query!( 157 r#" 158 SELECT 159 created_at, 160 channel as "channel: String", 161 comms_type as "comms_type: String", 162 status as "status: String", 163 subject, 164 body 165 FROM comms_queue 166 WHERE user_id = $1 167 ORDER BY created_at DESC 168 LIMIT 50 169 "#, 170 user_id 171 ) 172 .fetch_all(&state.db) 173 .await 174 { 175 Ok(r) => r, 176 Err(e) => return ( 177 StatusCode::INTERNAL_SERVER_ERROR, 178 Json( 179 json!({"error": "InternalError", "message": format!("Database error: {}", e)}), 180 ), 181 ) 182 .into_response(), 183 }; 184 185 let notifications = rows 186 .iter() 187 .map(|row| NotificationHistoryEntry { 188 created_at: row.created_at.to_rfc3339(), 189 channel: row.channel.clone(), 190 comms_type: row.comms_type.clone(), 191 status: row.status.clone(), 192 subject: row.subject.clone(), 193 body: row.body.clone(), 194 }) 195 .collect(); 196 197 Json(GetNotificationHistoryResponse { notifications }).into_response() 198} 199 200#[derive(Deserialize)] 201#[serde(rename_all = "camelCase")] 202pub struct UpdateNotificationPrefsInput { 203 pub preferred_channel: Option<String>, 204 pub email: Option<String>, 205 pub discord_id: Option<String>, 206 pub telegram_username: Option<String>, 207 pub signal_number: Option<String>, 208} 209 210#[derive(Serialize)] 211#[serde(rename_all = "camelCase")] 212pub struct UpdateNotificationPrefsResponse { 213 pub success: bool, 214 #[serde(skip_serializing_if = "Vec::is_empty")] 215 pub verification_required: Vec<String>, 216} 217 218pub async fn request_channel_verification( 219 db: &sqlx::PgPool, 220 user_id: uuid::Uuid, 221 did: &str, 222 channel: &str, 223 identifier: &str, 224 handle: Option<&str>, 225) -> Result<String, String> { 226 let token = 227 crate::auth::verification_token::generate_channel_update_token(did, channel, identifier); 228 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 229 230 if channel == "email" { 231 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 232 let handle_str = handle.unwrap_or("user"); 233 crate::comms::enqueue_email_update( 234 db, 235 user_id, 236 identifier, 237 handle_str, 238 &formatted_token, 239 &hostname, 240 ) 241 .await 242 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?; 243 } else { 244 sqlx::query!( 245 r#" 246 INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body, metadata) 247 VALUES ($1, $2::comms_channel, 'channel_verification', $3, 'Verify your channel', $4, $5) 248 "#, 249 user_id, 250 channel as _, 251 identifier, 252 format!("Your verification code is: {}", formatted_token), 253 json!({"code": formatted_token}) 254 ) 255 .execute(db) 256 .await 257 .map_err(|e| format!("Failed to enqueue notification: {}", e))?; 258 } 259 260 Ok(token) 261} 262 263pub async fn update_notification_prefs( 264 State(state): State<AppState>, 265 headers: HeaderMap, 266 Json(input): Json<UpdateNotificationPrefsInput>, 267) -> Response { 268 let token = match crate::auth::extract_bearer_token_from_header( 269 headers.get("Authorization").and_then(|h| h.to_str().ok()), 270 ) { 271 Some(t) => t, 272 None => return ( 273 StatusCode::UNAUTHORIZED, 274 Json(json!({"error": "AuthenticationRequired", "message": "Authentication required"})), 275 ) 276 .into_response(), 277 }; 278 let user = match validate_bearer_token(&state.db, &token).await { 279 Ok(u) => u, 280 Err(_) => { 281 return ( 282 StatusCode::UNAUTHORIZED, 283 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"})), 284 ) 285 .into_response(); 286 } 287 }; 288 289 let user_row = 290 match sqlx::query!( 291 "SELECT id, handle, email FROM users WHERE did = $1", 292 user.did 293 ) 294 .fetch_one(&state.db) 295 .await 296 { 297 Ok(row) => row, 298 Err(e) => return ( 299 StatusCode::INTERNAL_SERVER_ERROR, 300 Json( 301 json!({"error": "InternalError", "message": format!("Database error: {}", e)}), 302 ), 303 ) 304 .into_response(), 305 }; 306 307 let user_id = user_row.id; 308 let handle = user_row.handle; 309 let current_email = user_row.email; 310 311 let mut verification_required: Vec<String> = Vec::new(); 312 313 if let Some(ref channel) = input.preferred_channel { 314 let valid_channels = ["email", "discord", "telegram", "signal"]; 315 if !valid_channels.contains(&channel.as_str()) { 316 return ( 317 StatusCode::BAD_REQUEST, 318 Json(json!({ 319 "error": "InvalidRequest", 320 "message": "Invalid channel. Must be one of: email, discord, telegram, signal" 321 })), 322 ) 323 .into_response(); 324 } 325 if let Err(e) = sqlx::query( 326 r#"UPDATE users SET preferred_comms_channel = $1::comms_channel, updated_at = NOW() WHERE did = $2"# 327 ) 328 .bind(channel) 329 .bind(&user.did) 330 .execute(&state.db) 331 .await 332 { 333 return ( 334 StatusCode::INTERNAL_SERVER_ERROR, 335 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 336 ) 337 .into_response(); 338 } 339 info!(did = %user.did, channel = %channel, "Updated preferred notification channel"); 340 } 341 342 if let Some(ref new_email) = input.email { 343 let email_clean = new_email.trim().to_lowercase(); 344 if email_clean.is_empty() { 345 return ( 346 StatusCode::BAD_REQUEST, 347 Json(json!({"error": "InvalidRequest", "message": "Email cannot be empty"})), 348 ) 349 .into_response(); 350 } 351 352 if !crate::api::validation::is_valid_email(&email_clean) { 353 return ( 354 StatusCode::BAD_REQUEST, 355 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 356 ) 357 .into_response(); 358 } 359 360 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) { 361 info!(did = %user.did, "Email unchanged, skipping"); 362 } else { 363 let exists = sqlx::query!( 364 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2", 365 email_clean, 366 user_id 367 ) 368 .fetch_optional(&state.db) 369 .await; 370 371 if let Ok(Some(_)) = exists { 372 return ( 373 StatusCode::BAD_REQUEST, 374 Json(json!({"error": "EmailTaken", "message": "Email already in use"})), 375 ) 376 .into_response(); 377 } 378 379 if let Err(e) = request_channel_verification( 380 &state.db, 381 user_id, 382 &user.did, 383 "email", 384 &email_clean, 385 Some(&handle), 386 ) 387 .await 388 { 389 return ( 390 StatusCode::INTERNAL_SERVER_ERROR, 391 Json(json!({"error": "InternalError", "message": e})), 392 ) 393 .into_response(); 394 } 395 verification_required.push("email".to_string()); 396 info!(did = %user.did, "Requested email verification"); 397 } 398 } 399 400 if let Some(ref discord_id) = input.discord_id { 401 if discord_id.is_empty() { 402 if let Err(e) = sqlx::query!( 403 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1", 404 user_id 405 ) 406 .execute(&state.db) 407 .await 408 { 409 return ( 410 StatusCode::INTERNAL_SERVER_ERROR, 411 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 412 ) 413 .into_response(); 414 } 415 info!(did = %user.did, "Cleared Discord ID"); 416 } else { 417 if let Err(e) = request_channel_verification( 418 &state.db, user_id, &user.did, "discord", discord_id, None, 419 ) 420 .await 421 { 422 return ( 423 StatusCode::INTERNAL_SERVER_ERROR, 424 Json(json!({"error": "InternalError", "message": e})), 425 ) 426 .into_response(); 427 } 428 verification_required.push("discord".to_string()); 429 info!(did = %user.did, "Requested Discord verification"); 430 } 431 } 432 433 if let Some(ref telegram) = input.telegram_username { 434 let telegram_clean = telegram.trim_start_matches('@'); 435 if telegram_clean.is_empty() { 436 if let Err(e) = sqlx::query!( 437 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1", 438 user_id 439 ) 440 .execute(&state.db) 441 .await 442 { 443 return ( 444 StatusCode::INTERNAL_SERVER_ERROR, 445 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 446 ) 447 .into_response(); 448 } 449 info!(did = %user.did, "Cleared Telegram username"); 450 } else { 451 if let Err(e) = request_channel_verification( 452 &state.db, 453 user_id, 454 &user.did, 455 "telegram", 456 telegram_clean, 457 None, 458 ) 459 .await 460 { 461 return ( 462 StatusCode::INTERNAL_SERVER_ERROR, 463 Json(json!({"error": "InternalError", "message": e})), 464 ) 465 .into_response(); 466 } 467 verification_required.push("telegram".to_string()); 468 info!(did = %user.did, "Requested Telegram verification"); 469 } 470 } 471 472 if let Some(ref signal) = input.signal_number { 473 if signal.is_empty() { 474 if let Err(e) = sqlx::query!( 475 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1", 476 user_id 477 ) 478 .execute(&state.db) 479 .await 480 { 481 return ( 482 StatusCode::INTERNAL_SERVER_ERROR, 483 Json(json!({"error": "InternalError", "message": format!("Database error: {}", e)})), 484 ) 485 .into_response(); 486 } 487 info!(did = %user.did, "Cleared Signal number"); 488 } else { 489 if let Err(e) = 490 request_channel_verification(&state.db, user_id, &user.did, "signal", signal, None) 491 .await 492 { 493 return ( 494 StatusCode::INTERNAL_SERVER_ERROR, 495 Json(json!({"error": "InternalError", "message": e})), 496 ) 497 .into_response(); 498 } 499 verification_required.push("signal".to_string()); 500 info!(did = %user.did, "Requested Signal verification"); 501 } 502 } 503 504 Json(UpdateNotificationPrefsResponse { 505 success: true, 506 verification_required, 507 }) 508 .into_response() 509}