use crate::AppState; use crate::helpers::{ AuthResult, ProxiedResult, TokenCheckError, VerifyServiceAuthError, json_error_response, preauth_check, proxy_get_json, verify_gate_token, verify_service_auth, }; use crate::middleware::Did; use axum::body::{Body, to_bytes}; use axum::extract::State; use axum::http::{HeaderMap, StatusCode, header}; use axum::response::{IntoResponse, Response}; use axum::{Extension, Json, debug_handler, extract, extract::Request}; use chrono::{Duration, Utc}; use jacquard_common::types::did::Did as JacquardDid; use serde::{Deserialize, Serialize}; use serde_json; use tracing::log; #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] enum AccountStatus { Takendown, Suspended, Deactivated, } #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] struct GetSessionResponse { handle: String, did: String, #[serde(skip_serializing_if = "Option::is_none")] email: Option, #[serde(skip_serializing_if = "Option::is_none")] email_confirmed: Option, #[serde(skip_serializing_if = "Option::is_none")] email_auth_factor: Option, #[serde(skip_serializing_if = "Option::is_none")] did_doc: Option, #[serde(skip_serializing_if = "Option::is_none")] active: Option, #[serde(skip_serializing_if = "Option::is_none")] status: Option, } #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct UpdateEmailResponse { email: String, #[serde(skip_serializing_if = "Option::is_none")] email_auth_factor: Option, #[serde(skip_serializing_if = "Option::is_none")] token: Option, } #[allow(dead_code)] #[derive(Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct CreateSessionRequest { identifier: String, password: String, #[serde(skip_serializing_if = "Option::is_none")] auth_factor_token: Option, #[serde(skip_serializing_if = "Option::is_none")] allow_takendown: Option, } #[derive(Deserialize, Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct CreateAccountRequest { handle: String, #[serde(skip_serializing_if = "Option::is_none")] email: Option, #[serde(skip_serializing_if = "Option::is_none")] password: Option, #[serde(skip_serializing_if = "Option::is_none")] did: Option, #[serde(skip_serializing_if = "Option::is_none")] invite_code: Option, #[serde(skip_serializing_if = "Option::is_none")] verification_code: Option, #[serde(skip_serializing_if = "Option::is_none")] plc_op: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct DescribeServerContact { #[serde(skip_serializing_if = "Option::is_none")] email: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct DescribeServerLinks { #[serde(skip_serializing_if = "Option::is_none")] privacy_policy: Option, #[serde(skip_serializing_if = "Option::is_none")] terms_of_service: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct DescribeServerResponse { #[serde(skip_serializing_if = "Option::is_none")] invite_code_required: Option, #[serde(skip_serializing_if = "Option::is_none")] phone_verification_required: Option, #[serde(skip_serializing_if = "Option::is_none")] available_user_domains: Option>, #[serde(skip_serializing_if = "Option::is_none")] links: Option, #[serde(skip_serializing_if = "Option::is_none")] contact: Option, #[serde(skip_serializing_if = "Option::is_none")] did: Option, } pub async fn create_session( State(state): State, headers: HeaderMap, Json(payload): extract::Json, ) -> Result, StatusCode> { let identifier = payload.identifier.clone(); let password = payload.password.clone(); let auth_factor_token = payload.auth_factor_token.clone(); // Run the shared pre-auth logic to validate and check 2FA requirement match preauth_check(&state, &identifier, &password, auth_factor_token, false).await { Ok(result) => match result { AuthResult::WrongIdentityOrPassword => json_error_response( StatusCode::UNAUTHORIZED, "AuthenticationRequired", "Invalid identifier or password", ), AuthResult::TwoFactorRequired(_) => { // Email sending step can be handled here if needed in the future. json_error_response( StatusCode::UNAUTHORIZED, "AuthFactorTokenRequired", "A sign in code has been sent to your email address", ) } AuthResult::ProxyThrough => { //No 2FA or already passed let uri = format!( "{}{}", state.app_config.pds_base_url, "/xrpc/com.atproto.server.createSession" ); let mut req = axum::http::Request::post(uri); if let Some(req_headers) = req.headers_mut() { req_headers.extend(headers.clone()); } let payload_bytes = serde_json::to_vec(&payload).map_err(|_| StatusCode::BAD_REQUEST)?; let req = req .body(Body::from(payload_bytes)) .map_err(|_| StatusCode::BAD_REQUEST)?; let proxied = state .reverse_proxy_client .request(req) .await .map_err(|_| StatusCode::BAD_REQUEST)? .into_response(); Ok(proxied) } AuthResult::TokenCheckFailed(err) => match err { TokenCheckError::InvalidToken => { json_error_response(StatusCode::BAD_REQUEST, "InvalidToken", "Token is invalid") } TokenCheckError::ExpiredToken => { json_error_response(StatusCode::BAD_REQUEST, "ExpiredToken", "Token is expired") } }, }, Err(err) => { log::error!( "Error during pre-auth check. This happens on the create_session endpoint when trying to decide if the user has access:\n {err}" ); json_error_response( StatusCode::INTERNAL_SERVER_ERROR, "InternalServerError", "This error was not generated by the PDS, but PDS Gatekeeper. Please contact your PDS administrator for help and for them to review the server logs.", ) } } } #[debug_handler] pub async fn update_email( State(state): State, Extension(did): Extension, headers: HeaderMap, Json(payload): extract::Json, ) -> Result, StatusCode> { //If email auth is not set at all it is a update email address let email_auth_not_set = payload.email_auth_factor.is_none(); //If email auth is set it is to either turn on or off 2fa let email_auth_update = payload.email_auth_factor.unwrap_or(false); //This means the middleware successfully extracted a did from the request, if not it just needs to be forward to the PDS //This is also empty if it is an oauth request, which is not supported by gatekeeper turning on 2fa since the dpop stuff needs to be implemented let did_is_not_empty = did.0.is_some(); if did_is_not_empty { // Email update asked for if email_auth_update { let email = payload.email.clone(); let email_confirmed = match sqlx::query_as::<_, (String,)>( "SELECT did FROM account WHERE emailConfirmedAt IS NOT NULL AND email = ?", ) .bind(&email) .fetch_optional(&state.account_pool) .await { Ok(row) => row, Err(err) => { log::error!("Error checking if email is confirmed: {err}"); return Err(StatusCode::BAD_REQUEST); } }; //Since the email is already confirmed we can enable 2fa return match email_confirmed { None => Err(StatusCode::BAD_REQUEST), Some(did_row) => { let _ = sqlx::query( "INSERT INTO two_factor_accounts (did, required) VALUES (?, 1) ON CONFLICT(did) DO UPDATE SET required = 1", ) .bind(&did_row.0) .execute(&state.pds_gatekeeper_pool) .await .map_err(|_| StatusCode::BAD_REQUEST)?; Ok(StatusCode::OK.into_response()) } }; } // User wants auth turned off if !email_auth_update && !email_auth_not_set { //User wants auth turned off and has a token if let Some(token) = &payload.token { let token_found = match sqlx::query_as::<_, (String,)>( "SELECT token FROM email_token WHERE token = ? AND did = ? AND purpose = 'update_email'", ) .bind(token) .bind(&did.0) .fetch_optional(&state.account_pool) .await{ Ok(token) => token, Err(err) => { log::error!("Error checking if token is valid: {err}"); return Err(StatusCode::BAD_REQUEST); } }; return if token_found.is_some() { //TODO I think there may be a bug here and need to do some retry logic // First try was erroring, seconds was allowing match sqlx::query( "INSERT INTO two_factor_accounts (did, required) VALUES (?, 0) ON CONFLICT(did) DO UPDATE SET required = 0", ) .bind(&did.0) .execute(&state.pds_gatekeeper_pool) .await { Ok(_) => {} Err(err) => { log::error!("Error updating email auth: {err}"); return Err(StatusCode::BAD_REQUEST); } } Ok(StatusCode::OK.into_response()) } else { Err(StatusCode::BAD_REQUEST) }; } } } // Updating the actual email address by sending it on to the PDS let uri = format!( "{}{}", state.app_config.pds_base_url, "/xrpc/com.atproto.server.updateEmail" ); let mut req = axum::http::Request::post(uri); if let Some(req_headers) = req.headers_mut() { req_headers.extend(headers.clone()); } let payload_bytes = serde_json::to_vec(&payload).map_err(|_| StatusCode::BAD_REQUEST)?; let req = req .body(Body::from(payload_bytes)) .map_err(|_| StatusCode::BAD_REQUEST)?; let proxied = state .reverse_proxy_client .request(req) .await .map_err(|_| StatusCode::BAD_REQUEST)? .into_response(); Ok(proxied) } pub async fn get_session( State(state): State, req: Request, ) -> Result, StatusCode> { match proxy_get_json::(&state, req, "/xrpc/com.atproto.server.getSession") .await? { ProxiedResult::Parsed { value: mut session, .. } => { let did = session.did.clone(); let required_opt = sqlx::query_as::<_, (u8,)>( "SELECT required FROM two_factor_accounts WHERE did = ? LIMIT 1", ) .bind(&did) .fetch_optional(&state.pds_gatekeeper_pool) .await .map_err(|_| StatusCode::BAD_REQUEST)?; let email_auth_factor = match required_opt { Some(row) => row.0 != 0, None => false, }; session.email_auth_factor = Some(email_auth_factor); Ok(Json(session).into_response()) } ProxiedResult::Passthrough(resp) => Ok(resp), } } pub async fn describe_server( State(state): State, req: Request, ) -> Result, StatusCode> { match proxy_get_json::( &state, req, "/xrpc/com.atproto.server.describeServer", ) .await? { ProxiedResult::Parsed { value: mut server_info, .. } => { //This signifies the server is configured for captcha verification server_info.phone_verification_required = Some(state.app_config.use_captcha); Ok(Json(server_info).into_response()) } ProxiedResult::Passthrough(resp) => Ok(resp), } } /// Verify a gate code matches the handle and is not expired async fn verify_gate_code( state: &AppState, code: &str, handle: &str, ) -> Result { // First, decrypt and verify the JWE token let payload = match verify_gate_token(code, &state.app_config.gate_jwe_key) { Ok(p) => p, Err(e) => { log::warn!("Failed to decrypt gate token: {}", e); return Ok(false); } }; // Verify the handle matches if payload.handle != handle { log::warn!( "Gate code handle mismatch: expected {}, got {}", handle, payload.handle ); return Ok(false); } let created_at = chrono::DateTime::parse_from_rfc3339(&payload.created_at) .map_err(|e| anyhow::anyhow!("Failed to parse created_at from token: {}", e))? .with_timezone(&Utc); let now = Utc::now(); let age = now - created_at; // Check if the token is expired (5 minutes) if age > Duration::minutes(5) { log::warn!("Gate code expired for handle {}", handle); return Ok(false); } // Verify the token exists in the database (to prevent reuse) let row: Option<(String,)> = sqlx::query_as("SELECT code FROM gate_codes WHERE code = ? and handle = ? LIMIT 1") .bind(code) .bind(handle) .fetch_optional(&state.pds_gatekeeper_pool) .await?; if row.is_none() { log::warn!("Gate code not found in database or already used"); return Ok(false); } // Token is valid, delete it so it can't be reused //TODO probably also delete expired codes? Will need to do that at some point probably altho the where is on code and handle sqlx::query("DELETE FROM gate_codes WHERE code = ?") .bind(code) .execute(&state.pds_gatekeeper_pool) .await?; Ok(true) } pub async fn create_account( State(state): State, req: Request, ) -> Result, StatusCode> { let headers = req.headers().clone(); let body_bytes = to_bytes(req.into_body(), usize::MAX) .await .map_err(|_| StatusCode::BAD_REQUEST)?; // Parse the body to check for verification code let account_request: CreateAccountRequest = serde_json::from_slice(&body_bytes).map_err(|e| { log::error!("Failed to parse create account request: {}", e); StatusCode::BAD_REQUEST })?; // Check for service auth (migrations) if configured if state.app_config.allow_only_migrations { // Expect Authorization: Bearer let auth_header = headers .get(header::AUTHORIZATION) .and_then(|v| v.to_str().ok()) .map(str::to_string); let Some(value) = auth_header else { log::error!("No Authorization header found in the request"); return json_error_response( StatusCode::UNAUTHORIZED, "InvalidAuth", "This PDS is configured to only allow accounts created by migrations via this endpoint.", ); }; // Ensure Bearer prefix let token = value.strip_prefix("Bearer ").unwrap_or("").trim(); if token.is_empty() { log::error!("No Service Auth token found in the Authorization header"); return json_error_response( StatusCode::UNAUTHORIZED, "InvalidAuth", "This PDS is configured to only allow accounts created by migrations via this endpoint.", ); } // Ensure a non-empty DID was provided when migrations are enabled let requested_did_str = match account_request.did.as_deref() { Some(s) if !s.trim().is_empty() => s, _ => { return json_error_response( StatusCode::BAD_REQUEST, "InvalidRequest", "The 'did' field is required when migrations are enforced.", ); } }; // Parse the DID into the expected type for verification let requested_did: JacquardDid<'static> = match requested_did_str.parse() { Ok(d) => d, Err(e) => { log::error!( "Invalid DID format provided in createAccount: {} | error: {}", requested_did_str, e ); return json_error_response( StatusCode::BAD_REQUEST, "InvalidRequest", "The 'did' field is not a valid DID.", ); } }; let nsid = "com.atproto.server.createAccount".parse().unwrap(); match verify_service_auth( token, &nsid, state.resolver.clone(), &state.app_config.pds_service_did, &requested_did, ) .await { //Just do nothing if it passes so it continues. Ok(_) => {} Err(err) => match err { VerifyServiceAuthError::AuthFailed => { return json_error_response( StatusCode::UNAUTHORIZED, "InvalidAuth", "This PDS is configured to only allow accounts created by migrations via this endpoint.", ); } VerifyServiceAuthError::Error(err) => { log::error!("Error verifying service auth token: {err}"); return json_error_response( StatusCode::BAD_REQUEST, "InvalidRequest", "There has been an error, please contact your PDS administrator for help and for them to review the server logs.", ); } }, } } // Check for captcha verification if configured if state.app_config.use_captcha { if let Some(ref verification_code) = account_request.verification_code { match verify_gate_code(&state, verification_code, &account_request.handle).await { //TODO has a few errors to support //expired token // { // "error": "ExpiredToken", // "message": "Token has expired" // } //TODO ALSO add rate limits on the /gate endpoints so they can't be abused Ok(true) => { log::info!("Gate code verified for handle: {}", account_request.handle); } Ok(false) => { log::warn!( "Invalid or expired gate code for handle: {}", account_request.handle ); return json_error_response( StatusCode::BAD_REQUEST, "InvalidToken", "Token could not be verified", ); } Err(e) => { log::error!("Error verifying gate code: {}", e); return json_error_response( StatusCode::INTERNAL_SERVER_ERROR, "InvalidToken", "Token could not be verified", ); } } } else { // No verification code provided but captcha is required log::warn!( "No verification code provided for account creation: {}", account_request.handle ); return json_error_response( StatusCode::BAD_REQUEST, "InvalidRequest", "Verification is now required on this server.", ); } } // Rebuild the request with the same body and headers let uri = format!( "{}{}", state.app_config.pds_base_url, "/xrpc/com.atproto.server.createAccount" ); let mut new_req = axum::http::Request::post(&uri); if let Some(req_headers) = new_req.headers_mut() { *req_headers = headers; } let new_req = new_req .body(Body::from(body_bytes)) .map_err(|_| StatusCode::BAD_REQUEST)?; let proxied = state .reverse_proxy_client .request(new_req) .await .map_err(|_| StatusCode::BAD_REQUEST)? .into_response(); Ok(proxied) }