this repo has no description
1use crate::api::error::ApiError;
2use crate::auth::BearerAuth;
3use crate::state::AppState;
4use axum::{
5 Json,
6 extract::State,
7 response::{IntoResponse, Response},
8};
9use serde::{Deserialize, Serialize};
10use serde_json::json;
11use sqlx::Row;
12use tracing::info;
13
14#[derive(Serialize)]
15#[serde(rename_all = "camelCase")]
16pub struct NotificationPrefsResponse {
17 pub preferred_channel: String,
18 pub email: String,
19 pub discord_id: Option<String>,
20 pub discord_verified: bool,
21 pub telegram_username: Option<String>,
22 pub telegram_verified: bool,
23 pub signal_number: Option<String>,
24 pub signal_verified: bool,
25}
26
27pub async fn get_notification_prefs(State(state): State<AppState>, auth: BearerAuth) -> Response {
28 let user = auth.0;
29 let row = match sqlx::query(
30 r#"
31 SELECT
32 email,
33 preferred_comms_channel::text as channel,
34 discord_id,
35 discord_verified,
36 telegram_username,
37 telegram_verified,
38 signal_number,
39 signal_verified
40 FROM users
41 WHERE did = $1
42 "#,
43 )
44 .bind(&user.did)
45 .fetch_one(&state.db)
46 .await
47 {
48 Ok(r) => r,
49 Err(e) => {
50 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
51 }
52 };
53 let email: String = row.get("email");
54 let channel: String = row.get("channel");
55 let discord_id: Option<String> = row.get("discord_id");
56 let discord_verified: bool = row.get("discord_verified");
57 let telegram_username: Option<String> = row.get("telegram_username");
58 let telegram_verified: bool = row.get("telegram_verified");
59 let signal_number: Option<String> = row.get("signal_number");
60 let signal_verified: bool = row.get("signal_verified");
61 Json(NotificationPrefsResponse {
62 preferred_channel: channel,
63 email,
64 discord_id,
65 discord_verified,
66 telegram_username,
67 telegram_verified,
68 signal_number,
69 signal_verified,
70 })
71 .into_response()
72}
73
74#[derive(Serialize)]
75#[serde(rename_all = "camelCase")]
76pub struct NotificationHistoryEntry {
77 pub created_at: String,
78 pub channel: String,
79 pub comms_type: String,
80 pub status: String,
81 pub subject: Option<String>,
82 pub body: String,
83}
84
85#[derive(Serialize)]
86#[serde(rename_all = "camelCase")]
87pub struct GetNotificationHistoryResponse {
88 pub notifications: Vec<NotificationHistoryEntry>,
89}
90
91pub async fn get_notification_history(State(state): State<AppState>, auth: BearerAuth) -> Response {
92 let user = auth.0;
93
94 let user_id: uuid::Uuid =
95 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &user.did)
96 .fetch_one(&state.db)
97 .await
98 {
99 Ok(id) => id,
100 Err(e) => {
101 return ApiError::InternalError(Some(format!("Database error: {}", e)))
102 .into_response();
103 }
104 };
105
106 let rows = match sqlx::query!(
107 r#"
108 SELECT
109 created_at,
110 channel as "channel: String",
111 comms_type as "comms_type: String",
112 status as "status: String",
113 subject,
114 body
115 FROM comms_queue
116 WHERE user_id = $1
117 ORDER BY created_at DESC
118 LIMIT 50
119 "#,
120 user_id
121 )
122 .fetch_all(&state.db)
123 .await
124 {
125 Ok(r) => r,
126 Err(e) => {
127 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
128 }
129 };
130
131 let sensitive_types = [
132 "email_verification",
133 "password_reset",
134 "email_update",
135 "two_factor_code",
136 "passkey_recovery",
137 "migration_verification",
138 "plc_operation",
139 "channel_verification",
140 "signup_verification",
141 ];
142
143 let notifications = rows
144 .iter()
145 .map(|row| {
146 let body = if sensitive_types.contains(&row.comms_type.as_str()) {
147 "[Code redacted for security]".to_string()
148 } else {
149 row.body.clone()
150 };
151 NotificationHistoryEntry {
152 created_at: row.created_at.to_rfc3339(),
153 channel: row.channel.clone(),
154 comms_type: row.comms_type.clone(),
155 status: row.status.clone(),
156 subject: row.subject.clone(),
157 body,
158 }
159 })
160 .collect();
161
162 Json(GetNotificationHistoryResponse { notifications }).into_response()
163}
164
165#[derive(Deserialize)]
166#[serde(rename_all = "camelCase")]
167pub struct UpdateNotificationPrefsInput {
168 pub preferred_channel: Option<String>,
169 pub email: Option<String>,
170 pub discord_id: Option<String>,
171 pub telegram_username: Option<String>,
172 pub signal_number: Option<String>,
173}
174
175#[derive(Serialize)]
176#[serde(rename_all = "camelCase")]
177pub struct UpdateNotificationPrefsResponse {
178 pub success: bool,
179 #[serde(skip_serializing_if = "Vec::is_empty")]
180 pub verification_required: Vec<String>,
181}
182
183pub async fn request_channel_verification(
184 db: &sqlx::PgPool,
185 user_id: uuid::Uuid,
186 did: &str,
187 channel: &str,
188 identifier: &str,
189 handle: Option<&str>,
190) -> Result<String, String> {
191 let token =
192 crate::auth::verification_token::generate_channel_update_token(did, channel, identifier);
193 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
194
195 if channel == "email" {
196 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
197 let handle_str = handle.unwrap_or("user");
198 crate::comms::enqueue_email_update(
199 db,
200 user_id,
201 identifier,
202 handle_str,
203 &formatted_token,
204 &hostname,
205 )
206 .await
207 .map_err(|e| format!("Failed to enqueue email notification: {}", e))?;
208 } else {
209 sqlx::query!(
210 r#"
211 INSERT INTO comms_queue (user_id, channel, comms_type, recipient, subject, body, metadata)
212 VALUES ($1, $2::comms_channel, 'channel_verification', $3, 'Verify your channel', $4, $5)
213 "#,
214 user_id,
215 channel as _,
216 identifier,
217 format!("Your verification code is: {}", formatted_token),
218 json!({"code": formatted_token})
219 )
220 .execute(db)
221 .await
222 .map_err(|e| format!("Failed to enqueue notification: {}", e))?;
223 }
224
225 Ok(token)
226}
227
228pub async fn update_notification_prefs(
229 State(state): State<AppState>,
230 auth: BearerAuth,
231 Json(input): Json<UpdateNotificationPrefsInput>,
232) -> Response {
233 let user = auth.0;
234
235 let user_row = match sqlx::query!(
236 "SELECT id, handle, email FROM users WHERE did = $1",
237 &user.did
238 )
239 .fetch_one(&state.db)
240 .await
241 {
242 Ok(row) => row,
243 Err(e) => {
244 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
245 }
246 };
247
248 let user_id = user_row.id;
249 let handle = user_row.handle;
250 let current_email = user_row.email;
251
252 let mut verification_required: Vec<String> = Vec::new();
253
254 if let Some(ref channel) = input.preferred_channel {
255 let valid_channels = ["email", "discord", "telegram", "signal"];
256 if !valid_channels.contains(&channel.as_str()) {
257 return ApiError::InvalidRequest(
258 "Invalid channel. Must be one of: email, discord, telegram, signal".into(),
259 )
260 .into_response();
261 }
262 if let Err(e) = sqlx::query(
263 r#"UPDATE users SET preferred_comms_channel = $1::comms_channel, updated_at = NOW() WHERE did = $2"#
264 )
265 .bind(channel)
266 .bind(&user.did)
267 .execute(&state.db)
268 .await
269 {
270 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
271 }
272 info!(did = %user.did, channel = %channel, "Updated preferred notification channel");
273 }
274
275 if let Some(ref new_email) = input.email {
276 let email_clean = new_email.trim().to_lowercase();
277 if email_clean.is_empty() {
278 return ApiError::InvalidRequest("Email cannot be empty".into()).into_response();
279 }
280
281 if !crate::api::validation::is_valid_email(&email_clean) {
282 return ApiError::InvalidEmail.into_response();
283 }
284
285 if current_email.as_ref().map(|e| e.to_lowercase()) == Some(email_clean.clone()) {
286 info!(did = %user.did, "Email unchanged, skipping");
287 } else {
288 let exists = sqlx::query!(
289 "SELECT 1 as one FROM users WHERE LOWER(email) = $1 AND id != $2",
290 email_clean,
291 user_id
292 )
293 .fetch_optional(&state.db)
294 .await;
295
296 if let Ok(Some(_)) = exists {
297 return ApiError::EmailTaken.into_response();
298 }
299
300 if let Err(e) = request_channel_verification(
301 &state.db,
302 user_id,
303 &user.did,
304 "email",
305 &email_clean,
306 Some(&handle),
307 )
308 .await
309 {
310 return ApiError::InternalError(Some(e)).into_response();
311 }
312 verification_required.push("email".to_string());
313 info!(did = %user.did, "Requested email verification");
314 }
315 }
316
317 if let Some(ref discord_id) = input.discord_id {
318 if discord_id.is_empty() {
319 if let Err(e) = sqlx::query!(
320 "UPDATE users SET discord_id = NULL, discord_verified = FALSE, updated_at = NOW() WHERE id = $1",
321 user_id
322 )
323 .execute(&state.db)
324 .await
325 {
326 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
327 }
328 info!(did = %user.did, "Cleared Discord ID");
329 } else {
330 if let Err(e) = request_channel_verification(
331 &state.db, user_id, &user.did, "discord", discord_id, None,
332 )
333 .await
334 {
335 return ApiError::InternalError(Some(e)).into_response();
336 }
337 verification_required.push("discord".to_string());
338 info!(did = %user.did, "Requested Discord verification");
339 }
340 }
341
342 if let Some(ref telegram) = input.telegram_username {
343 let telegram_clean = telegram.trim_start_matches('@');
344 if telegram_clean.is_empty() {
345 if let Err(e) = sqlx::query!(
346 "UPDATE users SET telegram_username = NULL, telegram_verified = FALSE, updated_at = NOW() WHERE id = $1",
347 user_id
348 )
349 .execute(&state.db)
350 .await
351 {
352 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
353 }
354 info!(did = %user.did, "Cleared Telegram username");
355 } else {
356 if let Err(e) = request_channel_verification(
357 &state.db,
358 user_id,
359 &user.did,
360 "telegram",
361 telegram_clean,
362 None,
363 )
364 .await
365 {
366 return ApiError::InternalError(Some(e)).into_response();
367 }
368 verification_required.push("telegram".to_string());
369 info!(did = %user.did, "Requested Telegram verification");
370 }
371 }
372
373 if let Some(ref signal) = input.signal_number {
374 if signal.is_empty() {
375 if let Err(e) = sqlx::query!(
376 "UPDATE users SET signal_number = NULL, signal_verified = FALSE, updated_at = NOW() WHERE id = $1",
377 user_id
378 )
379 .execute(&state.db)
380 .await
381 {
382 return ApiError::InternalError(Some(format!("Database error: {}", e))).into_response();
383 }
384 info!(did = %user.did, "Cleared Signal number");
385 } else {
386 if let Err(e) =
387 request_channel_verification(&state.db, user_id, &user.did, "signal", signal, None)
388 .await
389 {
390 return ApiError::InternalError(Some(e)).into_response();
391 }
392 verification_required.push("signal".to_string());
393 info!(did = %user.did, "Requested Signal verification");
394 }
395 }
396
397 Json(UpdateNotificationPrefsResponse {
398 success: true,
399 verification_required,
400 })
401 .into_response()
402}