this repo has no description
1use crate::api::error::ApiError;
2use crate::api::{EmptyResponse, SuccessResponse};
3use crate::auth::{BearerAuth, BearerAuthAllowDeactivated};
4use crate::state::{AppState, RateLimitKind};
5use crate::types::{AccountState, Did, Handle, PlainPassword};
6use axum::{
7 Json,
8 extract::State,
9 http::{HeaderMap, StatusCode},
10 response::{IntoResponse, Response},
11};
12use bcrypt::verify;
13use serde::{Deserialize, Serialize};
14use serde_json::json;
15use tracing::{error, info, warn};
16
17fn extract_client_ip(headers: &HeaderMap) -> String {
18 if let Some(forwarded) = headers.get("x-forwarded-for")
19 && let Ok(value) = forwarded.to_str()
20 && let Some(first_ip) = value.split(',').next()
21 {
22 return first_ip.trim().to_string();
23 }
24 if let Some(real_ip) = headers.get("x-real-ip")
25 && let Ok(value) = real_ip.to_str()
26 {
27 return value.trim().to_string();
28 }
29 "unknown".to_string()
30}
31
32fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
33 let identifier = identifier.trim();
34 if identifier.contains('@') || identifier.starts_with("did:") {
35 identifier.to_string()
36 } else if !identifier.contains('.') {
37 format!("{}.{}", identifier.to_lowercase(), pds_hostname)
38 } else {
39 identifier.to_lowercase()
40 }
41}
42
43fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String {
44 stored_handle.to_string()
45}
46
47#[derive(Deserialize)]
48#[serde(rename_all = "camelCase")]
49pub struct CreateSessionInput {
50 pub identifier: String,
51 pub password: PlainPassword,
52 #[serde(default)]
53 pub allow_takendown: bool,
54}
55
56#[derive(Serialize)]
57#[serde(rename_all = "camelCase")]
58pub struct CreateSessionOutput {
59 pub access_jwt: String,
60 pub refresh_jwt: String,
61 pub handle: Handle,
62 pub did: Did,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub did_doc: Option<serde_json::Value>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub email: Option<String>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub email_confirmed: Option<bool>,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub active: Option<bool>,
71 #[serde(skip_serializing_if = "Option::is_none")]
72 pub status: Option<String>,
73}
74
75pub async fn create_session(
76 State(state): State<AppState>,
77 headers: HeaderMap,
78 Json(input): Json<CreateSessionInput>,
79) -> Response {
80 info!(
81 "create_session called with identifier: {}",
82 input.identifier
83 );
84 let client_ip = extract_client_ip(&headers);
85 if !state
86 .check_rate_limit(RateLimitKind::Login, &client_ip)
87 .await
88 {
89 warn!(ip = %client_ip, "Login rate limit exceeded");
90 return ApiError::RateLimitExceeded(None).into_response();
91 }
92 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
93 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
94 info!(
95 "Normalized identifier: {} -> {}",
96 input.identifier, normalized_identifier
97 );
98 let row = match sqlx::query!(
99 r#"SELECT
100 u.id, u.did, u.handle, u.password_hash, u.email, u.deactivated_at, u.takedown_ref,
101 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified,
102 u.allow_legacy_login, u.migrated_to_pds,
103 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel",
104 k.key_bytes, k.encryption_version,
105 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled
106 FROM users u
107 JOIN user_keys k ON u.id = k.user_id
108 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#,
109 normalized_identifier
110 )
111 .fetch_optional(&state.db)
112 .await
113 {
114 Ok(Some(row)) => row,
115 Ok(None) => {
116 let _ = verify(
117 &input.password,
118 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK",
119 );
120 warn!("User not found for login attempt");
121 return ApiError::AuthenticationFailed(Some("Invalid identifier or password".into()))
122 .into_response();
123 }
124 Err(e) => {
125 error!("Database error fetching user: {:?}", e);
126 return ApiError::InternalError(None).into_response();
127 }
128 };
129 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
130 Ok(k) => k,
131 Err(e) => {
132 error!("Failed to decrypt user key: {:?}", e);
133 return ApiError::InternalError(None).into_response();
134 }
135 };
136 let (password_valid, app_password_name, app_password_scopes, app_password_controller) = if row
137 .password_hash
138 .as_ref()
139 .map(|h| verify(&input.password, h).unwrap_or(false))
140 .unwrap_or(false)
141 {
142 (true, None, None, None)
143 } else {
144 let app_passwords = sqlx::query!(
145 "SELECT name, password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
146 row.id
147 )
148 .fetch_all(&state.db)
149 .await
150 .unwrap_or_default();
151 let matched = app_passwords
152 .iter()
153 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false));
154 match matched {
155 Some(app) => (
156 true,
157 Some(app.name.clone()),
158 app.scopes.clone(),
159 app.created_by_controller_did.clone(),
160 ),
161 None => (false, None, None, None),
162 }
163 };
164 if !password_valid {
165 warn!("Password verification failed for login attempt");
166 return ApiError::AuthenticationFailed(Some("Invalid identifier or password".into()))
167 .into_response();
168 }
169 let account_state = AccountState::from_db_fields(
170 row.deactivated_at,
171 row.takedown_ref.clone(),
172 row.migrated_to_pds.clone(),
173 None,
174 );
175 if account_state.is_takendown() && !input.allow_takendown {
176 warn!("Login attempt for takendown account: {}", row.did);
177 return ApiError::AccountTakedown.into_response();
178 }
179 let is_verified =
180 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
181 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did)
182 .await
183 .unwrap_or(false);
184 if !is_verified && !is_delegated {
185 warn!("Login attempt for unverified account: {}", row.did);
186 return (
187 StatusCode::FORBIDDEN,
188 Json(json!({
189 "error": "AccountNotVerified",
190 "message": "Please verify your account before logging in",
191 "did": row.did
192 })),
193 )
194 .into_response();
195 }
196 let has_totp = row.totp_enabled.unwrap_or(false);
197 let is_legacy_login = has_totp;
198 if has_totp && !row.allow_legacy_login {
199 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did);
200 return (
201 StatusCode::FORBIDDEN,
202 Json(json!({
203 "error": "MfaRequired",
204 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.",
205 "did": row.did
206 })),
207 )
208 .into_response();
209 }
210 let access_meta = match crate::auth::create_access_token_with_delegation(
211 &row.did,
212 &key_bytes,
213 app_password_scopes.as_deref(),
214 app_password_controller.as_deref(),
215 ) {
216 Ok(m) => m,
217 Err(e) => {
218 error!("Failed to create access token: {:?}", e);
219 return ApiError::InternalError(None).into_response();
220 }
221 };
222 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
223 Ok(m) => m,
224 Err(e) => {
225 error!("Failed to create refresh token: {:?}", e);
226 return ApiError::InternalError(None).into_response();
227 }
228 };
229 let did_for_doc = row.did.clone();
230 let did_resolver = state.did_resolver.clone();
231 let (insert_result, did_doc) = tokio::join!(
232 sqlx::query!(
233 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did, app_password_name) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
234 row.did,
235 access_meta.jti,
236 refresh_meta.jti,
237 access_meta.expires_at,
238 refresh_meta.expires_at,
239 is_legacy_login,
240 false,
241 app_password_scopes,
242 app_password_controller,
243 app_password_name
244 )
245 .execute(&state.db),
246 did_resolver.resolve_did_document(&did_for_doc)
247 );
248 if let Err(e) = insert_result {
249 error!("Failed to insert session: {:?}", e);
250 return ApiError::InternalError(None).into_response();
251 }
252 if is_legacy_login {
253 warn!(
254 did = %row.did,
255 ip = %client_ip,
256 "Legacy login on TOTP-enabled account - sending notification"
257 );
258 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
259 if let Err(e) = crate::comms::queue_legacy_login_notification(
260 &state.db,
261 row.id,
262 &hostname,
263 &client_ip,
264 row.preferred_comms_channel,
265 )
266 .await
267 {
268 error!("Failed to queue legacy login notification: {:?}", e);
269 }
270 }
271 let handle = full_handle(&row.handle, &pds_hostname);
272 let is_active = account_state.is_active();
273 let status = account_state.status_for_session().map(String::from);
274 Json(CreateSessionOutput {
275 access_jwt: access_meta.token,
276 refresh_jwt: refresh_meta.token,
277 handle: handle.into(),
278 did: row.did.into(),
279 did_doc,
280 email: row.email,
281 email_confirmed: Some(row.email_verified),
282 active: Some(is_active),
283 status,
284 })
285 .into_response()
286}
287
288pub async fn get_session(
289 State(state): State<AppState>,
290 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated,
291) -> Response {
292 let permissions = auth_user.permissions();
293 let can_read_email = permissions.allows_email_read();
294
295 let did_for_doc = auth_user.did.clone();
296 let did_resolver = state.did_resolver.clone();
297 let (db_result, did_doc) = tokio::join!(
298 sqlx::query!(
299 r#"SELECT
300 handle, email, email_verified, is_admin, deactivated_at, takedown_ref, preferred_locale,
301 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
302 discord_verified, telegram_verified, signal_verified, migrated_to_pds, migrated_at
303 FROM users WHERE did = $1"#,
304 &auth_user.did
305 )
306 .fetch_optional(&state.db),
307 did_resolver.resolve_did_document(&did_for_doc)
308 );
309 match db_result {
310 Ok(Some(row)) => {
311 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
312 crate::comms::CommsChannel::Email => ("email", row.email_verified),
313 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified),
314 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified),
315 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified),
316 };
317 let pds_hostname =
318 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
319 let handle = full_handle(&row.handle, &pds_hostname);
320 let account_state = AccountState::from_db_fields(
321 row.deactivated_at,
322 row.takedown_ref.clone(),
323 row.migrated_to_pds.clone(),
324 row.migrated_at,
325 );
326 let email_value = if can_read_email {
327 row.email.clone()
328 } else {
329 None
330 };
331 let email_confirmed_value = can_read_email && row.email_verified;
332 let mut response = json!({
333 "handle": handle,
334 "did": &auth_user.did,
335 "active": account_state.is_active(),
336 "preferredChannel": preferred_channel,
337 "preferredChannelVerified": preferred_channel_verified,
338 "preferredLocale": row.preferred_locale,
339 "isAdmin": row.is_admin
340 });
341 if can_read_email {
342 response["email"] = json!(email_value);
343 response["emailConfirmed"] = json!(email_confirmed_value);
344 }
345 if let Some(status) = account_state.status_for_session() {
346 response["status"] = json!(status);
347 }
348 if let AccountState::Migrated { to_pds, at } = &account_state {
349 response["migratedToPds"] = json!(to_pds);
350 response["migratedAt"] = json!(at);
351 }
352 if let Some(doc) = did_doc {
353 response["didDoc"] = doc;
354 }
355 Json(response).into_response()
356 }
357 Ok(None) => ApiError::AuthenticationFailed(None).into_response(),
358 Err(e) => {
359 error!("Database error in get_session: {:?}", e);
360 ApiError::InternalError(None).into_response()
361 }
362 }
363}
364
365pub async fn delete_session(
366 State(state): State<AppState>,
367 headers: axum::http::HeaderMap,
368 _auth: BearerAuth,
369) -> Response {
370 let extracted = match crate::auth::extract_auth_token_from_header(
371 headers.get("Authorization").and_then(|h| h.to_str().ok()),
372 ) {
373 Some(t) => t,
374 None => return ApiError::AuthenticationRequired.into_response(),
375 };
376 let jti = match crate::auth::get_jti_from_token(&extracted.token) {
377 Ok(jti) => jti,
378 Err(_) => return ApiError::AuthenticationFailed(None).into_response(),
379 };
380 let did = crate::auth::get_did_from_token(&extracted.token).ok();
381 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
382 .execute(&state.db)
383 .await
384 {
385 Ok(res) if res.rows_affected() > 0 => {
386 if let Some(did) = did {
387 let session_cache_key = format!("auth:session:{}:{}", did, jti);
388 let _ = state.cache.delete(&session_cache_key).await;
389 }
390 EmptyResponse::ok().into_response()
391 }
392 Ok(_) => ApiError::AuthenticationFailed(None).into_response(),
393 Err(e) => {
394 error!("Database error in delete_session: {:?}", e);
395 ApiError::AuthenticationFailed(None).into_response()
396 }
397 }
398}
399
400pub async fn refresh_session(
401 State(state): State<AppState>,
402 headers: axum::http::HeaderMap,
403) -> Response {
404 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
405 if !state
406 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip)
407 .await
408 {
409 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
410 return ApiError::RateLimitExceeded(None).into_response();
411 }
412 let extracted = match crate::auth::extract_auth_token_from_header(
413 headers.get("Authorization").and_then(|h| h.to_str().ok()),
414 ) {
415 Some(t) => t,
416 None => return ApiError::AuthenticationRequired.into_response(),
417 };
418 let refresh_token = extracted.token;
419 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
420 Ok(jti) => jti,
421 Err(_) => {
422 return ApiError::AuthenticationFailed(Some("Invalid token format".into()))
423 .into_response();
424 }
425 };
426 let mut tx = match state.db.begin().await {
427 Ok(tx) => tx,
428 Err(e) => {
429 error!("Failed to begin transaction: {:?}", e);
430 return ApiError::InternalError(None).into_response();
431 }
432 };
433 if let Ok(Some(session_id)) = sqlx::query_scalar!(
434 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
435 refresh_jti
436 )
437 .fetch_optional(&mut *tx)
438 .await
439 {
440 warn!(
441 "Refresh token reuse detected! Revoking token family for session_id: {}",
442 session_id
443 );
444 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
445 .execute(&mut *tx)
446 .await;
447 let _ = tx.commit().await;
448 return ApiError::AuthenticationFailed(Some(
449 "Refresh token has been revoked due to suspected compromise".into(),
450 ))
451 .into_response();
452 }
453 let session_row = match sqlx::query!(
454 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version
455 FROM session_tokens st
456 JOIN users u ON st.did = u.did
457 JOIN user_keys k ON u.id = k.user_id
458 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
459 FOR UPDATE OF st"#,
460 refresh_jti
461 )
462 .fetch_optional(&mut *tx)
463 .await
464 {
465 Ok(Some(row)) => row,
466 Ok(None) => {
467 return ApiError::AuthenticationFailed(Some("Invalid refresh token".into()))
468 .into_response();
469 }
470 Err(e) => {
471 error!("Database error fetching session: {:?}", e);
472 return ApiError::InternalError(None).into_response();
473 }
474 };
475 let key_bytes =
476 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
477 Ok(k) => k,
478 Err(e) => {
479 error!("Failed to decrypt user key: {:?}", e);
480 return ApiError::InternalError(None).into_response();
481 }
482 };
483 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
484 return ApiError::AuthenticationFailed(Some("Invalid refresh token".into()))
485 .into_response();
486 }
487 let new_access_meta = match crate::auth::create_access_token_with_delegation(
488 &session_row.did,
489 &key_bytes,
490 session_row.scope.as_deref(),
491 session_row.controller_did.as_deref(),
492 ) {
493 Ok(m) => m,
494 Err(e) => {
495 error!("Failed to create access token: {:?}", e);
496 return ApiError::InternalError(None).into_response();
497 }
498 };
499 let new_refresh_meta =
500 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
501 Ok(m) => m,
502 Err(e) => {
503 error!("Failed to create refresh token: {:?}", e);
504 return ApiError::InternalError(None).into_response();
505 }
506 };
507 match sqlx::query!(
508 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
509 refresh_jti,
510 session_row.id
511 )
512 .execute(&mut *tx)
513 .await
514 {
515 Ok(result) if result.rows_affected() == 0 => {
516 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
517 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
518 .execute(&mut *tx)
519 .await;
520 let _ = tx.commit().await;
521 return ApiError::AuthenticationFailed(Some("Refresh token has been revoked due to suspected compromise".into())).into_response();
522 }
523 Err(e) => {
524 error!("Failed to record used refresh token: {:?}", e);
525 return ApiError::InternalError(None).into_response();
526 }
527 Ok(_) => {}
528 }
529 if let Err(e) = sqlx::query!(
530 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
531 new_access_meta.jti,
532 new_refresh_meta.jti,
533 new_access_meta.expires_at,
534 new_refresh_meta.expires_at,
535 session_row.id
536 )
537 .execute(&mut *tx)
538 .await
539 {
540 error!("Database error updating session: {:?}", e);
541 return ApiError::InternalError(None).into_response();
542 }
543 if let Err(e) = tx.commit().await {
544 error!("Failed to commit transaction: {:?}", e);
545 return ApiError::InternalError(None).into_response();
546 }
547 let did_for_doc = session_row.did.clone();
548 let did_resolver = state.did_resolver.clone();
549 let (db_result, did_doc) = tokio::join!(
550 sqlx::query!(
551 r#"SELECT
552 handle, email, email_verified, is_admin, preferred_locale, deactivated_at, takedown_ref,
553 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
554 discord_verified, telegram_verified, signal_verified
555 FROM users WHERE did = $1"#,
556 session_row.did
557 )
558 .fetch_optional(&state.db),
559 did_resolver.resolve_did_document(&did_for_doc)
560 );
561 match db_result {
562 Ok(Some(u)) => {
563 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
564 crate::comms::CommsChannel::Email => ("email", u.email_verified),
565 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified),
566 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified),
567 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified),
568 };
569 let pds_hostname =
570 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
571 let handle = full_handle(&u.handle, &pds_hostname);
572 let account_state =
573 AccountState::from_db_fields(u.deactivated_at, u.takedown_ref.clone(), None, None);
574 let mut response = json!({
575 "accessJwt": new_access_meta.token,
576 "refreshJwt": new_refresh_meta.token,
577 "handle": handle,
578 "did": session_row.did,
579 "email": u.email,
580 "emailConfirmed": u.email_verified,
581 "preferredChannel": preferred_channel,
582 "preferredChannelVerified": preferred_channel_verified,
583 "preferredLocale": u.preferred_locale,
584 "isAdmin": u.is_admin,
585 "active": account_state.is_active()
586 });
587 if let Some(doc) = did_doc {
588 response["didDoc"] = doc;
589 }
590 if let Some(status) = account_state.status_for_session() {
591 response["status"] = json!(status);
592 }
593 Json(response).into_response()
594 }
595 Ok(None) => {
596 error!("User not found for existing session: {}", session_row.did);
597 ApiError::InternalError(None).into_response()
598 }
599 Err(e) => {
600 error!("Database error fetching user: {:?}", e);
601 ApiError::InternalError(None).into_response()
602 }
603 }
604}
605
606#[derive(Deserialize)]
607#[serde(rename_all = "camelCase")]
608pub struct ConfirmSignupInput {
609 pub did: Did,
610 pub verification_code: String,
611}
612
613#[derive(Serialize)]
614#[serde(rename_all = "camelCase")]
615pub struct ConfirmSignupOutput {
616 pub access_jwt: String,
617 pub refresh_jwt: String,
618 pub handle: Handle,
619 pub did: Did,
620 pub email: Option<String>,
621 pub email_verified: bool,
622 pub preferred_channel: String,
623 pub preferred_channel_verified: bool,
624}
625
626pub async fn confirm_signup(
627 State(state): State<AppState>,
628 Json(input): Json<ConfirmSignupInput>,
629) -> Response {
630 info!("confirm_signup called for DID: {}", input.did);
631 let row = match sqlx::query!(
632 r#"SELECT
633 u.id, u.did, u.handle, u.email,
634 u.preferred_comms_channel as "channel: crate::comms::CommsChannel",
635 u.discord_id, u.telegram_username, u.signal_number,
636 k.key_bytes, k.encryption_version
637 FROM users u
638 JOIN user_keys k ON u.id = k.user_id
639 WHERE u.did = $1"#,
640 input.did.as_str()
641 )
642 .fetch_optional(&state.db)
643 .await
644 {
645 Ok(Some(row)) => row,
646 Ok(None) => {
647 warn!("User not found for confirm_signup: {}", input.did);
648 return ApiError::InvalidRequest("Invalid DID or verification code".into())
649 .into_response();
650 }
651 Err(e) => {
652 error!("Database error in confirm_signup: {:?}", e);
653 return ApiError::InternalError(None).into_response();
654 }
655 };
656
657 let (channel_str, identifier) = match row.channel {
658 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
659 crate::comms::CommsChannel::Discord => {
660 ("discord", row.discord_id.clone().unwrap_or_default())
661 }
662 crate::comms::CommsChannel::Telegram => (
663 "telegram",
664 row.telegram_username.clone().unwrap_or_default(),
665 ),
666 crate::comms::CommsChannel::Signal => {
667 ("signal", row.signal_number.clone().unwrap_or_default())
668 }
669 };
670
671 let normalized_token =
672 crate::auth::verification_token::normalize_token_input(&input.verification_code);
673 match crate::auth::verification_token::verify_signup_token(
674 &normalized_token,
675 channel_str,
676 &identifier,
677 ) {
678 Ok(token_data) => {
679 if token_data.did != input.did.as_str() {
680 warn!(
681 "Token DID mismatch for confirm_signup: expected {}, got {}",
682 input.did, token_data.did
683 );
684 return ApiError::InvalidRequest("Invalid verification code".into())
685 .into_response();
686 }
687 }
688 Err(crate::auth::verification_token::VerifyError::Expired) => {
689 warn!("Verification code expired for user: {}", input.did);
690 return ApiError::ExpiredToken(Some("Verification code has expired".into()))
691 .into_response();
692 }
693 Err(e) => {
694 warn!("Invalid verification code for user {}: {:?}", input.did, e);
695 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
696 }
697 }
698
699 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
700 Ok(k) => k,
701 Err(e) => {
702 error!("Failed to decrypt user key: {:?}", e);
703 return ApiError::InternalError(None).into_response();
704 }
705 };
706 let verified_column = match row.channel {
707 crate::comms::CommsChannel::Email => "email_verified",
708 crate::comms::CommsChannel::Discord => "discord_verified",
709 crate::comms::CommsChannel::Telegram => "telegram_verified",
710 crate::comms::CommsChannel::Signal => "signal_verified",
711 };
712 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column);
713 if let Err(e) = sqlx::query(&update_query)
714 .bind(input.did.as_str())
715 .execute(&state.db)
716 .await
717 {
718 error!("Failed to update verification status: {:?}", e);
719 return ApiError::InternalError(None).into_response();
720 }
721
722 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
723 Ok(m) => m,
724 Err(e) => {
725 error!("Failed to create access token: {:?}", e);
726 return ApiError::InternalError(None).into_response();
727 }
728 };
729 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
730 Ok(m) => m,
731 Err(e) => {
732 error!("Failed to create refresh token: {:?}", e);
733 return ApiError::InternalError(None).into_response();
734 }
735 };
736 let no_scope: Option<String> = None;
737 if let Err(e) = sqlx::query!(
738 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
739 row.did,
740 access_meta.jti,
741 refresh_meta.jti,
742 access_meta.expires_at,
743 refresh_meta.expires_at,
744 false,
745 false,
746 no_scope
747 )
748 .execute(&state.db)
749 .await
750 {
751 error!("Failed to insert session: {:?}", e);
752 return ApiError::InternalError(None).into_response();
753 }
754 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
755 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await {
756 warn!("Failed to enqueue welcome notification: {:?}", e);
757 }
758 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email);
759 let preferred_channel = match row.channel {
760 crate::comms::CommsChannel::Email => "email",
761 crate::comms::CommsChannel::Discord => "discord",
762 crate::comms::CommsChannel::Telegram => "telegram",
763 crate::comms::CommsChannel::Signal => "signal",
764 };
765 Json(ConfirmSignupOutput {
766 access_jwt: access_meta.token,
767 refresh_jwt: refresh_meta.token,
768 handle: row.handle.into(),
769 did: row.did.into(),
770 email: row.email,
771 email_verified,
772 preferred_channel: preferred_channel.to_string(),
773 preferred_channel_verified: true,
774 })
775 .into_response()
776}
777
778#[derive(Deserialize)]
779#[serde(rename_all = "camelCase")]
780pub struct ResendVerificationInput {
781 pub did: Did,
782}
783
784pub async fn resend_verification(
785 State(state): State<AppState>,
786 Json(input): Json<ResendVerificationInput>,
787) -> Response {
788 info!("resend_verification called for DID: {}", input.did);
789 let row = match sqlx::query!(
790 r#"SELECT
791 id, handle, email,
792 preferred_comms_channel as "channel: crate::comms::CommsChannel",
793 discord_id, telegram_username, signal_number,
794 email_verified, discord_verified, telegram_verified, signal_verified
795 FROM users
796 WHERE did = $1"#,
797 input.did.as_str()
798 )
799 .fetch_optional(&state.db)
800 .await
801 {
802 Ok(Some(row)) => row,
803 Ok(None) => {
804 return ApiError::InvalidRequest("User not found".into()).into_response();
805 }
806 Err(e) => {
807 error!("Database error in resend_verification: {:?}", e);
808 return ApiError::InternalError(None).into_response();
809 }
810 };
811 let is_verified =
812 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
813 if is_verified {
814 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
815 }
816
817 let (channel_str, recipient) = match row.channel {
818 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
819 crate::comms::CommsChannel::Discord => {
820 ("discord", row.discord_id.clone().unwrap_or_default())
821 }
822 crate::comms::CommsChannel::Telegram => (
823 "telegram",
824 row.telegram_username.clone().unwrap_or_default(),
825 ),
826 crate::comms::CommsChannel::Signal => {
827 ("signal", row.signal_number.clone().unwrap_or_default())
828 }
829 };
830
831 let verification_token =
832 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient);
833 let formatted_token =
834 crate::auth::verification_token::format_token_for_display(&verification_token);
835
836 if let Err(e) = crate::comms::enqueue_signup_verification(
837 &state.db,
838 row.id,
839 channel_str,
840 &recipient,
841 &formatted_token,
842 None,
843 )
844 .await
845 {
846 warn!("Failed to enqueue verification notification: {:?}", e);
847 }
848 SuccessResponse::ok().into_response()
849}
850
851#[derive(Serialize)]
852#[serde(rename_all = "camelCase")]
853pub struct SessionInfo {
854 pub id: String,
855 pub session_type: String,
856 pub client_name: Option<String>,
857 pub created_at: String,
858 pub expires_at: String,
859 pub is_current: bool,
860}
861
862#[derive(Serialize)]
863#[serde(rename_all = "camelCase")]
864pub struct ListSessionsOutput {
865 pub sessions: Vec<SessionInfo>,
866}
867
868pub async fn list_sessions(
869 State(state): State<AppState>,
870 headers: HeaderMap,
871 auth: BearerAuth,
872) -> Response {
873 let current_jti = headers
874 .get("authorization")
875 .and_then(|v| v.to_str().ok())
876 .and_then(|v| v.strip_prefix("Bearer "))
877 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
878
879 let mut sessions: Vec<SessionInfo> = Vec::new();
880
881 let jwt_result = sqlx::query_as::<
882 _,
883 (
884 i32,
885 String,
886 chrono::DateTime<chrono::Utc>,
887 chrono::DateTime<chrono::Utc>,
888 ),
889 >(
890 r#"
891 SELECT id, access_jti, created_at, refresh_expires_at
892 FROM session_tokens
893 WHERE did = $1 AND refresh_expires_at > NOW()
894 ORDER BY created_at DESC
895 "#,
896 )
897 .bind(&auth.0.did)
898 .fetch_all(&state.db)
899 .await;
900
901 match jwt_result {
902 Ok(rows) => {
903 for (id, access_jti, created_at, expires_at) in rows {
904 sessions.push(SessionInfo {
905 id: format!("jwt:{}", id),
906 session_type: "legacy".to_string(),
907 client_name: None,
908 created_at: created_at.to_rfc3339(),
909 expires_at: expires_at.to_rfc3339(),
910 is_current: current_jti.as_ref() == Some(&access_jti),
911 });
912 }
913 }
914 Err(e) => {
915 error!("DB error fetching JWT sessions: {:?}", e);
916 return ApiError::InternalError(None).into_response();
917 }
918 }
919
920 let oauth_result = sqlx::query_as::<
921 _,
922 (
923 i32,
924 String,
925 chrono::DateTime<chrono::Utc>,
926 chrono::DateTime<chrono::Utc>,
927 String,
928 ),
929 >(
930 r#"
931 SELECT id, token_id, created_at, expires_at, client_id
932 FROM oauth_token
933 WHERE did = $1 AND expires_at > NOW()
934 ORDER BY created_at DESC
935 "#,
936 )
937 .bind(&auth.0.did)
938 .fetch_all(&state.db)
939 .await;
940
941 match oauth_result {
942 Ok(rows) => {
943 for (id, token_id, created_at, expires_at, client_id) in rows {
944 let client_name = extract_client_name(&client_id);
945 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id);
946 sessions.push(SessionInfo {
947 id: format!("oauth:{}", id),
948 session_type: "oauth".to_string(),
949 client_name: Some(client_name),
950 created_at: created_at.to_rfc3339(),
951 expires_at: expires_at.to_rfc3339(),
952 is_current: is_current_oauth,
953 });
954 }
955 }
956 Err(e) => {
957 error!("DB error fetching OAuth sessions: {:?}", e);
958 return ApiError::InternalError(None).into_response();
959 }
960 }
961
962 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
963
964 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response()
965}
966
967fn extract_client_name(client_id: &str) -> String {
968 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") {
969 "Localhost App".to_string()
970 } else if let Ok(parsed) = reqwest::Url::parse(client_id) {
971 parsed.host_str().unwrap_or("Unknown App").to_string()
972 } else {
973 client_id.to_string()
974 }
975}
976
977#[derive(Deserialize)]
978#[serde(rename_all = "camelCase")]
979pub struct RevokeSessionInput {
980 pub session_id: String,
981}
982
983pub async fn revoke_session(
984 State(state): State<AppState>,
985 auth: BearerAuth,
986 Json(input): Json<RevokeSessionInput>,
987) -> Response {
988 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") {
989 let Ok(session_id) = jwt_id.parse::<i32>() else {
990 return ApiError::InvalidRequest("Invalid session ID".into()).into_response();
991 };
992 let session = sqlx::query_as::<_, (String,)>(
993 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2",
994 )
995 .bind(session_id)
996 .bind(&auth.0.did)
997 .fetch_optional(&state.db)
998 .await;
999 let access_jti = match session {
1000 Ok(Some((jti,))) => jti,
1001 Ok(None) => {
1002 return ApiError::SessionNotFound.into_response();
1003 }
1004 Err(e) => {
1005 error!("DB error in revoke_session: {:?}", e);
1006 return ApiError::InternalError(None).into_response();
1007 }
1008 };
1009 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1")
1010 .bind(session_id)
1011 .execute(&state.db)
1012 .await
1013 {
1014 error!("DB error deleting session: {:?}", e);
1015 return ApiError::InternalError(None).into_response();
1016 }
1017 let cache_key = format!("auth:session:{}:{}", &auth.0.did, access_jti);
1018 if let Err(e) = state.cache.delete(&cache_key).await {
1019 warn!("Failed to invalidate session cache: {:?}", e);
1020 }
1021 info!(did = %&auth.0.did, session_id = %session_id, "JWT session revoked");
1022 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") {
1023 let Ok(session_id) = oauth_id.parse::<i32>() else {
1024 return ApiError::InvalidRequest("Invalid session ID".into()).into_response();
1025 };
1026 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2")
1027 .bind(session_id)
1028 .bind(&auth.0.did)
1029 .execute(&state.db)
1030 .await;
1031 match result {
1032 Ok(r) if r.rows_affected() == 0 => {
1033 return ApiError::SessionNotFound.into_response();
1034 }
1035 Err(e) => {
1036 error!("DB error deleting OAuth session: {:?}", e);
1037 return ApiError::InternalError(None).into_response();
1038 }
1039 _ => {}
1040 }
1041 info!(did = %&auth.0.did, session_id = %session_id, "OAuth session revoked");
1042 } else {
1043 return ApiError::InvalidRequest("Invalid session ID format".into()).into_response();
1044 }
1045 EmptyResponse::ok().into_response()
1046}
1047
1048pub async fn revoke_all_sessions(
1049 State(state): State<AppState>,
1050 headers: HeaderMap,
1051 auth: BearerAuth,
1052) -> Response {
1053 let current_jti = crate::auth::extract_auth_token_from_header(
1054 headers.get("authorization").and_then(|v| v.to_str().ok()),
1055 )
1056 .and_then(|extracted| crate::auth::get_jti_from_token(&extracted.token).ok());
1057
1058 let Some(ref jti) = current_jti else {
1059 return ApiError::InvalidToken(None).into_response();
1060 };
1061
1062 if auth.0.is_oauth {
1063 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1")
1064 .bind(&auth.0.did)
1065 .execute(&state.db)
1066 .await
1067 {
1068 error!("DB error revoking JWT sessions: {:?}", e);
1069 return ApiError::InternalError(None).into_response();
1070 }
1071 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2")
1072 .bind(&auth.0.did)
1073 .bind(jti)
1074 .execute(&state.db)
1075 .await
1076 {
1077 error!("DB error revoking OAuth sessions: {:?}", e);
1078 return ApiError::InternalError(None).into_response();
1079 }
1080 } else {
1081 if let Err(e) =
1082 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2")
1083 .bind(&auth.0.did)
1084 .bind(jti)
1085 .execute(&state.db)
1086 .await
1087 {
1088 error!("DB error revoking JWT sessions: {:?}", e);
1089 return ApiError::InternalError(None).into_response();
1090 }
1091 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1")
1092 .bind(&auth.0.did)
1093 .execute(&state.db)
1094 .await
1095 {
1096 error!("DB error revoking OAuth sessions: {:?}", e);
1097 return ApiError::InternalError(None).into_response();
1098 }
1099 }
1100
1101 info!(did = %&auth.0.did, "All other sessions revoked");
1102 SuccessResponse::ok().into_response()
1103}
1104
1105#[derive(Serialize)]
1106#[serde(rename_all = "camelCase")]
1107pub struct LegacyLoginPreferenceOutput {
1108 pub allow_legacy_login: bool,
1109 pub has_mfa: bool,
1110}
1111
1112pub async fn get_legacy_login_preference(
1113 State(state): State<AppState>,
1114 auth: BearerAuth,
1115) -> Response {
1116 let result = sqlx::query!(
1117 r#"SELECT
1118 u.allow_legacy_login,
1119 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR
1120 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!"
1121 FROM users u WHERE u.did = $1"#,
1122 &auth.0.did
1123 )
1124 .fetch_optional(&state.db)
1125 .await;
1126
1127 match result {
1128 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput {
1129 allow_legacy_login: row.allow_legacy_login,
1130 has_mfa: row.has_mfa,
1131 })
1132 .into_response(),
1133 Ok(None) => ApiError::AccountNotFound.into_response(),
1134 Err(e) => {
1135 error!("DB error: {:?}", e);
1136 ApiError::InternalError(None).into_response()
1137 }
1138 }
1139}
1140
1141#[derive(Deserialize)]
1142#[serde(rename_all = "camelCase")]
1143pub struct UpdateLegacyLoginInput {
1144 pub allow_legacy_login: bool,
1145}
1146
1147pub async fn update_legacy_login_preference(
1148 State(state): State<AppState>,
1149 auth: BearerAuth,
1150 Json(input): Json<UpdateLegacyLoginInput>,
1151) -> Response {
1152 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await {
1153 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did)
1154 .await;
1155 }
1156
1157 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await {
1158 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await;
1159 }
1160
1161 let result = sqlx::query!(
1162 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did",
1163 input.allow_legacy_login,
1164 &auth.0.did
1165 )
1166 .fetch_optional(&state.db)
1167 .await;
1168
1169 match result {
1170 Ok(Some(_)) => {
1171 info!(
1172 did = %&auth.0.did,
1173 allow_legacy_login = input.allow_legacy_login,
1174 "Legacy login preference updated"
1175 );
1176 Json(json!({
1177 "allowLegacyLogin": input.allow_legacy_login
1178 }))
1179 .into_response()
1180 }
1181 Ok(None) => ApiError::AccountNotFound.into_response(),
1182 Err(e) => {
1183 error!("DB error: {:?}", e);
1184 ApiError::InternalError(None).into_response()
1185 }
1186 }
1187}
1188
1189use crate::comms::locale::VALID_LOCALES;
1190
1191#[derive(Deserialize)]
1192#[serde(rename_all = "camelCase")]
1193pub struct UpdateLocaleInput {
1194 pub preferred_locale: String,
1195}
1196
1197pub async fn update_locale(
1198 State(state): State<AppState>,
1199 auth: BearerAuth,
1200 Json(input): Json<UpdateLocaleInput>,
1201) -> Response {
1202 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) {
1203 return ApiError::InvalidRequest(format!(
1204 "Invalid locale. Valid options: {}",
1205 VALID_LOCALES.join(", ")
1206 ))
1207 .into_response();
1208 }
1209
1210 let result = sqlx::query!(
1211 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did",
1212 input.preferred_locale,
1213 &auth.0.did
1214 )
1215 .fetch_optional(&state.db)
1216 .await;
1217
1218 match result {
1219 Ok(Some(_)) => {
1220 info!(
1221 did = %&auth.0.did,
1222 locale = %input.preferred_locale,
1223 "User locale preference updated"
1224 );
1225 Json(json!({
1226 "preferredLocale": input.preferred_locale
1227 }))
1228 .into_response()
1229 }
1230 Ok(None) => ApiError::AccountNotFound.into_response(),
1231 Err(e) => {
1232 error!("DB error updating locale: {:?}", e);
1233 ApiError::InternalError(None).into_response()
1234 }
1235 }
1236}