this repo has no description
1use crate::api::error::ApiError;
2use crate::auth::validate_bearer_token;
3use crate::state::AppState;
4use axum::{
5 Json,
6 extract::State,
7 http::HeaderMap,
8 response::{IntoResponse, Response},
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use sqlx::Row;
13use tracing::info;
14
15#[derive(Serialize)]
16#[serde(rename_all = "camelCase")]
17pub struct NotificationPrefsResponse {
18 pub preferred_channel: String,
19 pub email: String,
20 pub discord_id: Option<String>,
21 pub discord_verified: bool,
22 pub telegram_username: Option<String>,
23 pub telegram_verified: bool,
24 pub signal_number: Option<String>,
25 pub signal_verified: bool,
26}
27
28pub async fn get_notification_prefs(State(state): State<AppState>, headers: HeaderMap) -> Response {
29 let token = match crate::auth::extract_bearer_token_from_header(
30 headers.get("Authorization").and_then(|h| h.to_str().ok()),
31 ) {
32 Some(t) => t,
33 None => return ApiError::AuthenticationRequired.into_response(),
34 };
35 let user = match validate_bearer_token(&state.db, &token).await {
36 Ok(u) => u,
37 Err(_) => {
38 return ApiError::AuthenticationFailed(None).into_response();
39 }
40 };
41 let row =
42 match sqlx::query(
43 r#"
44 SELECT
45 email,
46 preferred_comms_channel::text as channel,
47 discord_id,
48 discord_verified,
49 telegram_username,
50 telegram_verified,
51 signal_number,
52 signal_verified
53 FROM users
54 WHERE did = $1
55 "#,
56 )
57 .bind(&user.did)
58 .fetch_one(&state.db)
59 .await
60 {
61 Ok(r) => r,
62 Err(e) => {
63 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response()
64 }
65 };
66 let email: String = row.get("email");
67 let channel: String = row.get("channel");
68 let discord_id: Option<String> = row.get("discord_id");
69 let discord_verified: bool = row.get("discord_verified");
70 let telegram_username: Option<String> = row.get("telegram_username");
71 let telegram_verified: bool = row.get("telegram_verified");
72 let signal_number: Option<String> = row.get("signal_number");
73 let signal_verified: bool = row.get("signal_verified");
74 Json(NotificationPrefsResponse {
75 preferred_channel: channel,
76 email,
77 discord_id,
78 discord_verified,
79 telegram_username,
80 telegram_verified,
81 signal_number,
82 signal_verified,
83 })
84 .into_response()
85}
86
87#[derive(Serialize)]
88#[serde(rename_all = "camelCase")]
89pub struct NotificationHistoryEntry {
90 pub created_at: String,
91 pub channel: String,
92 pub comms_type: String,
93 pub status: String,
94 pub subject: Option<String>,
95 pub body: String,
96}
97
98#[derive(Serialize)]
99#[serde(rename_all = "camelCase")]
100pub struct GetNotificationHistoryResponse {
101 pub notifications: Vec<NotificationHistoryEntry>,
102}
103
104pub async fn get_notification_history(
105 State(state): State<AppState>,
106 headers: HeaderMap,
107) -> Response {
108 let token = match crate::auth::extract_bearer_token_from_header(
109 headers.get("Authorization").and_then(|h| h.to_str().ok()),
110 ) {
111 Some(t) => t,
112 None => return ApiError::AuthenticationRequired.into_response(),
113 };
114 let user = match validate_bearer_token(&state.db, &token).await {
115 Ok(u) => u,
116 Err(_) => {
117 return ApiError::AuthenticationFailed(None).into_response();
118 }
119 };
120
121 let user_id: uuid::Uuid =
122 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &user.did)
123 .fetch_one(&state.db)
124 .await
125 {
126 Ok(id) => id,
127 Err(e) => {
128 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response()
129 }
130 };
131
132 let rows =
133 match sqlx::query!(
134 r#"
135 SELECT
136 created_at,
137 channel as "channel: String",
138 comms_type as "comms_type: String",
139 status as "status: String",
140 subject,
141 body
142 FROM comms_queue
143 WHERE user_id = $1
144 ORDER BY created_at DESC
145 LIMIT 50
146 "#,
147 user_id
148 )
149 .fetch_all(&state.db)
150 .await
151 {
152 Ok(r) => r,
153 Err(e) => {
154 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response()
155 }
156 };
157
158 let sensitive_types = [
159 "email_verification",
160 "password_reset",
161 "email_update",
162 "two_factor_code",
163 "passkey_recovery",
164 "migration_verification",
165 "plc_operation",
166 "channel_verification",
167 "signup_verification",
168 ];
169
170 let notifications = rows
171 .iter()
172 .map(|row| {
173 let body = if sensitive_types.contains(&row.comms_type.as_str()) {
174 "[Code redacted for security]".to_string()
175 } else {
176 row.body.clone()
177 };
178 NotificationHistoryEntry {
179 created_at: row.created_at.to_rfc3339(),
180 channel: row.channel.clone(),
181 comms_type: row.comms_type.clone(),
182 status: row.status.clone(),
183 subject: row.subject.clone(),
184 body,
185 }
186 })
187 .collect();
188
189 Json(GetNotificationHistoryResponse { notifications }).into_response()
190}
191
192#[derive(Deserialize)]
193#[serde(rename_all = "camelCase")]
194pub struct UpdateNotificationPrefsInput {
195 pub preferred_channel: Option<String>,
196 pub email: Option<String>,
197 pub discord_id: Option<String>,
198 pub telegram_username: Option<String>,
199 pub signal_number: Option<String>,
200}
201
202#[derive(Serialize)]
203#[serde(rename_all = "camelCase")]
204pub struct UpdateNotificationPrefsResponse {
205 pub success: bool,
206 #[serde(skip_serializing_if = "Vec::is_empty")]
207 pub verification_required: Vec<String>,
208}
209
210pub async fn request_channel_verification(
211 db: &sqlx::PgPool,
212 user_id: uuid::Uuid,
213 did: &str,
214 channel: &str,
215 identifier: &str,
216 handle: Option<&str>,
217) -> Result<String, String> {
218 let token =
219 crate::auth::verification_token::generate_channel_update_token(did, channel, identifier);
220 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
221
222 if channel == "email" {
223 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
224 let handle_str = handle.unwrap_or("user");
225 crate::comms::enqueue_email_update(
226 db,
227 user_id,
228 identifier,
229 handle_str,
230 &formatted_token,
231 &hostname,
232 )
233 .await
234 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?;
235 } else {
236 sqlx::query!(
237 r#"
238 INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body, metadata)
239 VALUES ($1, $2::comms_channel, 'channel_verification', $3, 'Verify your channel', $4, $5)
240 "#,
241 user_id,
242 channel as _,
243 identifier,
244 format!("Your verification code is: {}", formatted_token),
245 json!({"code": formatted_token})
246 )
247 .execute(db)
248 .await
249 .map_err(|e| format!("Failed to enqueue notification: {}", e))?;
250 }
251
252 Ok(token)
253}
254
255pub async fn update_notification_prefs(
256 State(state): State<AppState>,
257 headers: HeaderMap,
258 Json(input): Json<UpdateNotificationPrefsInput>,
259) -> Response {
260 let token = match crate::auth::extract_bearer_token_from_header(
261 headers.get("Authorization").and_then(|h| h.to_str().ok()),
262 ) {
263 Some(t) => t,
264 None => return ApiError::AuthenticationRequired.into_response(),
265 };
266 let user = match validate_bearer_token(&state.db, &token).await {
267 Ok(u) => u,
268 Err(_) => {
269 return ApiError::AuthenticationFailed(None).into_response();
270 }
271 };
272
273 let user_row =
274 match sqlx::query!(
275 "SELECT id, handle, email FROM users WHERE did = $1",
276 &user.did
277 )
278 .fetch_one(&state.db)
279 .await
280 {
281 Ok(row) => row,
282 Err(e) => {
283 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response()
284 }
285 };
286
287 let user_id = user_row.id;
288 let handle = user_row.handle;
289 let current_email = user_row.email;
290
291 let mut verification_required: Vec<String> = Vec::new();
292
293 if let Some(ref channel) = input.preferred_channel {
294 let valid_channels = ["email", "discord", "telegram", "signal"];
295 if !valid_channels.contains(&channel.as_str()) {
296 return ApiError::InvalidRequest(
297 "Invalid channel. Must be one of: email, discord, telegram, signal".into(),
298 )
299 .into_response();
300 }
301 if let Err(e) = sqlx::query(
302 r#"UPDATE users SET preferred_comms_channel = $1::comms_channel, updated_at = NOW() WHERE did = $2"#
303 )
304 .bind(channel)
305 .bind(&user.did)
306 .execute(&state.db)
307 .await
308 {
309 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
310 }
311 info!(did = %user.did, channel = %channel, "Updated preferred notification channel");
312 }
313
314 if let Some(ref new_email) = input.email {
315 let email_clean = new_email.trim().to_lowercase();
316 if email_clean.is_empty() {
317 return ApiError::InvalidRequest("Email cannot be empty".into()).into_response();
318 }
319
320 if !crate::api::validation::is_valid_email(&email_clean) {
321 return ApiError::InvalidEmail.into_response();
322 }
323
324 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) {
325 info!(did = %user.did, "Email unchanged, skipping");
326 } else {
327 let exists = sqlx::query!(
328 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2",
329 email_clean,
330 user_id
331 )
332 .fetch_optional(&state.db)
333 .await;
334
335 if let Ok(Some(_)) = exists {
336 return ApiError::EmailTaken.into_response();
337 }
338
339 if let Err(e) = request_channel_verification(
340 &state.db,
341 user_id,
342 &user.did,
343 "email",
344 &email_clean,
345 Some(&handle),
346 )
347 .await
348 {
349 return ApiError::InternalError(Some(e)).into_response();
350 }
351 verification_required.push("email".to_string());
352 info!(did = %user.did, "Requested email verification");
353 }
354 }
355
356 if let Some(ref discord_id) = input.discord_id {
357 if discord_id.is_empty() {
358 if let Err(e) = sqlx::query!(
359 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1",
360 user_id
361 )
362 .execute(&state.db)
363 .await
364 {
365 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
366 }
367 info!(did = %user.did, "Cleared Discord ID");
368 } else {
369 if let Err(e) = request_channel_verification(
370 &state.db, user_id, &user.did, "discord", discord_id, None,
371 )
372 .await
373 {
374 return ApiError::InternalError(Some(e)).into_response();
375 }
376 verification_required.push("discord".to_string());
377 info!(did = %user.did, "Requested Discord verification");
378 }
379 }
380
381 if let Some(ref telegram) = input.telegram_username {
382 let telegram_clean = telegram.trim_start_matches('@');
383 if telegram_clean.is_empty() {
384 if let Err(e) = sqlx::query!(
385 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1",
386 user_id
387 )
388 .execute(&state.db)
389 .await
390 {
391 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
392 }
393 info!(did = %user.did, "Cleared Telegram username");
394 } else {
395 if let Err(e) = request_channel_verification(
396 &state.db,
397 user_id,
398 &user.did,
399 "telegram",
400 telegram_clean,
401 None,
402 )
403 .await
404 {
405 return ApiError::InternalError(Some(e)).into_response();
406 }
407 verification_required.push("telegram".to_string());
408 info!(did = %user.did, "Requested Telegram verification");
409 }
410 }
411
412 if let Some(ref signal) = input.signal_number {
413 if signal.is_empty() {
414 if let Err(e) = sqlx::query!(
415 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1",
416 user_id
417 )
418 .execute(&state.db)
419 .await
420 {
421 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
422 }
423 info!(did = %user.did, "Cleared Signal number");
424 } else {
425 if let Err(e) =
426 request_channel_verification(&state.db, user_id, &user.did, "signal", signal, None)
427 .await
428 {
429 return ApiError::InternalError(Some(e)).into_response();
430 }
431 verification_required.push("signal".to_string());
432 info!(did = %user.did, "Requested Signal verification");
433 }
434 }
435
436 Json(UpdateNotificationPrefsResponse {
437 success: true,
438 verification_required,
439 })
440 .into_response()
441}