this repo has no description
1use crate::api::error::ApiError; 2use crate::auth::validate_bearer_token; 3use crate::state::AppState; 4use axum::{ 5 Json, 6 extract::State, 7 http::HeaderMap, 8 response::{IntoResponse, Response}, 9}; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use sqlx::Row; 13use tracing::info; 14 15#[derive(Serialize)] 16#[serde(rename_all = "camelCase")] 17pub struct NotificationPrefsResponse { 18 pub preferred_channel: String, 19 pub email: String, 20 pub discord_id: Option<String>, 21 pub discord_verified: bool, 22 pub telegram_username: Option<String>, 23 pub telegram_verified: bool, 24 pub signal_number: Option<String>, 25 pub signal_verified: bool, 26} 27 28pub async fn get_notification_prefs(State(state): State<AppState>, headers: HeaderMap) -> Response { 29 let token = match crate::auth::extract_bearer_token_from_header( 30 headers.get("Authorization").and_then(|h| h.to_str().ok()), 31 ) { 32 Some(t) => t, 33 None => return ApiError::AuthenticationRequired.into_response(), 34 }; 35 let user = match validate_bearer_token(&state.db, &token).await { 36 Ok(u) => u, 37 Err(_) => { 38 return ApiError::AuthenticationFailed(None).into_response(); 39 } 40 }; 41 let row = match sqlx::query( 42 r#" 43 SELECT 44 email, 45 preferred_comms_channel::text as channel, 46 discord_id, 47 discord_verified, 48 telegram_username, 49 telegram_verified, 50 signal_number, 51 signal_verified 52 FROM users 53 WHERE did = $1 54 "#, 55 ) 56 .bind(&user.did) 57 .fetch_one(&state.db) 58 .await 59 { 60 Ok(r) => r, 61 Err(e) => { 62 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 63 } 64 }; 65 let email: String = row.get("email"); 66 let channel: String = row.get("channel"); 67 let discord_id: Option<String> = row.get("discord_id"); 68 let discord_verified: bool = row.get("discord_verified"); 69 let telegram_username: Option<String> = row.get("telegram_username"); 70 let telegram_verified: bool = row.get("telegram_verified"); 71 let signal_number: Option<String> = row.get("signal_number"); 72 let signal_verified: bool = row.get("signal_verified"); 73 Json(NotificationPrefsResponse { 74 preferred_channel: channel, 75 email, 76 discord_id, 77 discord_verified, 78 telegram_username, 79 telegram_verified, 80 signal_number, 81 signal_verified, 82 }) 83 .into_response() 84} 85 86#[derive(Serialize)] 87#[serde(rename_all = "camelCase")] 88pub struct NotificationHistoryEntry { 89 pub created_at: String, 90 pub channel: String, 91 pub comms_type: String, 92 pub status: String, 93 pub subject: Option<String>, 94 pub body: String, 95} 96 97#[derive(Serialize)] 98#[serde(rename_all = "camelCase")] 99pub struct GetNotificationHistoryResponse { 100 pub notifications: Vec<NotificationHistoryEntry>, 101} 102 103pub async fn get_notification_history( 104 State(state): State<AppState>, 105 headers: HeaderMap, 106) -> Response { 107 let token = match crate::auth::extract_bearer_token_from_header( 108 headers.get("Authorization").and_then(|h| h.to_str().ok()), 109 ) { 110 Some(t) => t, 111 None => return ApiError::AuthenticationRequired.into_response(), 112 }; 113 let user = match validate_bearer_token(&state.db, &token).await { 114 Ok(u) => u, 115 Err(_) => { 116 return ApiError::AuthenticationFailed(None).into_response(); 117 } 118 }; 119 120 let user_id: uuid::Uuid = 121 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &user.did) 122 .fetch_one(&state.db) 123 .await 124 { 125 Ok(id) => id, 126 Err(e) => { 127 return ApiError::InternalError(Some(format!("Database error: {}", e))) 128 .into_response(); 129 } 130 }; 131 132 let rows = match sqlx::query!( 133 r#" 134 SELECT 135 created_at, 136 channel as "channel: String", 137 comms_type as "comms_type: String", 138 status as "status: String", 139 subject, 140 body 141 FROM comms_queue 142 WHERE user_id = $1 143 ORDER BY created_at DESC 144 LIMIT 50 145 "#, 146 user_id 147 ) 148 .fetch_all(&state.db) 149 .await 150 { 151 Ok(r) => r, 152 Err(e) => { 153 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 154 } 155 }; 156 157 let sensitive_types = [ 158 "email_verification", 159 "password_reset", 160 "email_update", 161 "two_factor_code", 162 "passkey_recovery", 163 "migration_verification", 164 "plc_operation", 165 "channel_verification", 166 "signup_verification", 167 ]; 168 169 let notifications = rows 170 .iter() 171 .map(|row| { 172 let body = if sensitive_types.contains(&row.comms_type.as_str()) { 173 "[Code redacted for security]".to_string() 174 } else { 175 row.body.clone() 176 }; 177 NotificationHistoryEntry { 178 created_at: row.created_at.to_rfc3339(), 179 channel: row.channel.clone(), 180 comms_type: row.comms_type.clone(), 181 status: row.status.clone(), 182 subject: row.subject.clone(), 183 body, 184 } 185 }) 186 .collect(); 187 188 Json(GetNotificationHistoryResponse { notifications }).into_response() 189} 190 191#[derive(Deserialize)] 192#[serde(rename_all = "camelCase")] 193pub struct UpdateNotificationPrefsInput { 194 pub preferred_channel: Option<String>, 195 pub email: Option<String>, 196 pub discord_id: Option<String>, 197 pub telegram_username: Option<String>, 198 pub signal_number: Option<String>, 199} 200 201#[derive(Serialize)] 202#[serde(rename_all = "camelCase")] 203pub struct UpdateNotificationPrefsResponse { 204 pub success: bool, 205 #[serde(skip_serializing_if = "Vec::is_empty")] 206 pub verification_required: Vec<String>, 207} 208 209pub async fn request_channel_verification( 210 db: &sqlx::PgPool, 211 user_id: uuid::Uuid, 212 did: &str, 213 channel: &str, 214 identifier: &str, 215 handle: Option<&str>, 216) -> Result<String, String> { 217 let token = 218 crate::auth::verification_token::generate_channel_update_token(did, channel, identifier); 219 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 220 221 if channel == "email" { 222 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 223 let handle_str = handle.unwrap_or("user"); 224 crate::comms::enqueue_email_update( 225 db, 226 user_id, 227 identifier, 228 handle_str, 229 &formatted_token, 230 &hostname, 231 ) 232 .await 233 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?; 234 } else { 235 sqlx::query!( 236 r#" 237 INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body, metadata) 238 VALUES ($1, $2::comms_channel, 'channel_verification', $3, 'Verify your channel', $4, $5) 239 "#, 240 user_id, 241 channel as _, 242 identifier, 243 format!("Your verification code is: {}", formatted_token), 244 json!({"code": formatted_token}) 245 ) 246 .execute(db) 247 .await 248 .map_err(|e| format!("Failed to enqueue notification: {}", e))?; 249 } 250 251 Ok(token) 252} 253 254pub async fn update_notification_prefs( 255 State(state): State<AppState>, 256 headers: HeaderMap, 257 Json(input): Json<UpdateNotificationPrefsInput>, 258) -> Response { 259 let token = match crate::auth::extract_bearer_token_from_header( 260 headers.get("Authorization").and_then(|h| h.to_str().ok()), 261 ) { 262 Some(t) => t, 263 None => return ApiError::AuthenticationRequired.into_response(), 264 }; 265 let user = match validate_bearer_token(&state.db, &token).await { 266 Ok(u) => u, 267 Err(_) => { 268 return ApiError::AuthenticationFailed(None).into_response(); 269 } 270 }; 271 272 let user_row = match sqlx::query!( 273 "SELECT id, handle, email FROM users WHERE did = $1", 274 &user.did 275 ) 276 .fetch_one(&state.db) 277 .await 278 { 279 Ok(row) => row, 280 Err(e) => { 281 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 282 } 283 }; 284 285 let user_id = user_row.id; 286 let handle = user_row.handle; 287 let current_email = user_row.email; 288 289 let mut verification_required: Vec<String> = Vec::new(); 290 291 if let Some(ref channel) = input.preferred_channel { 292 let valid_channels = ["email", "discord", "telegram", "signal"]; 293 if !valid_channels.contains(&channel.as_str()) { 294 return ApiError::InvalidRequest( 295 "Invalid channel. Must be one of: email, discord, telegram, signal".into(), 296 ) 297 .into_response(); 298 } 299 if let Err(e) = sqlx::query( 300 r#"UPDATE users SET preferred_comms_channel = $1::comms_channel, updated_at = NOW() WHERE did = $2"# 301 ) 302 .bind(channel) 303 .bind(&user.did) 304 .execute(&state.db) 305 .await 306 { 307 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 308 } 309 info!(did = %user.did, channel = %channel, "Updated preferred notification channel"); 310 } 311 312 if let Some(ref new_email) = input.email { 313 let email_clean = new_email.trim().to_lowercase(); 314 if email_clean.is_empty() { 315 return ApiError::InvalidRequest("Email cannot be empty".into()).into_response(); 316 } 317 318 if !crate::api::validation::is_valid_email(&email_clean) { 319 return ApiError::InvalidEmail.into_response(); 320 } 321 322 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) { 323 info!(did = %user.did, "Email unchanged, skipping"); 324 } else { 325 let exists = sqlx::query!( 326 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2", 327 email_clean, 328 user_id 329 ) 330 .fetch_optional(&state.db) 331 .await; 332 333 if let Ok(Some(_)) = exists { 334 return ApiError::EmailTaken.into_response(); 335 } 336 337 if let Err(e) = request_channel_verification( 338 &state.db, 339 user_id, 340 &user.did, 341 "email", 342 &email_clean, 343 Some(&handle), 344 ) 345 .await 346 { 347 return ApiError::InternalError(Some(e)).into_response(); 348 } 349 verification_required.push("email".to_string()); 350 info!(did = %user.did, "Requested email verification"); 351 } 352 } 353 354 if let Some(ref discord_id) = input.discord_id { 355 if discord_id.is_empty() { 356 if let Err(e) = sqlx::query!( 357 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1", 358 user_id 359 ) 360 .execute(&state.db) 361 .await 362 { 363 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 364 } 365 info!(did = %user.did, "Cleared Discord ID"); 366 } else { 367 if let Err(e) = request_channel_verification( 368 &state.db, user_id, &user.did, "discord", discord_id, None, 369 ) 370 .await 371 { 372 return ApiError::InternalError(Some(e)).into_response(); 373 } 374 verification_required.push("discord".to_string()); 375 info!(did = %user.did, "Requested Discord verification"); 376 } 377 } 378 379 if let Some(ref telegram) = input.telegram_username { 380 let telegram_clean = telegram.trim_start_matches('@'); 381 if telegram_clean.is_empty() { 382 if let Err(e) = sqlx::query!( 383 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1", 384 user_id 385 ) 386 .execute(&state.db) 387 .await 388 { 389 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 390 } 391 info!(did = %user.did, "Cleared Telegram username"); 392 } else { 393 if let Err(e) = request_channel_verification( 394 &state.db, 395 user_id, 396 &user.did, 397 "telegram", 398 telegram_clean, 399 None, 400 ) 401 .await 402 { 403 return ApiError::InternalError(Some(e)).into_response(); 404 } 405 verification_required.push("telegram".to_string()); 406 info!(did = %user.did, "Requested Telegram verification"); 407 } 408 } 409 410 if let Some(ref signal) = input.signal_number { 411 if signal.is_empty() { 412 if let Err(e) = sqlx::query!( 413 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1", 414 user_id 415 ) 416 .execute(&state.db) 417 .await 418 { 419 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 420 } 421 info!(did = %user.did, "Cleared Signal number"); 422 } else { 423 if let Err(e) = 424 request_channel_verification(&state.db, user_id, &user.did, "signal", signal, None) 425 .await 426 { 427 return ApiError::InternalError(Some(e)).into_response(); 428 } 429 verification_required.push("signal".to_string()); 430 info!(did = %user.did, "Requested Signal verification"); 431 } 432 } 433 434 Json(UpdateNotificationPrefsResponse { 435 success: true, 436 verification_required, 437 }) 438 .into_response() 439}