this repo has no description
1use super::did::verify_did_web; 2use crate::api::repo::record::utils::create_signed_commit; 3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 5use crate::state::{AppState, RateLimitKind}; 6use crate::validation::validate_password; 7use axum::{ 8 Json, 9 extract::State, 10 http::{HeaderMap, StatusCode}, 11 response::{IntoResponse, Response}, 12}; 13use bcrypt::{DEFAULT_COST, hash}; 14use jacquard::types::{integer::LimitedU32, string::Tid}; 15use jacquard_repo::{mst::Mst, storage::BlockStore}; 16use k256::{SecretKey, ecdsa::SigningKey}; 17use rand::rngs::OsRng; 18use serde::{Deserialize, Serialize}; 19use serde_json::json; 20use std::sync::Arc; 21use tracing::{debug, error, info, warn}; 22 23fn extract_client_ip(headers: &HeaderMap) -> String { 24 if let Some(forwarded) = headers.get("x-forwarded-for") 25 && let Ok(value) = forwarded.to_str() 26 && let Some(first_ip) = value.split(',').next() 27 { 28 return first_ip.trim().to_string(); 29 } 30 if let Some(real_ip) = headers.get("x-real-ip") 31 && let Ok(value) = real_ip.to_str() 32 { 33 return value.trim().to_string(); 34 } 35 "unknown".to_string() 36} 37 38#[derive(Deserialize)] 39#[serde(rename_all = "camelCase")] 40pub struct CreateAccountInput { 41 pub handle: String, 42 pub email: Option<String>, 43 pub password: String, 44 pub invite_code: Option<String>, 45 pub did: Option<String>, 46 pub did_type: Option<String>, 47 pub signing_key: Option<String>, 48 pub verification_channel: Option<String>, 49 pub discord_id: Option<String>, 50 pub telegram_username: Option<String>, 51 pub signal_number: Option<String>, 52} 53 54#[derive(Serialize)] 55#[serde(rename_all = "camelCase")] 56pub struct CreateAccountOutput { 57 pub handle: String, 58 pub did: String, 59 #[serde(skip_serializing_if = "Option::is_none")] 60 pub did_doc: Option<serde_json::Value>, 61 pub access_jwt: String, 62 pub refresh_jwt: String, 63 pub verification_required: bool, 64 pub verification_channel: String, 65} 66 67pub async fn create_account( 68 State(state): State<AppState>, 69 headers: HeaderMap, 70 Json(input): Json<CreateAccountInput>, 71) -> Response { 72 let is_potential_migration = input 73 .did 74 .as_ref() 75 .map(|d| d.starts_with("did:plc:")) 76 .unwrap_or(false); 77 if is_potential_migration { 78 info!( 79 "[MIGRATION] createAccount called for potential migration did={:?} handle={}", 80 input.did, input.handle 81 ); 82 } else { 83 info!("create_account called"); 84 } 85 let client_ip = extract_client_ip(&headers); 86 if !state 87 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 88 .await 89 { 90 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 91 return ( 92 StatusCode::TOO_MANY_REQUESTS, 93 Json(json!({ 94 "error": "RateLimitExceeded", 95 "message": "Too many account creation attempts. Please try again later." 96 })), 97 ) 98 .into_response(); 99 } 100 101 let migration_auth = if let Some(token) = 102 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 103 { 104 if is_service_token(&token) { 105 let verifier = ServiceTokenVerifier::new(); 106 match verifier 107 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 108 .await 109 { 110 Ok(claims) => { 111 debug!("Service token verified for migration: iss={}", claims.iss); 112 Some(claims.iss) 113 } 114 Err(e) => { 115 error!("Service token verification failed: {:?}", e); 116 return ( 117 StatusCode::UNAUTHORIZED, 118 Json(json!({ 119 "error": "AuthenticationFailed", 120 "message": format!("Service token verification failed: {}", e) 121 })), 122 ) 123 .into_response(); 124 } 125 } 126 } else { 127 None 128 } 129 } else { 130 None 131 }; 132 133 let is_did_web_byod = migration_auth.is_some() 134 && input 135 .did 136 .as_ref() 137 .map(|d| d.starts_with("did:web:")) 138 .unwrap_or(false); 139 140 let is_migration = migration_auth.is_some() 141 && input 142 .did 143 .as_ref() 144 .map(|d| d.starts_with("did:plc:")) 145 .unwrap_or(false); 146 147 if (is_migration || is_did_web_byod) 148 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref()) 149 { 150 if provided_did != auth_did { 151 info!( 152 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}", 153 auth_did, provided_did 154 ); 155 return ( 156 StatusCode::FORBIDDEN, 157 Json(json!({ 158 "error": "AuthorizationError", 159 "message": format!("Service token issuer {} does not match DID {}", auth_did, provided_did) 160 })), 161 ) 162 .into_response(); 163 } 164 if is_did_web_byod { 165 info!(did = %provided_did, "Processing did:web BYOD account creation"); 166 } else { 167 info!( 168 "[MIGRATION] createAccount: Service token verified, processing migration for did={}", 169 provided_did 170 ); 171 } 172 } 173 174 let hostname_for_validation = 175 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 176 let pds_suffix = format!(".{}", hostname_for_validation); 177 178 let validated_short_handle = if !input.handle.contains('.') 179 || input.handle.ends_with(&pds_suffix) 180 { 181 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 182 input 183 .handle 184 .strip_suffix(&pds_suffix) 185 .unwrap_or(&input.handle) 186 } else { 187 &input.handle 188 }; 189 match crate::api::validation::validate_short_handle(handle_to_validate) { 190 Ok(h) => h, 191 Err(crate::api::validation::HandleValidationError::Reserved) => { 192 return ( 193 StatusCode::BAD_REQUEST, 194 Json(json!({"error": "HandleNotAvailable", "message": "Reserved handle"})), 195 ) 196 .into_response(); 197 } 198 Err(e) => { 199 return ( 200 StatusCode::BAD_REQUEST, 201 Json(json!({"error": "InvalidHandle", "message": e.to_string()})), 202 ) 203 .into_response(); 204 } 205 } 206 } else { 207 if input.handle.contains(' ') || input.handle.contains('\t') { 208 return ( 209 StatusCode::BAD_REQUEST, 210 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})), 211 ) 212 .into_response(); 213 } 214 for c in input.handle.chars() { 215 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' { 216 return ( 217 StatusCode::BAD_REQUEST, 218 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})), 219 ) 220 .into_response(); 221 } 222 } 223 let handle_lower = input.handle.to_lowercase(); 224 if crate::moderation::has_explicit_slur(&handle_lower) { 225 return ( 226 StatusCode::BAD_REQUEST, 227 Json(json!({"error": "InvalidHandle", "message": "Inappropriate language in handle"})), 228 ) 229 .into_response(); 230 } 231 handle_lower 232 }; 233 let email: Option<String> = input 234 .email 235 .as_ref() 236 .map(|e| e.trim().to_string()) 237 .filter(|e| !e.is_empty()); 238 if let Some(ref email) = email 239 && !crate::api::validation::is_valid_email(email) 240 { 241 return ( 242 StatusCode::BAD_REQUEST, 243 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 244 ) 245 .into_response(); 246 } 247 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 248 let valid_channels = ["email", "discord", "telegram", "signal"]; 249 if !valid_channels.contains(&verification_channel) && !is_migration { 250 return ( 251 StatusCode::BAD_REQUEST, 252 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 253 ) 254 .into_response(); 255 } 256 let verification_recipient = if is_migration { 257 None 258 } else { 259 Some(match verification_channel { 260 "email" => match &input.email { 261 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 262 _ => return ( 263 StatusCode::BAD_REQUEST, 264 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 265 ).into_response(), 266 }, 267 "discord" => match &input.discord_id { 268 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 269 _ => return ( 270 StatusCode::BAD_REQUEST, 271 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 272 ).into_response(), 273 }, 274 "telegram" => match &input.telegram_username { 275 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 276 _ => return ( 277 StatusCode::BAD_REQUEST, 278 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 279 ).into_response(), 280 }, 281 "signal" => match &input.signal_number { 282 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 283 _ => return ( 284 StatusCode::BAD_REQUEST, 285 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 286 ).into_response(), 287 }, 288 _ => return ( 289 StatusCode::BAD_REQUEST, 290 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 291 ).into_response(), 292 }) 293 }; 294 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 295 let pds_endpoint = format!("https://{}", hostname); 296 let suffix = format!(".{}", hostname); 297 let handle = if input.handle.ends_with(&suffix) { 298 format!("{}.{}", validated_short_handle, hostname) 299 } else if input.handle.contains('.') { 300 validated_short_handle.clone() 301 } else { 302 format!("{}.{}", validated_short_handle, hostname) 303 }; 304 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 305 if let Some(signing_key_did) = &input.signing_key { 306 let reserved = sqlx::query!( 307 r#" 308 SELECT id, private_key_bytes 309 FROM reserved_signing_keys 310 WHERE public_key_did_key = $1 311 AND used_at IS NULL 312 AND expires_at > NOW() 313 FOR UPDATE 314 "#, 315 signing_key_did 316 ) 317 .fetch_optional(&state.db) 318 .await; 319 match reserved { 320 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 321 Ok(None) => { 322 return ( 323 StatusCode::BAD_REQUEST, 324 Json(json!({ 325 "error": "InvalidSigningKey", 326 "message": "Signing key not found, already used, or expired" 327 })), 328 ) 329 .into_response(); 330 } 331 Err(e) => { 332 error!("Error looking up reserved signing key: {:?}", e); 333 return ( 334 StatusCode::INTERNAL_SERVER_ERROR, 335 Json(json!({"error": "InternalError"})), 336 ) 337 .into_response(); 338 } 339 } 340 } else { 341 let secret_key = SecretKey::random(&mut OsRng); 342 (secret_key.to_bytes().to_vec(), None) 343 }; 344 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 345 Ok(k) => k, 346 Err(e) => { 347 error!("Error creating signing key: {:?}", e); 348 return ( 349 StatusCode::INTERNAL_SERVER_ERROR, 350 Json(json!({"error": "InternalError"})), 351 ) 352 .into_response(); 353 } 354 }; 355 let did_type = input.did_type.as_deref().unwrap_or("plc"); 356 let did = match did_type { 357 "web" => { 358 let subdomain_host = format!("{}.{}", input.handle, hostname); 359 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 360 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 361 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 362 self_hosted_did 363 } 364 "web-external" => { 365 let d = match &input.did { 366 Some(d) if !d.trim().is_empty() => d, 367 _ => { 368 return ( 369 StatusCode::BAD_REQUEST, 370 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})), 371 ) 372 .into_response(); 373 } 374 }; 375 if !d.starts_with("did:web:") { 376 return ( 377 StatusCode::BAD_REQUEST, 378 Json( 379 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}), 380 ), 381 ) 382 .into_response(); 383 } 384 if !is_did_web_byod 385 && let Err(e) = 386 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await 387 { 388 return ( 389 StatusCode::BAD_REQUEST, 390 Json(json!({"error": "InvalidDid", "message": e})), 391 ) 392 .into_response(); 393 } 394 info!(did = %d, "Creating external did:web account"); 395 d.clone() 396 } 397 _ => { 398 if let Some(d) = &input.did { 399 if d.starts_with("did:plc:") && is_migration { 400 info!(did = %d, "Migration with existing did:plc"); 401 d.clone() 402 } else if d.starts_with("did:web:") { 403 if !is_did_web_byod 404 && let Err(e) = verify_did_web( 405 d, 406 &hostname, 407 &input.handle, 408 input.signing_key.as_deref(), 409 ) 410 .await 411 { 412 return ( 413 StatusCode::BAD_REQUEST, 414 Json(json!({"error": "InvalidDid", "message": e})), 415 ) 416 .into_response(); 417 } 418 d.clone() 419 } else if !d.trim().is_empty() { 420 return ( 421 StatusCode::BAD_REQUEST, 422 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 423 ) 424 .into_response(); 425 } else { 426 let rotation_key = std::env::var("PLC_ROTATION_KEY") 427 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 428 let genesis_result = match create_genesis_operation( 429 &signing_key, 430 &rotation_key, 431 &handle, 432 &pds_endpoint, 433 ) { 434 Ok(r) => r, 435 Err(e) => { 436 error!("Error creating PLC genesis operation: {:?}", e); 437 return ( 438 StatusCode::INTERNAL_SERVER_ERROR, 439 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 440 ) 441 .into_response(); 442 } 443 }; 444 let plc_client = PlcClient::new(None); 445 if let Err(e) = plc_client 446 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 447 .await 448 { 449 error!("Failed to submit PLC genesis operation: {:?}", e); 450 return ( 451 StatusCode::BAD_GATEWAY, 452 Json(json!({ 453 "error": "UpstreamError", 454 "message": format!("Failed to register DID with PLC directory: {}", e) 455 })), 456 ) 457 .into_response(); 458 } 459 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 460 genesis_result.did 461 } 462 } else { 463 let rotation_key = std::env::var("PLC_ROTATION_KEY") 464 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 465 let genesis_result = match create_genesis_operation( 466 &signing_key, 467 &rotation_key, 468 &handle, 469 &pds_endpoint, 470 ) { 471 Ok(r) => r, 472 Err(e) => { 473 error!("Error creating PLC genesis operation: {:?}", e); 474 return ( 475 StatusCode::INTERNAL_SERVER_ERROR, 476 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 477 ) 478 .into_response(); 479 } 480 }; 481 let plc_client = PlcClient::new(None); 482 if let Err(e) = plc_client 483 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 484 .await 485 { 486 error!("Failed to submit PLC genesis operation: {:?}", e); 487 return ( 488 StatusCode::BAD_GATEWAY, 489 Json(json!({ 490 "error": "UpstreamError", 491 "message": format!("Failed to register DID with PLC directory: {}", e) 492 })), 493 ) 494 .into_response(); 495 } 496 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 497 genesis_result.did 498 } 499 } 500 }; 501 let mut tx = match state.db.begin().await { 502 Ok(tx) => tx, 503 Err(e) => { 504 error!("Error starting transaction: {:?}", e); 505 return ( 506 StatusCode::INTERNAL_SERVER_ERROR, 507 Json(json!({"error": "InternalError"})), 508 ) 509 .into_response(); 510 } 511 }; 512 if is_migration { 513 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 514 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 515 .bind(&did) 516 .fetch_optional(&mut *tx) 517 .await 518 .unwrap_or(None); 519 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 520 if deactivated_at.is_some() { 521 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 522 let update_result: Result<_, sqlx::Error> = 523 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 524 .bind(&handle) 525 .bind(account_id) 526 .execute(&mut *tx) 527 .await; 528 if let Err(e) = update_result { 529 if let Some(db_err) = e.as_database_error() 530 && db_err 531 .constraint() 532 .map(|c| c.contains("handle")) 533 .unwrap_or(false) 534 { 535 return ( 536 StatusCode::BAD_REQUEST, 537 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 538 ) 539 .into_response(); 540 } 541 error!("Error reactivating account: {:?}", e); 542 return ( 543 StatusCode::INTERNAL_SERVER_ERROR, 544 Json(json!({"error": "InternalError"})), 545 ) 546 .into_response(); 547 } 548 if let Err(e) = tx.commit().await { 549 error!("Error committing reactivation: {:?}", e); 550 return ( 551 StatusCode::INTERNAL_SERVER_ERROR, 552 Json(json!({"error": "InternalError"})), 553 ) 554 .into_response(); 555 } 556 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 557 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 558 ) 559 .bind(account_id) 560 .fetch_optional(&state.db) 561 .await 562 .unwrap_or(None); 563 let secret_key_bytes = match key_row { 564 Some((key_bytes, encryption_version)) => { 565 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 566 Ok(k) => k, 567 Err(e) => { 568 error!("Error decrypting key for reactivated account: {:?}", e); 569 return ( 570 StatusCode::INTERNAL_SERVER_ERROR, 571 Json(json!({"error": "InternalError"})), 572 ) 573 .into_response(); 574 } 575 } 576 } 577 None => { 578 error!("No signing key found for reactivated account"); 579 return ( 580 StatusCode::INTERNAL_SERVER_ERROR, 581 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 582 ) 583 .into_response(); 584 } 585 }; 586 let access_meta = 587 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 588 Ok(m) => m, 589 Err(e) => { 590 error!("Error creating access token: {:?}", e); 591 return ( 592 StatusCode::INTERNAL_SERVER_ERROR, 593 Json(json!({"error": "InternalError"})), 594 ) 595 .into_response(); 596 } 597 }; 598 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 599 &did, 600 &secret_key_bytes, 601 ) { 602 Ok(m) => m, 603 Err(e) => { 604 error!("Error creating refresh token: {:?}", e); 605 return ( 606 StatusCode::INTERNAL_SERVER_ERROR, 607 Json(json!({"error": "InternalError"})), 608 ) 609 .into_response(); 610 } 611 }; 612 let session_result: Result<_, sqlx::Error> = sqlx::query( 613 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 614 ) 615 .bind(&did) 616 .bind(&access_meta.jti) 617 .bind(&refresh_meta.jti) 618 .bind(access_meta.expires_at) 619 .bind(refresh_meta.expires_at) 620 .execute(&state.db) 621 .await; 622 if let Err(e) = session_result { 623 error!("Error creating session: {:?}", e); 624 return ( 625 StatusCode::INTERNAL_SERVER_ERROR, 626 Json(json!({"error": "InternalError"})), 627 ) 628 .into_response(); 629 } 630 return ( 631 StatusCode::OK, 632 Json(CreateAccountOutput { 633 handle: handle.clone(), 634 did: did.clone(), 635 did_doc: state.did_resolver.resolve_did_document(&did).await, 636 access_jwt: access_meta.token, 637 refresh_jwt: refresh_meta.token, 638 verification_required: false, 639 verification_channel: "email".to_string(), 640 }), 641 ) 642 .into_response(); 643 } else { 644 return ( 645 StatusCode::BAD_REQUEST, 646 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 647 ) 648 .into_response(); 649 } 650 } 651 } 652 let exists_result: Option<(i32,)> = 653 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 654 .bind(&handle) 655 .fetch_optional(&mut *tx) 656 .await 657 .unwrap_or(None); 658 if exists_result.is_some() { 659 return ( 660 StatusCode::BAD_REQUEST, 661 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 662 ) 663 .into_response(); 664 } 665 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 666 .map(|v| v == "true" || v == "1") 667 .unwrap_or(false); 668 if invite_code_required 669 && input 670 .invite_code 671 .as_ref() 672 .map(|c| c.trim().is_empty()) 673 .unwrap_or(true) 674 { 675 return ( 676 StatusCode::BAD_REQUEST, 677 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 678 ) 679 .into_response(); 680 } 681 if let Some(code) = &input.invite_code 682 && !code.trim().is_empty() 683 { 684 let invite_query = sqlx::query!( 685 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 686 code 687 ) 688 .fetch_optional(&mut *tx) 689 .await; 690 match invite_query { 691 Ok(Some(row)) => { 692 if row.available_uses <= 0 { 693 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 694 } 695 let update_invite = sqlx::query!( 696 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 697 code 698 ) 699 .execute(&mut *tx) 700 .await; 701 if let Err(e) = update_invite { 702 error!("Error updating invite code: {:?}", e); 703 return ( 704 StatusCode::INTERNAL_SERVER_ERROR, 705 Json(json!({"error": "InternalError"})), 706 ) 707 .into_response(); 708 } 709 } 710 Ok(None) => { 711 return ( 712 StatusCode::BAD_REQUEST, 713 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 714 ) 715 .into_response(); 716 } 717 Err(e) => { 718 error!("Error checking invite code: {:?}", e); 719 return ( 720 StatusCode::INTERNAL_SERVER_ERROR, 721 Json(json!({"error": "InternalError"})), 722 ) 723 .into_response(); 724 } 725 } 726 } 727 if let Err(e) = validate_password(&input.password) { 728 return ( 729 StatusCode::BAD_REQUEST, 730 Json(json!({ 731 "error": "InvalidPassword", 732 "message": e.to_string() 733 })), 734 ) 735 .into_response(); 736 } 737 738 let password_hash = match hash(&input.password, DEFAULT_COST) { 739 Ok(h) => h, 740 Err(e) => { 741 error!("Error hashing password: {:?}", e); 742 return ( 743 StatusCode::INTERNAL_SERVER_ERROR, 744 Json(json!({"error": "InternalError"})), 745 ) 746 .into_response(); 747 } 748 }; 749 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 750 .fetch_one(&mut *tx) 751 .await 752 .map(|c| c.unwrap_or(0) == 0) 753 .unwrap_or(false); 754 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod { 755 Some(chrono::Utc::now()) 756 } else { 757 None 758 }; 759 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 760 r#"INSERT INTO users ( 761 handle, email, did, password_hash, 762 preferred_comms_channel, 763 discord_id, telegram_username, signal_number, 764 is_admin, deactivated_at, email_verified 765 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 766 ) 767 .bind(&handle) 768 .bind(&email) 769 .bind(&did) 770 .bind(&password_hash) 771 .bind(verification_channel) 772 .bind( 773 input 774 .discord_id 775 .as_deref() 776 .map(|s| s.trim()) 777 .filter(|s| !s.is_empty()), 778 ) 779 .bind( 780 input 781 .telegram_username 782 .as_deref() 783 .map(|s| s.trim()) 784 .filter(|s| !s.is_empty()), 785 ) 786 .bind( 787 input 788 .signal_number 789 .as_deref() 790 .map(|s| s.trim()) 791 .filter(|s| !s.is_empty()), 792 ) 793 .bind(is_first_user) 794 .bind(deactivated_at) 795 .bind(false) 796 .fetch_one(&mut *tx) 797 .await; 798 let user_id = match user_insert { 799 Ok((id,)) => id, 800 Err(e) => { 801 if let Some(db_err) = e.as_database_error() 802 && db_err.code().as_deref() == Some("23505") 803 { 804 let constraint = db_err.constraint().unwrap_or(""); 805 if constraint.contains("handle") || constraint.contains("users_handle") { 806 return ( 807 StatusCode::BAD_REQUEST, 808 Json(json!({ 809 "error": "HandleNotAvailable", 810 "message": "Handle already taken" 811 })), 812 ) 813 .into_response(); 814 } else if constraint.contains("email") || constraint.contains("users_email") { 815 return ( 816 StatusCode::BAD_REQUEST, 817 Json(json!({ 818 "error": "InvalidEmail", 819 "message": "Email already registered" 820 })), 821 ) 822 .into_response(); 823 } else if constraint.contains("did") || constraint.contains("users_did") { 824 return ( 825 StatusCode::BAD_REQUEST, 826 Json(json!({ 827 "error": "AccountAlreadyExists", 828 "message": "An account with this DID already exists" 829 })), 830 ) 831 .into_response(); 832 } 833 } 834 error!("Error inserting user: {:?}", e); 835 return ( 836 StatusCode::INTERNAL_SERVER_ERROR, 837 Json(json!({"error": "InternalError"})), 838 ) 839 .into_response(); 840 } 841 }; 842 843 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 844 Ok(enc) => enc, 845 Err(e) => { 846 error!("Error encrypting user key: {:?}", e); 847 return ( 848 StatusCode::INTERNAL_SERVER_ERROR, 849 Json(json!({"error": "InternalError"})), 850 ) 851 .into_response(); 852 } 853 }; 854 let key_insert = sqlx::query!( 855 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 856 user_id, 857 &encrypted_key_bytes[..], 858 crate::config::ENCRYPTION_VERSION 859 ) 860 .execute(&mut *tx) 861 .await; 862 if let Err(e) = key_insert { 863 error!("Error inserting user key: {:?}", e); 864 return ( 865 StatusCode::INTERNAL_SERVER_ERROR, 866 Json(json!({"error": "InternalError"})), 867 ) 868 .into_response(); 869 } 870 if let Some(key_id) = reserved_key_id { 871 let mark_used = sqlx::query!( 872 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 873 key_id 874 ) 875 .execute(&mut *tx) 876 .await; 877 if let Err(e) = mark_used { 878 error!("Error marking reserved key as used: {:?}", e); 879 return ( 880 StatusCode::INTERNAL_SERVER_ERROR, 881 Json(json!({"error": "InternalError"})), 882 ) 883 .into_response(); 884 } 885 } 886 let mst = Mst::new(Arc::new(state.block_store.clone())); 887 let mst_root = match mst.persist().await { 888 Ok(c) => c, 889 Err(e) => { 890 error!("Error persisting MST: {:?}", e); 891 return ( 892 StatusCode::INTERNAL_SERVER_ERROR, 893 Json(json!({"error": "InternalError"})), 894 ) 895 .into_response(); 896 } 897 }; 898 let rev = Tid::now(LimitedU32::MIN); 899 let (commit_bytes, _sig) = 900 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) { 901 Ok(result) => result, 902 Err(e) => { 903 error!("Error creating genesis commit: {:?}", e); 904 return ( 905 StatusCode::INTERNAL_SERVER_ERROR, 906 Json(json!({"error": "InternalError"})), 907 ) 908 .into_response(); 909 } 910 }; 911 let commit_cid = match state.block_store.put(&commit_bytes).await { 912 Ok(c) => c, 913 Err(e) => { 914 error!("Error saving genesis commit: {:?}", e); 915 return ( 916 StatusCode::INTERNAL_SERVER_ERROR, 917 Json(json!({"error": "InternalError"})), 918 ) 919 .into_response(); 920 } 921 }; 922 let commit_cid_str = commit_cid.to_string(); 923 let rev_str = rev.as_ref().to_string(); 924 let repo_insert = sqlx::query!( 925 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", 926 user_id, 927 commit_cid_str, 928 rev_str 929 ) 930 .execute(&mut *tx) 931 .await; 932 if let Err(e) = repo_insert { 933 error!("Error initializing repo: {:?}", e); 934 return ( 935 StatusCode::INTERNAL_SERVER_ERROR, 936 Json(json!({"error": "InternalError"})), 937 ) 938 .into_response(); 939 } 940 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()]; 941 if let Err(e) = sqlx::query!( 942 r#" 943 INSERT INTO user_blocks (user_id, block_cid) 944 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 945 ON CONFLICT (user_id, block_cid) DO NOTHING 946 "#, 947 user_id, 948 &genesis_block_cids 949 ) 950 .execute(&mut *tx) 951 .await 952 { 953 error!("Error inserting user_blocks: {:?}", e); 954 return ( 955 StatusCode::INTERNAL_SERVER_ERROR, 956 Json(json!({"error": "InternalError"})), 957 ) 958 .into_response(); 959 } 960 if let Some(code) = &input.invite_code 961 && !code.trim().is_empty() 962 { 963 let use_insert = sqlx::query!( 964 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 965 code, 966 user_id 967 ) 968 .execute(&mut *tx) 969 .await; 970 if let Err(e) = use_insert { 971 error!("Error recording invite usage: {:?}", e); 972 return ( 973 StatusCode::INTERNAL_SERVER_ERROR, 974 Json(json!({"error": "InternalError"})), 975 ) 976 .into_response(); 977 } 978 } 979 if let Err(e) = tx.commit().await { 980 error!("Error committing transaction: {:?}", e); 981 return ( 982 StatusCode::INTERNAL_SERVER_ERROR, 983 Json(json!({"error": "InternalError"})), 984 ) 985 .into_response(); 986 } 987 if !is_migration && !is_did_web_byod { 988 if let Err(e) = 989 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await 990 { 991 warn!("Failed to sequence identity event for {}: {}", did, e); 992 } 993 if let Err(e) = 994 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 995 { 996 warn!("Failed to sequence account event for {}: {}", did, e); 997 } 998 if let Err(e) = 999 crate::api::repo::record::sequence_genesis_commit(&state, &did, &commit_cid, &mst_root, &rev_str).await 1000 { 1001 warn!("Failed to sequence commit event for {}: {}", did, e); 1002 } 1003 if let Err(e) = crate::api::repo::record::sequence_sync_event( 1004 &state, 1005 &did, 1006 &commit_cid_str, 1007 Some(rev.as_ref()), 1008 ) 1009 .await 1010 { 1011 warn!("Failed to sequence sync event for {}: {}", did, e); 1012 } 1013 let profile_record = json!({ 1014 "$type": "app.bsky.actor.profile", 1015 "displayName": input.handle 1016 }); 1017 if let Err(e) = crate::api::repo::record::create_record_internal( 1018 &state, 1019 &did, 1020 "app.bsky.actor.profile", 1021 "self", 1022 &profile_record, 1023 ) 1024 .await 1025 { 1026 warn!("Failed to create default profile for {}: {}", did, e); 1027 } 1028 } 1029 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 1030 if !is_migration { 1031 if let Some(ref recipient) = verification_recipient { 1032 let verification_token = crate::auth::verification_token::generate_signup_token( 1033 &did, 1034 verification_channel, 1035 recipient, 1036 ); 1037 let formatted_token = 1038 crate::auth::verification_token::format_token_for_display(&verification_token); 1039 if let Err(e) = crate::comms::enqueue_signup_verification( 1040 &state.db, 1041 user_id, 1042 verification_channel, 1043 recipient, 1044 &formatted_token, 1045 None, 1046 ) 1047 .await 1048 { 1049 warn!( 1050 "Failed to enqueue signup verification notification: {:?}", 1051 e 1052 ); 1053 } 1054 } 1055 } else if let Some(ref user_email) = email { 1056 let token = crate::auth::verification_token::generate_migration_token(&did, user_email); 1057 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 1058 if let Err(e) = crate::comms::enqueue_migration_verification( 1059 &state.db, 1060 user_id, 1061 user_email, 1062 &formatted_token, 1063 &hostname, 1064 ) 1065 .await 1066 { 1067 warn!("Failed to enqueue migration verification email: {:?}", e); 1068 } 1069 } 1070 1071 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) 1072 { 1073 Ok(m) => m, 1074 Err(e) => { 1075 error!("createAccount: Error creating access token: {:?}", e); 1076 return ( 1077 StatusCode::INTERNAL_SERVER_ERROR, 1078 Json(json!({"error": "InternalError"})), 1079 ) 1080 .into_response(); 1081 } 1082 }; 1083 let refresh_meta = 1084 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 1085 Ok(m) => m, 1086 Err(e) => { 1087 error!("createAccount: Error creating refresh token: {:?}", e); 1088 return ( 1089 StatusCode::INTERNAL_SERVER_ERROR, 1090 Json(json!({"error": "InternalError"})), 1091 ) 1092 .into_response(); 1093 } 1094 }; 1095 if let Err(e) = sqlx::query!( 1096 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 1097 did, 1098 access_meta.jti, 1099 refresh_meta.jti, 1100 access_meta.expires_at, 1101 refresh_meta.expires_at 1102 ) 1103 .execute(&state.db) 1104 .await 1105 { 1106 error!("createAccount: Error creating session: {:?}", e); 1107 return ( 1108 StatusCode::INTERNAL_SERVER_ERROR, 1109 Json(json!({"error": "InternalError"})), 1110 ) 1111 .into_response(); 1112 } 1113 1114 let did_doc = state.did_resolver.resolve_did_document(&did).await; 1115 1116 if is_migration { 1117 info!( 1118 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}", 1119 did, handle 1120 ); 1121 } 1122 1123 ( 1124 StatusCode::OK, 1125 Json(CreateAccountOutput { 1126 handle: handle.clone(), 1127 did, 1128 did_doc, 1129 access_jwt: access_meta.token, 1130 refresh_jwt: refresh_meta.token, 1131 verification_required: !is_migration, 1132 verification_channel: verification_channel.to_string(), 1133 }), 1134 ) 1135 .into_response() 1136} 1137