this repo has no description
1use crate::api::ApiError;
2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated};
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use tracing::{error, info, warn};
14
15fn extract_client_ip(headers: &HeaderMap) -> String {
16 if let Some(forwarded) = headers.get("x-forwarded-for")
17 && let Ok(value) = forwarded.to_str()
18 && let Some(first_ip) = value.split(',').next()
19 {
20 return first_ip.trim().to_string();
21 }
22 if let Some(real_ip) = headers.get("x-real-ip")
23 && let Ok(value) = real_ip.to_str()
24 {
25 return value.trim().to_string();
26 }
27 "unknown".to_string()
28}
29
30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
31 let identifier = identifier.trim();
32 if identifier.contains('@') || identifier.starts_with("did:") {
33 identifier.to_string()
34 } else if !identifier.contains('.') {
35 format!("{}.{}", identifier.to_lowercase(), pds_hostname)
36 } else {
37 identifier.to_lowercase()
38 }
39}
40
41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String {
42 stored_handle.to_string()
43}
44
45#[derive(Deserialize)]
46#[serde(rename_all = "camelCase")]
47pub struct CreateSessionInput {
48 pub identifier: String,
49 pub password: String,
50 #[serde(default)]
51 pub allow_takendown: bool,
52}
53
54#[derive(Serialize)]
55#[serde(rename_all = "camelCase")]
56pub struct CreateSessionOutput {
57 pub access_jwt: String,
58 pub refresh_jwt: String,
59 pub handle: String,
60 pub did: String,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub did_doc: Option<serde_json::Value>,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub email: Option<String>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub email_confirmed: Option<bool>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub active: Option<bool>,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub status: Option<String>,
71}
72
73pub async fn create_session(
74 State(state): State<AppState>,
75 headers: HeaderMap,
76 Json(input): Json<CreateSessionInput>,
77) -> Response {
78 info!(
79 "create_session called with identifier: {}",
80 input.identifier
81 );
82 let client_ip = extract_client_ip(&headers);
83 if !state
84 .check_rate_limit(RateLimitKind::Login, &client_ip)
85 .await
86 {
87 warn!(ip = %client_ip, "Login rate limit exceeded");
88 return (
89 StatusCode::TOO_MANY_REQUESTS,
90 Json(json!({
91 "error": "RateLimitExceeded",
92 "message": "Too many login attempts. Please try again later."
93 })),
94 )
95 .into_response();
96 }
97 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
98 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
99 info!(
100 "Normalized identifier: {} -> {}",
101 input.identifier, normalized_identifier
102 );
103 let row = match sqlx::query!(
104 r#"SELECT
105 u.id, u.did, u.handle, u.password_hash, u.email, u.deactivated_at, u.takedown_ref,
106 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified,
107 u.allow_legacy_login, u.migrated_to_pds,
108 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel",
109 k.key_bytes, k.encryption_version,
110 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled
111 FROM users u
112 JOIN user_keys k ON u.id = k.user_id
113 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#,
114 normalized_identifier
115 )
116 .fetch_optional(&state.db)
117 .await
118 {
119 Ok(Some(row)) => row,
120 Ok(None) => {
121 let _ = verify(
122 &input.password,
123 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK",
124 );
125 warn!("User not found for login attempt");
126 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
127 .into_response();
128 }
129 Err(e) => {
130 error!("Database error fetching user: {:?}", e);
131 return ApiError::InternalError.into_response();
132 }
133 };
134 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
135 Ok(k) => k,
136 Err(e) => {
137 error!("Failed to decrypt user key: {:?}", e);
138 return ApiError::InternalError.into_response();
139 }
140 };
141 let (password_valid, app_password_name, app_password_scopes, app_password_controller) = if row
142 .password_hash
143 .as_ref()
144 .map(|h| verify(&input.password, h).unwrap_or(false))
145 .unwrap_or(false)
146 {
147 (true, None, None, None)
148 } else {
149 let app_passwords = sqlx::query!(
150 "SELECT name, password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
151 row.id
152 )
153 .fetch_all(&state.db)
154 .await
155 .unwrap_or_default();
156 let matched = app_passwords
157 .iter()
158 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false));
159 match matched {
160 Some(app) => (
161 true,
162 Some(app.name.clone()),
163 app.scopes.clone(),
164 app.created_by_controller_did.clone(),
165 ),
166 None => (false, None, None, None),
167 }
168 };
169 if !password_valid {
170 warn!("Password verification failed for login attempt");
171 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
172 .into_response();
173 }
174 let is_takendown = row.takedown_ref.is_some();
175 if is_takendown && !input.allow_takendown {
176 warn!("Login attempt for takendown account: {}", row.did);
177 return (
178 StatusCode::UNAUTHORIZED,
179 Json(json!({
180 "error": "AccountTakedown",
181 "message": "Account has been taken down"
182 })),
183 )
184 .into_response();
185 }
186 let is_verified =
187 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
188 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did)
189 .await
190 .unwrap_or(false);
191 if !is_verified && !is_delegated {
192 warn!("Login attempt for unverified account: {}", row.did);
193 return (
194 StatusCode::FORBIDDEN,
195 Json(json!({
196 "error": "AccountNotVerified",
197 "message": "Please verify your account before logging in",
198 "did": row.did
199 })),
200 )
201 .into_response();
202 }
203 let has_totp = row.totp_enabled.unwrap_or(false);
204 let is_legacy_login = has_totp;
205 if has_totp && !row.allow_legacy_login {
206 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did);
207 return (
208 StatusCode::FORBIDDEN,
209 Json(json!({
210 "error": "MfaRequired",
211 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.",
212 "did": row.did
213 })),
214 )
215 .into_response();
216 }
217 let access_meta = match crate::auth::create_access_token_with_delegation(
218 &row.did,
219 &key_bytes,
220 app_password_scopes.as_deref(),
221 app_password_controller.as_deref(),
222 ) {
223 Ok(m) => m,
224 Err(e) => {
225 error!("Failed to create access token: {:?}", e);
226 return ApiError::InternalError.into_response();
227 }
228 };
229 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
230 Ok(m) => m,
231 Err(e) => {
232 error!("Failed to create refresh token: {:?}", e);
233 return ApiError::InternalError.into_response();
234 }
235 };
236 let did_for_doc = row.did.clone();
237 let did_resolver = state.did_resolver.clone();
238 let (insert_result, did_doc) = tokio::join!(
239 sqlx::query!(
240 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did, app_password_name) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
241 row.did,
242 access_meta.jti,
243 refresh_meta.jti,
244 access_meta.expires_at,
245 refresh_meta.expires_at,
246 is_legacy_login,
247 false,
248 app_password_scopes,
249 app_password_controller,
250 app_password_name
251 )
252 .execute(&state.db),
253 did_resolver.resolve_did_document(&did_for_doc)
254 );
255 if let Err(e) = insert_result {
256 error!("Failed to insert session: {:?}", e);
257 return ApiError::InternalError.into_response();
258 }
259 if is_legacy_login {
260 warn!(
261 did = %row.did,
262 ip = %client_ip,
263 "Legacy login on TOTP-enabled account - sending notification"
264 );
265 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
266 if let Err(e) = crate::comms::queue_legacy_login_notification(
267 &state.db,
268 row.id,
269 &hostname,
270 &client_ip,
271 row.preferred_comms_channel,
272 )
273 .await
274 {
275 error!("Failed to queue legacy login notification: {:?}", e);
276 }
277 }
278 let handle = full_handle(&row.handle, &pds_hostname);
279 let is_migrated = row.deactivated_at.is_some() && row.migrated_to_pds.is_some();
280 let is_active = row.deactivated_at.is_none() && !is_takendown;
281 let status = if is_takendown {
282 Some("takendown".to_string())
283 } else if is_migrated {
284 Some("migrated".to_string())
285 } else if row.deactivated_at.is_some() {
286 Some("deactivated".to_string())
287 } else {
288 None
289 };
290 Json(CreateSessionOutput {
291 access_jwt: access_meta.token,
292 refresh_jwt: refresh_meta.token,
293 handle,
294 did: row.did,
295 did_doc,
296 email: row.email,
297 email_confirmed: Some(row.email_verified),
298 active: Some(is_active),
299 status,
300 })
301 .into_response()
302}
303
304pub async fn get_session(
305 State(state): State<AppState>,
306 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated,
307) -> Response {
308 let permissions = auth_user.permissions();
309 let can_read_email = permissions.allows_email_read();
310
311 let did_for_doc = auth_user.did.clone();
312 let did_resolver = state.did_resolver.clone();
313 let (db_result, did_doc) = tokio::join!(
314 sqlx::query!(
315 r#"SELECT
316 handle, email, email_verified, is_admin, deactivated_at, takedown_ref, preferred_locale,
317 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
318 discord_verified, telegram_verified, signal_verified, migrated_to_pds, migrated_at
319 FROM users WHERE did = $1"#,
320 auth_user.did
321 )
322 .fetch_optional(&state.db),
323 did_resolver.resolve_did_document(&did_for_doc)
324 );
325 match db_result {
326 Ok(Some(row)) => {
327 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
328 crate::comms::CommsChannel::Email => ("email", row.email_verified),
329 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified),
330 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified),
331 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified),
332 };
333 let pds_hostname =
334 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
335 let handle = full_handle(&row.handle, &pds_hostname);
336 let is_takendown = row.takedown_ref.is_some();
337 let is_migrated = row.deactivated_at.is_some() && row.migrated_to_pds.is_some();
338 let is_active = row.deactivated_at.is_none() && !is_takendown;
339 let email_value = if can_read_email {
340 row.email.clone()
341 } else {
342 None
343 };
344 let email_confirmed_value = can_read_email && row.email_verified;
345 let mut response = json!({
346 "handle": handle,
347 "did": auth_user.did,
348 "active": is_active,
349 "preferredChannel": preferred_channel,
350 "preferredChannelVerified": preferred_channel_verified,
351 "preferredLocale": row.preferred_locale,
352 "isAdmin": row.is_admin
353 });
354 if can_read_email {
355 response["email"] = json!(email_value);
356 response["emailConfirmed"] = json!(email_confirmed_value);
357 }
358 if is_takendown {
359 response["status"] = json!("takendown");
360 } else if is_migrated {
361 response["status"] = json!("migrated");
362 response["migratedToPds"] = json!(row.migrated_to_pds);
363 response["migratedAt"] = json!(row.migrated_at);
364 } else if row.deactivated_at.is_some() {
365 response["status"] = json!("deactivated");
366 }
367 if let Some(doc) = did_doc {
368 response["didDoc"] = doc;
369 }
370 Json(response).into_response()
371 }
372 Ok(None) => ApiError::AuthenticationFailed.into_response(),
373 Err(e) => {
374 error!("Database error in get_session: {:?}", e);
375 ApiError::InternalError.into_response()
376 }
377 }
378}
379
380pub async fn delete_session(
381 State(state): State<AppState>,
382 headers: axum::http::HeaderMap,
383) -> Response {
384 let token = match crate::auth::extract_bearer_token_from_header(
385 headers.get("Authorization").and_then(|h| h.to_str().ok()),
386 ) {
387 Some(t) => t,
388 None => return ApiError::AuthenticationRequired.into_response(),
389 };
390 let jti = match crate::auth::get_jti_from_token(&token) {
391 Ok(jti) => jti,
392 Err(_) => return ApiError::AuthenticationFailed.into_response(),
393 };
394 let did = crate::auth::get_did_from_token(&token).ok();
395 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
396 .execute(&state.db)
397 .await
398 {
399 Ok(res) if res.rows_affected() > 0 => {
400 if let Some(did) = did {
401 let session_cache_key = format!("auth:session:{}:{}", did, jti);
402 let _ = state.cache.delete(&session_cache_key).await;
403 }
404 Json(json!({})).into_response()
405 }
406 Ok(_) => ApiError::AuthenticationFailed.into_response(),
407 Err(e) => {
408 error!("Database error in delete_session: {:?}", e);
409 ApiError::AuthenticationFailed.into_response()
410 }
411 }
412}
413
414pub async fn refresh_session(
415 State(state): State<AppState>,
416 headers: axum::http::HeaderMap,
417) -> Response {
418 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
419 if !state
420 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip)
421 .await
422 {
423 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
424 return (
425 axum::http::StatusCode::TOO_MANY_REQUESTS,
426 axum::Json(serde_json::json!({
427 "error": "RateLimitExceeded",
428 "message": "Too many requests. Please try again later."
429 })),
430 )
431 .into_response();
432 }
433 let refresh_token = match crate::auth::extract_bearer_token_from_header(
434 headers.get("Authorization").and_then(|h| h.to_str().ok()),
435 ) {
436 Some(t) => t,
437 None => return ApiError::AuthenticationRequired.into_response(),
438 };
439 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
440 Ok(jti) => jti,
441 Err(_) => {
442 return ApiError::AuthenticationFailedMsg("Invalid token format".into())
443 .into_response();
444 }
445 };
446 let mut tx = match state.db.begin().await {
447 Ok(tx) => tx,
448 Err(e) => {
449 error!("Failed to begin transaction: {:?}", e);
450 return ApiError::InternalError.into_response();
451 }
452 };
453 if let Ok(Some(session_id)) = sqlx::query_scalar!(
454 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
455 refresh_jti
456 )
457 .fetch_optional(&mut *tx)
458 .await
459 {
460 warn!(
461 "Refresh token reuse detected! Revoking token family for session_id: {}",
462 session_id
463 );
464 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
465 .execute(&mut *tx)
466 .await;
467 let _ = tx.commit().await;
468 return ApiError::ExpiredTokenMsg(
469 "Refresh token has been revoked due to suspected compromise".into(),
470 )
471 .into_response();
472 }
473 let session_row = match sqlx::query!(
474 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version
475 FROM session_tokens st
476 JOIN users u ON st.did = u.did
477 JOIN user_keys k ON u.id = k.user_id
478 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
479 FOR UPDATE OF st"#,
480 refresh_jti
481 )
482 .fetch_optional(&mut *tx)
483 .await
484 {
485 Ok(Some(row)) => row,
486 Ok(None) => {
487 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into())
488 .into_response();
489 }
490 Err(e) => {
491 error!("Database error fetching session: {:?}", e);
492 return ApiError::InternalError.into_response();
493 }
494 };
495 let key_bytes =
496 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
497 Ok(k) => k,
498 Err(e) => {
499 error!("Failed to decrypt user key: {:?}", e);
500 return ApiError::InternalError.into_response();
501 }
502 };
503 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
504 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
505 }
506 let new_access_meta = match crate::auth::create_access_token_with_delegation(
507 &session_row.did,
508 &key_bytes,
509 session_row.scope.as_deref(),
510 session_row.controller_did.as_deref(),
511 ) {
512 Ok(m) => m,
513 Err(e) => {
514 error!("Failed to create access token: {:?}", e);
515 return ApiError::InternalError.into_response();
516 }
517 };
518 let new_refresh_meta =
519 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
520 Ok(m) => m,
521 Err(e) => {
522 error!("Failed to create refresh token: {:?}", e);
523 return ApiError::InternalError.into_response();
524 }
525 };
526 match sqlx::query!(
527 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
528 refresh_jti,
529 session_row.id
530 )
531 .execute(&mut *tx)
532 .await
533 {
534 Ok(result) if result.rows_affected() == 0 => {
535 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
536 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
537 .execute(&mut *tx)
538 .await;
539 let _ = tx.commit().await;
540 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
541 }
542 Err(e) => {
543 error!("Failed to record used refresh token: {:?}", e);
544 return ApiError::InternalError.into_response();
545 }
546 Ok(_) => {}
547 }
548 if let Err(e) = sqlx::query!(
549 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
550 new_access_meta.jti,
551 new_refresh_meta.jti,
552 new_access_meta.expires_at,
553 new_refresh_meta.expires_at,
554 session_row.id
555 )
556 .execute(&mut *tx)
557 .await
558 {
559 error!("Database error updating session: {:?}", e);
560 return ApiError::InternalError.into_response();
561 }
562 if let Err(e) = tx.commit().await {
563 error!("Failed to commit transaction: {:?}", e);
564 return ApiError::InternalError.into_response();
565 }
566 let did_for_doc = session_row.did.clone();
567 let did_resolver = state.did_resolver.clone();
568 let (db_result, did_doc) = tokio::join!(
569 sqlx::query!(
570 r#"SELECT
571 handle, email, email_verified, is_admin, preferred_locale, deactivated_at, takedown_ref,
572 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
573 discord_verified, telegram_verified, signal_verified
574 FROM users WHERE did = $1"#,
575 session_row.did
576 )
577 .fetch_optional(&state.db),
578 did_resolver.resolve_did_document(&did_for_doc)
579 );
580 match db_result {
581 Ok(Some(u)) => {
582 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
583 crate::comms::CommsChannel::Email => ("email", u.email_verified),
584 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified),
585 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified),
586 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified),
587 };
588 let pds_hostname =
589 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
590 let handle = full_handle(&u.handle, &pds_hostname);
591 let is_takendown = u.takedown_ref.is_some();
592 let is_active = u.deactivated_at.is_none() && !is_takendown;
593 let mut response = json!({
594 "accessJwt": new_access_meta.token,
595 "refreshJwt": new_refresh_meta.token,
596 "handle": handle,
597 "did": session_row.did,
598 "email": u.email,
599 "emailConfirmed": u.email_verified,
600 "preferredChannel": preferred_channel,
601 "preferredChannelVerified": preferred_channel_verified,
602 "preferredLocale": u.preferred_locale,
603 "isAdmin": u.is_admin,
604 "active": is_active
605 });
606 if let Some(doc) = did_doc {
607 response["didDoc"] = doc;
608 }
609 if is_takendown {
610 response["status"] = json!("takendown");
611 } else if u.deactivated_at.is_some() {
612 response["status"] = json!("deactivated");
613 }
614 Json(response).into_response()
615 }
616 Ok(None) => {
617 error!("User not found for existing session: {}", session_row.did);
618 ApiError::InternalError.into_response()
619 }
620 Err(e) => {
621 error!("Database error fetching user: {:?}", e);
622 ApiError::InternalError.into_response()
623 }
624 }
625}
626
627#[derive(Deserialize)]
628#[serde(rename_all = "camelCase")]
629pub struct ConfirmSignupInput {
630 pub did: String,
631 pub verification_code: String,
632}
633
634#[derive(Serialize)]
635#[serde(rename_all = "camelCase")]
636pub struct ConfirmSignupOutput {
637 pub access_jwt: String,
638 pub refresh_jwt: String,
639 pub handle: String,
640 pub did: String,
641 pub email: Option<String>,
642 pub email_verified: bool,
643 pub preferred_channel: String,
644 pub preferred_channel_verified: bool,
645}
646
647pub async fn confirm_signup(
648 State(state): State<AppState>,
649 Json(input): Json<ConfirmSignupInput>,
650) -> Response {
651 info!("confirm_signup called for DID: {}", input.did);
652 let row = match sqlx::query!(
653 r#"SELECT
654 u.id, u.did, u.handle, u.email,
655 u.preferred_comms_channel as "channel: crate::comms::CommsChannel",
656 u.discord_id, u.telegram_username, u.signal_number,
657 k.key_bytes, k.encryption_version
658 FROM users u
659 JOIN user_keys k ON u.id = k.user_id
660 WHERE u.did = $1"#,
661 input.did
662 )
663 .fetch_optional(&state.db)
664 .await
665 {
666 Ok(Some(row)) => row,
667 Ok(None) => {
668 warn!("User not found for confirm_signup: {}", input.did);
669 return ApiError::InvalidRequest("Invalid DID or verification code".into())
670 .into_response();
671 }
672 Err(e) => {
673 error!("Database error in confirm_signup: {:?}", e);
674 return ApiError::InternalError.into_response();
675 }
676 };
677
678 let (channel_str, identifier) = match row.channel {
679 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
680 crate::comms::CommsChannel::Discord => {
681 ("discord", row.discord_id.clone().unwrap_or_default())
682 }
683 crate::comms::CommsChannel::Telegram => (
684 "telegram",
685 row.telegram_username.clone().unwrap_or_default(),
686 ),
687 crate::comms::CommsChannel::Signal => {
688 ("signal", row.signal_number.clone().unwrap_or_default())
689 }
690 };
691
692 let normalized_token =
693 crate::auth::verification_token::normalize_token_input(&input.verification_code);
694 match crate::auth::verification_token::verify_signup_token(
695 &normalized_token,
696 channel_str,
697 &identifier,
698 ) {
699 Ok(token_data) => {
700 if token_data.did != input.did {
701 warn!(
702 "Token DID mismatch for confirm_signup: expected {}, got {}",
703 input.did, token_data.did
704 );
705 return ApiError::InvalidRequest("Invalid verification code".into())
706 .into_response();
707 }
708 }
709 Err(crate::auth::verification_token::VerifyError::Expired) => {
710 warn!("Verification code expired for user: {}", input.did);
711 return ApiError::ExpiredTokenMsg("Verification code has expired".into())
712 .into_response();
713 }
714 Err(e) => {
715 warn!("Invalid verification code for user {}: {:?}", input.did, e);
716 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
717 }
718 }
719
720 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
721 Ok(k) => k,
722 Err(e) => {
723 error!("Failed to decrypt user key: {:?}", e);
724 return ApiError::InternalError.into_response();
725 }
726 };
727 let verified_column = match row.channel {
728 crate::comms::CommsChannel::Email => "email_verified",
729 crate::comms::CommsChannel::Discord => "discord_verified",
730 crate::comms::CommsChannel::Telegram => "telegram_verified",
731 crate::comms::CommsChannel::Signal => "signal_verified",
732 };
733 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column);
734 if let Err(e) = sqlx::query(&update_query)
735 .bind(&input.did)
736 .execute(&state.db)
737 .await
738 {
739 error!("Failed to update verification status: {:?}", e);
740 return ApiError::InternalError.into_response();
741 }
742
743 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
744 Ok(m) => m,
745 Err(e) => {
746 error!("Failed to create access token: {:?}", e);
747 return ApiError::InternalError.into_response();
748 }
749 };
750 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
751 Ok(m) => m,
752 Err(e) => {
753 error!("Failed to create refresh token: {:?}", e);
754 return ApiError::InternalError.into_response();
755 }
756 };
757 let no_scope: Option<String> = None;
758 if let Err(e) = sqlx::query!(
759 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
760 row.did,
761 access_meta.jti,
762 refresh_meta.jti,
763 access_meta.expires_at,
764 refresh_meta.expires_at,
765 false,
766 false,
767 no_scope
768 )
769 .execute(&state.db)
770 .await
771 {
772 error!("Failed to insert session: {:?}", e);
773 return ApiError::InternalError.into_response();
774 }
775 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
776 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await {
777 warn!("Failed to enqueue welcome notification: {:?}", e);
778 }
779 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email);
780 let preferred_channel = match row.channel {
781 crate::comms::CommsChannel::Email => "email",
782 crate::comms::CommsChannel::Discord => "discord",
783 crate::comms::CommsChannel::Telegram => "telegram",
784 crate::comms::CommsChannel::Signal => "signal",
785 };
786 Json(ConfirmSignupOutput {
787 access_jwt: access_meta.token,
788 refresh_jwt: refresh_meta.token,
789 handle: row.handle,
790 did: row.did,
791 email: row.email,
792 email_verified,
793 preferred_channel: preferred_channel.to_string(),
794 preferred_channel_verified: true,
795 })
796 .into_response()
797}
798
799#[derive(Deserialize)]
800#[serde(rename_all = "camelCase")]
801pub struct ResendVerificationInput {
802 pub did: String,
803}
804
805pub async fn resend_verification(
806 State(state): State<AppState>,
807 Json(input): Json<ResendVerificationInput>,
808) -> Response {
809 info!("resend_verification called for DID: {}", input.did);
810 let row = match sqlx::query!(
811 r#"SELECT
812 id, handle, email,
813 preferred_comms_channel as "channel: crate::comms::CommsChannel",
814 discord_id, telegram_username, signal_number,
815 email_verified, discord_verified, telegram_verified, signal_verified
816 FROM users
817 WHERE did = $1"#,
818 input.did
819 )
820 .fetch_optional(&state.db)
821 .await
822 {
823 Ok(Some(row)) => row,
824 Ok(None) => {
825 return ApiError::InvalidRequest("User not found".into()).into_response();
826 }
827 Err(e) => {
828 error!("Database error in resend_verification: {:?}", e);
829 return ApiError::InternalError.into_response();
830 }
831 };
832 let is_verified =
833 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
834 if is_verified {
835 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
836 }
837
838 let (channel_str, recipient) = match row.channel {
839 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
840 crate::comms::CommsChannel::Discord => {
841 ("discord", row.discord_id.clone().unwrap_or_default())
842 }
843 crate::comms::CommsChannel::Telegram => (
844 "telegram",
845 row.telegram_username.clone().unwrap_or_default(),
846 ),
847 crate::comms::CommsChannel::Signal => {
848 ("signal", row.signal_number.clone().unwrap_or_default())
849 }
850 };
851
852 let verification_token =
853 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient);
854 let formatted_token =
855 crate::auth::verification_token::format_token_for_display(&verification_token);
856
857 if let Err(e) = crate::comms::enqueue_signup_verification(
858 &state.db,
859 row.id,
860 channel_str,
861 &recipient,
862 &formatted_token,
863 None,
864 )
865 .await
866 {
867 warn!("Failed to enqueue verification notification: {:?}", e);
868 }
869 Json(json!({"success": true})).into_response()
870}
871
872#[derive(Serialize)]
873#[serde(rename_all = "camelCase")]
874pub struct SessionInfo {
875 pub id: String,
876 pub session_type: String,
877 pub client_name: Option<String>,
878 pub created_at: String,
879 pub expires_at: String,
880 pub is_current: bool,
881}
882
883#[derive(Serialize)]
884#[serde(rename_all = "camelCase")]
885pub struct ListSessionsOutput {
886 pub sessions: Vec<SessionInfo>,
887}
888
889pub async fn list_sessions(
890 State(state): State<AppState>,
891 headers: HeaderMap,
892 auth: BearerAuth,
893) -> Response {
894 let current_jti = headers
895 .get("authorization")
896 .and_then(|v| v.to_str().ok())
897 .and_then(|v| v.strip_prefix("Bearer "))
898 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
899
900 let mut sessions: Vec<SessionInfo> = Vec::new();
901
902 let jwt_result = sqlx::query_as::<
903 _,
904 (
905 i32,
906 String,
907 chrono::DateTime<chrono::Utc>,
908 chrono::DateTime<chrono::Utc>,
909 ),
910 >(
911 r#"
912 SELECT id, access_jti, created_at, refresh_expires_at
913 FROM session_tokens
914 WHERE did = $1 AND refresh_expires_at > NOW()
915 ORDER BY created_at DESC
916 "#,
917 )
918 .bind(&auth.0.did)
919 .fetch_all(&state.db)
920 .await;
921
922 match jwt_result {
923 Ok(rows) => {
924 for (id, access_jti, created_at, expires_at) in rows {
925 sessions.push(SessionInfo {
926 id: format!("jwt:{}", id),
927 session_type: "legacy".to_string(),
928 client_name: None,
929 created_at: created_at.to_rfc3339(),
930 expires_at: expires_at.to_rfc3339(),
931 is_current: current_jti.as_ref() == Some(&access_jti),
932 });
933 }
934 }
935 Err(e) => {
936 error!("DB error fetching JWT sessions: {:?}", e);
937 return (
938 StatusCode::INTERNAL_SERVER_ERROR,
939 Json(json!({"error": "InternalError"})),
940 )
941 .into_response();
942 }
943 }
944
945 let oauth_result = sqlx::query_as::<
946 _,
947 (
948 i32,
949 String,
950 chrono::DateTime<chrono::Utc>,
951 chrono::DateTime<chrono::Utc>,
952 String,
953 ),
954 >(
955 r#"
956 SELECT id, token_id, created_at, expires_at, client_id
957 FROM oauth_token
958 WHERE did = $1 AND expires_at > NOW()
959 ORDER BY created_at DESC
960 "#,
961 )
962 .bind(&auth.0.did)
963 .fetch_all(&state.db)
964 .await;
965
966 match oauth_result {
967 Ok(rows) => {
968 for (id, token_id, created_at, expires_at, client_id) in rows {
969 let client_name = extract_client_name(&client_id);
970 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id);
971 sessions.push(SessionInfo {
972 id: format!("oauth:{}", id),
973 session_type: "oauth".to_string(),
974 client_name: Some(client_name),
975 created_at: created_at.to_rfc3339(),
976 expires_at: expires_at.to_rfc3339(),
977 is_current: is_current_oauth,
978 });
979 }
980 }
981 Err(e) => {
982 error!("DB error fetching OAuth sessions: {:?}", e);
983 return (
984 StatusCode::INTERNAL_SERVER_ERROR,
985 Json(json!({"error": "InternalError"})),
986 )
987 .into_response();
988 }
989 }
990
991 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
992
993 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response()
994}
995
996fn extract_client_name(client_id: &str) -> String {
997 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") {
998 "Localhost App".to_string()
999 } else if let Ok(parsed) = reqwest::Url::parse(client_id) {
1000 parsed.host_str().unwrap_or("Unknown App").to_string()
1001 } else {
1002 client_id.to_string()
1003 }
1004}
1005
1006#[derive(Deserialize)]
1007#[serde(rename_all = "camelCase")]
1008pub struct RevokeSessionInput {
1009 pub session_id: String,
1010}
1011
1012pub async fn revoke_session(
1013 State(state): State<AppState>,
1014 auth: BearerAuth,
1015 Json(input): Json<RevokeSessionInput>,
1016) -> Response {
1017 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") {
1018 let session_id: i32 = match jwt_id.parse() {
1019 Ok(id) => id,
1020 Err(_) => {
1021 return (
1022 StatusCode::BAD_REQUEST,
1023 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
1024 )
1025 .into_response();
1026 }
1027 };
1028 let session = sqlx::query_as::<_, (String,)>(
1029 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2",
1030 )
1031 .bind(session_id)
1032 .bind(&auth.0.did)
1033 .fetch_optional(&state.db)
1034 .await;
1035 let access_jti = match session {
1036 Ok(Some((jti,))) => jti,
1037 Ok(None) => {
1038 return (
1039 StatusCode::NOT_FOUND,
1040 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
1041 )
1042 .into_response();
1043 }
1044 Err(e) => {
1045 error!("DB error in revoke_session: {:?}", e);
1046 return (
1047 StatusCode::INTERNAL_SERVER_ERROR,
1048 Json(json!({"error": "InternalError"})),
1049 )
1050 .into_response();
1051 }
1052 };
1053 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1")
1054 .bind(session_id)
1055 .execute(&state.db)
1056 .await
1057 {
1058 error!("DB error deleting session: {:?}", e);
1059 return (
1060 StatusCode::INTERNAL_SERVER_ERROR,
1061 Json(json!({"error": "InternalError"})),
1062 )
1063 .into_response();
1064 }
1065 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti);
1066 if let Err(e) = state.cache.delete(&cache_key).await {
1067 warn!("Failed to invalidate session cache: {:?}", e);
1068 }
1069 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked");
1070 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") {
1071 let session_id: i32 = match oauth_id.parse() {
1072 Ok(id) => id,
1073 Err(_) => {
1074 return (
1075 StatusCode::BAD_REQUEST,
1076 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
1077 )
1078 .into_response();
1079 }
1080 };
1081 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2")
1082 .bind(session_id)
1083 .bind(&auth.0.did)
1084 .execute(&state.db)
1085 .await;
1086 match result {
1087 Ok(r) if r.rows_affected() == 0 => {
1088 return (
1089 StatusCode::NOT_FOUND,
1090 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
1091 )
1092 .into_response();
1093 }
1094 Err(e) => {
1095 error!("DB error deleting OAuth session: {:?}", e);
1096 return (
1097 StatusCode::INTERNAL_SERVER_ERROR,
1098 Json(json!({"error": "InternalError"})),
1099 )
1100 .into_response();
1101 }
1102 _ => {}
1103 }
1104 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked");
1105 } else {
1106 return (
1107 StatusCode::BAD_REQUEST,
1108 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})),
1109 )
1110 .into_response();
1111 }
1112 (StatusCode::OK, Json(json!({}))).into_response()
1113}
1114
1115pub async fn revoke_all_sessions(
1116 State(state): State<AppState>,
1117 headers: HeaderMap,
1118 auth: BearerAuth,
1119) -> Response {
1120 let current_jti = headers
1121 .get("authorization")
1122 .and_then(|v| v.to_str().ok())
1123 .and_then(|v| v.strip_prefix("Bearer "))
1124 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
1125
1126 if let Some(ref jti) = current_jti {
1127 if auth.0.is_oauth {
1128 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1")
1129 .bind(&auth.0.did)
1130 .execute(&state.db)
1131 .await
1132 {
1133 error!("DB error revoking JWT sessions: {:?}", e);
1134 return (
1135 StatusCode::INTERNAL_SERVER_ERROR,
1136 Json(json!({"error": "InternalError"})),
1137 )
1138 .into_response();
1139 }
1140 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2")
1141 .bind(&auth.0.did)
1142 .bind(jti)
1143 .execute(&state.db)
1144 .await
1145 {
1146 error!("DB error revoking OAuth sessions: {:?}", e);
1147 return (
1148 StatusCode::INTERNAL_SERVER_ERROR,
1149 Json(json!({"error": "InternalError"})),
1150 )
1151 .into_response();
1152 }
1153 } else {
1154 if let Err(e) =
1155 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2")
1156 .bind(&auth.0.did)
1157 .bind(jti)
1158 .execute(&state.db)
1159 .await
1160 {
1161 error!("DB error revoking JWT sessions: {:?}", e);
1162 return (
1163 StatusCode::INTERNAL_SERVER_ERROR,
1164 Json(json!({"error": "InternalError"})),
1165 )
1166 .into_response();
1167 }
1168 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1")
1169 .bind(&auth.0.did)
1170 .execute(&state.db)
1171 .await
1172 {
1173 error!("DB error revoking OAuth sessions: {:?}", e);
1174 return (
1175 StatusCode::INTERNAL_SERVER_ERROR,
1176 Json(json!({"error": "InternalError"})),
1177 )
1178 .into_response();
1179 }
1180 }
1181 } else {
1182 return (
1183 StatusCode::BAD_REQUEST,
1184 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})),
1185 )
1186 .into_response();
1187 }
1188
1189 info!(did = %auth.0.did, "All other sessions revoked");
1190 (StatusCode::OK, Json(json!({"success": true}))).into_response()
1191}
1192
1193#[derive(Serialize)]
1194#[serde(rename_all = "camelCase")]
1195pub struct LegacyLoginPreferenceOutput {
1196 pub allow_legacy_login: bool,
1197 pub has_mfa: bool,
1198}
1199
1200pub async fn get_legacy_login_preference(
1201 State(state): State<AppState>,
1202 auth: BearerAuth,
1203) -> Response {
1204 let result = sqlx::query!(
1205 r#"SELECT
1206 u.allow_legacy_login,
1207 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR
1208 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!"
1209 FROM users u WHERE u.did = $1"#,
1210 auth.0.did
1211 )
1212 .fetch_optional(&state.db)
1213 .await;
1214
1215 match result {
1216 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput {
1217 allow_legacy_login: row.allow_legacy_login,
1218 has_mfa: row.has_mfa,
1219 })
1220 .into_response(),
1221 Ok(None) => (
1222 StatusCode::NOT_FOUND,
1223 Json(json!({"error": "AccountNotFound"})),
1224 )
1225 .into_response(),
1226 Err(e) => {
1227 error!("DB error: {:?}", e);
1228 (
1229 StatusCode::INTERNAL_SERVER_ERROR,
1230 Json(json!({"error": "InternalError"})),
1231 )
1232 .into_response()
1233 }
1234 }
1235}
1236
1237#[derive(Deserialize)]
1238#[serde(rename_all = "camelCase")]
1239pub struct UpdateLegacyLoginInput {
1240 pub allow_legacy_login: bool,
1241}
1242
1243pub async fn update_legacy_login_preference(
1244 State(state): State<AppState>,
1245 auth: BearerAuth,
1246 Json(input): Json<UpdateLegacyLoginInput>,
1247) -> Response {
1248 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await {
1249 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did)
1250 .await;
1251 }
1252
1253 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await {
1254 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await;
1255 }
1256
1257 let result = sqlx::query!(
1258 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did",
1259 input.allow_legacy_login,
1260 auth.0.did
1261 )
1262 .fetch_optional(&state.db)
1263 .await;
1264
1265 match result {
1266 Ok(Some(_)) => {
1267 info!(
1268 did = %auth.0.did,
1269 allow_legacy_login = input.allow_legacy_login,
1270 "Legacy login preference updated"
1271 );
1272 Json(json!({
1273 "allowLegacyLogin": input.allow_legacy_login
1274 }))
1275 .into_response()
1276 }
1277 Ok(None) => (
1278 StatusCode::NOT_FOUND,
1279 Json(json!({"error": "AccountNotFound"})),
1280 )
1281 .into_response(),
1282 Err(e) => {
1283 error!("DB error: {:?}", e);
1284 (
1285 StatusCode::INTERNAL_SERVER_ERROR,
1286 Json(json!({"error": "InternalError"})),
1287 )
1288 .into_response()
1289 }
1290 }
1291}
1292
1293use crate::comms::locale::VALID_LOCALES;
1294
1295#[derive(Deserialize)]
1296#[serde(rename_all = "camelCase")]
1297pub struct UpdateLocaleInput {
1298 pub preferred_locale: String,
1299}
1300
1301pub async fn update_locale(
1302 State(state): State<AppState>,
1303 auth: BearerAuth,
1304 Json(input): Json<UpdateLocaleInput>,
1305) -> Response {
1306 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) {
1307 return (
1308 StatusCode::BAD_REQUEST,
1309 Json(json!({
1310 "error": "InvalidRequest",
1311 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", "))
1312 })),
1313 )
1314 .into_response();
1315 }
1316
1317 let result = sqlx::query!(
1318 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did",
1319 input.preferred_locale,
1320 auth.0.did
1321 )
1322 .fetch_optional(&state.db)
1323 .await;
1324
1325 match result {
1326 Ok(Some(_)) => {
1327 info!(
1328 did = %auth.0.did,
1329 locale = %input.preferred_locale,
1330 "User locale preference updated"
1331 );
1332 Json(json!({
1333 "preferredLocale": input.preferred_locale
1334 }))
1335 .into_response()
1336 }
1337 Ok(None) => (
1338 StatusCode::NOT_FOUND,
1339 Json(json!({"error": "AccountNotFound"})),
1340 )
1341 .into_response(),
1342 Err(e) => {
1343 error!("DB error updating locale: {:?}", e);
1344 (
1345 StatusCode::INTERNAL_SERVER_ERROR,
1346 Json(json!({"error": "InternalError"})),
1347 )
1348 .into_response()
1349 }
1350 }
1351}