this repo has no description
1use crate::api::ApiError;
2use crate::auth::BearerAuth;
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use chrono::Utc;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use tracing::{error, info, warn};
15
16fn extract_client_ip(headers: &HeaderMap) -> String {
17 if let Some(forwarded) = headers.get("x-forwarded-for") {
18 if let Ok(value) = forwarded.to_str() {
19 if let Some(first_ip) = value.split(',').next() {
20 return first_ip.trim().to_string();
21 }
22 }
23 }
24 if let Some(real_ip) = headers.get("x-real-ip") {
25 if let Ok(value) = real_ip.to_str() {
26 return value.trim().to_string();
27 }
28 }
29 "unknown".to_string()
30}
31
32fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
33 let suffix = format!(".{}", pds_hostname);
34 if identifier.ends_with(&suffix) {
35 identifier[..identifier.len() - suffix.len()].to_string()
36 } else {
37 identifier.to_string()
38 }
39}
40
41#[derive(Deserialize)]
42pub struct CreateSessionInput {
43 pub identifier: String,
44 pub password: String,
45}
46
47#[derive(Serialize)]
48#[serde(rename_all = "camelCase")]
49pub struct CreateSessionOutput {
50 pub access_jwt: String,
51 pub refresh_jwt: String,
52 pub handle: String,
53 pub did: String,
54}
55
56pub async fn create_session(
57 State(state): State<AppState>,
58 headers: HeaderMap,
59 Json(input): Json<CreateSessionInput>,
60) -> Response {
61 info!("create_session called");
62 let client_ip = extract_client_ip(&headers);
63 if !state.check_rate_limit(RateLimitKind::Login, &client_ip).await {
64 warn!(ip = %client_ip, "Login rate limit exceeded");
65 return (
66 StatusCode::TOO_MANY_REQUESTS,
67 Json(json!({
68 "error": "RateLimitExceeded",
69 "message": "Too many login attempts. Please try again later."
70 })),
71 )
72 .into_response();
73 }
74 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
75 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
76 let row = match sqlx::query!(
77 r#"SELECT
78 u.id, u.did, u.handle, u.password_hash,
79 u.email_confirmed, u.discord_verified, u.telegram_verified, u.signal_verified,
80 k.key_bytes, k.encryption_version
81 FROM users u
82 JOIN user_keys k ON u.id = k.user_id
83 WHERE u.handle = $1 OR u.email = $1"#,
84 normalized_identifier
85 )
86 .fetch_optional(&state.db)
87 .await
88 {
89 Ok(Some(row)) => row,
90 Ok(None) => {
91 let _ = verify(&input.password, "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK");
92 warn!("User not found for login attempt");
93 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response();
94 }
95 Err(e) => {
96 error!("Database error fetching user: {:?}", e);
97 return ApiError::InternalError.into_response();
98 }
99 };
100 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
101 Ok(k) => k,
102 Err(e) => {
103 error!("Failed to decrypt user key: {:?}", e);
104 return ApiError::InternalError.into_response();
105 }
106 };
107 let password_valid = if verify(&input.password, &row.password_hash).unwrap_or(false) {
108 true
109 } else {
110 let app_passwords = sqlx::query!(
111 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
112 row.id
113 )
114 .fetch_all(&state.db)
115 .await
116 .unwrap_or_default();
117 app_passwords.iter().any(|app| verify(&input.password, &app.password_hash).unwrap_or(false))
118 };
119 if !password_valid {
120 warn!("Password verification failed for login attempt");
121 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response();
122 }
123 let is_verified = row.email_confirmed
124 || row.discord_verified
125 || row.telegram_verified
126 || row.signal_verified;
127 if !is_verified {
128 warn!("Login attempt for unverified account: {}", row.did);
129 return (
130 StatusCode::FORBIDDEN,
131 Json(json!({
132 "error": "AccountNotVerified",
133 "message": "Please verify your account before logging in",
134 "did": row.did
135 })),
136 ).into_response();
137 }
138 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
139 Ok(m) => m,
140 Err(e) => {
141 error!("Failed to create access token: {:?}", e);
142 return ApiError::InternalError.into_response();
143 }
144 };
145 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
146 Ok(m) => m,
147 Err(e) => {
148 error!("Failed to create refresh token: {:?}", e);
149 return ApiError::InternalError.into_response();
150 }
151 };
152 if let Err(e) = sqlx::query!(
153 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
154 row.did,
155 access_meta.jti,
156 refresh_meta.jti,
157 access_meta.expires_at,
158 refresh_meta.expires_at
159 )
160 .execute(&state.db)
161 .await
162 {
163 error!("Failed to insert session: {:?}", e);
164 return ApiError::InternalError.into_response();
165 }
166 let full_handle = format!("{}.{}", row.handle, pds_hostname);
167 Json(CreateSessionOutput {
168 access_jwt: access_meta.token,
169 refresh_jwt: refresh_meta.token,
170 handle: full_handle,
171 did: row.did,
172 }).into_response()
173}
174
175pub async fn get_session(
176 State(state): State<AppState>,
177 BearerAuth(auth_user): BearerAuth,
178) -> Response {
179 match sqlx::query!(
180 r#"SELECT
181 handle, email, email_confirmed,
182 preferred_notification_channel as "preferred_channel: crate::notifications::NotificationChannel",
183 discord_verified, telegram_verified, signal_verified
184 FROM users WHERE did = $1"#,
185 auth_user.did
186 )
187 .fetch_optional(&state.db)
188 .await
189 {
190 Ok(Some(row)) => {
191 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
192 crate::notifications::NotificationChannel::Email => ("email", row.email_confirmed),
193 crate::notifications::NotificationChannel::Discord => ("discord", row.discord_verified),
194 crate::notifications::NotificationChannel::Telegram => ("telegram", row.telegram_verified),
195 crate::notifications::NotificationChannel::Signal => ("signal", row.signal_verified),
196 };
197 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
198 let full_handle = format!("{}.{}", row.handle, pds_hostname);
199 Json(json!({
200 "handle": full_handle,
201 "did": auth_user.did,
202 "email": row.email,
203 "emailConfirmed": row.email_confirmed,
204 "preferredChannel": preferred_channel,
205 "preferredChannelVerified": preferred_channel_verified,
206 "active": true,
207 "didDoc": {}
208 })).into_response()
209 }
210 Ok(None) => ApiError::AuthenticationFailed.into_response(),
211 Err(e) => {
212 error!("Database error in get_session: {:?}", e);
213 ApiError::InternalError.into_response()
214 }
215 }
216}
217
218pub async fn delete_session(
219 State(state): State<AppState>,
220 headers: axum::http::HeaderMap,
221) -> Response {
222 let token = match crate::auth::extract_bearer_token_from_header(
223 headers.get("Authorization").and_then(|h| h.to_str().ok())
224 ) {
225 Some(t) => t,
226 None => return ApiError::AuthenticationRequired.into_response(),
227 };
228 let jti = match crate::auth::get_jti_from_token(&token) {
229 Ok(jti) => jti,
230 Err(_) => return ApiError::AuthenticationFailed.into_response(),
231 };
232 let did = crate::auth::get_did_from_token(&token).ok();
233 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
234 .execute(&state.db)
235 .await
236 {
237 Ok(res) if res.rows_affected() > 0 => {
238 if let Some(did) = did {
239 let session_cache_key = format!("auth:session:{}:{}", did, jti);
240 let _ = state.cache.delete(&session_cache_key).await;
241 }
242 Json(json!({})).into_response()
243 }
244 Ok(_) => ApiError::AuthenticationFailed.into_response(),
245 Err(e) => {
246 error!("Database error in delete_session: {:?}", e);
247 ApiError::AuthenticationFailed.into_response()
248 }
249 }
250}
251
252pub async fn refresh_session(
253 State(state): State<AppState>,
254 headers: axum::http::HeaderMap,
255) -> Response {
256 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
257 if !state.check_rate_limit(RateLimitKind::RefreshSession, &client_ip).await {
258 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
259 return (
260 axum::http::StatusCode::TOO_MANY_REQUESTS,
261 axum::Json(serde_json::json!({
262 "error": "RateLimitExceeded",
263 "message": "Too many requests. Please try again later."
264 })),
265 ).into_response();
266 }
267 let refresh_token = match crate::auth::extract_bearer_token_from_header(
268 headers.get("Authorization").and_then(|h| h.to_str().ok())
269 ) {
270 Some(t) => t,
271 None => return ApiError::AuthenticationRequired.into_response(),
272 };
273 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
274 Ok(jti) => jti,
275 Err(_) => return ApiError::AuthenticationFailedMsg("Invalid token format".into()).into_response(),
276 };
277 let mut tx = match state.db.begin().await {
278 Ok(tx) => tx,
279 Err(e) => {
280 error!("Failed to begin transaction: {:?}", e);
281 return ApiError::InternalError.into_response();
282 }
283 };
284 if let Ok(Some(session_id)) = sqlx::query_scalar!(
285 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
286 refresh_jti
287 )
288 .fetch_optional(&mut *tx)
289 .await
290 {
291 warn!("Refresh token reuse detected! Revoking token family for session_id: {}", session_id);
292 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
293 .execute(&mut *tx)
294 .await;
295 let _ = tx.commit().await;
296 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
297 }
298 let session_row = match sqlx::query!(
299 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version
300 FROM session_tokens st
301 JOIN users u ON st.did = u.did
302 JOIN user_keys k ON u.id = k.user_id
303 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
304 FOR UPDATE OF st"#,
305 refresh_jti
306 )
307 .fetch_optional(&mut *tx)
308 .await
309 {
310 Ok(Some(row)) => row,
311 Ok(None) => return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(),
312 Err(e) => {
313 error!("Database error fetching session: {:?}", e);
314 return ApiError::InternalError.into_response();
315 }
316 };
317 let key_bytes = match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
318 Ok(k) => k,
319 Err(e) => {
320 error!("Failed to decrypt user key: {:?}", e);
321 return ApiError::InternalError.into_response();
322 }
323 };
324 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
325 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
326 }
327 let new_access_meta = match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) {
328 Ok(m) => m,
329 Err(e) => {
330 error!("Failed to create access token: {:?}", e);
331 return ApiError::InternalError.into_response();
332 }
333 };
334 let new_refresh_meta = match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
335 Ok(m) => m,
336 Err(e) => {
337 error!("Failed to create refresh token: {:?}", e);
338 return ApiError::InternalError.into_response();
339 }
340 };
341 match sqlx::query!(
342 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
343 refresh_jti,
344 session_row.id
345 )
346 .execute(&mut *tx)
347 .await
348 {
349 Ok(result) if result.rows_affected() == 0 => {
350 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
351 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
352 .execute(&mut *tx)
353 .await;
354 let _ = tx.commit().await;
355 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
356 }
357 Err(e) => {
358 error!("Failed to record used refresh token: {:?}", e);
359 return ApiError::InternalError.into_response();
360 }
361 Ok(_) => {}
362 }
363 if let Err(e) = sqlx::query!(
364 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
365 new_access_meta.jti,
366 new_refresh_meta.jti,
367 new_access_meta.expires_at,
368 new_refresh_meta.expires_at,
369 session_row.id
370 )
371 .execute(&mut *tx)
372 .await
373 {
374 error!("Database error updating session: {:?}", e);
375 return ApiError::InternalError.into_response();
376 }
377 if let Err(e) = tx.commit().await {
378 error!("Failed to commit transaction: {:?}", e);
379 return ApiError::InternalError.into_response();
380 }
381 match sqlx::query!(
382 r#"SELECT
383 handle, email, email_confirmed,
384 preferred_notification_channel as "preferred_channel: crate::notifications::NotificationChannel",
385 discord_verified, telegram_verified, signal_verified
386 FROM users WHERE did = $1"#,
387 session_row.did
388 )
389 .fetch_optional(&state.db)
390 .await
391 {
392 Ok(Some(u)) => {
393 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
394 crate::notifications::NotificationChannel::Email => ("email", u.email_confirmed),
395 crate::notifications::NotificationChannel::Discord => ("discord", u.discord_verified),
396 crate::notifications::NotificationChannel::Telegram => ("telegram", u.telegram_verified),
397 crate::notifications::NotificationChannel::Signal => ("signal", u.signal_verified),
398 };
399 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
400 let full_handle = format!("{}.{}", u.handle, pds_hostname);
401 Json(json!({
402 "accessJwt": new_access_meta.token,
403 "refreshJwt": new_refresh_meta.token,
404 "handle": full_handle,
405 "did": session_row.did,
406 "email": u.email,
407 "emailConfirmed": u.email_confirmed,
408 "preferredChannel": preferred_channel,
409 "preferredChannelVerified": preferred_channel_verified,
410 "active": true
411 })).into_response()
412 }
413 Ok(None) => {
414 error!("User not found for existing session: {}", session_row.did);
415 ApiError::InternalError.into_response()
416 }
417 Err(e) => {
418 error!("Database error fetching user: {:?}", e);
419 ApiError::InternalError.into_response()
420 }
421 }
422}
423
424#[derive(Deserialize)]
425#[serde(rename_all = "camelCase")]
426pub struct ConfirmSignupInput {
427 pub did: String,
428 pub verification_code: String,
429}
430
431#[derive(Serialize)]
432#[serde(rename_all = "camelCase")]
433pub struct ConfirmSignupOutput {
434 pub access_jwt: String,
435 pub refresh_jwt: String,
436 pub handle: String,
437 pub did: String,
438 pub email: Option<String>,
439 pub email_confirmed: bool,
440 pub preferred_channel: String,
441 pub preferred_channel_verified: bool,
442}
443
444pub async fn confirm_signup(
445 State(state): State<AppState>,
446 Json(input): Json<ConfirmSignupInput>,
447) -> Response {
448 info!("confirm_signup called for DID: {}", input.did);
449 let row = match sqlx::query!(
450 r#"SELECT
451 u.id, u.did, u.handle, u.email,
452 u.email_confirmation_code,
453 u.email_confirmation_code_expires_at,
454 u.preferred_notification_channel as "channel: crate::notifications::NotificationChannel",
455 k.key_bytes, k.encryption_version
456 FROM users u
457 JOIN user_keys k ON u.id = k.user_id
458 WHERE u.did = $1"#,
459 input.did
460 )
461 .fetch_optional(&state.db)
462 .await
463 {
464 Ok(Some(row)) => row,
465 Ok(None) => {
466 warn!("User not found for confirm_signup: {}", input.did);
467 return ApiError::InvalidRequest("Invalid DID or verification code".into()).into_response();
468 }
469 Err(e) => {
470 error!("Database error in confirm_signup: {:?}", e);
471 return ApiError::InternalError.into_response();
472 }
473 };
474 let stored_code = match &row.email_confirmation_code {
475 Some(code) => code,
476 None => {
477 warn!("No verification code found for user: {}", input.did);
478 return ApiError::InvalidRequest("No pending verification".into()).into_response();
479 }
480 };
481 if stored_code != &input.verification_code {
482 warn!("Invalid verification code for user: {}", input.did);
483 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
484 }
485 if let Some(expires_at) = row.email_confirmation_code_expires_at {
486 if expires_at < Utc::now() {
487 warn!("Verification code expired for user: {}", input.did);
488 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response();
489 }
490 }
491 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
492 Ok(k) => k,
493 Err(e) => {
494 error!("Failed to decrypt user key: {:?}", e);
495 return ApiError::InternalError.into_response();
496 }
497 };
498 let verified_column = match row.channel {
499 crate::notifications::NotificationChannel::Email => "email_confirmed",
500 crate::notifications::NotificationChannel::Discord => "discord_verified",
501 crate::notifications::NotificationChannel::Telegram => "telegram_verified",
502 crate::notifications::NotificationChannel::Signal => "signal_verified",
503 };
504 let update_query = format!(
505 "UPDATE users SET {} = TRUE, email_confirmation_code = NULL, email_confirmation_code_expires_at = NULL WHERE did = $1",
506 verified_column
507 );
508 if let Err(e) = sqlx::query(&update_query)
509 .bind(&input.did)
510 .execute(&state.db)
511 .await
512 {
513 error!("Failed to update verification status: {:?}", e);
514 return ApiError::InternalError.into_response();
515 }
516 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
517 Ok(m) => m,
518 Err(e) => {
519 error!("Failed to create access token: {:?}", e);
520 return ApiError::InternalError.into_response();
521 }
522 };
523 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
524 Ok(m) => m,
525 Err(e) => {
526 error!("Failed to create refresh token: {:?}", e);
527 return ApiError::InternalError.into_response();
528 }
529 };
530 if let Err(e) = sqlx::query!(
531 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
532 row.did,
533 access_meta.jti,
534 refresh_meta.jti,
535 access_meta.expires_at,
536 refresh_meta.expires_at
537 )
538 .execute(&state.db)
539 .await
540 {
541 error!("Failed to insert session: {:?}", e);
542 return ApiError::InternalError.into_response();
543 }
544 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
545 if let Err(e) = crate::notifications::enqueue_welcome(&state.db, row.id, &hostname).await {
546 warn!("Failed to enqueue welcome notification: {:?}", e);
547 }
548 let email_confirmed = matches!(row.channel, crate::notifications::NotificationChannel::Email);
549 let preferred_channel = match row.channel {
550 crate::notifications::NotificationChannel::Email => "email",
551 crate::notifications::NotificationChannel::Discord => "discord",
552 crate::notifications::NotificationChannel::Telegram => "telegram",
553 crate::notifications::NotificationChannel::Signal => "signal",
554 };
555 Json(ConfirmSignupOutput {
556 access_jwt: access_meta.token,
557 refresh_jwt: refresh_meta.token,
558 handle: row.handle,
559 did: row.did,
560 email: row.email,
561 email_confirmed,
562 preferred_channel: preferred_channel.to_string(),
563 preferred_channel_verified: true,
564 }).into_response()
565}
566
567#[derive(Deserialize)]
568#[serde(rename_all = "camelCase")]
569pub struct ResendVerificationInput {
570 pub did: String,
571}
572
573pub async fn resend_verification(
574 State(state): State<AppState>,
575 Json(input): Json<ResendVerificationInput>,
576) -> Response {
577 info!("resend_verification called for DID: {}", input.did);
578 let row = match sqlx::query!(
579 r#"SELECT
580 id, handle, email,
581 preferred_notification_channel as "channel: crate::notifications::NotificationChannel",
582 discord_id, telegram_username, signal_number,
583 email_confirmed, discord_verified, telegram_verified, signal_verified
584 FROM users
585 WHERE did = $1"#,
586 input.did
587 )
588 .fetch_optional(&state.db)
589 .await
590 {
591 Ok(Some(row)) => row,
592 Ok(None) => {
593 return ApiError::InvalidRequest("User not found".into()).into_response();
594 }
595 Err(e) => {
596 error!("Database error in resend_verification: {:?}", e);
597 return ApiError::InternalError.into_response();
598 }
599 };
600 let is_verified = row.email_confirmed
601 || row.discord_verified
602 || row.telegram_verified
603 || row.signal_verified;
604 if is_verified {
605 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
606 }
607 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
608 let code_expires_at = Utc::now() + chrono::Duration::minutes(30);
609 if let Err(e) = sqlx::query!(
610 "UPDATE users SET email_confirmation_code = $1, email_confirmation_code_expires_at = $2 WHERE did = $3",
611 verification_code,
612 code_expires_at,
613 input.did
614 )
615 .execute(&state.db)
616 .await
617 {
618 error!("Failed to update verification code: {:?}", e);
619 return ApiError::InternalError.into_response();
620 }
621 let (channel_str, recipient) = match row.channel {
622 crate::notifications::NotificationChannel::Email => ("email", row.email.clone().unwrap_or_default()),
623 crate::notifications::NotificationChannel::Discord => {
624 ("discord", row.discord_id.unwrap_or_default())
625 }
626 crate::notifications::NotificationChannel::Telegram => {
627 ("telegram", row.telegram_username.unwrap_or_default())
628 }
629 crate::notifications::NotificationChannel::Signal => {
630 ("signal", row.signal_number.unwrap_or_default())
631 }
632 };
633 if let Err(e) = crate::notifications::enqueue_signup_verification(
634 &state.db,
635 row.id,
636 channel_str,
637 &recipient,
638 &verification_code,
639 ).await {
640 warn!("Failed to enqueue verification notification: {:?}", e);
641 }
642 Json(json!({"success": true})).into_response()
643}