this repo has no description
1use crate::api::error::ApiError; 2use crate::auth::validate_bearer_token; 3use crate::state::AppState; 4use axum::{ 5 Json, 6 extract::State, 7 http::HeaderMap, 8 response::{IntoResponse, Response}, 9}; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use sqlx::Row; 13use tracing::info; 14 15#[derive(Serialize)] 16#[serde(rename_all = "camelCase")] 17pub struct NotificationPrefsResponse { 18 pub preferred_channel: String, 19 pub email: String, 20 pub discord_id: Option<String>, 21 pub discord_verified: bool, 22 pub telegram_username: Option<String>, 23 pub telegram_verified: bool, 24 pub signal_number: Option<String>, 25 pub signal_verified: bool, 26} 27 28pub async fn get_notification_prefs(State(state): State<AppState>, headers: HeaderMap) -> Response { 29 let token = match crate::auth::extract_bearer_token_from_header( 30 headers.get("Authorization").and_then(|h| h.to_str().ok()), 31 ) { 32 Some(t) => t, 33 None => return ApiError::AuthenticationRequired.into_response(), 34 }; 35 let user = match validate_bearer_token(&state.db, &token).await { 36 Ok(u) => u, 37 Err(_) => { 38 return ApiError::AuthenticationFailed(None).into_response(); 39 } 40 }; 41 let row = 42 match sqlx::query( 43 r#" 44 SELECT 45 email, 46 preferred_comms_channel::text as channel, 47 discord_id, 48 discord_verified, 49 telegram_username, 50 telegram_verified, 51 signal_number, 52 signal_verified 53 FROM users 54 WHERE did = $1 55 "#, 56 ) 57 .bind(&user.did) 58 .fetch_one(&state.db) 59 .await 60 { 61 Ok(r) => r, 62 Err(e) => { 63 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response() 64 } 65 }; 66 let email: String = row.get("email"); 67 let channel: String = row.get("channel"); 68 let discord_id: Option<String> = row.get("discord_id"); 69 let discord_verified: bool = row.get("discord_verified"); 70 let telegram_username: Option<String> = row.get("telegram_username"); 71 let telegram_verified: bool = row.get("telegram_verified"); 72 let signal_number: Option<String> = row.get("signal_number"); 73 let signal_verified: bool = row.get("signal_verified"); 74 Json(NotificationPrefsResponse { 75 preferred_channel: channel, 76 email, 77 discord_id, 78 discord_verified, 79 telegram_username, 80 telegram_verified, 81 signal_number, 82 signal_verified, 83 }) 84 .into_response() 85} 86 87#[derive(Serialize)] 88#[serde(rename_all = "camelCase")] 89pub struct NotificationHistoryEntry { 90 pub created_at: String, 91 pub channel: String, 92 pub comms_type: String, 93 pub status: String, 94 pub subject: Option<String>, 95 pub body: String, 96} 97 98#[derive(Serialize)] 99#[serde(rename_all = "camelCase")] 100pub struct GetNotificationHistoryResponse { 101 pub notifications: Vec<NotificationHistoryEntry>, 102} 103 104pub async fn get_notification_history( 105 State(state): State<AppState>, 106 headers: HeaderMap, 107) -> Response { 108 let token = match crate::auth::extract_bearer_token_from_header( 109 headers.get("Authorization").and_then(|h| h.to_str().ok()), 110 ) { 111 Some(t) => t, 112 None => return ApiError::AuthenticationRequired.into_response(), 113 }; 114 let user = match validate_bearer_token(&state.db, &token).await { 115 Ok(u) => u, 116 Err(_) => { 117 return ApiError::AuthenticationFailed(None).into_response(); 118 } 119 }; 120 121 let user_id: uuid::Uuid = 122 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &user.did) 123 .fetch_one(&state.db) 124 .await 125 { 126 Ok(id) => id, 127 Err(e) => { 128 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response() 129 } 130 }; 131 132 let rows = 133 match sqlx::query!( 134 r#" 135 SELECT 136 created_at, 137 channel as "channel: String", 138 comms_type as "comms_type: String", 139 status as "status: String", 140 subject, 141 body 142 FROM comms_queue 143 WHERE user_id = $1 144 ORDER BY created_at DESC 145 LIMIT 50 146 "#, 147 user_id 148 ) 149 .fetch_all(&state.db) 150 .await 151 { 152 Ok(r) => r, 153 Err(e) => { 154 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response() 155 } 156 }; 157 158 let sensitive_types = [ 159 "email_verification", 160 "password_reset", 161 "email_update", 162 "two_factor_code", 163 "passkey_recovery", 164 "migration_verification", 165 "plc_operation", 166 "channel_verification", 167 "signup_verification", 168 ]; 169 170 let notifications = rows 171 .iter() 172 .map(|row| { 173 let body = if sensitive_types.contains(&row.comms_type.as_str()) { 174 "[Code redacted for security]".to_string() 175 } else { 176 row.body.clone() 177 }; 178 NotificationHistoryEntry { 179 created_at: row.created_at.to_rfc3339(), 180 channel: row.channel.clone(), 181 comms_type: row.comms_type.clone(), 182 status: row.status.clone(), 183 subject: row.subject.clone(), 184 body, 185 } 186 }) 187 .collect(); 188 189 Json(GetNotificationHistoryResponse { notifications }).into_response() 190} 191 192#[derive(Deserialize)] 193#[serde(rename_all = "camelCase")] 194pub struct UpdateNotificationPrefsInput { 195 pub preferred_channel: Option<String>, 196 pub email: Option<String>, 197 pub discord_id: Option<String>, 198 pub telegram_username: Option<String>, 199 pub signal_number: Option<String>, 200} 201 202#[derive(Serialize)] 203#[serde(rename_all = "camelCase")] 204pub struct UpdateNotificationPrefsResponse { 205 pub success: bool, 206 #[serde(skip_serializing_if = "Vec::is_empty")] 207 pub verification_required: Vec<String>, 208} 209 210pub async fn request_channel_verification( 211 db: &sqlx::PgPool, 212 user_id: uuid::Uuid, 213 did: &str, 214 channel: &str, 215 identifier: &str, 216 handle: Option<&str>, 217) -> Result<String, String> { 218 let token = 219 crate::auth::verification_token::generate_channel_update_token(did, channel, identifier); 220 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 221 222 if channel == "email" { 223 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 224 let handle_str = handle.unwrap_or("user"); 225 crate::comms::enqueue_email_update( 226 db, 227 user_id, 228 identifier, 229 handle_str, 230 &formatted_token, 231 &hostname, 232 ) 233 .await 234 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?; 235 } else { 236 sqlx::query!( 237 r#" 238 INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body, metadata) 239 VALUES ($1, $2::comms_channel, 'channel_verification', $3, 'Verify your channel', $4, $5) 240 "#, 241 user_id, 242 channel as _, 243 identifier, 244 format!("Your verification code is: {}", formatted_token), 245 json!({"code": formatted_token}) 246 ) 247 .execute(db) 248 .await 249 .map_err(|e| format!("Failed to enqueue notification: {}", e))?; 250 } 251 252 Ok(token) 253} 254 255pub async fn update_notification_prefs( 256 State(state): State<AppState>, 257 headers: HeaderMap, 258 Json(input): Json<UpdateNotificationPrefsInput>, 259) -> Response { 260 let token = match crate::auth::extract_bearer_token_from_header( 261 headers.get("Authorization").and_then(|h| h.to_str().ok()), 262 ) { 263 Some(t) => t, 264 None => return ApiError::AuthenticationRequired.into_response(), 265 }; 266 let user = match validate_bearer_token(&state.db, &token).await { 267 Ok(u) => u, 268 Err(_) => { 269 return ApiError::AuthenticationFailed(None).into_response(); 270 } 271 }; 272 273 let user_row = 274 match sqlx::query!( 275 "SELECT id, handle, email FROM users WHERE did = $1", 276 &user.did 277 ) 278 .fetch_one(&state.db) 279 .await 280 { 281 Ok(row) => row, 282 Err(e) => { 283 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response() 284 } 285 }; 286 287 let user_id = user_row.id; 288 let handle = user_row.handle; 289 let current_email = user_row.email; 290 291 let mut verification_required: Vec<String> = Vec::new(); 292 293 if let Some(ref channel) = input.preferred_channel { 294 let valid_channels = ["email", "discord", "telegram", "signal"]; 295 if !valid_channels.contains(&channel.as_str()) { 296 return ApiError::InvalidRequest( 297 "Invalid channel. Must be one of: email, discord, telegram, signal".into(), 298 ) 299 .into_response(); 300 } 301 if let Err(e) = sqlx::query( 302 r#"UPDATE users SET preferred_comms_channel = $1::comms_channel, updated_at = NOW() WHERE did = $2"# 303 ) 304 .bind(channel) 305 .bind(&user.did) 306 .execute(&state.db) 307 .await 308 { 309 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 310 } 311 info!(did = %user.did, channel = %channel, "Updated preferred notification channel"); 312 } 313 314 if let Some(ref new_email) = input.email { 315 let email_clean = new_email.trim().to_lowercase(); 316 if email_clean.is_empty() { 317 return ApiError::InvalidRequest("Email cannot be empty".into()).into_response(); 318 } 319 320 if !crate::api::validation::is_valid_email(&email_clean) { 321 return ApiError::InvalidEmail.into_response(); 322 } 323 324 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) { 325 info!(did = %user.did, "Email unchanged, skipping"); 326 } else { 327 let exists = sqlx::query!( 328 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2", 329 email_clean, 330 user_id 331 ) 332 .fetch_optional(&state.db) 333 .await; 334 335 if let Ok(Some(_)) = exists { 336 return ApiError::EmailTaken.into_response(); 337 } 338 339 if let Err(e) = request_channel_verification( 340 &state.db, 341 user_id, 342 &user.did, 343 "email", 344 &email_clean, 345 Some(&handle), 346 ) 347 .await 348 { 349 return ApiError::InternalError(Some(e)).into_response(); 350 } 351 verification_required.push("email".to_string()); 352 info!(did = %user.did, "Requested email verification"); 353 } 354 } 355 356 if let Some(ref discord_id) = input.discord_id { 357 if discord_id.is_empty() { 358 if let Err(e) = sqlx::query!( 359 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1", 360 user_id 361 ) 362 .execute(&state.db) 363 .await 364 { 365 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 366 } 367 info!(did = %user.did, "Cleared Discord ID"); 368 } else { 369 if let Err(e) = request_channel_verification( 370 &state.db, user_id, &user.did, "discord", discord_id, None, 371 ) 372 .await 373 { 374 return ApiError::InternalError(Some(e)).into_response(); 375 } 376 verification_required.push("discord".to_string()); 377 info!(did = %user.did, "Requested Discord verification"); 378 } 379 } 380 381 if let Some(ref telegram) = input.telegram_username { 382 let telegram_clean = telegram.trim_start_matches('@'); 383 if telegram_clean.is_empty() { 384 if let Err(e) = sqlx::query!( 385 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1", 386 user_id 387 ) 388 .execute(&state.db) 389 .await 390 { 391 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 392 } 393 info!(did = %user.did, "Cleared Telegram username"); 394 } else { 395 if let Err(e) = request_channel_verification( 396 &state.db, 397 user_id, 398 &user.did, 399 "telegram", 400 telegram_clean, 401 None, 402 ) 403 .await 404 { 405 return ApiError::InternalError(Some(e)).into_response(); 406 } 407 verification_required.push("telegram".to_string()); 408 info!(did = %user.did, "Requested Telegram verification"); 409 } 410 } 411 412 if let Some(ref signal) = input.signal_number { 413 if signal.is_empty() { 414 if let Err(e) = sqlx::query!( 415 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1", 416 user_id 417 ) 418 .execute(&state.db) 419 .await 420 { 421 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 422 } 423 info!(did = %user.did, "Cleared Signal number"); 424 } else { 425 if let Err(e) = 426 request_channel_verification(&state.db, user_id, &user.did, "signal", signal, None) 427 .await 428 { 429 return ApiError::InternalError(Some(e)).into_response(); 430 } 431 verification_required.push("signal".to_string()); 432 info!(did = %user.did, "Requested Signal verification"); 433 } 434 } 435 436 Json(UpdateNotificationPrefsResponse { 437 success: true, 438 verification_required, 439 }) 440 .into_response() 441}