this repo has no description
1use super::did::verify_did_web; 2use crate::api::repo::record::utils::create_signed_commit; 3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 5use crate::state::{AppState, RateLimitKind}; 6use crate::validation::validate_password; 7use axum::{ 8 Json, 9 extract::State, 10 http::{HeaderMap, StatusCode}, 11 response::{IntoResponse, Response}, 12}; 13use bcrypt::{DEFAULT_COST, hash}; 14use jacquard::types::{integer::LimitedU32, string::Tid}; 15use jacquard_repo::{mst::Mst, storage::BlockStore}; 16use k256::{SecretKey, ecdsa::SigningKey}; 17use rand::rngs::OsRng; 18use serde::{Deserialize, Serialize}; 19use serde_json::json; 20use std::sync::Arc; 21use tracing::{debug, error, info, warn}; 22 23fn extract_client_ip(headers: &HeaderMap) -> String { 24 if let Some(forwarded) = headers.get("x-forwarded-for") 25 && let Ok(value) = forwarded.to_str() 26 && let Some(first_ip) = value.split(',').next() 27 { 28 return first_ip.trim().to_string(); 29 } 30 if let Some(real_ip) = headers.get("x-real-ip") 31 && let Ok(value) = real_ip.to_str() 32 { 33 return value.trim().to_string(); 34 } 35 "unknown".to_string() 36} 37 38#[derive(Deserialize)] 39#[serde(rename_all = "camelCase")] 40pub struct CreateAccountInput { 41 pub handle: String, 42 pub email: Option<String>, 43 pub password: String, 44 pub invite_code: Option<String>, 45 pub did: Option<String>, 46 pub did_type: Option<String>, 47 pub signing_key: Option<String>, 48 pub verification_channel: Option<String>, 49 pub discord_id: Option<String>, 50 pub telegram_username: Option<String>, 51 pub signal_number: Option<String>, 52} 53 54#[derive(Serialize)] 55#[serde(rename_all = "camelCase")] 56pub struct CreateAccountOutput { 57 pub handle: String, 58 pub did: String, 59 #[serde(skip_serializing_if = "Option::is_none")] 60 pub access_jwt: Option<String>, 61 #[serde(skip_serializing_if = "Option::is_none")] 62 pub refresh_jwt: Option<String>, 63 pub verification_required: bool, 64 pub verification_channel: String, 65} 66 67pub async fn create_account( 68 State(state): State<AppState>, 69 headers: HeaderMap, 70 Json(input): Json<CreateAccountInput>, 71) -> Response { 72 let is_potential_migration = input 73 .did 74 .as_ref() 75 .map(|d| d.starts_with("did:plc:")) 76 .unwrap_or(false); 77 if is_potential_migration { 78 info!( 79 "[MIGRATION] createAccount called for potential migration did={:?} handle={}", 80 input.did, input.handle 81 ); 82 } else { 83 info!("create_account called"); 84 } 85 let client_ip = extract_client_ip(&headers); 86 if !state 87 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 88 .await 89 { 90 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 91 return ( 92 StatusCode::TOO_MANY_REQUESTS, 93 Json(json!({ 94 "error": "RateLimitExceeded", 95 "message": "Too many account creation attempts. Please try again later." 96 })), 97 ) 98 .into_response(); 99 } 100 101 let migration_auth = if let Some(token) = 102 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 103 { 104 if is_service_token(&token) { 105 let verifier = ServiceTokenVerifier::new(); 106 match verifier 107 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 108 .await 109 { 110 Ok(claims) => { 111 debug!("Service token verified for migration: iss={}", claims.iss); 112 Some(claims.iss) 113 } 114 Err(e) => { 115 error!("Service token verification failed: {:?}", e); 116 return ( 117 StatusCode::UNAUTHORIZED, 118 Json(json!({ 119 "error": "AuthenticationFailed", 120 "message": format!("Service token verification failed: {}", e) 121 })), 122 ) 123 .into_response(); 124 } 125 } 126 } else { 127 None 128 } 129 } else { 130 None 131 }; 132 133 let is_did_web_byod = migration_auth.is_some() 134 && input 135 .did 136 .as_ref() 137 .map(|d| d.starts_with("did:web:")) 138 .unwrap_or(false); 139 140 let is_migration = migration_auth.is_some() 141 && input 142 .did 143 .as_ref() 144 .map(|d| d.starts_with("did:plc:")) 145 .unwrap_or(false); 146 147 if (is_migration || is_did_web_byod) 148 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref()) 149 { 150 if provided_did != auth_did { 151 info!( 152 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}", 153 auth_did, provided_did 154 ); 155 return ( 156 StatusCode::FORBIDDEN, 157 Json(json!({ 158 "error": "AuthorizationError", 159 "message": format!("Service token issuer {} does not match DID {}", auth_did, provided_did) 160 })), 161 ) 162 .into_response(); 163 } 164 if is_did_web_byod { 165 info!(did = %provided_did, "Processing did:web BYOD account creation"); 166 } else { 167 info!( 168 "[MIGRATION] createAccount: Service token verified, processing migration for did={}", 169 provided_did 170 ); 171 } 172 } 173 174 let hostname_for_validation = 175 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 176 let pds_suffix = format!(".{}", hostname_for_validation); 177 178 let validated_short_handle = if !input.handle.contains('.') 179 || input.handle.ends_with(&pds_suffix) 180 { 181 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 182 input 183 .handle 184 .strip_suffix(&pds_suffix) 185 .unwrap_or(&input.handle) 186 } else { 187 &input.handle 188 }; 189 match crate::api::validation::validate_short_handle(handle_to_validate) { 190 Ok(h) => h, 191 Err(e) => { 192 return ( 193 StatusCode::BAD_REQUEST, 194 Json(json!({"error": "InvalidHandle", "message": e.to_string()})), 195 ) 196 .into_response(); 197 } 198 } 199 } else { 200 if input.handle.contains(' ') || input.handle.contains('\t') { 201 return ( 202 StatusCode::BAD_REQUEST, 203 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})), 204 ) 205 .into_response(); 206 } 207 for c in input.handle.chars() { 208 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' { 209 return ( 210 StatusCode::BAD_REQUEST, 211 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})), 212 ) 213 .into_response(); 214 } 215 } 216 let handle_lower = input.handle.to_lowercase(); 217 if crate::moderation::has_explicit_slur(&handle_lower) { 218 return ( 219 StatusCode::BAD_REQUEST, 220 Json(json!({"error": "InvalidHandle", "message": "Inappropriate language in handle"})), 221 ) 222 .into_response(); 223 } 224 handle_lower 225 }; 226 let email: Option<String> = input 227 .email 228 .as_ref() 229 .map(|e| e.trim().to_string()) 230 .filter(|e| !e.is_empty()); 231 if let Some(ref email) = email 232 && !crate::api::validation::is_valid_email(email) 233 { 234 return ( 235 StatusCode::BAD_REQUEST, 236 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 237 ) 238 .into_response(); 239 } 240 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 241 let valid_channels = ["email", "discord", "telegram", "signal"]; 242 if !valid_channels.contains(&verification_channel) && !is_migration { 243 return ( 244 StatusCode::BAD_REQUEST, 245 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 246 ) 247 .into_response(); 248 } 249 let verification_recipient = if is_migration { 250 None 251 } else { 252 Some(match verification_channel { 253 "email" => match &input.email { 254 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 255 _ => return ( 256 StatusCode::BAD_REQUEST, 257 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 258 ).into_response(), 259 }, 260 "discord" => match &input.discord_id { 261 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 262 _ => return ( 263 StatusCode::BAD_REQUEST, 264 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 265 ).into_response(), 266 }, 267 "telegram" => match &input.telegram_username { 268 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 269 _ => return ( 270 StatusCode::BAD_REQUEST, 271 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 272 ).into_response(), 273 }, 274 "signal" => match &input.signal_number { 275 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 276 _ => return ( 277 StatusCode::BAD_REQUEST, 278 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 279 ).into_response(), 280 }, 281 _ => return ( 282 StatusCode::BAD_REQUEST, 283 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 284 ).into_response(), 285 }) 286 }; 287 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 288 let pds_endpoint = format!("https://{}", hostname); 289 let suffix = format!(".{}", hostname); 290 let handle = if input.handle.ends_with(&suffix) { 291 format!("{}.{}", validated_short_handle, hostname) 292 } else if input.handle.contains('.') { 293 validated_short_handle.clone() 294 } else { 295 format!("{}.{}", validated_short_handle, hostname) 296 }; 297 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 298 if let Some(signing_key_did) = &input.signing_key { 299 let reserved = sqlx::query!( 300 r#" 301 SELECT id, private_key_bytes 302 FROM reserved_signing_keys 303 WHERE public_key_did_key = $1 304 AND used_at IS NULL 305 AND expires_at > NOW() 306 FOR UPDATE 307 "#, 308 signing_key_did 309 ) 310 .fetch_optional(&state.db) 311 .await; 312 match reserved { 313 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 314 Ok(None) => { 315 return ( 316 StatusCode::BAD_REQUEST, 317 Json(json!({ 318 "error": "InvalidSigningKey", 319 "message": "Signing key not found, already used, or expired" 320 })), 321 ) 322 .into_response(); 323 } 324 Err(e) => { 325 error!("Error looking up reserved signing key: {:?}", e); 326 return ( 327 StatusCode::INTERNAL_SERVER_ERROR, 328 Json(json!({"error": "InternalError"})), 329 ) 330 .into_response(); 331 } 332 } 333 } else { 334 let secret_key = SecretKey::random(&mut OsRng); 335 (secret_key.to_bytes().to_vec(), None) 336 }; 337 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 338 Ok(k) => k, 339 Err(e) => { 340 error!("Error creating signing key: {:?}", e); 341 return ( 342 StatusCode::INTERNAL_SERVER_ERROR, 343 Json(json!({"error": "InternalError"})), 344 ) 345 .into_response(); 346 } 347 }; 348 let did_type = input.did_type.as_deref().unwrap_or("plc"); 349 let did = match did_type { 350 "web" => { 351 let subdomain_host = format!("{}.{}", input.handle, hostname); 352 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 353 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 354 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 355 self_hosted_did 356 } 357 "web-external" => { 358 let d = match &input.did { 359 Some(d) if !d.trim().is_empty() => d, 360 _ => { 361 return ( 362 StatusCode::BAD_REQUEST, 363 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})), 364 ) 365 .into_response(); 366 } 367 }; 368 if !d.starts_with("did:web:") { 369 return ( 370 StatusCode::BAD_REQUEST, 371 Json( 372 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}), 373 ), 374 ) 375 .into_response(); 376 } 377 if !is_did_web_byod 378 && let Err(e) = 379 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await 380 { 381 return ( 382 StatusCode::BAD_REQUEST, 383 Json(json!({"error": "InvalidDid", "message": e})), 384 ) 385 .into_response(); 386 } 387 info!(did = %d, "Creating external did:web account"); 388 d.clone() 389 } 390 _ => { 391 if let Some(d) = &input.did { 392 if d.starts_with("did:plc:") && is_migration { 393 info!(did = %d, "Migration with existing did:plc"); 394 d.clone() 395 } else if d.starts_with("did:web:") { 396 if !is_did_web_byod 397 && let Err(e) = verify_did_web( 398 d, 399 &hostname, 400 &input.handle, 401 input.signing_key.as_deref(), 402 ) 403 .await 404 { 405 return ( 406 StatusCode::BAD_REQUEST, 407 Json(json!({"error": "InvalidDid", "message": e})), 408 ) 409 .into_response(); 410 } 411 d.clone() 412 } else if !d.trim().is_empty() { 413 return ( 414 StatusCode::BAD_REQUEST, 415 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 416 ) 417 .into_response(); 418 } else { 419 let rotation_key = std::env::var("PLC_ROTATION_KEY") 420 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 421 let genesis_result = match create_genesis_operation( 422 &signing_key, 423 &rotation_key, 424 &handle, 425 &pds_endpoint, 426 ) { 427 Ok(r) => r, 428 Err(e) => { 429 error!("Error creating PLC genesis operation: {:?}", e); 430 return ( 431 StatusCode::INTERNAL_SERVER_ERROR, 432 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 433 ) 434 .into_response(); 435 } 436 }; 437 let plc_client = PlcClient::new(None); 438 if let Err(e) = plc_client 439 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 440 .await 441 { 442 error!("Failed to submit PLC genesis operation: {:?}", e); 443 return ( 444 StatusCode::BAD_GATEWAY, 445 Json(json!({ 446 "error": "UpstreamError", 447 "message": format!("Failed to register DID with PLC directory: {}", e) 448 })), 449 ) 450 .into_response(); 451 } 452 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 453 genesis_result.did 454 } 455 } else { 456 let rotation_key = std::env::var("PLC_ROTATION_KEY") 457 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 458 let genesis_result = match create_genesis_operation( 459 &signing_key, 460 &rotation_key, 461 &handle, 462 &pds_endpoint, 463 ) { 464 Ok(r) => r, 465 Err(e) => { 466 error!("Error creating PLC genesis operation: {:?}", e); 467 return ( 468 StatusCode::INTERNAL_SERVER_ERROR, 469 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 470 ) 471 .into_response(); 472 } 473 }; 474 let plc_client = PlcClient::new(None); 475 if let Err(e) = plc_client 476 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 477 .await 478 { 479 error!("Failed to submit PLC genesis operation: {:?}", e); 480 return ( 481 StatusCode::BAD_GATEWAY, 482 Json(json!({ 483 "error": "UpstreamError", 484 "message": format!("Failed to register DID with PLC directory: {}", e) 485 })), 486 ) 487 .into_response(); 488 } 489 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 490 genesis_result.did 491 } 492 } 493 }; 494 let mut tx = match state.db.begin().await { 495 Ok(tx) => tx, 496 Err(e) => { 497 error!("Error starting transaction: {:?}", e); 498 return ( 499 StatusCode::INTERNAL_SERVER_ERROR, 500 Json(json!({"error": "InternalError"})), 501 ) 502 .into_response(); 503 } 504 }; 505 if is_migration { 506 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 507 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 508 .bind(&did) 509 .fetch_optional(&mut *tx) 510 .await 511 .unwrap_or(None); 512 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 513 if deactivated_at.is_some() { 514 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 515 let update_result: Result<_, sqlx::Error> = 516 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 517 .bind(&handle) 518 .bind(account_id) 519 .execute(&mut *tx) 520 .await; 521 if let Err(e) = update_result { 522 if let Some(db_err) = e.as_database_error() 523 && db_err 524 .constraint() 525 .map(|c| c.contains("handle")) 526 .unwrap_or(false) 527 { 528 return ( 529 StatusCode::BAD_REQUEST, 530 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 531 ) 532 .into_response(); 533 } 534 error!("Error reactivating account: {:?}", e); 535 return ( 536 StatusCode::INTERNAL_SERVER_ERROR, 537 Json(json!({"error": "InternalError"})), 538 ) 539 .into_response(); 540 } 541 if let Err(e) = tx.commit().await { 542 error!("Error committing reactivation: {:?}", e); 543 return ( 544 StatusCode::INTERNAL_SERVER_ERROR, 545 Json(json!({"error": "InternalError"})), 546 ) 547 .into_response(); 548 } 549 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 550 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 551 ) 552 .bind(account_id) 553 .fetch_optional(&state.db) 554 .await 555 .unwrap_or(None); 556 let secret_key_bytes = match key_row { 557 Some((key_bytes, encryption_version)) => { 558 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 559 Ok(k) => k, 560 Err(e) => { 561 error!("Error decrypting key for reactivated account: {:?}", e); 562 return ( 563 StatusCode::INTERNAL_SERVER_ERROR, 564 Json(json!({"error": "InternalError"})), 565 ) 566 .into_response(); 567 } 568 } 569 } 570 None => { 571 error!("No signing key found for reactivated account"); 572 return ( 573 StatusCode::INTERNAL_SERVER_ERROR, 574 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 575 ) 576 .into_response(); 577 } 578 }; 579 let access_meta = 580 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 581 Ok(m) => m, 582 Err(e) => { 583 error!("Error creating access token: {:?}", e); 584 return ( 585 StatusCode::INTERNAL_SERVER_ERROR, 586 Json(json!({"error": "InternalError"})), 587 ) 588 .into_response(); 589 } 590 }; 591 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 592 &did, 593 &secret_key_bytes, 594 ) { 595 Ok(m) => m, 596 Err(e) => { 597 error!("Error creating refresh token: {:?}", e); 598 return ( 599 StatusCode::INTERNAL_SERVER_ERROR, 600 Json(json!({"error": "InternalError"})), 601 ) 602 .into_response(); 603 } 604 }; 605 let session_result: Result<_, sqlx::Error> = sqlx::query( 606 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 607 ) 608 .bind(&did) 609 .bind(&access_meta.jti) 610 .bind(&refresh_meta.jti) 611 .bind(access_meta.expires_at) 612 .bind(refresh_meta.expires_at) 613 .execute(&state.db) 614 .await; 615 if let Err(e) = session_result { 616 error!("Error creating session: {:?}", e); 617 return ( 618 StatusCode::INTERNAL_SERVER_ERROR, 619 Json(json!({"error": "InternalError"})), 620 ) 621 .into_response(); 622 } 623 return ( 624 StatusCode::OK, 625 Json(CreateAccountOutput { 626 handle: handle.clone(), 627 did, 628 access_jwt: Some(access_meta.token), 629 refresh_jwt: Some(refresh_meta.token), 630 verification_required: false, 631 verification_channel: "email".to_string(), 632 }), 633 ) 634 .into_response(); 635 } else { 636 return ( 637 StatusCode::BAD_REQUEST, 638 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 639 ) 640 .into_response(); 641 } 642 } 643 } 644 let exists_result: Option<(i32,)> = 645 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 646 .bind(&handle) 647 .fetch_optional(&mut *tx) 648 .await 649 .unwrap_or(None); 650 if exists_result.is_some() { 651 return ( 652 StatusCode::BAD_REQUEST, 653 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 654 ) 655 .into_response(); 656 } 657 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 658 .map(|v| v == "true" || v == "1") 659 .unwrap_or(false); 660 if invite_code_required 661 && input 662 .invite_code 663 .as_ref() 664 .map(|c| c.trim().is_empty()) 665 .unwrap_or(true) 666 { 667 return ( 668 StatusCode::BAD_REQUEST, 669 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 670 ) 671 .into_response(); 672 } 673 if let Some(code) = &input.invite_code 674 && !code.trim().is_empty() 675 { 676 let invite_query = sqlx::query!( 677 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 678 code 679 ) 680 .fetch_optional(&mut *tx) 681 .await; 682 match invite_query { 683 Ok(Some(row)) => { 684 if row.available_uses <= 0 { 685 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 686 } 687 let update_invite = sqlx::query!( 688 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 689 code 690 ) 691 .execute(&mut *tx) 692 .await; 693 if let Err(e) = update_invite { 694 error!("Error updating invite code: {:?}", e); 695 return ( 696 StatusCode::INTERNAL_SERVER_ERROR, 697 Json(json!({"error": "InternalError"})), 698 ) 699 .into_response(); 700 } 701 } 702 Ok(None) => { 703 return ( 704 StatusCode::BAD_REQUEST, 705 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 706 ) 707 .into_response(); 708 } 709 Err(e) => { 710 error!("Error checking invite code: {:?}", e); 711 return ( 712 StatusCode::INTERNAL_SERVER_ERROR, 713 Json(json!({"error": "InternalError"})), 714 ) 715 .into_response(); 716 } 717 } 718 } 719 if let Err(e) = validate_password(&input.password) { 720 return ( 721 StatusCode::BAD_REQUEST, 722 Json(json!({ 723 "error": "InvalidPassword", 724 "message": e.to_string() 725 })), 726 ) 727 .into_response(); 728 } 729 730 let password_hash = match hash(&input.password, DEFAULT_COST) { 731 Ok(h) => h, 732 Err(e) => { 733 error!("Error hashing password: {:?}", e); 734 return ( 735 StatusCode::INTERNAL_SERVER_ERROR, 736 Json(json!({"error": "InternalError"})), 737 ) 738 .into_response(); 739 } 740 }; 741 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 742 .fetch_one(&mut *tx) 743 .await 744 .map(|c| c.unwrap_or(0) == 0) 745 .unwrap_or(false); 746 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod { 747 Some(chrono::Utc::now()) 748 } else { 749 None 750 }; 751 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 752 r#"INSERT INTO users ( 753 handle, email, did, password_hash, 754 preferred_comms_channel, 755 discord_id, telegram_username, signal_number, 756 is_admin, deactivated_at, email_verified 757 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 758 ) 759 .bind(&handle) 760 .bind(&email) 761 .bind(&did) 762 .bind(&password_hash) 763 .bind(verification_channel) 764 .bind( 765 input 766 .discord_id 767 .as_deref() 768 .map(|s| s.trim()) 769 .filter(|s| !s.is_empty()), 770 ) 771 .bind( 772 input 773 .telegram_username 774 .as_deref() 775 .map(|s| s.trim()) 776 .filter(|s| !s.is_empty()), 777 ) 778 .bind( 779 input 780 .signal_number 781 .as_deref() 782 .map(|s| s.trim()) 783 .filter(|s| !s.is_empty()), 784 ) 785 .bind(is_first_user) 786 .bind(deactivated_at) 787 .bind(false) 788 .fetch_one(&mut *tx) 789 .await; 790 let user_id = match user_insert { 791 Ok((id,)) => id, 792 Err(e) => { 793 if let Some(db_err) = e.as_database_error() 794 && db_err.code().as_deref() == Some("23505") 795 { 796 let constraint = db_err.constraint().unwrap_or(""); 797 if constraint.contains("handle") || constraint.contains("users_handle") { 798 return ( 799 StatusCode::BAD_REQUEST, 800 Json(json!({ 801 "error": "HandleNotAvailable", 802 "message": "Handle already taken" 803 })), 804 ) 805 .into_response(); 806 } else if constraint.contains("email") || constraint.contains("users_email") { 807 return ( 808 StatusCode::BAD_REQUEST, 809 Json(json!({ 810 "error": "InvalidEmail", 811 "message": "Email already registered" 812 })), 813 ) 814 .into_response(); 815 } else if constraint.contains("did") || constraint.contains("users_did") { 816 return ( 817 StatusCode::BAD_REQUEST, 818 Json(json!({ 819 "error": "AccountAlreadyExists", 820 "message": "An account with this DID already exists" 821 })), 822 ) 823 .into_response(); 824 } 825 } 826 error!("Error inserting user: {:?}", e); 827 return ( 828 StatusCode::INTERNAL_SERVER_ERROR, 829 Json(json!({"error": "InternalError"})), 830 ) 831 .into_response(); 832 } 833 }; 834 835 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 836 Ok(enc) => enc, 837 Err(e) => { 838 error!("Error encrypting user key: {:?}", e); 839 return ( 840 StatusCode::INTERNAL_SERVER_ERROR, 841 Json(json!({"error": "InternalError"})), 842 ) 843 .into_response(); 844 } 845 }; 846 let key_insert = sqlx::query!( 847 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 848 user_id, 849 &encrypted_key_bytes[..], 850 crate::config::ENCRYPTION_VERSION 851 ) 852 .execute(&mut *tx) 853 .await; 854 if let Err(e) = key_insert { 855 error!("Error inserting user key: {:?}", e); 856 return ( 857 StatusCode::INTERNAL_SERVER_ERROR, 858 Json(json!({"error": "InternalError"})), 859 ) 860 .into_response(); 861 } 862 if let Some(key_id) = reserved_key_id { 863 let mark_used = sqlx::query!( 864 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 865 key_id 866 ) 867 .execute(&mut *tx) 868 .await; 869 if let Err(e) = mark_used { 870 error!("Error marking reserved key as used: {:?}", e); 871 return ( 872 StatusCode::INTERNAL_SERVER_ERROR, 873 Json(json!({"error": "InternalError"})), 874 ) 875 .into_response(); 876 } 877 } 878 let mst = Mst::new(Arc::new(state.block_store.clone())); 879 let mst_root = match mst.persist().await { 880 Ok(c) => c, 881 Err(e) => { 882 error!("Error persisting MST: {:?}", e); 883 return ( 884 StatusCode::INTERNAL_SERVER_ERROR, 885 Json(json!({"error": "InternalError"})), 886 ) 887 .into_response(); 888 } 889 }; 890 let rev = Tid::now(LimitedU32::MIN); 891 let (commit_bytes, _sig) = 892 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) { 893 Ok(result) => result, 894 Err(e) => { 895 error!("Error creating genesis commit: {:?}", e); 896 return ( 897 StatusCode::INTERNAL_SERVER_ERROR, 898 Json(json!({"error": "InternalError"})), 899 ) 900 .into_response(); 901 } 902 }; 903 let commit_cid = match state.block_store.put(&commit_bytes).await { 904 Ok(c) => c, 905 Err(e) => { 906 error!("Error saving genesis commit: {:?}", e); 907 return ( 908 StatusCode::INTERNAL_SERVER_ERROR, 909 Json(json!({"error": "InternalError"})), 910 ) 911 .into_response(); 912 } 913 }; 914 let commit_cid_str = commit_cid.to_string(); 915 let repo_insert = sqlx::query!( 916 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 917 user_id, 918 commit_cid_str 919 ) 920 .execute(&mut *tx) 921 .await; 922 if let Err(e) = repo_insert { 923 error!("Error initializing repo: {:?}", e); 924 return ( 925 StatusCode::INTERNAL_SERVER_ERROR, 926 Json(json!({"error": "InternalError"})), 927 ) 928 .into_response(); 929 } 930 if let Some(code) = &input.invite_code 931 && !code.trim().is_empty() 932 { 933 let use_insert = sqlx::query!( 934 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 935 code, 936 user_id 937 ) 938 .execute(&mut *tx) 939 .await; 940 if let Err(e) = use_insert { 941 error!("Error recording invite usage: {:?}", e); 942 return ( 943 StatusCode::INTERNAL_SERVER_ERROR, 944 Json(json!({"error": "InternalError"})), 945 ) 946 .into_response(); 947 } 948 } 949 if let Err(e) = tx.commit().await { 950 error!("Error committing transaction: {:?}", e); 951 return ( 952 StatusCode::INTERNAL_SERVER_ERROR, 953 Json(json!({"error": "InternalError"})), 954 ) 955 .into_response(); 956 } 957 if !is_migration && !is_did_web_byod { 958 if let Err(e) = 959 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await 960 { 961 warn!("Failed to sequence identity event for {}: {}", did, e); 962 } 963 if let Err(e) = 964 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 965 { 966 warn!("Failed to sequence account event for {}: {}", did, e); 967 } 968 let profile_record = json!({ 969 "$type": "app.bsky.actor.profile", 970 "displayName": input.handle 971 }); 972 if let Err(e) = crate::api::repo::record::create_record_internal( 973 &state, 974 &did, 975 "app.bsky.actor.profile", 976 "self", 977 &profile_record, 978 ) 979 .await 980 { 981 warn!("Failed to create default profile for {}: {}", did, e); 982 } 983 } 984 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 985 if !is_migration { 986 if let Some(ref recipient) = verification_recipient { 987 let verification_token = crate::auth::verification_token::generate_signup_token( 988 &did, 989 verification_channel, 990 recipient, 991 ); 992 let formatted_token = 993 crate::auth::verification_token::format_token_for_display(&verification_token); 994 if let Err(e) = crate::comms::enqueue_signup_verification( 995 &state.db, 996 user_id, 997 verification_channel, 998 recipient, 999 &formatted_token, 1000 None, 1001 ) 1002 .await 1003 { 1004 warn!( 1005 "Failed to enqueue signup verification notification: {:?}", 1006 e 1007 ); 1008 } 1009 } 1010 } else if let Some(ref user_email) = email { 1011 let token = crate::auth::verification_token::generate_migration_token(&did, user_email); 1012 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 1013 if let Err(e) = crate::comms::enqueue_migration_verification( 1014 &state.db, 1015 user_id, 1016 user_email, 1017 &formatted_token, 1018 &hostname, 1019 ) 1020 .await 1021 { 1022 warn!("Failed to enqueue migration verification email: {:?}", e); 1023 } 1024 } 1025 1026 let (access_jwt, refresh_jwt) = if is_migration { 1027 info!( 1028 "[MIGRATION] createAccount: Creating session tokens for migration did={}", 1029 did 1030 ); 1031 let access_meta = match crate::auth::create_access_token_with_metadata( 1032 &did, 1033 &secret_key_bytes, 1034 ) { 1035 Ok(m) => m, 1036 Err(e) => { 1037 error!( 1038 "[MIGRATION] createAccount: Error creating access token for migration: {:?}", 1039 e 1040 ); 1041 return ( 1042 StatusCode::INTERNAL_SERVER_ERROR, 1043 Json(json!({"error": "InternalError"})), 1044 ) 1045 .into_response(); 1046 } 1047 }; 1048 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 1049 &did, 1050 &secret_key_bytes, 1051 ) { 1052 Ok(m) => m, 1053 Err(e) => { 1054 error!( 1055 "[MIGRATION] createAccount: Error creating refresh token for migration: {:?}", 1056 e 1057 ); 1058 return ( 1059 StatusCode::INTERNAL_SERVER_ERROR, 1060 Json(json!({"error": "InternalError"})), 1061 ) 1062 .into_response(); 1063 } 1064 }; 1065 if let Err(e) = sqlx::query!( 1066 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 1067 did, 1068 access_meta.jti, 1069 refresh_meta.jti, 1070 access_meta.expires_at, 1071 refresh_meta.expires_at 1072 ) 1073 .execute(&state.db) 1074 .await 1075 { 1076 error!("[MIGRATION] createAccount: Error creating session for migration: {:?}", e); 1077 return ( 1078 StatusCode::INTERNAL_SERVER_ERROR, 1079 Json(json!({"error": "InternalError"})), 1080 ) 1081 .into_response(); 1082 } 1083 info!( 1084 "[MIGRATION] createAccount: Session created successfully for did={}", 1085 did 1086 ); 1087 (Some(access_meta.token), Some(refresh_meta.token)) 1088 } else { 1089 (None, None) 1090 }; 1091 1092 if is_migration { 1093 info!( 1094 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}", 1095 did, handle 1096 ); 1097 } 1098 1099 ( 1100 StatusCode::OK, 1101 Json(CreateAccountOutput { 1102 handle: handle.clone(), 1103 did, 1104 access_jwt, 1105 refresh_jwt, 1106 verification_required: !is_migration, 1107 verification_channel: verification_channel.to_string(), 1108 }), 1109 ) 1110 .into_response() 1111}