this repo has no description
1use crate::api::ApiError;
2use crate::auth::BearerAuth;
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use chrono::Utc;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use tracing::{error, info, warn};
15fn extract_client_ip(headers: &HeaderMap) -> String {
16 if let Some(forwarded) = headers.get("x-forwarded-for") {
17 if let Ok(value) = forwarded.to_str() {
18 if let Some(first_ip) = value.split(',').next() {
19 return first_ip.trim().to_string();
20 }
21 }
22 }
23 if let Some(real_ip) = headers.get("x-real-ip") {
24 if let Ok(value) = real_ip.to_str() {
25 return value.trim().to_string();
26 }
27 }
28 "unknown".to_string()
29}
30#[derive(Deserialize)]
31pub struct CreateSessionInput {
32 pub identifier: String,
33 pub password: String,
34}
35#[derive(Serialize)]
36#[serde(rename_all = "camelCase")]
37pub struct CreateSessionOutput {
38 pub access_jwt: String,
39 pub refresh_jwt: String,
40 pub handle: String,
41 pub did: String,
42}
43pub async fn create_session(
44 State(state): State<AppState>,
45 headers: HeaderMap,
46 Json(input): Json<CreateSessionInput>,
47) -> Response {
48 info!("create_session called");
49 let client_ip = extract_client_ip(&headers);
50 if !state.check_rate_limit(RateLimitKind::Login, &client_ip).await {
51 warn!(ip = %client_ip, "Login rate limit exceeded");
52 return (
53 StatusCode::TOO_MANY_REQUESTS,
54 Json(json!({
55 "error": "RateLimitExceeded",
56 "message": "Too many login attempts. Please try again later."
57 })),
58 )
59 .into_response();
60 }
61 let row = match sqlx::query!(
62 r#"SELECT
63 u.id, u.did, u.handle, u.password_hash,
64 u.email_confirmed, u.discord_verified, u.telegram_verified, u.signal_verified,
65 k.key_bytes, k.encryption_version
66 FROM users u
67 JOIN user_keys k ON u.id = k.user_id
68 WHERE u.handle = $1 OR u.email = $1"#,
69 input.identifier
70 )
71 .fetch_optional(&state.db)
72 .await
73 {
74 Ok(Some(row)) => row,
75 Ok(None) => {
76 let _ = verify(&input.password, "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK");
77 warn!("User not found for login attempt");
78 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response();
79 }
80 Err(e) => {
81 error!("Database error fetching user: {:?}", e);
82 return ApiError::InternalError.into_response();
83 }
84 };
85 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
86 Ok(k) => k,
87 Err(e) => {
88 error!("Failed to decrypt user key: {:?}", e);
89 return ApiError::InternalError.into_response();
90 }
91 };
92 let password_valid = if verify(&input.password, &row.password_hash).unwrap_or(false) {
93 true
94 } else {
95 let app_passwords = sqlx::query!(
96 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
97 row.id
98 )
99 .fetch_all(&state.db)
100 .await
101 .unwrap_or_default();
102 app_passwords.iter().any(|app| verify(&input.password, &app.password_hash).unwrap_or(false))
103 };
104 if !password_valid {
105 warn!("Password verification failed for login attempt");
106 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response();
107 }
108 let is_verified = row.email_confirmed
109 || row.discord_verified
110 || row.telegram_verified
111 || row.signal_verified;
112 if !is_verified {
113 warn!("Login attempt for unverified account: {}", row.did);
114 return (
115 StatusCode::FORBIDDEN,
116 Json(json!({
117 "error": "AccountNotVerified",
118 "message": "Please verify your account before logging in",
119 "did": row.did
120 })),
121 ).into_response();
122 }
123 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
124 Ok(m) => m,
125 Err(e) => {
126 error!("Failed to create access token: {:?}", e);
127 return ApiError::InternalError.into_response();
128 }
129 };
130 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
131 Ok(m) => m,
132 Err(e) => {
133 error!("Failed to create refresh token: {:?}", e);
134 return ApiError::InternalError.into_response();
135 }
136 };
137 if let Err(e) = sqlx::query!(
138 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
139 row.did,
140 access_meta.jti,
141 refresh_meta.jti,
142 access_meta.expires_at,
143 refresh_meta.expires_at
144 )
145 .execute(&state.db)
146 .await
147 {
148 error!("Failed to insert session: {:?}", e);
149 return ApiError::InternalError.into_response();
150 }
151 Json(CreateSessionOutput {
152 access_jwt: access_meta.token,
153 refresh_jwt: refresh_meta.token,
154 handle: row.handle,
155 did: row.did,
156 }).into_response()
157}
158pub async fn get_session(
159 State(state): State<AppState>,
160 BearerAuth(auth_user): BearerAuth,
161) -> Response {
162 match sqlx::query!(
163 r#"SELECT
164 handle, email, email_confirmed,
165 preferred_notification_channel as "preferred_channel: crate::notifications::NotificationChannel",
166 discord_verified, telegram_verified, signal_verified
167 FROM users WHERE did = $1"#,
168 auth_user.did
169 )
170 .fetch_optional(&state.db)
171 .await
172 {
173 Ok(Some(row)) => {
174 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
175 crate::notifications::NotificationChannel::Email => ("email", row.email_confirmed),
176 crate::notifications::NotificationChannel::Discord => ("discord", row.discord_verified),
177 crate::notifications::NotificationChannel::Telegram => ("telegram", row.telegram_verified),
178 crate::notifications::NotificationChannel::Signal => ("signal", row.signal_verified),
179 };
180 Json(json!({
181 "handle": row.handle,
182 "did": auth_user.did,
183 "email": row.email,
184 "emailConfirmed": row.email_confirmed,
185 "preferredChannel": preferred_channel,
186 "preferredChannelVerified": preferred_channel_verified,
187 "didDoc": {}
188 })).into_response()
189 }
190 Ok(None) => ApiError::AuthenticationFailed.into_response(),
191 Err(e) => {
192 error!("Database error in get_session: {:?}", e);
193 ApiError::InternalError.into_response()
194 }
195 }
196}
197pub async fn delete_session(
198 State(state): State<AppState>,
199 headers: axum::http::HeaderMap,
200) -> Response {
201 let token = match crate::auth::extract_bearer_token_from_header(
202 headers.get("Authorization").and_then(|h| h.to_str().ok())
203 ) {
204 Some(t) => t,
205 None => return ApiError::AuthenticationRequired.into_response(),
206 };
207 let jti = match crate::auth::get_jti_from_token(&token) {
208 Ok(jti) => jti,
209 Err(_) => return ApiError::AuthenticationFailed.into_response(),
210 };
211 let did = crate::auth::get_did_from_token(&token).ok();
212 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
213 .execute(&state.db)
214 .await
215 {
216 Ok(res) if res.rows_affected() > 0 => {
217 if let Some(did) = did {
218 let session_cache_key = format!("auth:session:{}:{}", did, jti);
219 let _ = state.cache.delete(&session_cache_key).await;
220 }
221 Json(json!({})).into_response()
222 }
223 Ok(_) => ApiError::AuthenticationFailed.into_response(),
224 Err(e) => {
225 error!("Database error in delete_session: {:?}", e);
226 ApiError::AuthenticationFailed.into_response()
227 }
228 }
229}
230pub async fn refresh_session(
231 State(state): State<AppState>,
232 headers: axum::http::HeaderMap,
233) -> Response {
234 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
235 if !state.check_rate_limit(RateLimitKind::RefreshSession, &client_ip).await {
236 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
237 return (
238 axum::http::StatusCode::TOO_MANY_REQUESTS,
239 axum::Json(serde_json::json!({
240 "error": "RateLimitExceeded",
241 "message": "Too many requests. Please try again later."
242 })),
243 ).into_response();
244 }
245 let refresh_token = match crate::auth::extract_bearer_token_from_header(
246 headers.get("Authorization").and_then(|h| h.to_str().ok())
247 ) {
248 Some(t) => t,
249 None => return ApiError::AuthenticationRequired.into_response(),
250 };
251 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
252 Ok(jti) => jti,
253 Err(_) => return ApiError::AuthenticationFailedMsg("Invalid token format".into()).into_response(),
254 };
255 let mut tx = match state.db.begin().await {
256 Ok(tx) => tx,
257 Err(e) => {
258 error!("Failed to begin transaction: {:?}", e);
259 return ApiError::InternalError.into_response();
260 }
261 };
262 if let Ok(Some(session_id)) = sqlx::query_scalar!(
263 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
264 refresh_jti
265 )
266 .fetch_optional(&mut *tx)
267 .await
268 {
269 warn!("Refresh token reuse detected! Revoking token family for session_id: {}", session_id);
270 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
271 .execute(&mut *tx)
272 .await;
273 let _ = tx.commit().await;
274 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
275 }
276 let session_row = match sqlx::query!(
277 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version
278 FROM session_tokens st
279 JOIN users u ON st.did = u.did
280 JOIN user_keys k ON u.id = k.user_id
281 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
282 FOR UPDATE OF st"#,
283 refresh_jti
284 )
285 .fetch_optional(&mut *tx)
286 .await
287 {
288 Ok(Some(row)) => row,
289 Ok(None) => return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(),
290 Err(e) => {
291 error!("Database error fetching session: {:?}", e);
292 return ApiError::InternalError.into_response();
293 }
294 };
295 let key_bytes = match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
296 Ok(k) => k,
297 Err(e) => {
298 error!("Failed to decrypt user key: {:?}", e);
299 return ApiError::InternalError.into_response();
300 }
301 };
302 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
303 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
304 }
305 let new_access_meta = match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) {
306 Ok(m) => m,
307 Err(e) => {
308 error!("Failed to create access token: {:?}", e);
309 return ApiError::InternalError.into_response();
310 }
311 };
312 let new_refresh_meta = match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
313 Ok(m) => m,
314 Err(e) => {
315 error!("Failed to create refresh token: {:?}", e);
316 return ApiError::InternalError.into_response();
317 }
318 };
319 match sqlx::query!(
320 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
321 refresh_jti,
322 session_row.id
323 )
324 .execute(&mut *tx)
325 .await
326 {
327 Ok(result) if result.rows_affected() == 0 => {
328 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
329 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
330 .execute(&mut *tx)
331 .await;
332 let _ = tx.commit().await;
333 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
334 }
335 Err(e) => {
336 error!("Failed to record used refresh token: {:?}", e);
337 return ApiError::InternalError.into_response();
338 }
339 Ok(_) => {}
340 }
341 if let Err(e) = sqlx::query!(
342 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
343 new_access_meta.jti,
344 new_refresh_meta.jti,
345 new_access_meta.expires_at,
346 new_refresh_meta.expires_at,
347 session_row.id
348 )
349 .execute(&mut *tx)
350 .await
351 {
352 error!("Database error updating session: {:?}", e);
353 return ApiError::InternalError.into_response();
354 }
355 if let Err(e) = tx.commit().await {
356 error!("Failed to commit transaction: {:?}", e);
357 return ApiError::InternalError.into_response();
358 }
359 match sqlx::query!(
360 r#"SELECT
361 handle, email, email_confirmed,
362 preferred_notification_channel as "preferred_channel: crate::notifications::NotificationChannel",
363 discord_verified, telegram_verified, signal_verified
364 FROM users WHERE did = $1"#,
365 session_row.did
366 )
367 .fetch_optional(&state.db)
368 .await
369 {
370 Ok(Some(u)) => {
371 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
372 crate::notifications::NotificationChannel::Email => ("email", u.email_confirmed),
373 crate::notifications::NotificationChannel::Discord => ("discord", u.discord_verified),
374 crate::notifications::NotificationChannel::Telegram => ("telegram", u.telegram_verified),
375 crate::notifications::NotificationChannel::Signal => ("signal", u.signal_verified),
376 };
377 Json(json!({
378 "accessJwt": new_access_meta.token,
379 "refreshJwt": new_refresh_meta.token,
380 "handle": u.handle,
381 "did": session_row.did,
382 "email": u.email,
383 "emailConfirmed": u.email_confirmed,
384 "preferredChannel": preferred_channel,
385 "preferredChannelVerified": preferred_channel_verified
386 })).into_response()
387 }
388 Ok(None) => {
389 error!("User not found for existing session: {}", session_row.did);
390 ApiError::InternalError.into_response()
391 }
392 Err(e) => {
393 error!("Database error fetching user: {:?}", e);
394 ApiError::InternalError.into_response()
395 }
396 }
397}
398#[derive(Deserialize)]
399#[serde(rename_all = "camelCase")]
400pub struct ConfirmSignupInput {
401 pub did: String,
402 pub verification_code: String,
403}
404#[derive(Serialize)]
405#[serde(rename_all = "camelCase")]
406pub struct ConfirmSignupOutput {
407 pub access_jwt: String,
408 pub refresh_jwt: String,
409 pub handle: String,
410 pub did: String,
411 pub email: Option<String>,
412 pub email_confirmed: bool,
413 pub preferred_channel: String,
414 pub preferred_channel_verified: bool,
415}
416pub async fn confirm_signup(
417 State(state): State<AppState>,
418 Json(input): Json<ConfirmSignupInput>,
419) -> Response {
420 info!("confirm_signup called for DID: {}", input.did);
421 let row = match sqlx::query!(
422 r#"SELECT
423 u.id, u.did, u.handle, u.email,
424 u.email_confirmation_code,
425 u.email_confirmation_code_expires_at,
426 u.preferred_notification_channel as "channel: crate::notifications::NotificationChannel",
427 k.key_bytes, k.encryption_version
428 FROM users u
429 JOIN user_keys k ON u.id = k.user_id
430 WHERE u.did = $1"#,
431 input.did
432 )
433 .fetch_optional(&state.db)
434 .await
435 {
436 Ok(Some(row)) => row,
437 Ok(None) => {
438 warn!("User not found for confirm_signup: {}", input.did);
439 return ApiError::InvalidRequest("Invalid DID or verification code".into()).into_response();
440 }
441 Err(e) => {
442 error!("Database error in confirm_signup: {:?}", e);
443 return ApiError::InternalError.into_response();
444 }
445 };
446 let stored_code = match &row.email_confirmation_code {
447 Some(code) => code,
448 None => {
449 warn!("No verification code found for user: {}", input.did);
450 return ApiError::InvalidRequest("No pending verification".into()).into_response();
451 }
452 };
453 if stored_code != &input.verification_code {
454 warn!("Invalid verification code for user: {}", input.did);
455 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
456 }
457 if let Some(expires_at) = row.email_confirmation_code_expires_at {
458 if expires_at < Utc::now() {
459 warn!("Verification code expired for user: {}", input.did);
460 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response();
461 }
462 }
463 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
464 Ok(k) => k,
465 Err(e) => {
466 error!("Failed to decrypt user key: {:?}", e);
467 return ApiError::InternalError.into_response();
468 }
469 };
470 let verified_column = match row.channel {
471 crate::notifications::NotificationChannel::Email => "email_confirmed",
472 crate::notifications::NotificationChannel::Discord => "discord_verified",
473 crate::notifications::NotificationChannel::Telegram => "telegram_verified",
474 crate::notifications::NotificationChannel::Signal => "signal_verified",
475 };
476 let update_query = format!(
477 "UPDATE users SET {} = TRUE, email_confirmation_code = NULL, email_confirmation_code_expires_at = NULL WHERE did = $1",
478 verified_column
479 );
480 if let Err(e) = sqlx::query(&update_query)
481 .bind(&input.did)
482 .execute(&state.db)
483 .await
484 {
485 error!("Failed to update verification status: {:?}", e);
486 return ApiError::InternalError.into_response();
487 }
488 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
489 Ok(m) => m,
490 Err(e) => {
491 error!("Failed to create access token: {:?}", e);
492 return ApiError::InternalError.into_response();
493 }
494 };
495 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
496 Ok(m) => m,
497 Err(e) => {
498 error!("Failed to create refresh token: {:?}", e);
499 return ApiError::InternalError.into_response();
500 }
501 };
502 if let Err(e) = sqlx::query!(
503 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
504 row.did,
505 access_meta.jti,
506 refresh_meta.jti,
507 access_meta.expires_at,
508 refresh_meta.expires_at
509 )
510 .execute(&state.db)
511 .await
512 {
513 error!("Failed to insert session: {:?}", e);
514 return ApiError::InternalError.into_response();
515 }
516 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
517 if let Err(e) = crate::notifications::enqueue_welcome(&state.db, row.id, &hostname).await {
518 warn!("Failed to enqueue welcome notification: {:?}", e);
519 }
520 let email_confirmed = matches!(row.channel, crate::notifications::NotificationChannel::Email);
521 let preferred_channel = match row.channel {
522 crate::notifications::NotificationChannel::Email => "email",
523 crate::notifications::NotificationChannel::Discord => "discord",
524 crate::notifications::NotificationChannel::Telegram => "telegram",
525 crate::notifications::NotificationChannel::Signal => "signal",
526 };
527 Json(ConfirmSignupOutput {
528 access_jwt: access_meta.token,
529 refresh_jwt: refresh_meta.token,
530 handle: row.handle,
531 did: row.did,
532 email: row.email,
533 email_confirmed,
534 preferred_channel: preferred_channel.to_string(),
535 preferred_channel_verified: true,
536 }).into_response()
537}
538#[derive(Deserialize)]
539#[serde(rename_all = "camelCase")]
540pub struct ResendVerificationInput {
541 pub did: String,
542}
543pub async fn resend_verification(
544 State(state): State<AppState>,
545 Json(input): Json<ResendVerificationInput>,
546) -> Response {
547 info!("resend_verification called for DID: {}", input.did);
548 let row = match sqlx::query!(
549 r#"SELECT
550 id, handle, email,
551 preferred_notification_channel as "channel: crate::notifications::NotificationChannel",
552 discord_id, telegram_username, signal_number,
553 email_confirmed, discord_verified, telegram_verified, signal_verified
554 FROM users
555 WHERE did = $1"#,
556 input.did
557 )
558 .fetch_optional(&state.db)
559 .await
560 {
561 Ok(Some(row)) => row,
562 Ok(None) => {
563 return ApiError::InvalidRequest("User not found".into()).into_response();
564 }
565 Err(e) => {
566 error!("Database error in resend_verification: {:?}", e);
567 return ApiError::InternalError.into_response();
568 }
569 };
570 let is_verified = row.email_confirmed
571 || row.discord_verified
572 || row.telegram_verified
573 || row.signal_verified;
574 if is_verified {
575 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
576 }
577 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
578 let code_expires_at = Utc::now() + chrono::Duration::minutes(30);
579 if let Err(e) = sqlx::query!(
580 "UPDATE users SET email_confirmation_code = $1, email_confirmation_code_expires_at = $2 WHERE did = $3",
581 verification_code,
582 code_expires_at,
583 input.did
584 )
585 .execute(&state.db)
586 .await
587 {
588 error!("Failed to update verification code: {:?}", e);
589 return ApiError::InternalError.into_response();
590 }
591 let (channel_str, recipient) = match row.channel {
592 crate::notifications::NotificationChannel::Email => ("email", row.email.clone().unwrap_or_default()),
593 crate::notifications::NotificationChannel::Discord => {
594 ("discord", row.discord_id.unwrap_or_default())
595 }
596 crate::notifications::NotificationChannel::Telegram => {
597 ("telegram", row.telegram_username.unwrap_or_default())
598 }
599 crate::notifications::NotificationChannel::Signal => {
600 ("signal", row.signal_number.unwrap_or_default())
601 }
602 };
603 if let Err(e) = crate::notifications::enqueue_signup_verification(
604 &state.db,
605 row.id,
606 channel_str,
607 &recipient,
608 &verification_code,
609 ).await {
610 warn!("Failed to enqueue verification notification: {:?}", e);
611 }
612 Json(json!({"success": true})).into_response()
613}