this repo has no description
1use crate::api::error::ApiError;
2use crate::auth::validate_bearer_token;
3use crate::state::AppState;
4use axum::{
5 Json,
6 extract::State,
7 http::HeaderMap,
8 response::{IntoResponse, Response},
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use sqlx::Row;
13use tracing::info;
14
15#[derive(Serialize)]
16#[serde(rename_all = "camelCase")]
17pub struct NotificationPrefsResponse {
18 pub preferred_channel: String,
19 pub email: String,
20 pub discord_id: Option<String>,
21 pub discord_verified: bool,
22 pub telegram_username: Option<String>,
23 pub telegram_verified: bool,
24 pub signal_number: Option<String>,
25 pub signal_verified: bool,
26}
27
28pub async fn get_notification_prefs(State(state): State<AppState>, headers: HeaderMap) -> Response {
29 let token = match crate::auth::extract_bearer_token_from_header(
30 headers.get("Authorization").and_then(|h| h.to_str().ok()),
31 ) {
32 Some(t) => t,
33 None => return ApiError::AuthenticationRequired.into_response(),
34 };
35 let user = match validate_bearer_token(&state.db, &token).await {
36 Ok(u) => u,
37 Err(_) => {
38 return ApiError::AuthenticationFailed(None).into_response();
39 }
40 };
41 let row = match sqlx::query(
42 r#"
43 SELECT
44 email,
45 preferred_comms_channel::text as channel,
46 discord_id,
47 discord_verified,
48 telegram_username,
49 telegram_verified,
50 signal_number,
51 signal_verified
52 FROM users
53 WHERE did = $1
54 "#,
55 )
56 .bind(&user.did)
57 .fetch_one(&state.db)
58 .await
59 {
60 Ok(r) => r,
61 Err(e) => {
62 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
63 }
64 };
65 let email: String = row.get("email");
66 let channel: String = row.get("channel");
67 let discord_id: Option<String> = row.get("discord_id");
68 let discord_verified: bool = row.get("discord_verified");
69 let telegram_username: Option<String> = row.get("telegram_username");
70 let telegram_verified: bool = row.get("telegram_verified");
71 let signal_number: Option<String> = row.get("signal_number");
72 let signal_verified: bool = row.get("signal_verified");
73 Json(NotificationPrefsResponse {
74 preferred_channel: channel,
75 email,
76 discord_id,
77 discord_verified,
78 telegram_username,
79 telegram_verified,
80 signal_number,
81 signal_verified,
82 })
83 .into_response()
84}
85
86#[derive(Serialize)]
87#[serde(rename_all = "camelCase")]
88pub struct NotificationHistoryEntry {
89 pub created_at: String,
90 pub channel: String,
91 pub comms_type: String,
92 pub status: String,
93 pub subject: Option<String>,
94 pub body: String,
95}
96
97#[derive(Serialize)]
98#[serde(rename_all = "camelCase")]
99pub struct GetNotificationHistoryResponse {
100 pub notifications: Vec<NotificationHistoryEntry>,
101}
102
103pub async fn get_notification_history(
104 State(state): State<AppState>,
105 headers: HeaderMap,
106) -> Response {
107 let token = match crate::auth::extract_bearer_token_from_header(
108 headers.get("Authorization").and_then(|h| h.to_str().ok()),
109 ) {
110 Some(t) => t,
111 None => return ApiError::AuthenticationRequired.into_response(),
112 };
113 let user = match validate_bearer_token(&state.db, &token).await {
114 Ok(u) => u,
115 Err(_) => {
116 return ApiError::AuthenticationFailed(None).into_response();
117 }
118 };
119
120 let user_id: uuid::Uuid =
121 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &user.did)
122 .fetch_one(&state.db)
123 .await
124 {
125 Ok(id) => id,
126 Err(e) => {
127 return ApiError::InternalError(Some(format!("Database error: {}", e)))
128 .into_response();
129 }
130 };
131
132 let rows = match sqlx::query!(
133 r#"
134 SELECT
135 created_at,
136 channel as "channel: String",
137 comms_type as "comms_type: String",
138 status as "status: String",
139 subject,
140 body
141 FROM comms_queue
142 WHERE user_id = $1
143 ORDER BY created_at DESC
144 LIMIT 50
145 "#,
146 user_id
147 )
148 .fetch_all(&state.db)
149 .await
150 {
151 Ok(r) => r,
152 Err(e) => {
153 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
154 }
155 };
156
157 let sensitive_types = [
158 "email_verification",
159 "password_reset",
160 "email_update",
161 "two_factor_code",
162 "passkey_recovery",
163 "migration_verification",
164 "plc_operation",
165 "channel_verification",
166 "signup_verification",
167 ];
168
169 let notifications = rows
170 .iter()
171 .map(|row| {
172 let body = if sensitive_types.contains(&row.comms_type.as_str()) {
173 "[Code redacted for security]".to_string()
174 } else {
175 row.body.clone()
176 };
177 NotificationHistoryEntry {
178 created_at: row.created_at.to_rfc3339(),
179 channel: row.channel.clone(),
180 comms_type: row.comms_type.clone(),
181 status: row.status.clone(),
182 subject: row.subject.clone(),
183 body,
184 }
185 })
186 .collect();
187
188 Json(GetNotificationHistoryResponse { notifications }).into_response()
189}
190
191#[derive(Deserialize)]
192#[serde(rename_all = "camelCase")]
193pub struct UpdateNotificationPrefsInput {
194 pub preferred_channel: Option<String>,
195 pub email: Option<String>,
196 pub discord_id: Option<String>,
197 pub telegram_username: Option<String>,
198 pub signal_number: Option<String>,
199}
200
201#[derive(Serialize)]
202#[serde(rename_all = "camelCase")]
203pub struct UpdateNotificationPrefsResponse {
204 pub success: bool,
205 #[serde(skip_serializing_if = "Vec::is_empty")]
206 pub verification_required: Vec<String>,
207}
208
209pub async fn request_channel_verification(
210 db: &sqlx::PgPool,
211 user_id: uuid::Uuid,
212 did: &str,
213 channel: &str,
214 identifier: &str,
215 handle: Option<&str>,
216) -> Result<String, String> {
217 let token =
218 crate::auth::verification_token::generate_channel_update_token(did, channel, identifier);
219 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
220
221 if channel == "email" {
222 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
223 let handle_str = handle.unwrap_or("user");
224 crate::comms::enqueue_email_update(
225 db,
226 user_id,
227 identifier,
228 handle_str,
229 &formatted_token,
230 &hostname,
231 )
232 .await
233 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?;
234 } else {
235 sqlx::query!(
236 r#"
237 INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body, metadata)
238 VALUES ($1, $2::comms_channel, 'channel_verification', $3, 'Verify your channel', $4, $5)
239 "#,
240 user_id,
241 channel as _,
242 identifier,
243 format!("Your verification code is: {}", formatted_token),
244 json!({"code": formatted_token})
245 )
246 .execute(db)
247 .await
248 .map_err(|e| format!("Failed to enqueue notification: {}", e))?;
249 }
250
251 Ok(token)
252}
253
254pub async fn update_notification_prefs(
255 State(state): State<AppState>,
256 headers: HeaderMap,
257 Json(input): Json<UpdateNotificationPrefsInput>,
258) -> Response {
259 let token = match crate::auth::extract_bearer_token_from_header(
260 headers.get("Authorization").and_then(|h| h.to_str().ok()),
261 ) {
262 Some(t) => t,
263 None => return ApiError::AuthenticationRequired.into_response(),
264 };
265 let user = match validate_bearer_token(&state.db, &token).await {
266 Ok(u) => u,
267 Err(_) => {
268 return ApiError::AuthenticationFailed(None).into_response();
269 }
270 };
271
272 let user_row = match sqlx::query!(
273 "SELECT id, handle, email FROM users WHERE did = $1",
274 &user.did
275 )
276 .fetch_one(&state.db)
277 .await
278 {
279 Ok(row) => row,
280 Err(e) => {
281 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
282 }
283 };
284
285 let user_id = user_row.id;
286 let handle = user_row.handle;
287 let current_email = user_row.email;
288
289 let mut verification_required: Vec<String> = Vec::new();
290
291 if let Some(ref channel) = input.preferred_channel {
292 let valid_channels = ["email", "discord", "telegram", "signal"];
293 if !valid_channels.contains(&channel.as_str()) {
294 return ApiError::InvalidRequest(
295 "Invalid channel. Must be one of: email, discord, telegram, signal".into(),
296 )
297 .into_response();
298 }
299 if let Err(e) = sqlx::query(
300 r#"UPDATE users SET preferred_comms_channel = $1::comms_channel, updated_at = NOW() WHERE did = $2"#
301 )
302 .bind(channel)
303 .bind(&user.did)
304 .execute(&state.db)
305 .await
306 {
307 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
308 }
309 info!(did = %user.did, channel = %channel, "Updated preferred notification channel");
310 }
311
312 if let Some(ref new_email) = input.email {
313 let email_clean = new_email.trim().to_lowercase();
314 if email_clean.is_empty() {
315 return ApiError::InvalidRequest("Email cannot be empty".into()).into_response();
316 }
317
318 if !crate::api::validation::is_valid_email(&email_clean) {
319 return ApiError::InvalidEmail.into_response();
320 }
321
322 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) {
323 info!(did = %user.did, "Email unchanged, skipping");
324 } else {
325 let exists = sqlx::query!(
326 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2",
327 email_clean,
328 user_id
329 )
330 .fetch_optional(&state.db)
331 .await;
332
333 if let Ok(Some(_)) = exists {
334 return ApiError::EmailTaken.into_response();
335 }
336
337 if let Err(e) = request_channel_verification(
338 &state.db,
339 user_id,
340 &user.did,
341 "email",
342 &email_clean,
343 Some(&handle),
344 )
345 .await
346 {
347 return ApiError::InternalError(Some(e)).into_response();
348 }
349 verification_required.push("email".to_string());
350 info!(did = %user.did, "Requested email verification");
351 }
352 }
353
354 if let Some(ref discord_id) = input.discord_id {
355 if discord_id.is_empty() {
356 if let Err(e) = sqlx::query!(
357 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1",
358 user_id
359 )
360 .execute(&state.db)
361 .await
362 {
363 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
364 }
365 info!(did = %user.did, "Cleared Discord ID");
366 } else {
367 if let Err(e) = request_channel_verification(
368 &state.db, user_id, &user.did, "discord", discord_id, None,
369 )
370 .await
371 {
372 return ApiError::InternalError(Some(e)).into_response();
373 }
374 verification_required.push("discord".to_string());
375 info!(did = %user.did, "Requested Discord verification");
376 }
377 }
378
379 if let Some(ref telegram) = input.telegram_username {
380 let telegram_clean = telegram.trim_start_matches('@');
381 if telegram_clean.is_empty() {
382 if let Err(e) = sqlx::query!(
383 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1",
384 user_id
385 )
386 .execute(&state.db)
387 .await
388 {
389 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
390 }
391 info!(did = %user.did, "Cleared Telegram username");
392 } else {
393 if let Err(e) = request_channel_verification(
394 &state.db,
395 user_id,
396 &user.did,
397 "telegram",
398 telegram_clean,
399 None,
400 )
401 .await
402 {
403 return ApiError::InternalError(Some(e)).into_response();
404 }
405 verification_required.push("telegram".to_string());
406 info!(did = %user.did, "Requested Telegram verification");
407 }
408 }
409
410 if let Some(ref signal) = input.signal_number {
411 if signal.is_empty() {
412 if let Err(e) = sqlx::query!(
413 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1",
414 user_id
415 )
416 .execute(&state.db)
417 .await
418 {
419 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
420 }
421 info!(did = %user.did, "Cleared Signal number");
422 } else {
423 if let Err(e) =
424 request_channel_verification(&state.db, user_id, &user.did, "signal", signal, None)
425 .await
426 {
427 return ApiError::InternalError(Some(e)).into_response();
428 }
429 verification_required.push("signal".to_string());
430 info!(did = %user.did, "Requested Signal verification");
431 }
432 }
433
434 Json(UpdateNotificationPrefsResponse {
435 success: true,
436 verification_required,
437 })
438 .into_response()
439}