this repo has no description
1use crate::api::ApiError;
2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated};
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use tracing::{error, info, warn};
14
15fn extract_client_ip(headers: &HeaderMap) -> String {
16 if let Some(forwarded) = headers.get("x-forwarded-for")
17 && let Ok(value) = forwarded.to_str()
18 && let Some(first_ip) = value.split(',').next()
19 {
20 return first_ip.trim().to_string();
21 }
22 if let Some(real_ip) = headers.get("x-real-ip")
23 && let Ok(value) = real_ip.to_str()
24 {
25 return value.trim().to_string();
26 }
27 "unknown".to_string()
28}
29
30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
31 let identifier = identifier.trim();
32 if identifier.contains('@') || identifier.starts_with("did:") {
33 identifier.to_string()
34 } else if !identifier.contains('.') {
35 format!("{}.{}", identifier.to_lowercase(), pds_hostname)
36 } else {
37 identifier.to_lowercase()
38 }
39}
40
41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String {
42 stored_handle.to_string()
43}
44
45#[derive(Deserialize)]
46#[serde(rename_all = "camelCase")]
47pub struct CreateSessionInput {
48 pub identifier: String,
49 pub password: String,
50 #[serde(default)]
51 pub allow_takendown: bool,
52}
53
54#[derive(Serialize)]
55#[serde(rename_all = "camelCase")]
56pub struct CreateSessionOutput {
57 pub access_jwt: String,
58 pub refresh_jwt: String,
59 pub handle: String,
60 pub did: String,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub did_doc: Option<serde_json::Value>,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub email: Option<String>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub email_confirmed: Option<bool>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub active: Option<bool>,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub status: Option<String>,
71}
72
73pub async fn create_session(
74 State(state): State<AppState>,
75 headers: HeaderMap,
76 Json(input): Json<CreateSessionInput>,
77) -> Response {
78 info!(
79 "create_session called with identifier: {}",
80 input.identifier
81 );
82 let client_ip = extract_client_ip(&headers);
83 if !state
84 .check_rate_limit(RateLimitKind::Login, &client_ip)
85 .await
86 {
87 warn!(ip = %client_ip, "Login rate limit exceeded");
88 return (
89 StatusCode::TOO_MANY_REQUESTS,
90 Json(json!({
91 "error": "RateLimitExceeded",
92 "message": "Too many login attempts. Please try again later."
93 })),
94 )
95 .into_response();
96 }
97 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
98 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
99 info!(
100 "Normalized identifier: {} -> {}",
101 input.identifier, normalized_identifier
102 );
103 let row = match sqlx::query!(
104 r#"SELECT
105 u.id, u.did, u.handle, u.password_hash, u.email, u.deactivated_at, u.takedown_ref,
106 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified,
107 u.allow_legacy_login, u.migrated_to_pds,
108 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel",
109 k.key_bytes, k.encryption_version,
110 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled
111 FROM users u
112 JOIN user_keys k ON u.id = k.user_id
113 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#,
114 normalized_identifier
115 )
116 .fetch_optional(&state.db)
117 .await
118 {
119 Ok(Some(row)) => row,
120 Ok(None) => {
121 let _ = verify(
122 &input.password,
123 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK",
124 );
125 warn!("User not found for login attempt");
126 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
127 .into_response();
128 }
129 Err(e) => {
130 error!("Database error fetching user: {:?}", e);
131 return ApiError::InternalError.into_response();
132 }
133 };
134 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
135 Ok(k) => k,
136 Err(e) => {
137 error!("Failed to decrypt user key: {:?}", e);
138 return ApiError::InternalError.into_response();
139 }
140 };
141 let (password_valid, app_password_name, app_password_scopes, app_password_controller) = if row
142 .password_hash
143 .as_ref()
144 .map(|h| verify(&input.password, h).unwrap_or(false))
145 .unwrap_or(false)
146 {
147 (true, None, None, None)
148 } else {
149 let app_passwords = sqlx::query!(
150 "SELECT name, password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
151 row.id
152 )
153 .fetch_all(&state.db)
154 .await
155 .unwrap_or_default();
156 let matched = app_passwords
157 .iter()
158 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false));
159 match matched {
160 Some(app) => (
161 true,
162 Some(app.name.clone()),
163 app.scopes.clone(),
164 app.created_by_controller_did.clone(),
165 ),
166 None => (false, None, None, None),
167 }
168 };
169 if !password_valid {
170 warn!("Password verification failed for login attempt");
171 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
172 .into_response();
173 }
174 let is_takendown = row.takedown_ref.is_some();
175 if is_takendown && !input.allow_takendown {
176 warn!("Login attempt for takendown account: {}", row.did);
177 return (
178 StatusCode::UNAUTHORIZED,
179 Json(json!({
180 "error": "AccountTakedown",
181 "message": "Account has been taken down"
182 })),
183 )
184 .into_response();
185 }
186 let is_verified =
187 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
188 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did)
189 .await
190 .unwrap_or(false);
191 if !is_verified && !is_delegated {
192 warn!("Login attempt for unverified account: {}", row.did);
193 return (
194 StatusCode::FORBIDDEN,
195 Json(json!({
196 "error": "AccountNotVerified",
197 "message": "Please verify your account before logging in",
198 "did": row.did
199 })),
200 )
201 .into_response();
202 }
203 let has_totp = row.totp_enabled.unwrap_or(false);
204 let is_legacy_login = has_totp;
205 if has_totp && !row.allow_legacy_login {
206 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did);
207 return (
208 StatusCode::FORBIDDEN,
209 Json(json!({
210 "error": "MfaRequired",
211 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.",
212 "did": row.did
213 })),
214 )
215 .into_response();
216 }
217 let access_meta = match crate::auth::create_access_token_with_delegation(
218 &row.did,
219 &key_bytes,
220 app_password_scopes.as_deref(),
221 app_password_controller.as_deref(),
222 ) {
223 Ok(m) => m,
224 Err(e) => {
225 error!("Failed to create access token: {:?}", e);
226 return ApiError::InternalError.into_response();
227 }
228 };
229 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
230 Ok(m) => m,
231 Err(e) => {
232 error!("Failed to create refresh token: {:?}", e);
233 return ApiError::InternalError.into_response();
234 }
235 };
236 let did_for_doc = row.did.clone();
237 let did_resolver = state.did_resolver.clone();
238 let (insert_result, did_doc) = tokio::join!(
239 sqlx::query!(
240 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did, app_password_name) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
241 row.did,
242 access_meta.jti,
243 refresh_meta.jti,
244 access_meta.expires_at,
245 refresh_meta.expires_at,
246 is_legacy_login,
247 false,
248 app_password_scopes,
249 app_password_controller,
250 app_password_name
251 )
252 .execute(&state.db),
253 did_resolver.resolve_did_document(&did_for_doc)
254 );
255 if let Err(e) = insert_result {
256 error!("Failed to insert session: {:?}", e);
257 return ApiError::InternalError.into_response();
258 }
259 if is_legacy_login {
260 warn!(
261 did = %row.did,
262 ip = %client_ip,
263 "Legacy login on TOTP-enabled account - sending notification"
264 );
265 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
266 if let Err(e) = crate::comms::queue_legacy_login_notification(
267 &state.db,
268 row.id,
269 &hostname,
270 &client_ip,
271 row.preferred_comms_channel,
272 )
273 .await
274 {
275 error!("Failed to queue legacy login notification: {:?}", e);
276 }
277 }
278 let handle = full_handle(&row.handle, &pds_hostname);
279 let is_migrated = row.deactivated_at.is_some() && row.migrated_to_pds.is_some();
280 let is_active = row.deactivated_at.is_none() && !is_takendown;
281 let status = if is_takendown {
282 Some("takendown".to_string())
283 } else if is_migrated {
284 Some("migrated".to_string())
285 } else if row.deactivated_at.is_some() {
286 Some("deactivated".to_string())
287 } else {
288 None
289 };
290 Json(CreateSessionOutput {
291 access_jwt: access_meta.token,
292 refresh_jwt: refresh_meta.token,
293 handle,
294 did: row.did,
295 did_doc,
296 email: row.email,
297 email_confirmed: Some(row.email_verified),
298 active: Some(is_active),
299 status,
300 })
301 .into_response()
302}
303
304pub async fn get_session(
305 State(state): State<AppState>,
306 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated,
307) -> Response {
308 let permissions = auth_user.permissions();
309 let can_read_email = permissions.allows_email_read();
310
311 let did_for_doc = auth_user.did.clone();
312 let did_resolver = state.did_resolver.clone();
313 let (db_result, did_doc) = tokio::join!(
314 sqlx::query!(
315 r#"SELECT
316 handle, email, email_verified, is_admin, deactivated_at, takedown_ref, preferred_locale,
317 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
318 discord_verified, telegram_verified, signal_verified, migrated_to_pds, migrated_at
319 FROM users WHERE did = $1"#,
320 auth_user.did
321 )
322 .fetch_optional(&state.db),
323 did_resolver.resolve_did_document(&did_for_doc)
324 );
325 match db_result {
326 Ok(Some(row)) => {
327 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
328 crate::comms::CommsChannel::Email => ("email", row.email_verified),
329 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified),
330 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified),
331 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified),
332 };
333 let pds_hostname =
334 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
335 let handle = full_handle(&row.handle, &pds_hostname);
336 let is_takendown = row.takedown_ref.is_some();
337 let is_migrated =
338 row.deactivated_at.is_some() && row.migrated_to_pds.is_some();
339 let is_active = row.deactivated_at.is_none() && !is_takendown;
340 let email_value = if can_read_email {
341 row.email.clone()
342 } else {
343 None
344 };
345 let email_confirmed_value = can_read_email && row.email_verified;
346 let mut response = json!({
347 "handle": handle,
348 "did": auth_user.did,
349 "active": is_active,
350 "preferredChannel": preferred_channel,
351 "preferredChannelVerified": preferred_channel_verified,
352 "preferredLocale": row.preferred_locale,
353 "isAdmin": row.is_admin
354 });
355 if can_read_email {
356 response["email"] = json!(email_value);
357 response["emailConfirmed"] = json!(email_confirmed_value);
358 }
359 if is_takendown {
360 response["status"] = json!("takendown");
361 } else if is_migrated {
362 response["status"] = json!("migrated");
363 response["migratedToPds"] = json!(row.migrated_to_pds);
364 response["migratedAt"] = json!(row.migrated_at);
365 } else if row.deactivated_at.is_some() {
366 response["status"] = json!("deactivated");
367 }
368 if let Some(doc) = did_doc {
369 response["didDoc"] = doc;
370 }
371 Json(response)
372 .into_response()
373 }
374 Ok(None) => ApiError::AuthenticationFailed.into_response(),
375 Err(e) => {
376 error!("Database error in get_session: {:?}", e);
377 ApiError::InternalError.into_response()
378 }
379 }
380}
381
382pub async fn delete_session(
383 State(state): State<AppState>,
384 headers: axum::http::HeaderMap,
385) -> Response {
386 let token = match crate::auth::extract_bearer_token_from_header(
387 headers.get("Authorization").and_then(|h| h.to_str().ok()),
388 ) {
389 Some(t) => t,
390 None => return ApiError::AuthenticationRequired.into_response(),
391 };
392 let jti = match crate::auth::get_jti_from_token(&token) {
393 Ok(jti) => jti,
394 Err(_) => return ApiError::AuthenticationFailed.into_response(),
395 };
396 let did = crate::auth::get_did_from_token(&token).ok();
397 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
398 .execute(&state.db)
399 .await
400 {
401 Ok(res) if res.rows_affected() > 0 => {
402 if let Some(did) = did {
403 let session_cache_key = format!("auth:session:{}:{}", did, jti);
404 let _ = state.cache.delete(&session_cache_key).await;
405 }
406 Json(json!({})).into_response()
407 }
408 Ok(_) => ApiError::AuthenticationFailed.into_response(),
409 Err(e) => {
410 error!("Database error in delete_session: {:?}", e);
411 ApiError::AuthenticationFailed.into_response()
412 }
413 }
414}
415
416pub async fn refresh_session(
417 State(state): State<AppState>,
418 headers: axum::http::HeaderMap,
419) -> Response {
420 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
421 if !state
422 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip)
423 .await
424 {
425 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
426 return (
427 axum::http::StatusCode::TOO_MANY_REQUESTS,
428 axum::Json(serde_json::json!({
429 "error": "RateLimitExceeded",
430 "message": "Too many requests. Please try again later."
431 })),
432 )
433 .into_response();
434 }
435 let refresh_token = match crate::auth::extract_bearer_token_from_header(
436 headers.get("Authorization").and_then(|h| h.to_str().ok()),
437 ) {
438 Some(t) => t,
439 None => return ApiError::AuthenticationRequired.into_response(),
440 };
441 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
442 Ok(jti) => jti,
443 Err(_) => {
444 return ApiError::AuthenticationFailedMsg("Invalid token format".into())
445 .into_response();
446 }
447 };
448 let mut tx = match state.db.begin().await {
449 Ok(tx) => tx,
450 Err(e) => {
451 error!("Failed to begin transaction: {:?}", e);
452 return ApiError::InternalError.into_response();
453 }
454 };
455 if let Ok(Some(session_id)) = sqlx::query_scalar!(
456 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
457 refresh_jti
458 )
459 .fetch_optional(&mut *tx)
460 .await
461 {
462 warn!(
463 "Refresh token reuse detected! Revoking token family for session_id: {}",
464 session_id
465 );
466 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
467 .execute(&mut *tx)
468 .await;
469 let _ = tx.commit().await;
470 return ApiError::ExpiredTokenMsg(
471 "Refresh token has been revoked due to suspected compromise".into(),
472 )
473 .into_response();
474 }
475 let session_row = match sqlx::query!(
476 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version
477 FROM session_tokens st
478 JOIN users u ON st.did = u.did
479 JOIN user_keys k ON u.id = k.user_id
480 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
481 FOR UPDATE OF st"#,
482 refresh_jti
483 )
484 .fetch_optional(&mut *tx)
485 .await
486 {
487 Ok(Some(row)) => row,
488 Ok(None) => {
489 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into())
490 .into_response();
491 }
492 Err(e) => {
493 error!("Database error fetching session: {:?}", e);
494 return ApiError::InternalError.into_response();
495 }
496 };
497 let key_bytes =
498 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
499 Ok(k) => k,
500 Err(e) => {
501 error!("Failed to decrypt user key: {:?}", e);
502 return ApiError::InternalError.into_response();
503 }
504 };
505 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
506 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
507 }
508 let new_access_meta = match crate::auth::create_access_token_with_delegation(
509 &session_row.did,
510 &key_bytes,
511 session_row.scope.as_deref(),
512 session_row.controller_did.as_deref(),
513 ) {
514 Ok(m) => m,
515 Err(e) => {
516 error!("Failed to create access token: {:?}", e);
517 return ApiError::InternalError.into_response();
518 }
519 };
520 let new_refresh_meta =
521 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
522 Ok(m) => m,
523 Err(e) => {
524 error!("Failed to create refresh token: {:?}", e);
525 return ApiError::InternalError.into_response();
526 }
527 };
528 match sqlx::query!(
529 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
530 refresh_jti,
531 session_row.id
532 )
533 .execute(&mut *tx)
534 .await
535 {
536 Ok(result) if result.rows_affected() == 0 => {
537 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
538 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
539 .execute(&mut *tx)
540 .await;
541 let _ = tx.commit().await;
542 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
543 }
544 Err(e) => {
545 error!("Failed to record used refresh token: {:?}", e);
546 return ApiError::InternalError.into_response();
547 }
548 Ok(_) => {}
549 }
550 if let Err(e) = sqlx::query!(
551 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
552 new_access_meta.jti,
553 new_refresh_meta.jti,
554 new_access_meta.expires_at,
555 new_refresh_meta.expires_at,
556 session_row.id
557 )
558 .execute(&mut *tx)
559 .await
560 {
561 error!("Database error updating session: {:?}", e);
562 return ApiError::InternalError.into_response();
563 }
564 if let Err(e) = tx.commit().await {
565 error!("Failed to commit transaction: {:?}", e);
566 return ApiError::InternalError.into_response();
567 }
568 let did_for_doc = session_row.did.clone();
569 let did_resolver = state.did_resolver.clone();
570 let (db_result, did_doc) = tokio::join!(
571 sqlx::query!(
572 r#"SELECT
573 handle, email, email_verified, is_admin, preferred_locale, deactivated_at, takedown_ref,
574 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
575 discord_verified, telegram_verified, signal_verified
576 FROM users WHERE did = $1"#,
577 session_row.did
578 )
579 .fetch_optional(&state.db),
580 did_resolver.resolve_did_document(&did_for_doc)
581 );
582 match db_result {
583 Ok(Some(u)) => {
584 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
585 crate::comms::CommsChannel::Email => ("email", u.email_verified),
586 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified),
587 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified),
588 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified),
589 };
590 let pds_hostname =
591 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
592 let handle = full_handle(&u.handle, &pds_hostname);
593 let is_takendown = u.takedown_ref.is_some();
594 let is_active = u.deactivated_at.is_none() && !is_takendown;
595 let mut response = json!({
596 "accessJwt": new_access_meta.token,
597 "refreshJwt": new_refresh_meta.token,
598 "handle": handle,
599 "did": session_row.did,
600 "email": u.email,
601 "emailConfirmed": u.email_verified,
602 "preferredChannel": preferred_channel,
603 "preferredChannelVerified": preferred_channel_verified,
604 "preferredLocale": u.preferred_locale,
605 "isAdmin": u.is_admin,
606 "active": is_active
607 });
608 if let Some(doc) = did_doc {
609 response["didDoc"] = doc;
610 }
611 if is_takendown {
612 response["status"] = json!("takendown");
613 } else if u.deactivated_at.is_some() {
614 response["status"] = json!("deactivated");
615 }
616 Json(response)
617 .into_response()
618 }
619 Ok(None) => {
620 error!("User not found for existing session: {}", session_row.did);
621 ApiError::InternalError.into_response()
622 }
623 Err(e) => {
624 error!("Database error fetching user: {:?}", e);
625 ApiError::InternalError.into_response()
626 }
627 }
628}
629
630#[derive(Deserialize)]
631#[serde(rename_all = "camelCase")]
632pub struct ConfirmSignupInput {
633 pub did: String,
634 pub verification_code: String,
635}
636
637#[derive(Serialize)]
638#[serde(rename_all = "camelCase")]
639pub struct ConfirmSignupOutput {
640 pub access_jwt: String,
641 pub refresh_jwt: String,
642 pub handle: String,
643 pub did: String,
644 pub email: Option<String>,
645 pub email_verified: bool,
646 pub preferred_channel: String,
647 pub preferred_channel_verified: bool,
648}
649
650pub async fn confirm_signup(
651 State(state): State<AppState>,
652 Json(input): Json<ConfirmSignupInput>,
653) -> Response {
654 info!("confirm_signup called for DID: {}", input.did);
655 let row = match sqlx::query!(
656 r#"SELECT
657 u.id, u.did, u.handle, u.email,
658 u.preferred_comms_channel as "channel: crate::comms::CommsChannel",
659 u.discord_id, u.telegram_username, u.signal_number,
660 k.key_bytes, k.encryption_version
661 FROM users u
662 JOIN user_keys k ON u.id = k.user_id
663 WHERE u.did = $1"#,
664 input.did
665 )
666 .fetch_optional(&state.db)
667 .await
668 {
669 Ok(Some(row)) => row,
670 Ok(None) => {
671 warn!("User not found for confirm_signup: {}", input.did);
672 return ApiError::InvalidRequest("Invalid DID or verification code".into())
673 .into_response();
674 }
675 Err(e) => {
676 error!("Database error in confirm_signup: {:?}", e);
677 return ApiError::InternalError.into_response();
678 }
679 };
680
681 let (channel_str, identifier) = match row.channel {
682 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
683 crate::comms::CommsChannel::Discord => {
684 ("discord", row.discord_id.clone().unwrap_or_default())
685 }
686 crate::comms::CommsChannel::Telegram => (
687 "telegram",
688 row.telegram_username.clone().unwrap_or_default(),
689 ),
690 crate::comms::CommsChannel::Signal => {
691 ("signal", row.signal_number.clone().unwrap_or_default())
692 }
693 };
694
695 let normalized_token =
696 crate::auth::verification_token::normalize_token_input(&input.verification_code);
697 match crate::auth::verification_token::verify_signup_token(
698 &normalized_token,
699 channel_str,
700 &identifier,
701 ) {
702 Ok(token_data) => {
703 if token_data.did != input.did {
704 warn!(
705 "Token DID mismatch for confirm_signup: expected {}, got {}",
706 input.did, token_data.did
707 );
708 return ApiError::InvalidRequest("Invalid verification code".into())
709 .into_response();
710 }
711 }
712 Err(crate::auth::verification_token::VerifyError::Expired) => {
713 warn!("Verification code expired for user: {}", input.did);
714 return ApiError::ExpiredTokenMsg("Verification code has expired".into())
715 .into_response();
716 }
717 Err(e) => {
718 warn!("Invalid verification code for user {}: {:?}", input.did, e);
719 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
720 }
721 }
722
723 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
724 Ok(k) => k,
725 Err(e) => {
726 error!("Failed to decrypt user key: {:?}", e);
727 return ApiError::InternalError.into_response();
728 }
729 };
730 let verified_column = match row.channel {
731 crate::comms::CommsChannel::Email => "email_verified",
732 crate::comms::CommsChannel::Discord => "discord_verified",
733 crate::comms::CommsChannel::Telegram => "telegram_verified",
734 crate::comms::CommsChannel::Signal => "signal_verified",
735 };
736 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column);
737 if let Err(e) = sqlx::query(&update_query)
738 .bind(&input.did)
739 .execute(&state.db)
740 .await
741 {
742 error!("Failed to update verification status: {:?}", e);
743 return ApiError::InternalError.into_response();
744 }
745
746 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
747 Ok(m) => m,
748 Err(e) => {
749 error!("Failed to create access token: {:?}", e);
750 return ApiError::InternalError.into_response();
751 }
752 };
753 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
754 Ok(m) => m,
755 Err(e) => {
756 error!("Failed to create refresh token: {:?}", e);
757 return ApiError::InternalError.into_response();
758 }
759 };
760 let no_scope: Option<String> = None;
761 if let Err(e) = sqlx::query!(
762 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
763 row.did,
764 access_meta.jti,
765 refresh_meta.jti,
766 access_meta.expires_at,
767 refresh_meta.expires_at,
768 false,
769 false,
770 no_scope
771 )
772 .execute(&state.db)
773 .await
774 {
775 error!("Failed to insert session: {:?}", e);
776 return ApiError::InternalError.into_response();
777 }
778 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
779 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await {
780 warn!("Failed to enqueue welcome notification: {:?}", e);
781 }
782 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email);
783 let preferred_channel = match row.channel {
784 crate::comms::CommsChannel::Email => "email",
785 crate::comms::CommsChannel::Discord => "discord",
786 crate::comms::CommsChannel::Telegram => "telegram",
787 crate::comms::CommsChannel::Signal => "signal",
788 };
789 Json(ConfirmSignupOutput {
790 access_jwt: access_meta.token,
791 refresh_jwt: refresh_meta.token,
792 handle: row.handle,
793 did: row.did,
794 email: row.email,
795 email_verified,
796 preferred_channel: preferred_channel.to_string(),
797 preferred_channel_verified: true,
798 })
799 .into_response()
800}
801
802#[derive(Deserialize)]
803#[serde(rename_all = "camelCase")]
804pub struct ResendVerificationInput {
805 pub did: String,
806}
807
808pub async fn resend_verification(
809 State(state): State<AppState>,
810 Json(input): Json<ResendVerificationInput>,
811) -> Response {
812 info!("resend_verification called for DID: {}", input.did);
813 let row = match sqlx::query!(
814 r#"SELECT
815 id, handle, email,
816 preferred_comms_channel as "channel: crate::comms::CommsChannel",
817 discord_id, telegram_username, signal_number,
818 email_verified, discord_verified, telegram_verified, signal_verified
819 FROM users
820 WHERE did = $1"#,
821 input.did
822 )
823 .fetch_optional(&state.db)
824 .await
825 {
826 Ok(Some(row)) => row,
827 Ok(None) => {
828 return ApiError::InvalidRequest("User not found".into()).into_response();
829 }
830 Err(e) => {
831 error!("Database error in resend_verification: {:?}", e);
832 return ApiError::InternalError.into_response();
833 }
834 };
835 let is_verified =
836 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
837 if is_verified {
838 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
839 }
840
841 let (channel_str, recipient) = match row.channel {
842 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
843 crate::comms::CommsChannel::Discord => {
844 ("discord", row.discord_id.clone().unwrap_or_default())
845 }
846 crate::comms::CommsChannel::Telegram => (
847 "telegram",
848 row.telegram_username.clone().unwrap_or_default(),
849 ),
850 crate::comms::CommsChannel::Signal => {
851 ("signal", row.signal_number.clone().unwrap_or_default())
852 }
853 };
854
855 let verification_token =
856 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient);
857 let formatted_token =
858 crate::auth::verification_token::format_token_for_display(&verification_token);
859
860 if let Err(e) = crate::comms::enqueue_signup_verification(
861 &state.db,
862 row.id,
863 channel_str,
864 &recipient,
865 &formatted_token,
866 None,
867 )
868 .await
869 {
870 warn!("Failed to enqueue verification notification: {:?}", e);
871 }
872 Json(json!({"success": true})).into_response()
873}
874
875#[derive(Serialize)]
876#[serde(rename_all = "camelCase")]
877pub struct SessionInfo {
878 pub id: String,
879 pub session_type: String,
880 pub client_name: Option<String>,
881 pub created_at: String,
882 pub expires_at: String,
883 pub is_current: bool,
884}
885
886#[derive(Serialize)]
887#[serde(rename_all = "camelCase")]
888pub struct ListSessionsOutput {
889 pub sessions: Vec<SessionInfo>,
890}
891
892pub async fn list_sessions(
893 State(state): State<AppState>,
894 headers: HeaderMap,
895 auth: BearerAuth,
896) -> Response {
897 let current_jti = headers
898 .get("authorization")
899 .and_then(|v| v.to_str().ok())
900 .and_then(|v| v.strip_prefix("Bearer "))
901 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
902
903 let mut sessions: Vec<SessionInfo> = Vec::new();
904
905 let jwt_result = sqlx::query_as::<
906 _,
907 (
908 i32,
909 String,
910 chrono::DateTime<chrono::Utc>,
911 chrono::DateTime<chrono::Utc>,
912 ),
913 >(
914 r#"
915 SELECT id, access_jti, created_at, refresh_expires_at
916 FROM session_tokens
917 WHERE did = $1 AND refresh_expires_at > NOW()
918 ORDER BY created_at DESC
919 "#,
920 )
921 .bind(&auth.0.did)
922 .fetch_all(&state.db)
923 .await;
924
925 match jwt_result {
926 Ok(rows) => {
927 for (id, access_jti, created_at, expires_at) in rows {
928 sessions.push(SessionInfo {
929 id: format!("jwt:{}", id),
930 session_type: "legacy".to_string(),
931 client_name: None,
932 created_at: created_at.to_rfc3339(),
933 expires_at: expires_at.to_rfc3339(),
934 is_current: current_jti.as_ref() == Some(&access_jti),
935 });
936 }
937 }
938 Err(e) => {
939 error!("DB error fetching JWT sessions: {:?}", e);
940 return (
941 StatusCode::INTERNAL_SERVER_ERROR,
942 Json(json!({"error": "InternalError"})),
943 )
944 .into_response();
945 }
946 }
947
948 let oauth_result = sqlx::query_as::<
949 _,
950 (
951 i32,
952 String,
953 chrono::DateTime<chrono::Utc>,
954 chrono::DateTime<chrono::Utc>,
955 String,
956 ),
957 >(
958 r#"
959 SELECT id, token_id, created_at, expires_at, client_id
960 FROM oauth_token
961 WHERE did = $1 AND expires_at > NOW()
962 ORDER BY created_at DESC
963 "#,
964 )
965 .bind(&auth.0.did)
966 .fetch_all(&state.db)
967 .await;
968
969 match oauth_result {
970 Ok(rows) => {
971 for (id, token_id, created_at, expires_at, client_id) in rows {
972 let client_name = extract_client_name(&client_id);
973 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id);
974 sessions.push(SessionInfo {
975 id: format!("oauth:{}", id),
976 session_type: "oauth".to_string(),
977 client_name: Some(client_name),
978 created_at: created_at.to_rfc3339(),
979 expires_at: expires_at.to_rfc3339(),
980 is_current: is_current_oauth,
981 });
982 }
983 }
984 Err(e) => {
985 error!("DB error fetching OAuth sessions: {:?}", e);
986 return (
987 StatusCode::INTERNAL_SERVER_ERROR,
988 Json(json!({"error": "InternalError"})),
989 )
990 .into_response();
991 }
992 }
993
994 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
995
996 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response()
997}
998
999fn extract_client_name(client_id: &str) -> String {
1000 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") {
1001 "Localhost App".to_string()
1002 } else if let Ok(parsed) = reqwest::Url::parse(client_id) {
1003 parsed.host_str().unwrap_or("Unknown App").to_string()
1004 } else {
1005 client_id.to_string()
1006 }
1007}
1008
1009#[derive(Deserialize)]
1010#[serde(rename_all = "camelCase")]
1011pub struct RevokeSessionInput {
1012 pub session_id: String,
1013}
1014
1015pub async fn revoke_session(
1016 State(state): State<AppState>,
1017 auth: BearerAuth,
1018 Json(input): Json<RevokeSessionInput>,
1019) -> Response {
1020 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") {
1021 let session_id: i32 = match jwt_id.parse() {
1022 Ok(id) => id,
1023 Err(_) => {
1024 return (
1025 StatusCode::BAD_REQUEST,
1026 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
1027 )
1028 .into_response();
1029 }
1030 };
1031 let session = sqlx::query_as::<_, (String,)>(
1032 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2",
1033 )
1034 .bind(session_id)
1035 .bind(&auth.0.did)
1036 .fetch_optional(&state.db)
1037 .await;
1038 let access_jti = match session {
1039 Ok(Some((jti,))) => jti,
1040 Ok(None) => {
1041 return (
1042 StatusCode::NOT_FOUND,
1043 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
1044 )
1045 .into_response();
1046 }
1047 Err(e) => {
1048 error!("DB error in revoke_session: {:?}", e);
1049 return (
1050 StatusCode::INTERNAL_SERVER_ERROR,
1051 Json(json!({"error": "InternalError"})),
1052 )
1053 .into_response();
1054 }
1055 };
1056 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1")
1057 .bind(session_id)
1058 .execute(&state.db)
1059 .await
1060 {
1061 error!("DB error deleting session: {:?}", e);
1062 return (
1063 StatusCode::INTERNAL_SERVER_ERROR,
1064 Json(json!({"error": "InternalError"})),
1065 )
1066 .into_response();
1067 }
1068 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti);
1069 if let Err(e) = state.cache.delete(&cache_key).await {
1070 warn!("Failed to invalidate session cache: {:?}", e);
1071 }
1072 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked");
1073 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") {
1074 let session_id: i32 = match oauth_id.parse() {
1075 Ok(id) => id,
1076 Err(_) => {
1077 return (
1078 StatusCode::BAD_REQUEST,
1079 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
1080 )
1081 .into_response();
1082 }
1083 };
1084 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2")
1085 .bind(session_id)
1086 .bind(&auth.0.did)
1087 .execute(&state.db)
1088 .await;
1089 match result {
1090 Ok(r) if r.rows_affected() == 0 => {
1091 return (
1092 StatusCode::NOT_FOUND,
1093 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
1094 )
1095 .into_response();
1096 }
1097 Err(e) => {
1098 error!("DB error deleting OAuth session: {:?}", e);
1099 return (
1100 StatusCode::INTERNAL_SERVER_ERROR,
1101 Json(json!({"error": "InternalError"})),
1102 )
1103 .into_response();
1104 }
1105 _ => {}
1106 }
1107 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked");
1108 } else {
1109 return (
1110 StatusCode::BAD_REQUEST,
1111 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})),
1112 )
1113 .into_response();
1114 }
1115 (StatusCode::OK, Json(json!({}))).into_response()
1116}
1117
1118pub async fn revoke_all_sessions(
1119 State(state): State<AppState>,
1120 headers: HeaderMap,
1121 auth: BearerAuth,
1122) -> Response {
1123 let current_jti = headers
1124 .get("authorization")
1125 .and_then(|v| v.to_str().ok())
1126 .and_then(|v| v.strip_prefix("Bearer "))
1127 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
1128
1129 if let Some(ref jti) = current_jti {
1130 if auth.0.is_oauth {
1131 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1")
1132 .bind(&auth.0.did)
1133 .execute(&state.db)
1134 .await
1135 {
1136 error!("DB error revoking JWT sessions: {:?}", e);
1137 return (
1138 StatusCode::INTERNAL_SERVER_ERROR,
1139 Json(json!({"error": "InternalError"})),
1140 )
1141 .into_response();
1142 }
1143 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2")
1144 .bind(&auth.0.did)
1145 .bind(jti)
1146 .execute(&state.db)
1147 .await
1148 {
1149 error!("DB error revoking OAuth sessions: {:?}", e);
1150 return (
1151 StatusCode::INTERNAL_SERVER_ERROR,
1152 Json(json!({"error": "InternalError"})),
1153 )
1154 .into_response();
1155 }
1156 } else {
1157 if let Err(e) =
1158 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2")
1159 .bind(&auth.0.did)
1160 .bind(jti)
1161 .execute(&state.db)
1162 .await
1163 {
1164 error!("DB error revoking JWT sessions: {:?}", e);
1165 return (
1166 StatusCode::INTERNAL_SERVER_ERROR,
1167 Json(json!({"error": "InternalError"})),
1168 )
1169 .into_response();
1170 }
1171 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1")
1172 .bind(&auth.0.did)
1173 .execute(&state.db)
1174 .await
1175 {
1176 error!("DB error revoking OAuth sessions: {:?}", e);
1177 return (
1178 StatusCode::INTERNAL_SERVER_ERROR,
1179 Json(json!({"error": "InternalError"})),
1180 )
1181 .into_response();
1182 }
1183 }
1184 } else {
1185 return (
1186 StatusCode::BAD_REQUEST,
1187 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})),
1188 )
1189 .into_response();
1190 }
1191
1192 info!(did = %auth.0.did, "All other sessions revoked");
1193 (StatusCode::OK, Json(json!({"success": true}))).into_response()
1194}
1195
1196#[derive(Serialize)]
1197#[serde(rename_all = "camelCase")]
1198pub struct LegacyLoginPreferenceOutput {
1199 pub allow_legacy_login: bool,
1200 pub has_mfa: bool,
1201}
1202
1203pub async fn get_legacy_login_preference(
1204 State(state): State<AppState>,
1205 auth: BearerAuth,
1206) -> Response {
1207 let result = sqlx::query!(
1208 r#"SELECT
1209 u.allow_legacy_login,
1210 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR
1211 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!"
1212 FROM users u WHERE u.did = $1"#,
1213 auth.0.did
1214 )
1215 .fetch_optional(&state.db)
1216 .await;
1217
1218 match result {
1219 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput {
1220 allow_legacy_login: row.allow_legacy_login,
1221 has_mfa: row.has_mfa,
1222 })
1223 .into_response(),
1224 Ok(None) => (
1225 StatusCode::NOT_FOUND,
1226 Json(json!({"error": "AccountNotFound"})),
1227 )
1228 .into_response(),
1229 Err(e) => {
1230 error!("DB error: {:?}", e);
1231 (
1232 StatusCode::INTERNAL_SERVER_ERROR,
1233 Json(json!({"error": "InternalError"})),
1234 )
1235 .into_response()
1236 }
1237 }
1238}
1239
1240#[derive(Deserialize)]
1241#[serde(rename_all = "camelCase")]
1242pub struct UpdateLegacyLoginInput {
1243 pub allow_legacy_login: bool,
1244}
1245
1246pub async fn update_legacy_login_preference(
1247 State(state): State<AppState>,
1248 auth: BearerAuth,
1249 Json(input): Json<UpdateLegacyLoginInput>,
1250) -> Response {
1251 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await {
1252 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did)
1253 .await;
1254 }
1255
1256 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await {
1257 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await;
1258 }
1259
1260 let result = sqlx::query!(
1261 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did",
1262 input.allow_legacy_login,
1263 auth.0.did
1264 )
1265 .fetch_optional(&state.db)
1266 .await;
1267
1268 match result {
1269 Ok(Some(_)) => {
1270 info!(
1271 did = %auth.0.did,
1272 allow_legacy_login = input.allow_legacy_login,
1273 "Legacy login preference updated"
1274 );
1275 Json(json!({
1276 "allowLegacyLogin": input.allow_legacy_login
1277 }))
1278 .into_response()
1279 }
1280 Ok(None) => (
1281 StatusCode::NOT_FOUND,
1282 Json(json!({"error": "AccountNotFound"})),
1283 )
1284 .into_response(),
1285 Err(e) => {
1286 error!("DB error: {:?}", e);
1287 (
1288 StatusCode::INTERNAL_SERVER_ERROR,
1289 Json(json!({"error": "InternalError"})),
1290 )
1291 .into_response()
1292 }
1293 }
1294}
1295
1296use crate::comms::locale::VALID_LOCALES;
1297
1298#[derive(Deserialize)]
1299#[serde(rename_all = "camelCase")]
1300pub struct UpdateLocaleInput {
1301 pub preferred_locale: String,
1302}
1303
1304pub async fn update_locale(
1305 State(state): State<AppState>,
1306 auth: BearerAuth,
1307 Json(input): Json<UpdateLocaleInput>,
1308) -> Response {
1309 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) {
1310 return (
1311 StatusCode::BAD_REQUEST,
1312 Json(json!({
1313 "error": "InvalidRequest",
1314 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", "))
1315 })),
1316 )
1317 .into_response();
1318 }
1319
1320 let result = sqlx::query!(
1321 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did",
1322 input.preferred_locale,
1323 auth.0.did
1324 )
1325 .fetch_optional(&state.db)
1326 .await;
1327
1328 match result {
1329 Ok(Some(_)) => {
1330 info!(
1331 did = %auth.0.did,
1332 locale = %input.preferred_locale,
1333 "User locale preference updated"
1334 );
1335 Json(json!({
1336 "preferredLocale": input.preferred_locale
1337 }))
1338 .into_response()
1339 }
1340 Ok(None) => (
1341 StatusCode::NOT_FOUND,
1342 Json(json!({"error": "AccountNotFound"})),
1343 )
1344 .into_response(),
1345 Err(e) => {
1346 error!("DB error updating locale: {:?}", e);
1347 (
1348 StatusCode::INTERNAL_SERVER_ERROR,
1349 Json(json!({"error": "InternalError"})),
1350 )
1351 .into_response()
1352 }
1353 }
1354}