this repo has no description
1use crate::api::error::ApiError; 2use crate::auth::BearerAuth; 3use crate::state::AppState; 4use axum::{ 5 Json, 6 extract::State, 7 response::{IntoResponse, Response}, 8}; 9use serde::{Deserialize, Serialize}; 10use serde_json::json; 11use sqlx::Row; 12use tracing::info; 13 14#[derive(Serialize)] 15#[serde(rename_all = "camelCase")] 16pub struct NotificationPrefsResponse { 17 pub preferred_channel: String, 18 pub email: String, 19 pub discord_id: Option<String>, 20 pub discord_verified: bool, 21 pub telegram_username: Option<String>, 22 pub telegram_verified: bool, 23 pub signal_number: Option<String>, 24 pub signal_verified: bool, 25} 26 27pub async fn get_notification_prefs(State(state): State<AppState>, auth: BearerAuth) -> Response { 28 let user = auth.0; 29 let row = match sqlx::query( 30 r#" 31 SELECT 32 email, 33 preferred_comms_channel::text as channel, 34 discord_id, 35 discord_verified, 36 telegram_username, 37 telegram_verified, 38 signal_number, 39 signal_verified 40 FROM users 41 WHERE did = $1 42 "#, 43 ) 44 .bind(&user.did) 45 .fetch_one(&state.db) 46 .await 47 { 48 Ok(r) => r, 49 Err(e) => { 50 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 51 } 52 }; 53 let email: String = row.get("email"); 54 let channel: String = row.get("channel"); 55 let discord_id: Option<String> = row.get("discord_id"); 56 let discord_verified: bool = row.get("discord_verified"); 57 let telegram_username: Option<String> = row.get("telegram_username"); 58 let telegram_verified: bool = row.get("telegram_verified"); 59 let signal_number: Option<String> = row.get("signal_number"); 60 let signal_verified: bool = row.get("signal_verified"); 61 Json(NotificationPrefsResponse { 62 preferred_channel: channel, 63 email, 64 discord_id, 65 discord_verified, 66 telegram_username, 67 telegram_verified, 68 signal_number, 69 signal_verified, 70 }) 71 .into_response() 72} 73 74#[derive(Serialize)] 75#[serde(rename_all = "camelCase")] 76pub struct NotificationHistoryEntry { 77 pub created_at: String, 78 pub channel: String, 79 pub comms_type: String, 80 pub status: String, 81 pub subject: Option<String>, 82 pub body: String, 83} 84 85#[derive(Serialize)] 86#[serde(rename_all = "camelCase")] 87pub struct GetNotificationHistoryResponse { 88 pub notifications: Vec<NotificationHistoryEntry>, 89} 90 91pub async fn get_notification_history(State(state): State<AppState>, auth: BearerAuth) -> Response { 92 let user = auth.0; 93 94 let user_id: uuid::Uuid = 95 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &user.did) 96 .fetch_one(&state.db) 97 .await 98 { 99 Ok(id) => id, 100 Err(e) => { 101 return ApiError::InternalError(Some(format!("Database error: {}", e))) 102 .into_response(); 103 } 104 }; 105 106 let rows = match sqlx::query!( 107 r#" 108 SELECT 109 created_at, 110 channel as "channel: String", 111 comms_type as "comms_type: String", 112 status as "status: String", 113 subject, 114 body 115 FROM comms_queue 116 WHERE user_id = $1 117 ORDER BY created_at DESC 118 LIMIT 50 119 "#, 120 user_id 121 ) 122 .fetch_all(&state.db) 123 .await 124 { 125 Ok(r) => r, 126 Err(e) => { 127 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 128 } 129 }; 130 131 let sensitive_types = [ 132 "email_verification", 133 "password_reset", 134 "email_update", 135 "two_factor_code", 136 "passkey_recovery", 137 "migration_verification", 138 "plc_operation", 139 "channel_verification", 140 "signup_verification", 141 ]; 142 143 let notifications = rows 144 .iter() 145 .map(|row| { 146 let body = if sensitive_types.contains(&row.comms_type.as_str()) { 147 "[Code redacted for security]".to_string() 148 } else { 149 row.body.clone() 150 }; 151 NotificationHistoryEntry { 152 created_at: row.created_at.to_rfc3339(), 153 channel: row.channel.clone(), 154 comms_type: row.comms_type.clone(), 155 status: row.status.clone(), 156 subject: row.subject.clone(), 157 body, 158 } 159 }) 160 .collect(); 161 162 Json(GetNotificationHistoryResponse { notifications }).into_response() 163} 164 165#[derive(Deserialize)] 166#[serde(rename_all = "camelCase")] 167pub struct UpdateNotificationPrefsInput { 168 pub preferred_channel: Option<String>, 169 pub email: Option<String>, 170 pub discord_id: Option<String>, 171 pub telegram_username: Option<String>, 172 pub signal_number: Option<String>, 173} 174 175#[derive(Serialize)] 176#[serde(rename_all = "camelCase")] 177pub struct UpdateNotificationPrefsResponse { 178 pub success: bool, 179 #[serde(skip_serializing_if = "Vec::is_empty")] 180 pub verification_required: Vec<String>, 181} 182 183pub async fn request_channel_verification( 184 db: &sqlx::PgPool, 185 user_id: uuid::Uuid, 186 did: &str, 187 channel: &str, 188 identifier: &str, 189 handle: Option<&str>, 190) -> Result<String, String> { 191 let token = 192 crate::auth::verification_token::generate_channel_update_token(did, channel, identifier); 193 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 194 195 if channel == "email" { 196 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 197 let handle_str = handle.unwrap_or("user"); 198 crate::comms::enqueue_email_update( 199 db, 200 user_id, 201 identifier, 202 handle_str, 203 &formatted_token, 204 &hostname, 205 ) 206 .await 207 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?; 208 } else { 209 sqlx::query!( 210 r#" 211 INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body, metadata) 212 VALUES ($1, $2::comms_channel, 'channel_verification', $3, 'Verify your channel', $4, $5) 213 "#, 214 user_id, 215 channel as _, 216 identifier, 217 format!("Your verification code is: {}", formatted_token), 218 json!({"code": formatted_token}) 219 ) 220 .execute(db) 221 .await 222 .map_err(|e| format!("Failed to enqueue notification: {}", e))?; 223 } 224 225 Ok(token) 226} 227 228pub async fn update_notification_prefs( 229 State(state): State<AppState>, 230 auth: BearerAuth, 231 Json(input): Json<UpdateNotificationPrefsInput>, 232) -> Response { 233 let user = auth.0; 234 235 let user_row = match sqlx::query!( 236 "SELECT id, handle, email FROM users WHERE did = $1", 237 &user.did 238 ) 239 .fetch_one(&state.db) 240 .await 241 { 242 Ok(row) => row, 243 Err(e) => { 244 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 245 } 246 }; 247 248 let user_id = user_row.id; 249 let handle = user_row.handle; 250 let current_email = user_row.email; 251 252 let mut verification_required: Vec<String> = Vec::new(); 253 254 if let Some(ref channel) = input.preferred_channel { 255 let valid_channels = ["email", "discord", "telegram", "signal"]; 256 if !valid_channels.contains(&channel.as_str()) { 257 return ApiError::InvalidRequest( 258 "Invalid channel. Must be one of: email, discord, telegram, signal".into(), 259 ) 260 .into_response(); 261 } 262 if let Err(e) = sqlx::query( 263 r#"UPDATE users SET preferred_comms_channel = $1::comms_channel, updated_at = NOW() WHERE did = $2"# 264 ) 265 .bind(channel) 266 .bind(&user.did) 267 .execute(&state.db) 268 .await 269 { 270 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 271 } 272 info!(did = %user.did, channel = %channel, "Updated preferred notification channel"); 273 } 274 275 if let Some(ref new_email) = input.email { 276 let email_clean = new_email.trim().to_lowercase(); 277 if email_clean.is_empty() { 278 return ApiError::InvalidRequest("Email cannot be empty".into()).into_response(); 279 } 280 281 if !crate::api::validation::is_valid_email(&email_clean) { 282 return ApiError::InvalidEmail.into_response(); 283 } 284 285 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) { 286 info!(did = %user.did, "Email unchanged, skipping"); 287 } else { 288 let exists = sqlx::query!( 289 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2", 290 email_clean, 291 user_id 292 ) 293 .fetch_optional(&state.db) 294 .await; 295 296 if let Ok(Some(_)) = exists { 297 return ApiError::EmailTaken.into_response(); 298 } 299 300 if let Err(e) = request_channel_verification( 301 &state.db, 302 user_id, 303 &user.did, 304 "email", 305 &email_clean, 306 Some(&handle), 307 ) 308 .await 309 { 310 return ApiError::InternalError(Some(e)).into_response(); 311 } 312 verification_required.push("email".to_string()); 313 info!(did = %user.did, "Requested email verification"); 314 } 315 } 316 317 if let Some(ref discord_id) = input.discord_id { 318 if discord_id.is_empty() { 319 if let Err(e) = sqlx::query!( 320 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1", 321 user_id 322 ) 323 .execute(&state.db) 324 .await 325 { 326 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 327 } 328 info!(did = %user.did, "Cleared Discord ID"); 329 } else { 330 if let Err(e) = request_channel_verification( 331 &state.db, user_id, &user.did, "discord", discord_id, None, 332 ) 333 .await 334 { 335 return ApiError::InternalError(Some(e)).into_response(); 336 } 337 verification_required.push("discord".to_string()); 338 info!(did = %user.did, "Requested Discord verification"); 339 } 340 } 341 342 if let Some(ref telegram) = input.telegram_username { 343 let telegram_clean = telegram.trim_start_matches('@'); 344 if telegram_clean.is_empty() { 345 if let Err(e) = sqlx::query!( 346 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1", 347 user_id 348 ) 349 .execute(&state.db) 350 .await 351 { 352 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 353 } 354 info!(did = %user.did, "Cleared Telegram username"); 355 } else { 356 if let Err(e) = request_channel_verification( 357 &state.db, 358 user_id, 359 &user.did, 360 "telegram", 361 telegram_clean, 362 None, 363 ) 364 .await 365 { 366 return ApiError::InternalError(Some(e)).into_response(); 367 } 368 verification_required.push("telegram".to_string()); 369 info!(did = %user.did, "Requested Telegram verification"); 370 } 371 } 372 373 if let Some(ref signal) = input.signal_number { 374 if signal.is_empty() { 375 if let Err(e) = sqlx::query!( 376 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1", 377 user_id 378 ) 379 .execute(&state.db) 380 .await 381 { 382 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response(); 383 } 384 info!(did = %user.did, "Cleared Signal number"); 385 } else { 386 if let Err(e) = 387 request_channel_verification(&state.db, user_id, &user.did, "signal", signal, None) 388 .await 389 { 390 return ApiError::InternalError(Some(e)).into_response(); 391 } 392 verification_required.push("signal".to_string()); 393 info!(did = %user.did, "Requested Signal verification"); 394 } 395 } 396 397 Json(UpdateNotificationPrefsResponse { 398 success: true, 399 verification_required, 400 }) 401 .into_response() 402}