this repo has no description
1use super::did::verify_did_web; 2use crate::api::repo::record::utils::create_signed_commit; 3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 5use crate::state::{AppState, RateLimitKind}; 6use crate::validation::validate_password; 7use axum::{ 8 Json, 9 extract::State, 10 http::{HeaderMap, StatusCode}, 11 response::{IntoResponse, Response}, 12}; 13use bcrypt::{DEFAULT_COST, hash}; 14use jacquard::types::{integer::LimitedU32, string::Tid}; 15use jacquard_repo::{mst::Mst, storage::BlockStore}; 16use k256::{SecretKey, ecdsa::SigningKey}; 17use rand::rngs::OsRng; 18use serde::{Deserialize, Serialize}; 19use serde_json::json; 20use std::sync::Arc; 21use tracing::{debug, error, info, warn}; 22 23fn extract_client_ip(headers: &HeaderMap) -> String { 24 if let Some(forwarded) = headers.get("x-forwarded-for") 25 && let Ok(value) = forwarded.to_str() 26 && let Some(first_ip) = value.split(',').next() 27 { 28 return first_ip.trim().to_string(); 29 } 30 if let Some(real_ip) = headers.get("x-real-ip") 31 && let Ok(value) = real_ip.to_str() 32 { 33 return value.trim().to_string(); 34 } 35 "unknown".to_string() 36} 37 38#[derive(Deserialize)] 39#[serde(rename_all = "camelCase")] 40pub struct CreateAccountInput { 41 pub handle: String, 42 pub email: Option<String>, 43 pub password: String, 44 pub invite_code: Option<String>, 45 pub did: Option<String>, 46 pub did_type: Option<String>, 47 pub signing_key: Option<String>, 48 pub verification_channel: Option<String>, 49 pub discord_id: Option<String>, 50 pub telegram_username: Option<String>, 51 pub signal_number: Option<String>, 52} 53 54#[derive(Serialize)] 55#[serde(rename_all = "camelCase")] 56pub struct CreateAccountOutput { 57 pub handle: String, 58 pub did: String, 59 #[serde(skip_serializing_if = "Option::is_none")] 60 pub did_doc: Option<serde_json::Value>, 61 pub access_jwt: String, 62 pub refresh_jwt: String, 63 pub verification_required: bool, 64 pub verification_channel: String, 65} 66 67pub async fn create_account( 68 State(state): State<AppState>, 69 headers: HeaderMap, 70 Json(input): Json<CreateAccountInput>, 71) -> Response { 72 let is_potential_migration = input 73 .did 74 .as_ref() 75 .map(|d| d.starts_with("did:plc:")) 76 .unwrap_or(false); 77 if is_potential_migration { 78 info!( 79 "[MIGRATION] createAccount called for potential migration did={:?} handle={}", 80 input.did, input.handle 81 ); 82 } else { 83 info!("create_account called"); 84 } 85 let client_ip = extract_client_ip(&headers); 86 if !state 87 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 88 .await 89 { 90 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 91 return ( 92 StatusCode::TOO_MANY_REQUESTS, 93 Json(json!({ 94 "error": "RateLimitExceeded", 95 "message": "Too many account creation attempts. Please try again later." 96 })), 97 ) 98 .into_response(); 99 } 100 101 let migration_auth = if let Some(token) = 102 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 103 { 104 if is_service_token(&token) { 105 let verifier = ServiceTokenVerifier::new(); 106 match verifier 107 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 108 .await 109 { 110 Ok(claims) => { 111 debug!("Service token verified for migration: iss={}", claims.iss); 112 Some(claims.iss) 113 } 114 Err(e) => { 115 error!("Service token verification failed: {:?}", e); 116 return ( 117 StatusCode::UNAUTHORIZED, 118 Json(json!({ 119 "error": "AuthenticationFailed", 120 "message": format!("Service token verification failed: {}", e) 121 })), 122 ) 123 .into_response(); 124 } 125 } 126 } else { 127 None 128 } 129 } else { 130 None 131 }; 132 133 let is_did_web_byod = migration_auth.is_some() 134 && input 135 .did 136 .as_ref() 137 .map(|d| d.starts_with("did:web:")) 138 .unwrap_or(false); 139 140 let is_migration = migration_auth.is_some() 141 && input 142 .did 143 .as_ref() 144 .map(|d| d.starts_with("did:plc:")) 145 .unwrap_or(false); 146 147 if (is_migration || is_did_web_byod) 148 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref()) 149 { 150 if provided_did != auth_did { 151 info!( 152 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}", 153 auth_did, provided_did 154 ); 155 return ( 156 StatusCode::FORBIDDEN, 157 Json(json!({ 158 "error": "AuthorizationError", 159 "message": format!("Service token issuer {} does not match DID {}", auth_did, provided_did) 160 })), 161 ) 162 .into_response(); 163 } 164 if is_did_web_byod { 165 info!(did = %provided_did, "Processing did:web BYOD account creation"); 166 } else { 167 info!( 168 "[MIGRATION] createAccount: Service token verified, processing migration for did={}", 169 provided_did 170 ); 171 } 172 } 173 174 let hostname_for_validation = 175 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 176 let pds_suffix = format!(".{}", hostname_for_validation); 177 178 let validated_short_handle = if !input.handle.contains('.') 179 || input.handle.ends_with(&pds_suffix) 180 { 181 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 182 input 183 .handle 184 .strip_suffix(&pds_suffix) 185 .unwrap_or(&input.handle) 186 } else { 187 &input.handle 188 }; 189 match crate::api::validation::validate_short_handle(handle_to_validate) { 190 Ok(h) => h, 191 Err(crate::api::validation::HandleValidationError::Reserved) => { 192 return ( 193 StatusCode::BAD_REQUEST, 194 Json(json!({"error": "HandleNotAvailable", "message": "Reserved handle"})), 195 ) 196 .into_response(); 197 } 198 Err(e) => { 199 return ( 200 StatusCode::BAD_REQUEST, 201 Json(json!({"error": "InvalidHandle", "message": e.to_string()})), 202 ) 203 .into_response(); 204 } 205 } 206 } else { 207 if input.handle.contains(' ') || input.handle.contains('\t') { 208 return ( 209 StatusCode::BAD_REQUEST, 210 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})), 211 ) 212 .into_response(); 213 } 214 for c in input.handle.chars() { 215 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' { 216 return ( 217 StatusCode::BAD_REQUEST, 218 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})), 219 ) 220 .into_response(); 221 } 222 } 223 let handle_lower = input.handle.to_lowercase(); 224 if crate::moderation::has_explicit_slur(&handle_lower) { 225 return ( 226 StatusCode::BAD_REQUEST, 227 Json(json!({"error": "InvalidHandle", "message": "Inappropriate language in handle"})), 228 ) 229 .into_response(); 230 } 231 handle_lower 232 }; 233 let email: Option<String> = input 234 .email 235 .as_ref() 236 .map(|e| e.trim().to_string()) 237 .filter(|e| !e.is_empty()); 238 if let Some(ref email) = email 239 && !crate::api::validation::is_valid_email(email) 240 { 241 return ( 242 StatusCode::BAD_REQUEST, 243 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 244 ) 245 .into_response(); 246 } 247 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 248 let valid_channels = ["email", "discord", "telegram", "signal"]; 249 if !valid_channels.contains(&verification_channel) && !is_migration { 250 return ( 251 StatusCode::BAD_REQUEST, 252 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 253 ) 254 .into_response(); 255 } 256 let verification_recipient = if is_migration { 257 None 258 } else { 259 Some(match verification_channel { 260 "email" => match &input.email { 261 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 262 _ => return ( 263 StatusCode::BAD_REQUEST, 264 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 265 ).into_response(), 266 }, 267 "discord" => match &input.discord_id { 268 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 269 _ => return ( 270 StatusCode::BAD_REQUEST, 271 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 272 ).into_response(), 273 }, 274 "telegram" => match &input.telegram_username { 275 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 276 _ => return ( 277 StatusCode::BAD_REQUEST, 278 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 279 ).into_response(), 280 }, 281 "signal" => match &input.signal_number { 282 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 283 _ => return ( 284 StatusCode::BAD_REQUEST, 285 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 286 ).into_response(), 287 }, 288 _ => return ( 289 StatusCode::BAD_REQUEST, 290 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 291 ).into_response(), 292 }) 293 }; 294 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 295 let pds_endpoint = format!("https://{}", hostname); 296 let suffix = format!(".{}", hostname); 297 let handle = if input.handle.ends_with(&suffix) { 298 format!("{}.{}", validated_short_handle, hostname) 299 } else if input.handle.contains('.') { 300 validated_short_handle.clone() 301 } else { 302 format!("{}.{}", validated_short_handle, hostname) 303 }; 304 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 305 if let Some(signing_key_did) = &input.signing_key { 306 let reserved = sqlx::query!( 307 r#" 308 SELECT id, private_key_bytes 309 FROM reserved_signing_keys 310 WHERE public_key_did_key = $1 311 AND used_at IS NULL 312 AND expires_at > NOW() 313 FOR UPDATE 314 "#, 315 signing_key_did 316 ) 317 .fetch_optional(&state.db) 318 .await; 319 match reserved { 320 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 321 Ok(None) => { 322 return ( 323 StatusCode::BAD_REQUEST, 324 Json(json!({ 325 "error": "InvalidSigningKey", 326 "message": "Signing key not found, already used, or expired" 327 })), 328 ) 329 .into_response(); 330 } 331 Err(e) => { 332 error!("Error looking up reserved signing key: {:?}", e); 333 return ( 334 StatusCode::INTERNAL_SERVER_ERROR, 335 Json(json!({"error": "InternalError"})), 336 ) 337 .into_response(); 338 } 339 } 340 } else { 341 let secret_key = SecretKey::random(&mut OsRng); 342 (secret_key.to_bytes().to_vec(), None) 343 }; 344 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 345 Ok(k) => k, 346 Err(e) => { 347 error!("Error creating signing key: {:?}", e); 348 return ( 349 StatusCode::INTERNAL_SERVER_ERROR, 350 Json(json!({"error": "InternalError"})), 351 ) 352 .into_response(); 353 } 354 }; 355 let did_type = input.did_type.as_deref().unwrap_or("plc"); 356 let did = match did_type { 357 "web" => { 358 if !crate::api::server::meta::is_self_hosted_did_web_enabled() { 359 return ( 360 StatusCode::BAD_REQUEST, 361 Json(json!({ 362 "error": "SelfHostedDidWebDisabled", 363 "message": "This PDS does not offer self-hosted did:web identities. Please use did:plc or bring your own did:web." 364 })), 365 ) 366 .into_response(); 367 } 368 let subdomain_host = format!("{}.{}", input.handle, hostname); 369 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 370 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 371 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 372 self_hosted_did 373 } 374 "web-external" => { 375 let d = match &input.did { 376 Some(d) if !d.trim().is_empty() => d, 377 _ => { 378 return ( 379 StatusCode::BAD_REQUEST, 380 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})), 381 ) 382 .into_response(); 383 } 384 }; 385 if !d.starts_with("did:web:") { 386 return ( 387 StatusCode::BAD_REQUEST, 388 Json( 389 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}), 390 ), 391 ) 392 .into_response(); 393 } 394 if !is_did_web_byod 395 && let Err(e) = 396 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await 397 { 398 return ( 399 StatusCode::BAD_REQUEST, 400 Json(json!({"error": "InvalidDid", "message": e})), 401 ) 402 .into_response(); 403 } 404 info!(did = %d, "Creating external did:web account"); 405 d.clone() 406 } 407 _ => { 408 if let Some(d) = &input.did { 409 if d.starts_with("did:plc:") && is_migration { 410 info!(did = %d, "Migration with existing did:plc"); 411 d.clone() 412 } else if d.starts_with("did:web:") { 413 if !is_did_web_byod 414 && let Err(e) = verify_did_web( 415 d, 416 &hostname, 417 &input.handle, 418 input.signing_key.as_deref(), 419 ) 420 .await 421 { 422 return ( 423 StatusCode::BAD_REQUEST, 424 Json(json!({"error": "InvalidDid", "message": e})), 425 ) 426 .into_response(); 427 } 428 d.clone() 429 } else if !d.trim().is_empty() { 430 return ( 431 StatusCode::BAD_REQUEST, 432 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 433 ) 434 .into_response(); 435 } else { 436 let rotation_key = std::env::var("PLC_ROTATION_KEY") 437 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 438 let genesis_result = match create_genesis_operation( 439 &signing_key, 440 &rotation_key, 441 &handle, 442 &pds_endpoint, 443 ) { 444 Ok(r) => r, 445 Err(e) => { 446 error!("Error creating PLC genesis operation: {:?}", e); 447 return ( 448 StatusCode::INTERNAL_SERVER_ERROR, 449 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 450 ) 451 .into_response(); 452 } 453 }; 454 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone())); 455 if let Err(e) = plc_client 456 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 457 .await 458 { 459 error!("Failed to submit PLC genesis operation: {:?}", e); 460 return ( 461 StatusCode::BAD_GATEWAY, 462 Json(json!({ 463 "error": "UpstreamError", 464 "message": format!("Failed to register DID with PLC directory: {}", e) 465 })), 466 ) 467 .into_response(); 468 } 469 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 470 genesis_result.did 471 } 472 } else { 473 let rotation_key = std::env::var("PLC_ROTATION_KEY") 474 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 475 let genesis_result = match create_genesis_operation( 476 &signing_key, 477 &rotation_key, 478 &handle, 479 &pds_endpoint, 480 ) { 481 Ok(r) => r, 482 Err(e) => { 483 error!("Error creating PLC genesis operation: {:?}", e); 484 return ( 485 StatusCode::INTERNAL_SERVER_ERROR, 486 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 487 ) 488 .into_response(); 489 } 490 }; 491 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone())); 492 if let Err(e) = plc_client 493 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 494 .await 495 { 496 error!("Failed to submit PLC genesis operation: {:?}", e); 497 return ( 498 StatusCode::BAD_GATEWAY, 499 Json(json!({ 500 "error": "UpstreamError", 501 "message": format!("Failed to register DID with PLC directory: {}", e) 502 })), 503 ) 504 .into_response(); 505 } 506 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 507 genesis_result.did 508 } 509 } 510 }; 511 let mut tx = match state.db.begin().await { 512 Ok(tx) => tx, 513 Err(e) => { 514 error!("Error starting transaction: {:?}", e); 515 return ( 516 StatusCode::INTERNAL_SERVER_ERROR, 517 Json(json!({"error": "InternalError"})), 518 ) 519 .into_response(); 520 } 521 }; 522 if is_migration { 523 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 524 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 525 .bind(&did) 526 .fetch_optional(&mut *tx) 527 .await 528 .unwrap_or(None); 529 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 530 if deactivated_at.is_some() { 531 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 532 let update_result: Result<_, sqlx::Error> = 533 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 534 .bind(&handle) 535 .bind(account_id) 536 .execute(&mut *tx) 537 .await; 538 if let Err(e) = update_result { 539 if let Some(db_err) = e.as_database_error() 540 && db_err 541 .constraint() 542 .map(|c| c.contains("handle")) 543 .unwrap_or(false) 544 { 545 return ( 546 StatusCode::BAD_REQUEST, 547 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 548 ) 549 .into_response(); 550 } 551 error!("Error reactivating account: {:?}", e); 552 return ( 553 StatusCode::INTERNAL_SERVER_ERROR, 554 Json(json!({"error": "InternalError"})), 555 ) 556 .into_response(); 557 } 558 if let Err(e) = tx.commit().await { 559 error!("Error committing reactivation: {:?}", e); 560 return ( 561 StatusCode::INTERNAL_SERVER_ERROR, 562 Json(json!({"error": "InternalError"})), 563 ) 564 .into_response(); 565 } 566 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 567 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 568 ) 569 .bind(account_id) 570 .fetch_optional(&state.db) 571 .await 572 .unwrap_or(None); 573 let secret_key_bytes = match key_row { 574 Some((key_bytes, encryption_version)) => { 575 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 576 Ok(k) => k, 577 Err(e) => { 578 error!("Error decrypting key for reactivated account: {:?}", e); 579 return ( 580 StatusCode::INTERNAL_SERVER_ERROR, 581 Json(json!({"error": "InternalError"})), 582 ) 583 .into_response(); 584 } 585 } 586 } 587 None => { 588 error!("No signing key found for reactivated account"); 589 return ( 590 StatusCode::INTERNAL_SERVER_ERROR, 591 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 592 ) 593 .into_response(); 594 } 595 }; 596 let access_meta = 597 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 598 Ok(m) => m, 599 Err(e) => { 600 error!("Error creating access token: {:?}", e); 601 return ( 602 StatusCode::INTERNAL_SERVER_ERROR, 603 Json(json!({"error": "InternalError"})), 604 ) 605 .into_response(); 606 } 607 }; 608 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 609 &did, 610 &secret_key_bytes, 611 ) { 612 Ok(m) => m, 613 Err(e) => { 614 error!("Error creating refresh token: {:?}", e); 615 return ( 616 StatusCode::INTERNAL_SERVER_ERROR, 617 Json(json!({"error": "InternalError"})), 618 ) 619 .into_response(); 620 } 621 }; 622 let session_result: Result<_, sqlx::Error> = sqlx::query( 623 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 624 ) 625 .bind(&did) 626 .bind(&access_meta.jti) 627 .bind(&refresh_meta.jti) 628 .bind(access_meta.expires_at) 629 .bind(refresh_meta.expires_at) 630 .execute(&state.db) 631 .await; 632 if let Err(e) = session_result { 633 error!("Error creating session: {:?}", e); 634 return ( 635 StatusCode::INTERNAL_SERVER_ERROR, 636 Json(json!({"error": "InternalError"})), 637 ) 638 .into_response(); 639 } 640 return ( 641 StatusCode::OK, 642 Json(CreateAccountOutput { 643 handle: handle.clone(), 644 did: did.clone(), 645 did_doc: state.did_resolver.resolve_did_document(&did).await, 646 access_jwt: access_meta.token, 647 refresh_jwt: refresh_meta.token, 648 verification_required: false, 649 verification_channel: "email".to_string(), 650 }), 651 ) 652 .into_response(); 653 } else { 654 return ( 655 StatusCode::BAD_REQUEST, 656 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 657 ) 658 .into_response(); 659 } 660 } 661 } 662 let exists_result: Option<(i32,)> = 663 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 664 .bind(&handle) 665 .fetch_optional(&mut *tx) 666 .await 667 .unwrap_or(None); 668 if exists_result.is_some() { 669 return ( 670 StatusCode::BAD_REQUEST, 671 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 672 ) 673 .into_response(); 674 } 675 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 676 .map(|v| v == "true" || v == "1") 677 .unwrap_or(false); 678 if invite_code_required 679 && input 680 .invite_code 681 .as_ref() 682 .map(|c| c.trim().is_empty()) 683 .unwrap_or(true) 684 { 685 return ( 686 StatusCode::BAD_REQUEST, 687 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 688 ) 689 .into_response(); 690 } 691 if let Some(code) = &input.invite_code 692 && !code.trim().is_empty() 693 { 694 let invite_query = sqlx::query!( 695 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 696 code 697 ) 698 .fetch_optional(&mut *tx) 699 .await; 700 match invite_query { 701 Ok(Some(row)) => { 702 if row.available_uses <= 0 { 703 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 704 } 705 let update_invite = sqlx::query!( 706 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 707 code 708 ) 709 .execute(&mut *tx) 710 .await; 711 if let Err(e) = update_invite { 712 error!("Error updating invite code: {:?}", e); 713 return ( 714 StatusCode::INTERNAL_SERVER_ERROR, 715 Json(json!({"error": "InternalError"})), 716 ) 717 .into_response(); 718 } 719 } 720 Ok(None) => { 721 return ( 722 StatusCode::BAD_REQUEST, 723 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 724 ) 725 .into_response(); 726 } 727 Err(e) => { 728 error!("Error checking invite code: {:?}", e); 729 return ( 730 StatusCode::INTERNAL_SERVER_ERROR, 731 Json(json!({"error": "InternalError"})), 732 ) 733 .into_response(); 734 } 735 } 736 } 737 if let Err(e) = validate_password(&input.password) { 738 return ( 739 StatusCode::BAD_REQUEST, 740 Json(json!({ 741 "error": "InvalidPassword", 742 "message": e.to_string() 743 })), 744 ) 745 .into_response(); 746 } 747 748 let password_clone = input.password.clone(); 749 let password_hash = 750 match tokio::task::spawn_blocking(move || hash(&password_clone, DEFAULT_COST)).await { 751 Ok(Ok(h)) => h, 752 Ok(Err(e)) => { 753 error!("Error hashing password: {:?}", e); 754 return ( 755 StatusCode::INTERNAL_SERVER_ERROR, 756 Json(json!({"error": "InternalError"})), 757 ) 758 .into_response(); 759 } 760 Err(e) => { 761 error!("Failed to spawn blocking task: {:?}", e); 762 return ( 763 StatusCode::INTERNAL_SERVER_ERROR, 764 Json(json!({"error": "InternalError"})), 765 ) 766 .into_response(); 767 } 768 }; 769 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 770 .fetch_one(&mut *tx) 771 .await 772 .map(|c| c.unwrap_or(0) == 0) 773 .unwrap_or(false); 774 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod { 775 Some(chrono::Utc::now()) 776 } else { 777 None 778 }; 779 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 780 r#"INSERT INTO users ( 781 handle, email, did, password_hash, 782 preferred_comms_channel, 783 discord_id, telegram_username, signal_number, 784 is_admin, deactivated_at, email_verified 785 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 786 ) 787 .bind(&handle) 788 .bind(&email) 789 .bind(&did) 790 .bind(&password_hash) 791 .bind(verification_channel) 792 .bind( 793 input 794 .discord_id 795 .as_deref() 796 .map(|s| s.trim()) 797 .filter(|s| !s.is_empty()), 798 ) 799 .bind( 800 input 801 .telegram_username 802 .as_deref() 803 .map(|s| s.trim()) 804 .filter(|s| !s.is_empty()), 805 ) 806 .bind( 807 input 808 .signal_number 809 .as_deref() 810 .map(|s| s.trim()) 811 .filter(|s| !s.is_empty()), 812 ) 813 .bind(is_first_user) 814 .bind(deactivated_at) 815 .bind(false) 816 .fetch_one(&mut *tx) 817 .await; 818 let user_id = match user_insert { 819 Ok((id,)) => id, 820 Err(e) => { 821 if let Some(db_err) = e.as_database_error() 822 && db_err.code().as_deref() == Some("23505") 823 { 824 let constraint = db_err.constraint().unwrap_or(""); 825 if constraint.contains("handle") || constraint.contains("users_handle") { 826 return ( 827 StatusCode::BAD_REQUEST, 828 Json(json!({ 829 "error": "HandleNotAvailable", 830 "message": "Handle already taken" 831 })), 832 ) 833 .into_response(); 834 } else if constraint.contains("email") || constraint.contains("users_email") { 835 return ( 836 StatusCode::BAD_REQUEST, 837 Json(json!({ 838 "error": "InvalidEmail", 839 "message": "Email already registered" 840 })), 841 ) 842 .into_response(); 843 } else if constraint.contains("did") || constraint.contains("users_did") { 844 return ( 845 StatusCode::BAD_REQUEST, 846 Json(json!({ 847 "error": "AccountAlreadyExists", 848 "message": "An account with this DID already exists" 849 })), 850 ) 851 .into_response(); 852 } 853 } 854 error!("Error inserting user: {:?}", e); 855 return ( 856 StatusCode::INTERNAL_SERVER_ERROR, 857 Json(json!({"error": "InternalError"})), 858 ) 859 .into_response(); 860 } 861 }; 862 863 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 864 Ok(enc) => enc, 865 Err(e) => { 866 error!("Error encrypting user key: {:?}", e); 867 return ( 868 StatusCode::INTERNAL_SERVER_ERROR, 869 Json(json!({"error": "InternalError"})), 870 ) 871 .into_response(); 872 } 873 }; 874 let key_insert = sqlx::query!( 875 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 876 user_id, 877 &encrypted_key_bytes[..], 878 crate::config::ENCRYPTION_VERSION 879 ) 880 .execute(&mut *tx) 881 .await; 882 if let Err(e) = key_insert { 883 error!("Error inserting user key: {:?}", e); 884 return ( 885 StatusCode::INTERNAL_SERVER_ERROR, 886 Json(json!({"error": "InternalError"})), 887 ) 888 .into_response(); 889 } 890 if let Some(key_id) = reserved_key_id { 891 let mark_used = sqlx::query!( 892 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 893 key_id 894 ) 895 .execute(&mut *tx) 896 .await; 897 if let Err(e) = mark_used { 898 error!("Error marking reserved key as used: {:?}", e); 899 return ( 900 StatusCode::INTERNAL_SERVER_ERROR, 901 Json(json!({"error": "InternalError"})), 902 ) 903 .into_response(); 904 } 905 } 906 let mst = Mst::new(Arc::new(state.block_store.clone())); 907 let mst_root = match mst.persist().await { 908 Ok(c) => c, 909 Err(e) => { 910 error!("Error persisting MST: {:?}", e); 911 return ( 912 StatusCode::INTERNAL_SERVER_ERROR, 913 Json(json!({"error": "InternalError"})), 914 ) 915 .into_response(); 916 } 917 }; 918 let rev = Tid::now(LimitedU32::MIN); 919 let (commit_bytes, _sig) = 920 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) { 921 Ok(result) => result, 922 Err(e) => { 923 error!("Error creating genesis commit: {:?}", e); 924 return ( 925 StatusCode::INTERNAL_SERVER_ERROR, 926 Json(json!({"error": "InternalError"})), 927 ) 928 .into_response(); 929 } 930 }; 931 let commit_cid = match state.block_store.put(&commit_bytes).await { 932 Ok(c) => c, 933 Err(e) => { 934 error!("Error saving genesis commit: {:?}", e); 935 return ( 936 StatusCode::INTERNAL_SERVER_ERROR, 937 Json(json!({"error": "InternalError"})), 938 ) 939 .into_response(); 940 } 941 }; 942 let commit_cid_str = commit_cid.to_string(); 943 let rev_str = rev.as_ref().to_string(); 944 let repo_insert = sqlx::query!( 945 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", 946 user_id, 947 commit_cid_str, 948 rev_str 949 ) 950 .execute(&mut *tx) 951 .await; 952 if let Err(e) = repo_insert { 953 error!("Error initializing repo: {:?}", e); 954 return ( 955 StatusCode::INTERNAL_SERVER_ERROR, 956 Json(json!({"error": "InternalError"})), 957 ) 958 .into_response(); 959 } 960 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()]; 961 if let Err(e) = sqlx::query!( 962 r#" 963 INSERT INTO user_blocks (user_id, block_cid) 964 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 965 ON CONFLICT (user_id, block_cid) DO NOTHING 966 "#, 967 user_id, 968 &genesis_block_cids 969 ) 970 .execute(&mut *tx) 971 .await 972 { 973 error!("Error inserting user_blocks: {:?}", e); 974 return ( 975 StatusCode::INTERNAL_SERVER_ERROR, 976 Json(json!({"error": "InternalError"})), 977 ) 978 .into_response(); 979 } 980 if let Some(code) = &input.invite_code 981 && !code.trim().is_empty() 982 { 983 let use_insert = sqlx::query!( 984 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 985 code, 986 user_id 987 ) 988 .execute(&mut *tx) 989 .await; 990 if let Err(e) = use_insert { 991 error!("Error recording invite usage: {:?}", e); 992 return ( 993 StatusCode::INTERNAL_SERVER_ERROR, 994 Json(json!({"error": "InternalError"})), 995 ) 996 .into_response(); 997 } 998 } 999 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() { 1000 let birthdate_pref = json!({ 1001 "$type": "app.bsky.actor.defs#personalDetailsPref", 1002 "birthDate": "1998-05-06T00:00:00.000Z" 1003 }); 1004 if let Err(e) = sqlx::query!( 1005 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3) 1006 ON CONFLICT (user_id, name) DO NOTHING", 1007 user_id, 1008 "app.bsky.actor.defs#personalDetailsPref", 1009 birthdate_pref 1010 ) 1011 .execute(&mut *tx) 1012 .await 1013 { 1014 warn!("Failed to set default birthdate preference: {:?}", e); 1015 } 1016 } 1017 if let Err(e) = tx.commit().await { 1018 error!("Error committing transaction: {:?}", e); 1019 return ( 1020 StatusCode::INTERNAL_SERVER_ERROR, 1021 Json(json!({"error": "InternalError"})), 1022 ) 1023 .into_response(); 1024 } 1025 if !is_migration && !is_did_web_byod { 1026 if let Err(e) = 1027 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await 1028 { 1029 warn!("Failed to sequence identity event for {}: {}", did, e); 1030 } 1031 if let Err(e) = 1032 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 1033 { 1034 warn!("Failed to sequence account event for {}: {}", did, e); 1035 } 1036 if let Err(e) = crate::api::repo::record::sequence_genesis_commit( 1037 &state, 1038 &did, 1039 &commit_cid, 1040 &mst_root, 1041 &rev_str, 1042 ) 1043 .await 1044 { 1045 warn!("Failed to sequence commit event for {}: {}", did, e); 1046 } 1047 if let Err(e) = crate::api::repo::record::sequence_sync_event( 1048 &state, 1049 &did, 1050 &commit_cid_str, 1051 Some(rev.as_ref()), 1052 ) 1053 .await 1054 { 1055 warn!("Failed to sequence sync event for {}: {}", did, e); 1056 } 1057 let profile_record = json!({ 1058 "$type": "app.bsky.actor.profile", 1059 "displayName": input.handle 1060 }); 1061 if let Err(e) = crate::api::repo::record::create_record_internal( 1062 &state, 1063 &did, 1064 "app.bsky.actor.profile", 1065 "self", 1066 &profile_record, 1067 ) 1068 .await 1069 { 1070 warn!("Failed to create default profile for {}: {}", did, e); 1071 } 1072 } 1073 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 1074 if !is_migration { 1075 if let Some(ref recipient) = verification_recipient { 1076 let verification_token = crate::auth::verification_token::generate_signup_token( 1077 &did, 1078 verification_channel, 1079 recipient, 1080 ); 1081 let formatted_token = 1082 crate::auth::verification_token::format_token_for_display(&verification_token); 1083 if let Err(e) = crate::comms::enqueue_signup_verification( 1084 &state.db, 1085 user_id, 1086 verification_channel, 1087 recipient, 1088 &formatted_token, 1089 None, 1090 ) 1091 .await 1092 { 1093 warn!( 1094 "Failed to enqueue signup verification notification: {:?}", 1095 e 1096 ); 1097 } 1098 } 1099 } else if let Some(ref user_email) = email { 1100 let token = crate::auth::verification_token::generate_migration_token(&did, user_email); 1101 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 1102 if let Err(e) = crate::comms::enqueue_migration_verification( 1103 &state.db, 1104 user_id, 1105 user_email, 1106 &formatted_token, 1107 &hostname, 1108 ) 1109 .await 1110 { 1111 warn!("Failed to enqueue migration verification email: {:?}", e); 1112 } 1113 } 1114 1115 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) 1116 { 1117 Ok(m) => m, 1118 Err(e) => { 1119 error!("createAccount: Error creating access token: {:?}", e); 1120 return ( 1121 StatusCode::INTERNAL_SERVER_ERROR, 1122 Json(json!({"error": "InternalError"})), 1123 ) 1124 .into_response(); 1125 } 1126 }; 1127 let refresh_meta = 1128 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 1129 Ok(m) => m, 1130 Err(e) => { 1131 error!("createAccount: Error creating refresh token: {:?}", e); 1132 return ( 1133 StatusCode::INTERNAL_SERVER_ERROR, 1134 Json(json!({"error": "InternalError"})), 1135 ) 1136 .into_response(); 1137 } 1138 }; 1139 if let Err(e) = sqlx::query!( 1140 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 1141 did, 1142 access_meta.jti, 1143 refresh_meta.jti, 1144 access_meta.expires_at, 1145 refresh_meta.expires_at 1146 ) 1147 .execute(&state.db) 1148 .await 1149 { 1150 error!("createAccount: Error creating session: {:?}", e); 1151 return ( 1152 StatusCode::INTERNAL_SERVER_ERROR, 1153 Json(json!({"error": "InternalError"})), 1154 ) 1155 .into_response(); 1156 } 1157 1158 let did_doc = state.did_resolver.resolve_did_document(&did).await; 1159 1160 if is_migration { 1161 info!( 1162 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}", 1163 did, handle 1164 ); 1165 } 1166 1167 ( 1168 StatusCode::OK, 1169 Json(CreateAccountOutput { 1170 handle: handle.clone(), 1171 did, 1172 did_doc, 1173 access_jwt: access_meta.token, 1174 refresh_jwt: refresh_meta.token, 1175 verification_required: !is_migration, 1176 verification_channel: verification_channel.to_string(), 1177 }), 1178 ) 1179 .into_response() 1180}