this repo has no description
1use crate::api::ApiError;
2use crate::auth::BearerAuth;
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use chrono::Utc;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use tracing::{error, info, warn};
15
16fn extract_client_ip(headers: &HeaderMap) -> String {
17 if let Some(forwarded) = headers.get("x-forwarded-for") {
18 if let Ok(value) = forwarded.to_str() {
19 if let Some(first_ip) = value.split(',').next() {
20 return first_ip.trim().to_string();
21 }
22 }
23 }
24 if let Some(real_ip) = headers.get("x-real-ip") {
25 if let Ok(value) = real_ip.to_str() {
26 return value.trim().to_string();
27 }
28 }
29 "unknown".to_string()
30}
31
32#[derive(Deserialize)]
33pub struct CreateSessionInput {
34 pub identifier: String,
35 pub password: String,
36}
37
38#[derive(Serialize)]
39#[serde(rename_all = "camelCase")]
40pub struct CreateSessionOutput {
41 pub access_jwt: String,
42 pub refresh_jwt: String,
43 pub handle: String,
44 pub did: String,
45}
46
47pub async fn create_session(
48 State(state): State<AppState>,
49 headers: HeaderMap,
50 Json(input): Json<CreateSessionInput>,
51) -> Response {
52 info!("create_session called");
53 let client_ip = extract_client_ip(&headers);
54 if !state.check_rate_limit(RateLimitKind::Login, &client_ip).await {
55 warn!(ip = %client_ip, "Login rate limit exceeded");
56 return (
57 StatusCode::TOO_MANY_REQUESTS,
58 Json(json!({
59 "error": "RateLimitExceeded",
60 "message": "Too many login attempts. Please try again later."
61 })),
62 )
63 .into_response();
64 }
65 let row = match sqlx::query!(
66 r#"SELECT
67 u.id, u.did, u.handle, u.password_hash,
68 u.email_confirmed, u.discord_verified, u.telegram_verified, u.signal_verified,
69 k.key_bytes, k.encryption_version
70 FROM users u
71 JOIN user_keys k ON u.id = k.user_id
72 WHERE u.handle = $1 OR u.email = $1"#,
73 input.identifier
74 )
75 .fetch_optional(&state.db)
76 .await
77 {
78 Ok(Some(row)) => row,
79 Ok(None) => {
80 let _ = verify(&input.password, "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK");
81 warn!("User not found for login attempt");
82 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response();
83 }
84 Err(e) => {
85 error!("Database error fetching user: {:?}", e);
86 return ApiError::InternalError.into_response();
87 }
88 };
89 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
90 Ok(k) => k,
91 Err(e) => {
92 error!("Failed to decrypt user key: {:?}", e);
93 return ApiError::InternalError.into_response();
94 }
95 };
96 let password_valid = if verify(&input.password, &row.password_hash).unwrap_or(false) {
97 true
98 } else {
99 let app_passwords = sqlx::query!(
100 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
101 row.id
102 )
103 .fetch_all(&state.db)
104 .await
105 .unwrap_or_default();
106 app_passwords.iter().any(|app| verify(&input.password, &app.password_hash).unwrap_or(false))
107 };
108 if !password_valid {
109 warn!("Password verification failed for login attempt");
110 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response();
111 }
112 let is_verified = row.email_confirmed
113 || row.discord_verified
114 || row.telegram_verified
115 || row.signal_verified;
116 if !is_verified {
117 warn!("Login attempt for unverified account: {}", row.did);
118 return (
119 StatusCode::FORBIDDEN,
120 Json(json!({
121 "error": "AccountNotVerified",
122 "message": "Please verify your account before logging in",
123 "did": row.did
124 })),
125 ).into_response();
126 }
127 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
128 Ok(m) => m,
129 Err(e) => {
130 error!("Failed to create access token: {:?}", e);
131 return ApiError::InternalError.into_response();
132 }
133 };
134 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
135 Ok(m) => m,
136 Err(e) => {
137 error!("Failed to create refresh token: {:?}", e);
138 return ApiError::InternalError.into_response();
139 }
140 };
141 if let Err(e) = sqlx::query!(
142 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
143 row.did,
144 access_meta.jti,
145 refresh_meta.jti,
146 access_meta.expires_at,
147 refresh_meta.expires_at
148 )
149 .execute(&state.db)
150 .await
151 {
152 error!("Failed to insert session: {:?}", e);
153 return ApiError::InternalError.into_response();
154 }
155 Json(CreateSessionOutput {
156 access_jwt: access_meta.token,
157 refresh_jwt: refresh_meta.token,
158 handle: row.handle,
159 did: row.did,
160 }).into_response()
161}
162
163pub async fn get_session(
164 State(state): State<AppState>,
165 BearerAuth(auth_user): BearerAuth,
166) -> Response {
167 match sqlx::query!(
168 r#"SELECT
169 handle, email, email_confirmed,
170 preferred_notification_channel as "preferred_channel: crate::notifications::NotificationChannel",
171 discord_verified, telegram_verified, signal_verified
172 FROM users WHERE did = $1"#,
173 auth_user.did
174 )
175 .fetch_optional(&state.db)
176 .await
177 {
178 Ok(Some(row)) => {
179 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
180 crate::notifications::NotificationChannel::Email => ("email", row.email_confirmed),
181 crate::notifications::NotificationChannel::Discord => ("discord", row.discord_verified),
182 crate::notifications::NotificationChannel::Telegram => ("telegram", row.telegram_verified),
183 crate::notifications::NotificationChannel::Signal => ("signal", row.signal_verified),
184 };
185 Json(json!({
186 "handle": row.handle,
187 "did": auth_user.did,
188 "email": row.email,
189 "emailConfirmed": row.email_confirmed,
190 "preferredChannel": preferred_channel,
191 "preferredChannelVerified": preferred_channel_verified,
192 "didDoc": {}
193 })).into_response()
194 }
195 Ok(None) => ApiError::AuthenticationFailed.into_response(),
196 Err(e) => {
197 error!("Database error in get_session: {:?}", e);
198 ApiError::InternalError.into_response()
199 }
200 }
201}
202
203pub async fn delete_session(
204 State(state): State<AppState>,
205 headers: axum::http::HeaderMap,
206) -> Response {
207 let token = match crate::auth::extract_bearer_token_from_header(
208 headers.get("Authorization").and_then(|h| h.to_str().ok())
209 ) {
210 Some(t) => t,
211 None => return ApiError::AuthenticationRequired.into_response(),
212 };
213 let jti = match crate::auth::get_jti_from_token(&token) {
214 Ok(jti) => jti,
215 Err(_) => return ApiError::AuthenticationFailed.into_response(),
216 };
217 let did = crate::auth::get_did_from_token(&token).ok();
218 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
219 .execute(&state.db)
220 .await
221 {
222 Ok(res) if res.rows_affected() > 0 => {
223 if let Some(did) = did {
224 let session_cache_key = format!("auth:session:{}:{}", did, jti);
225 let _ = state.cache.delete(&session_cache_key).await;
226 }
227 Json(json!({})).into_response()
228 }
229 Ok(_) => ApiError::AuthenticationFailed.into_response(),
230 Err(e) => {
231 error!("Database error in delete_session: {:?}", e);
232 ApiError::AuthenticationFailed.into_response()
233 }
234 }
235}
236
237pub async fn refresh_session(
238 State(state): State<AppState>,
239 headers: axum::http::HeaderMap,
240) -> Response {
241 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
242 if !state.check_rate_limit(RateLimitKind::RefreshSession, &client_ip).await {
243 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
244 return (
245 axum::http::StatusCode::TOO_MANY_REQUESTS,
246 axum::Json(serde_json::json!({
247 "error": "RateLimitExceeded",
248 "message": "Too many requests. Please try again later."
249 })),
250 ).into_response();
251 }
252 let refresh_token = match crate::auth::extract_bearer_token_from_header(
253 headers.get("Authorization").and_then(|h| h.to_str().ok())
254 ) {
255 Some(t) => t,
256 None => return ApiError::AuthenticationRequired.into_response(),
257 };
258 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
259 Ok(jti) => jti,
260 Err(_) => return ApiError::AuthenticationFailedMsg("Invalid token format".into()).into_response(),
261 };
262 let mut tx = match state.db.begin().await {
263 Ok(tx) => tx,
264 Err(e) => {
265 error!("Failed to begin transaction: {:?}", e);
266 return ApiError::InternalError.into_response();
267 }
268 };
269 if let Ok(Some(session_id)) = sqlx::query_scalar!(
270 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
271 refresh_jti
272 )
273 .fetch_optional(&mut *tx)
274 .await
275 {
276 warn!("Refresh token reuse detected! Revoking token family for session_id: {}", session_id);
277 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
278 .execute(&mut *tx)
279 .await;
280 let _ = tx.commit().await;
281 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
282 }
283 let session_row = match sqlx::query!(
284 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version
285 FROM session_tokens st
286 JOIN users u ON st.did = u.did
287 JOIN user_keys k ON u.id = k.user_id
288 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
289 FOR UPDATE OF st"#,
290 refresh_jti
291 )
292 .fetch_optional(&mut *tx)
293 .await
294 {
295 Ok(Some(row)) => row,
296 Ok(None) => return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(),
297 Err(e) => {
298 error!("Database error fetching session: {:?}", e);
299 return ApiError::InternalError.into_response();
300 }
301 };
302 let key_bytes = match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
303 Ok(k) => k,
304 Err(e) => {
305 error!("Failed to decrypt user key: {:?}", e);
306 return ApiError::InternalError.into_response();
307 }
308 };
309 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
310 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
311 }
312 let new_access_meta = match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) {
313 Ok(m) => m,
314 Err(e) => {
315 error!("Failed to create access token: {:?}", e);
316 return ApiError::InternalError.into_response();
317 }
318 };
319 let new_refresh_meta = match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
320 Ok(m) => m,
321 Err(e) => {
322 error!("Failed to create refresh token: {:?}", e);
323 return ApiError::InternalError.into_response();
324 }
325 };
326 match sqlx::query!(
327 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
328 refresh_jti,
329 session_row.id
330 )
331 .execute(&mut *tx)
332 .await
333 {
334 Ok(result) if result.rows_affected() == 0 => {
335 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
336 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
337 .execute(&mut *tx)
338 .await;
339 let _ = tx.commit().await;
340 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
341 }
342 Err(e) => {
343 error!("Failed to record used refresh token: {:?}", e);
344 return ApiError::InternalError.into_response();
345 }
346 Ok(_) => {}
347 }
348 if let Err(e) = sqlx::query!(
349 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
350 new_access_meta.jti,
351 new_refresh_meta.jti,
352 new_access_meta.expires_at,
353 new_refresh_meta.expires_at,
354 session_row.id
355 )
356 .execute(&mut *tx)
357 .await
358 {
359 error!("Database error updating session: {:?}", e);
360 return ApiError::InternalError.into_response();
361 }
362 if let Err(e) = tx.commit().await {
363 error!("Failed to commit transaction: {:?}", e);
364 return ApiError::InternalError.into_response();
365 }
366 match sqlx::query!(
367 r#"SELECT
368 handle, email, email_confirmed,
369 preferred_notification_channel as "preferred_channel: crate::notifications::NotificationChannel",
370 discord_verified, telegram_verified, signal_verified
371 FROM users WHERE did = $1"#,
372 session_row.did
373 )
374 .fetch_optional(&state.db)
375 .await
376 {
377 Ok(Some(u)) => {
378 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
379 crate::notifications::NotificationChannel::Email => ("email", u.email_confirmed),
380 crate::notifications::NotificationChannel::Discord => ("discord", u.discord_verified),
381 crate::notifications::NotificationChannel::Telegram => ("telegram", u.telegram_verified),
382 crate::notifications::NotificationChannel::Signal => ("signal", u.signal_verified),
383 };
384 Json(json!({
385 "accessJwt": new_access_meta.token,
386 "refreshJwt": new_refresh_meta.token,
387 "handle": u.handle,
388 "did": session_row.did,
389 "email": u.email,
390 "emailConfirmed": u.email_confirmed,
391 "preferredChannel": preferred_channel,
392 "preferredChannelVerified": preferred_channel_verified
393 })).into_response()
394 }
395 Ok(None) => {
396 error!("User not found for existing session: {}", session_row.did);
397 ApiError::InternalError.into_response()
398 }
399 Err(e) => {
400 error!("Database error fetching user: {:?}", e);
401 ApiError::InternalError.into_response()
402 }
403 }
404}
405
406#[derive(Deserialize)]
407#[serde(rename_all = "camelCase")]
408pub struct ConfirmSignupInput {
409 pub did: String,
410 pub verification_code: String,
411}
412
413#[derive(Serialize)]
414#[serde(rename_all = "camelCase")]
415pub struct ConfirmSignupOutput {
416 pub access_jwt: String,
417 pub refresh_jwt: String,
418 pub handle: String,
419 pub did: String,
420 pub email: Option<String>,
421 pub email_confirmed: bool,
422 pub preferred_channel: String,
423 pub preferred_channel_verified: bool,
424}
425
426pub async fn confirm_signup(
427 State(state): State<AppState>,
428 Json(input): Json<ConfirmSignupInput>,
429) -> Response {
430 info!("confirm_signup called for DID: {}", input.did);
431 let row = match sqlx::query!(
432 r#"SELECT
433 u.id, u.did, u.handle, u.email,
434 u.email_confirmation_code,
435 u.email_confirmation_code_expires_at,
436 u.preferred_notification_channel as "channel: crate::notifications::NotificationChannel",
437 k.key_bytes, k.encryption_version
438 FROM users u
439 JOIN user_keys k ON u.id = k.user_id
440 WHERE u.did = $1"#,
441 input.did
442 )
443 .fetch_optional(&state.db)
444 .await
445 {
446 Ok(Some(row)) => row,
447 Ok(None) => {
448 warn!("User not found for confirm_signup: {}", input.did);
449 return ApiError::InvalidRequest("Invalid DID or verification code".into()).into_response();
450 }
451 Err(e) => {
452 error!("Database error in confirm_signup: {:?}", e);
453 return ApiError::InternalError.into_response();
454 }
455 };
456 let stored_code = match &row.email_confirmation_code {
457 Some(code) => code,
458 None => {
459 warn!("No verification code found for user: {}", input.did);
460 return ApiError::InvalidRequest("No pending verification".into()).into_response();
461 }
462 };
463 if stored_code != &input.verification_code {
464 warn!("Invalid verification code for user: {}", input.did);
465 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
466 }
467 if let Some(expires_at) = row.email_confirmation_code_expires_at {
468 if expires_at < Utc::now() {
469 warn!("Verification code expired for user: {}", input.did);
470 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response();
471 }
472 }
473 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
474 Ok(k) => k,
475 Err(e) => {
476 error!("Failed to decrypt user key: {:?}", e);
477 return ApiError::InternalError.into_response();
478 }
479 };
480 let verified_column = match row.channel {
481 crate::notifications::NotificationChannel::Email => "email_confirmed",
482 crate::notifications::NotificationChannel::Discord => "discord_verified",
483 crate::notifications::NotificationChannel::Telegram => "telegram_verified",
484 crate::notifications::NotificationChannel::Signal => "signal_verified",
485 };
486 let update_query = format!(
487 "UPDATE users SET {} = TRUE, email_confirmation_code = NULL, email_confirmation_code_expires_at = NULL WHERE did = $1",
488 verified_column
489 );
490 if let Err(e) = sqlx::query(&update_query)
491 .bind(&input.did)
492 .execute(&state.db)
493 .await
494 {
495 error!("Failed to update verification status: {:?}", e);
496 return ApiError::InternalError.into_response();
497 }
498 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
499 Ok(m) => m,
500 Err(e) => {
501 error!("Failed to create access token: {:?}", e);
502 return ApiError::InternalError.into_response();
503 }
504 };
505 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
506 Ok(m) => m,
507 Err(e) => {
508 error!("Failed to create refresh token: {:?}", e);
509 return ApiError::InternalError.into_response();
510 }
511 };
512 if let Err(e) = sqlx::query!(
513 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
514 row.did,
515 access_meta.jti,
516 refresh_meta.jti,
517 access_meta.expires_at,
518 refresh_meta.expires_at
519 )
520 .execute(&state.db)
521 .await
522 {
523 error!("Failed to insert session: {:?}", e);
524 return ApiError::InternalError.into_response();
525 }
526 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
527 if let Err(e) = crate::notifications::enqueue_welcome(&state.db, row.id, &hostname).await {
528 warn!("Failed to enqueue welcome notification: {:?}", e);
529 }
530 let email_confirmed = matches!(row.channel, crate::notifications::NotificationChannel::Email);
531 let preferred_channel = match row.channel {
532 crate::notifications::NotificationChannel::Email => "email",
533 crate::notifications::NotificationChannel::Discord => "discord",
534 crate::notifications::NotificationChannel::Telegram => "telegram",
535 crate::notifications::NotificationChannel::Signal => "signal",
536 };
537 Json(ConfirmSignupOutput {
538 access_jwt: access_meta.token,
539 refresh_jwt: refresh_meta.token,
540 handle: row.handle,
541 did: row.did,
542 email: row.email,
543 email_confirmed,
544 preferred_channel: preferred_channel.to_string(),
545 preferred_channel_verified: true,
546 }).into_response()
547}
548
549#[derive(Deserialize)]
550#[serde(rename_all = "camelCase")]
551pub struct ResendVerificationInput {
552 pub did: String,
553}
554
555pub async fn resend_verification(
556 State(state): State<AppState>,
557 Json(input): Json<ResendVerificationInput>,
558) -> Response {
559 info!("resend_verification called for DID: {}", input.did);
560 let row = match sqlx::query!(
561 r#"SELECT
562 id, handle, email,
563 preferred_notification_channel as "channel: crate::notifications::NotificationChannel",
564 discord_id, telegram_username, signal_number,
565 email_confirmed, discord_verified, telegram_verified, signal_verified
566 FROM users
567 WHERE did = $1"#,
568 input.did
569 )
570 .fetch_optional(&state.db)
571 .await
572 {
573 Ok(Some(row)) => row,
574 Ok(None) => {
575 return ApiError::InvalidRequest("User not found".into()).into_response();
576 }
577 Err(e) => {
578 error!("Database error in resend_verification: {:?}", e);
579 return ApiError::InternalError.into_response();
580 }
581 };
582 let is_verified = row.email_confirmed
583 || row.discord_verified
584 || row.telegram_verified
585 || row.signal_verified;
586 if is_verified {
587 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
588 }
589 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
590 let code_expires_at = Utc::now() + chrono::Duration::minutes(30);
591 if let Err(e) = sqlx::query!(
592 "UPDATE users SET email_confirmation_code = $1, email_confirmation_code_expires_at = $2 WHERE did = $3",
593 verification_code,
594 code_expires_at,
595 input.did
596 )
597 .execute(&state.db)
598 .await
599 {
600 error!("Failed to update verification code: {:?}", e);
601 return ApiError::InternalError.into_response();
602 }
603 let (channel_str, recipient) = match row.channel {
604 crate::notifications::NotificationChannel::Email => ("email", row.email.clone().unwrap_or_default()),
605 crate::notifications::NotificationChannel::Discord => {
606 ("discord", row.discord_id.unwrap_or_default())
607 }
608 crate::notifications::NotificationChannel::Telegram => {
609 ("telegram", row.telegram_username.unwrap_or_default())
610 }
611 crate::notifications::NotificationChannel::Signal => {
612 ("signal", row.signal_number.unwrap_or_default())
613 }
614 };
615 if let Err(e) = crate::notifications::enqueue_signup_verification(
616 &state.db,
617 row.id,
618 channel_str,
619 &recipient,
620 &verification_code,
621 ).await {
622 warn!("Failed to enqueue verification notification: {:?}", e);
623 }
624 Json(json!({"success": true})).into_response()
625}