this repo has no description
1use super::did::verify_did_web; 2use crate::api::repo::record::utils::create_signed_commit; 3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 5use crate::state::{AppState, RateLimitKind}; 6use crate::validation::validate_password; 7use axum::{ 8 Json, 9 extract::State, 10 http::{HeaderMap, StatusCode}, 11 response::{IntoResponse, Response}, 12}; 13use bcrypt::{DEFAULT_COST, hash}; 14use jacquard::types::{integer::LimitedU32, string::Tid}; 15use jacquard_repo::{mst::Mst, storage::BlockStore}; 16use k256::{SecretKey, ecdsa::SigningKey}; 17use rand::rngs::OsRng; 18use serde::{Deserialize, Serialize}; 19use serde_json::json; 20use std::sync::Arc; 21use tracing::{debug, error, info, warn}; 22 23fn extract_client_ip(headers: &HeaderMap) -> String { 24 if let Some(forwarded) = headers.get("x-forwarded-for") 25 && let Ok(value) = forwarded.to_str() 26 && let Some(first_ip) = value.split(',').next() 27 { 28 return first_ip.trim().to_string(); 29 } 30 if let Some(real_ip) = headers.get("x-real-ip") 31 && let Ok(value) = real_ip.to_str() 32 { 33 return value.trim().to_string(); 34 } 35 "unknown".to_string() 36} 37 38#[derive(Deserialize)] 39#[serde(rename_all = "camelCase")] 40pub struct CreateAccountInput { 41 pub handle: String, 42 pub email: Option<String>, 43 pub password: String, 44 pub invite_code: Option<String>, 45 pub did: Option<String>, 46 pub did_type: Option<String>, 47 pub signing_key: Option<String>, 48 pub verification_channel: Option<String>, 49 pub discord_id: Option<String>, 50 pub telegram_username: Option<String>, 51 pub signal_number: Option<String>, 52} 53 54#[derive(Serialize)] 55#[serde(rename_all = "camelCase")] 56pub struct CreateAccountOutput { 57 pub handle: String, 58 pub did: String, 59 #[serde(skip_serializing_if = "Option::is_none")] 60 pub did_doc: Option<serde_json::Value>, 61 pub access_jwt: String, 62 pub refresh_jwt: String, 63 pub verification_required: bool, 64 pub verification_channel: String, 65} 66 67pub async fn create_account( 68 State(state): State<AppState>, 69 headers: HeaderMap, 70 Json(input): Json<CreateAccountInput>, 71) -> Response { 72 let is_potential_migration = input 73 .did 74 .as_ref() 75 .map(|d| d.starts_with("did:plc:")) 76 .unwrap_or(false); 77 if is_potential_migration { 78 info!( 79 "[MIGRATION] createAccount called for potential migration did={:?} handle={}", 80 input.did, input.handle 81 ); 82 } else { 83 info!("create_account called"); 84 } 85 let client_ip = extract_client_ip(&headers); 86 if !state 87 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 88 .await 89 { 90 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 91 return ( 92 StatusCode::TOO_MANY_REQUESTS, 93 Json(json!({ 94 "error": "RateLimitExceeded", 95 "message": "Too many account creation attempts. Please try again later." 96 })), 97 ) 98 .into_response(); 99 } 100 101 let migration_auth = if let Some(token) = 102 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 103 { 104 if is_service_token(&token) { 105 let verifier = ServiceTokenVerifier::new(); 106 match verifier 107 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 108 .await 109 { 110 Ok(claims) => { 111 debug!("Service token verified for migration: iss={}", claims.iss); 112 Some(claims.iss) 113 } 114 Err(e) => { 115 error!("Service token verification failed: {:?}", e); 116 return ( 117 StatusCode::UNAUTHORIZED, 118 Json(json!({ 119 "error": "AuthenticationFailed", 120 "message": format!("Service token verification failed: {}", e) 121 })), 122 ) 123 .into_response(); 124 } 125 } 126 } else { 127 None 128 } 129 } else { 130 None 131 }; 132 133 let is_did_web_byod = migration_auth.is_some() 134 && input 135 .did 136 .as_ref() 137 .map(|d| d.starts_with("did:web:")) 138 .unwrap_or(false); 139 140 let is_migration = migration_auth.is_some() 141 && input 142 .did 143 .as_ref() 144 .map(|d| d.starts_with("did:plc:")) 145 .unwrap_or(false); 146 147 if (is_migration || is_did_web_byod) 148 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref()) 149 { 150 if provided_did != auth_did { 151 info!( 152 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}", 153 auth_did, provided_did 154 ); 155 return ( 156 StatusCode::FORBIDDEN, 157 Json(json!({ 158 "error": "AuthorizationError", 159 "message": format!("Service token issuer {} does not match DID {}", auth_did, provided_did) 160 })), 161 ) 162 .into_response(); 163 } 164 if is_did_web_byod { 165 info!(did = %provided_did, "Processing did:web BYOD account creation"); 166 } else { 167 info!( 168 "[MIGRATION] createAccount: Service token verified, processing migration for did={}", 169 provided_did 170 ); 171 } 172 } 173 174 let hostname_for_validation = 175 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 176 let pds_suffix = format!(".{}", hostname_for_validation); 177 178 let validated_short_handle = if !input.handle.contains('.') 179 || input.handle.ends_with(&pds_suffix) 180 { 181 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 182 input 183 .handle 184 .strip_suffix(&pds_suffix) 185 .unwrap_or(&input.handle) 186 } else { 187 &input.handle 188 }; 189 match crate::api::validation::validate_short_handle(handle_to_validate) { 190 Ok(h) => h, 191 Err(e) => { 192 return ( 193 StatusCode::BAD_REQUEST, 194 Json(json!({"error": "InvalidHandle", "message": e.to_string()})), 195 ) 196 .into_response(); 197 } 198 } 199 } else { 200 if input.handle.contains(' ') || input.handle.contains('\t') { 201 return ( 202 StatusCode::BAD_REQUEST, 203 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})), 204 ) 205 .into_response(); 206 } 207 for c in input.handle.chars() { 208 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' { 209 return ( 210 StatusCode::BAD_REQUEST, 211 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})), 212 ) 213 .into_response(); 214 } 215 } 216 let handle_lower = input.handle.to_lowercase(); 217 if crate::moderation::has_explicit_slur(&handle_lower) { 218 return ( 219 StatusCode::BAD_REQUEST, 220 Json(json!({"error": "InvalidHandle", "message": "Inappropriate language in handle"})), 221 ) 222 .into_response(); 223 } 224 handle_lower 225 }; 226 let email: Option<String> = input 227 .email 228 .as_ref() 229 .map(|e| e.trim().to_string()) 230 .filter(|e| !e.is_empty()); 231 if let Some(ref email) = email 232 && !crate::api::validation::is_valid_email(email) 233 { 234 return ( 235 StatusCode::BAD_REQUEST, 236 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 237 ) 238 .into_response(); 239 } 240 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 241 let valid_channels = ["email", "discord", "telegram", "signal"]; 242 if !valid_channels.contains(&verification_channel) && !is_migration { 243 return ( 244 StatusCode::BAD_REQUEST, 245 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 246 ) 247 .into_response(); 248 } 249 let verification_recipient = if is_migration { 250 None 251 } else { 252 Some(match verification_channel { 253 "email" => match &input.email { 254 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 255 _ => return ( 256 StatusCode::BAD_REQUEST, 257 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 258 ).into_response(), 259 }, 260 "discord" => match &input.discord_id { 261 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 262 _ => return ( 263 StatusCode::BAD_REQUEST, 264 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 265 ).into_response(), 266 }, 267 "telegram" => match &input.telegram_username { 268 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 269 _ => return ( 270 StatusCode::BAD_REQUEST, 271 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 272 ).into_response(), 273 }, 274 "signal" => match &input.signal_number { 275 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 276 _ => return ( 277 StatusCode::BAD_REQUEST, 278 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 279 ).into_response(), 280 }, 281 _ => return ( 282 StatusCode::BAD_REQUEST, 283 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 284 ).into_response(), 285 }) 286 }; 287 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 288 let pds_endpoint = format!("https://{}", hostname); 289 let suffix = format!(".{}", hostname); 290 let handle = if input.handle.ends_with(&suffix) { 291 format!("{}.{}", validated_short_handle, hostname) 292 } else if input.handle.contains('.') { 293 validated_short_handle.clone() 294 } else { 295 format!("{}.{}", validated_short_handle, hostname) 296 }; 297 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 298 if let Some(signing_key_did) = &input.signing_key { 299 let reserved = sqlx::query!( 300 r#" 301 SELECT id, private_key_bytes 302 FROM reserved_signing_keys 303 WHERE public_key_did_key = $1 304 AND used_at IS NULL 305 AND expires_at > NOW() 306 FOR UPDATE 307 "#, 308 signing_key_did 309 ) 310 .fetch_optional(&state.db) 311 .await; 312 match reserved { 313 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 314 Ok(None) => { 315 return ( 316 StatusCode::BAD_REQUEST, 317 Json(json!({ 318 "error": "InvalidSigningKey", 319 "message": "Signing key not found, already used, or expired" 320 })), 321 ) 322 .into_response(); 323 } 324 Err(e) => { 325 error!("Error looking up reserved signing key: {:?}", e); 326 return ( 327 StatusCode::INTERNAL_SERVER_ERROR, 328 Json(json!({"error": "InternalError"})), 329 ) 330 .into_response(); 331 } 332 } 333 } else { 334 let secret_key = SecretKey::random(&mut OsRng); 335 (secret_key.to_bytes().to_vec(), None) 336 }; 337 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 338 Ok(k) => k, 339 Err(e) => { 340 error!("Error creating signing key: {:?}", e); 341 return ( 342 StatusCode::INTERNAL_SERVER_ERROR, 343 Json(json!({"error": "InternalError"})), 344 ) 345 .into_response(); 346 } 347 }; 348 let did_type = input.did_type.as_deref().unwrap_or("plc"); 349 let did = match did_type { 350 "web" => { 351 let subdomain_host = format!("{}.{}", input.handle, hostname); 352 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 353 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 354 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 355 self_hosted_did 356 } 357 "web-external" => { 358 let d = match &input.did { 359 Some(d) if !d.trim().is_empty() => d, 360 _ => { 361 return ( 362 StatusCode::BAD_REQUEST, 363 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})), 364 ) 365 .into_response(); 366 } 367 }; 368 if !d.starts_with("did:web:") { 369 return ( 370 StatusCode::BAD_REQUEST, 371 Json( 372 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}), 373 ), 374 ) 375 .into_response(); 376 } 377 if !is_did_web_byod 378 && let Err(e) = 379 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await 380 { 381 return ( 382 StatusCode::BAD_REQUEST, 383 Json(json!({"error": "InvalidDid", "message": e})), 384 ) 385 .into_response(); 386 } 387 info!(did = %d, "Creating external did:web account"); 388 d.clone() 389 } 390 _ => { 391 if let Some(d) = &input.did { 392 if d.starts_with("did:plc:") && is_migration { 393 info!(did = %d, "Migration with existing did:plc"); 394 d.clone() 395 } else if d.starts_with("did:web:") { 396 if !is_did_web_byod 397 && let Err(e) = verify_did_web( 398 d, 399 &hostname, 400 &input.handle, 401 input.signing_key.as_deref(), 402 ) 403 .await 404 { 405 return ( 406 StatusCode::BAD_REQUEST, 407 Json(json!({"error": "InvalidDid", "message": e})), 408 ) 409 .into_response(); 410 } 411 d.clone() 412 } else if !d.trim().is_empty() { 413 return ( 414 StatusCode::BAD_REQUEST, 415 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 416 ) 417 .into_response(); 418 } else { 419 let rotation_key = std::env::var("PLC_ROTATION_KEY") 420 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 421 let genesis_result = match create_genesis_operation( 422 &signing_key, 423 &rotation_key, 424 &handle, 425 &pds_endpoint, 426 ) { 427 Ok(r) => r, 428 Err(e) => { 429 error!("Error creating PLC genesis operation: {:?}", e); 430 return ( 431 StatusCode::INTERNAL_SERVER_ERROR, 432 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 433 ) 434 .into_response(); 435 } 436 }; 437 let plc_client = PlcClient::new(None); 438 if let Err(e) = plc_client 439 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 440 .await 441 { 442 error!("Failed to submit PLC genesis operation: {:?}", e); 443 return ( 444 StatusCode::BAD_GATEWAY, 445 Json(json!({ 446 "error": "UpstreamError", 447 "message": format!("Failed to register DID with PLC directory: {}", e) 448 })), 449 ) 450 .into_response(); 451 } 452 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 453 genesis_result.did 454 } 455 } else { 456 let rotation_key = std::env::var("PLC_ROTATION_KEY") 457 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 458 let genesis_result = match create_genesis_operation( 459 &signing_key, 460 &rotation_key, 461 &handle, 462 &pds_endpoint, 463 ) { 464 Ok(r) => r, 465 Err(e) => { 466 error!("Error creating PLC genesis operation: {:?}", e); 467 return ( 468 StatusCode::INTERNAL_SERVER_ERROR, 469 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 470 ) 471 .into_response(); 472 } 473 }; 474 let plc_client = PlcClient::new(None); 475 if let Err(e) = plc_client 476 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 477 .await 478 { 479 error!("Failed to submit PLC genesis operation: {:?}", e); 480 return ( 481 StatusCode::BAD_GATEWAY, 482 Json(json!({ 483 "error": "UpstreamError", 484 "message": format!("Failed to register DID with PLC directory: {}", e) 485 })), 486 ) 487 .into_response(); 488 } 489 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 490 genesis_result.did 491 } 492 } 493 }; 494 let mut tx = match state.db.begin().await { 495 Ok(tx) => tx, 496 Err(e) => { 497 error!("Error starting transaction: {:?}", e); 498 return ( 499 StatusCode::INTERNAL_SERVER_ERROR, 500 Json(json!({"error": "InternalError"})), 501 ) 502 .into_response(); 503 } 504 }; 505 if is_migration { 506 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 507 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 508 .bind(&did) 509 .fetch_optional(&mut *tx) 510 .await 511 .unwrap_or(None); 512 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 513 if deactivated_at.is_some() { 514 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 515 let update_result: Result<_, sqlx::Error> = 516 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 517 .bind(&handle) 518 .bind(account_id) 519 .execute(&mut *tx) 520 .await; 521 if let Err(e) = update_result { 522 if let Some(db_err) = e.as_database_error() 523 && db_err 524 .constraint() 525 .map(|c| c.contains("handle")) 526 .unwrap_or(false) 527 { 528 return ( 529 StatusCode::BAD_REQUEST, 530 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 531 ) 532 .into_response(); 533 } 534 error!("Error reactivating account: {:?}", e); 535 return ( 536 StatusCode::INTERNAL_SERVER_ERROR, 537 Json(json!({"error": "InternalError"})), 538 ) 539 .into_response(); 540 } 541 if let Err(e) = tx.commit().await { 542 error!("Error committing reactivation: {:?}", e); 543 return ( 544 StatusCode::INTERNAL_SERVER_ERROR, 545 Json(json!({"error": "InternalError"})), 546 ) 547 .into_response(); 548 } 549 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 550 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 551 ) 552 .bind(account_id) 553 .fetch_optional(&state.db) 554 .await 555 .unwrap_or(None); 556 let secret_key_bytes = match key_row { 557 Some((key_bytes, encryption_version)) => { 558 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 559 Ok(k) => k, 560 Err(e) => { 561 error!("Error decrypting key for reactivated account: {:?}", e); 562 return ( 563 StatusCode::INTERNAL_SERVER_ERROR, 564 Json(json!({"error": "InternalError"})), 565 ) 566 .into_response(); 567 } 568 } 569 } 570 None => { 571 error!("No signing key found for reactivated account"); 572 return ( 573 StatusCode::INTERNAL_SERVER_ERROR, 574 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 575 ) 576 .into_response(); 577 } 578 }; 579 let access_meta = 580 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 581 Ok(m) => m, 582 Err(e) => { 583 error!("Error creating access token: {:?}", e); 584 return ( 585 StatusCode::INTERNAL_SERVER_ERROR, 586 Json(json!({"error": "InternalError"})), 587 ) 588 .into_response(); 589 } 590 }; 591 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 592 &did, 593 &secret_key_bytes, 594 ) { 595 Ok(m) => m, 596 Err(e) => { 597 error!("Error creating refresh token: {:?}", e); 598 return ( 599 StatusCode::INTERNAL_SERVER_ERROR, 600 Json(json!({"error": "InternalError"})), 601 ) 602 .into_response(); 603 } 604 }; 605 let session_result: Result<_, sqlx::Error> = sqlx::query( 606 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 607 ) 608 .bind(&did) 609 .bind(&access_meta.jti) 610 .bind(&refresh_meta.jti) 611 .bind(access_meta.expires_at) 612 .bind(refresh_meta.expires_at) 613 .execute(&state.db) 614 .await; 615 if let Err(e) = session_result { 616 error!("Error creating session: {:?}", e); 617 return ( 618 StatusCode::INTERNAL_SERVER_ERROR, 619 Json(json!({"error": "InternalError"})), 620 ) 621 .into_response(); 622 } 623 return ( 624 StatusCode::OK, 625 Json(CreateAccountOutput { 626 handle: handle.clone(), 627 did: did.clone(), 628 did_doc: state.did_resolver.resolve_did_document(&did).await, 629 access_jwt: access_meta.token, 630 refresh_jwt: refresh_meta.token, 631 verification_required: false, 632 verification_channel: "email".to_string(), 633 }), 634 ) 635 .into_response(); 636 } else { 637 return ( 638 StatusCode::BAD_REQUEST, 639 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 640 ) 641 .into_response(); 642 } 643 } 644 } 645 let exists_result: Option<(i32,)> = 646 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 647 .bind(&handle) 648 .fetch_optional(&mut *tx) 649 .await 650 .unwrap_or(None); 651 if exists_result.is_some() { 652 return ( 653 StatusCode::BAD_REQUEST, 654 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 655 ) 656 .into_response(); 657 } 658 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 659 .map(|v| v == "true" || v == "1") 660 .unwrap_or(false); 661 if invite_code_required 662 && input 663 .invite_code 664 .as_ref() 665 .map(|c| c.trim().is_empty()) 666 .unwrap_or(true) 667 { 668 return ( 669 StatusCode::BAD_REQUEST, 670 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 671 ) 672 .into_response(); 673 } 674 if let Some(code) = &input.invite_code 675 && !code.trim().is_empty() 676 { 677 let invite_query = sqlx::query!( 678 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 679 code 680 ) 681 .fetch_optional(&mut *tx) 682 .await; 683 match invite_query { 684 Ok(Some(row)) => { 685 if row.available_uses <= 0 { 686 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 687 } 688 let update_invite = sqlx::query!( 689 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 690 code 691 ) 692 .execute(&mut *tx) 693 .await; 694 if let Err(e) = update_invite { 695 error!("Error updating invite code: {:?}", e); 696 return ( 697 StatusCode::INTERNAL_SERVER_ERROR, 698 Json(json!({"error": "InternalError"})), 699 ) 700 .into_response(); 701 } 702 } 703 Ok(None) => { 704 return ( 705 StatusCode::BAD_REQUEST, 706 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 707 ) 708 .into_response(); 709 } 710 Err(e) => { 711 error!("Error checking invite code: {:?}", e); 712 return ( 713 StatusCode::INTERNAL_SERVER_ERROR, 714 Json(json!({"error": "InternalError"})), 715 ) 716 .into_response(); 717 } 718 } 719 } 720 if let Err(e) = validate_password(&input.password) { 721 return ( 722 StatusCode::BAD_REQUEST, 723 Json(json!({ 724 "error": "InvalidPassword", 725 "message": e.to_string() 726 })), 727 ) 728 .into_response(); 729 } 730 731 let password_hash = match hash(&input.password, DEFAULT_COST) { 732 Ok(h) => h, 733 Err(e) => { 734 error!("Error hashing password: {:?}", e); 735 return ( 736 StatusCode::INTERNAL_SERVER_ERROR, 737 Json(json!({"error": "InternalError"})), 738 ) 739 .into_response(); 740 } 741 }; 742 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 743 .fetch_one(&mut *tx) 744 .await 745 .map(|c| c.unwrap_or(0) == 0) 746 .unwrap_or(false); 747 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod { 748 Some(chrono::Utc::now()) 749 } else { 750 None 751 }; 752 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 753 r#"INSERT INTO users ( 754 handle, email, did, password_hash, 755 preferred_comms_channel, 756 discord_id, telegram_username, signal_number, 757 is_admin, deactivated_at, email_verified 758 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 759 ) 760 .bind(&handle) 761 .bind(&email) 762 .bind(&did) 763 .bind(&password_hash) 764 .bind(verification_channel) 765 .bind( 766 input 767 .discord_id 768 .as_deref() 769 .map(|s| s.trim()) 770 .filter(|s| !s.is_empty()), 771 ) 772 .bind( 773 input 774 .telegram_username 775 .as_deref() 776 .map(|s| s.trim()) 777 .filter(|s| !s.is_empty()), 778 ) 779 .bind( 780 input 781 .signal_number 782 .as_deref() 783 .map(|s| s.trim()) 784 .filter(|s| !s.is_empty()), 785 ) 786 .bind(is_first_user) 787 .bind(deactivated_at) 788 .bind(false) 789 .fetch_one(&mut *tx) 790 .await; 791 let user_id = match user_insert { 792 Ok((id,)) => id, 793 Err(e) => { 794 if let Some(db_err) = e.as_database_error() 795 && db_err.code().as_deref() == Some("23505") 796 { 797 let constraint = db_err.constraint().unwrap_or(""); 798 if constraint.contains("handle") || constraint.contains("users_handle") { 799 return ( 800 StatusCode::BAD_REQUEST, 801 Json(json!({ 802 "error": "HandleNotAvailable", 803 "message": "Handle already taken" 804 })), 805 ) 806 .into_response(); 807 } else if constraint.contains("email") || constraint.contains("users_email") { 808 return ( 809 StatusCode::BAD_REQUEST, 810 Json(json!({ 811 "error": "InvalidEmail", 812 "message": "Email already registered" 813 })), 814 ) 815 .into_response(); 816 } else if constraint.contains("did") || constraint.contains("users_did") { 817 return ( 818 StatusCode::BAD_REQUEST, 819 Json(json!({ 820 "error": "AccountAlreadyExists", 821 "message": "An account with this DID already exists" 822 })), 823 ) 824 .into_response(); 825 } 826 } 827 error!("Error inserting user: {:?}", e); 828 return ( 829 StatusCode::INTERNAL_SERVER_ERROR, 830 Json(json!({"error": "InternalError"})), 831 ) 832 .into_response(); 833 } 834 }; 835 836 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 837 Ok(enc) => enc, 838 Err(e) => { 839 error!("Error encrypting user key: {:?}", e); 840 return ( 841 StatusCode::INTERNAL_SERVER_ERROR, 842 Json(json!({"error": "InternalError"})), 843 ) 844 .into_response(); 845 } 846 }; 847 let key_insert = sqlx::query!( 848 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 849 user_id, 850 &encrypted_key_bytes[..], 851 crate::config::ENCRYPTION_VERSION 852 ) 853 .execute(&mut *tx) 854 .await; 855 if let Err(e) = key_insert { 856 error!("Error inserting user key: {:?}", e); 857 return ( 858 StatusCode::INTERNAL_SERVER_ERROR, 859 Json(json!({"error": "InternalError"})), 860 ) 861 .into_response(); 862 } 863 if let Some(key_id) = reserved_key_id { 864 let mark_used = sqlx::query!( 865 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 866 key_id 867 ) 868 .execute(&mut *tx) 869 .await; 870 if let Err(e) = mark_used { 871 error!("Error marking reserved key as used: {:?}", e); 872 return ( 873 StatusCode::INTERNAL_SERVER_ERROR, 874 Json(json!({"error": "InternalError"})), 875 ) 876 .into_response(); 877 } 878 } 879 let mst = Mst::new(Arc::new(state.block_store.clone())); 880 let mst_root = match mst.persist().await { 881 Ok(c) => c, 882 Err(e) => { 883 error!("Error persisting MST: {:?}", e); 884 return ( 885 StatusCode::INTERNAL_SERVER_ERROR, 886 Json(json!({"error": "InternalError"})), 887 ) 888 .into_response(); 889 } 890 }; 891 let rev = Tid::now(LimitedU32::MIN); 892 let (commit_bytes, _sig) = 893 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) { 894 Ok(result) => result, 895 Err(e) => { 896 error!("Error creating genesis commit: {:?}", e); 897 return ( 898 StatusCode::INTERNAL_SERVER_ERROR, 899 Json(json!({"error": "InternalError"})), 900 ) 901 .into_response(); 902 } 903 }; 904 let commit_cid = match state.block_store.put(&commit_bytes).await { 905 Ok(c) => c, 906 Err(e) => { 907 error!("Error saving genesis commit: {:?}", e); 908 return ( 909 StatusCode::INTERNAL_SERVER_ERROR, 910 Json(json!({"error": "InternalError"})), 911 ) 912 .into_response(); 913 } 914 }; 915 let commit_cid_str = commit_cid.to_string(); 916 let rev_str = rev.as_ref().to_string(); 917 let repo_insert = sqlx::query!( 918 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", 919 user_id, 920 commit_cid_str, 921 rev_str 922 ) 923 .execute(&mut *tx) 924 .await; 925 if let Err(e) = repo_insert { 926 error!("Error initializing repo: {:?}", e); 927 return ( 928 StatusCode::INTERNAL_SERVER_ERROR, 929 Json(json!({"error": "InternalError"})), 930 ) 931 .into_response(); 932 } 933 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()]; 934 if let Err(e) = sqlx::query!( 935 r#" 936 INSERT INTO user_blocks (user_id, block_cid) 937 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 938 ON CONFLICT (user_id, block_cid) DO NOTHING 939 "#, 940 user_id, 941 &genesis_block_cids 942 ) 943 .execute(&mut *tx) 944 .await 945 { 946 error!("Error inserting user_blocks: {:?}", e); 947 return ( 948 StatusCode::INTERNAL_SERVER_ERROR, 949 Json(json!({"error": "InternalError"})), 950 ) 951 .into_response(); 952 } 953 if let Some(code) = &input.invite_code 954 && !code.trim().is_empty() 955 { 956 let use_insert = sqlx::query!( 957 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 958 code, 959 user_id 960 ) 961 .execute(&mut *tx) 962 .await; 963 if let Err(e) = use_insert { 964 error!("Error recording invite usage: {:?}", e); 965 return ( 966 StatusCode::INTERNAL_SERVER_ERROR, 967 Json(json!({"error": "InternalError"})), 968 ) 969 .into_response(); 970 } 971 } 972 if let Err(e) = tx.commit().await { 973 error!("Error committing transaction: {:?}", e); 974 return ( 975 StatusCode::INTERNAL_SERVER_ERROR, 976 Json(json!({"error": "InternalError"})), 977 ) 978 .into_response(); 979 } 980 if !is_migration && !is_did_web_byod { 981 if let Err(e) = 982 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await 983 { 984 warn!("Failed to sequence identity event for {}: {}", did, e); 985 } 986 if let Err(e) = 987 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 988 { 989 warn!("Failed to sequence account event for {}: {}", did, e); 990 } 991 if let Err(e) = 992 crate::api::repo::record::sequence_empty_commit_event(&state, &did).await 993 { 994 warn!("Failed to sequence commit event for {}: {}", did, e); 995 } 996 if let Err(e) = crate::api::repo::record::sequence_sync_event( 997 &state, 998 &did, 999 &commit_cid_str, 1000 Some(rev.as_ref()), 1001 ) 1002 .await 1003 { 1004 warn!("Failed to sequence sync event for {}: {}", did, e); 1005 } 1006 let profile_record = json!({ 1007 "$type": "app.bsky.actor.profile", 1008 "displayName": input.handle 1009 }); 1010 if let Err(e) = crate::api::repo::record::create_record_internal( 1011 &state, 1012 &did, 1013 "app.bsky.actor.profile", 1014 "self", 1015 &profile_record, 1016 ) 1017 .await 1018 { 1019 warn!("Failed to create default profile for {}: {}", did, e); 1020 } 1021 } 1022 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 1023 if !is_migration { 1024 if let Some(ref recipient) = verification_recipient { 1025 let verification_token = crate::auth::verification_token::generate_signup_token( 1026 &did, 1027 verification_channel, 1028 recipient, 1029 ); 1030 let formatted_token = 1031 crate::auth::verification_token::format_token_for_display(&verification_token); 1032 if let Err(e) = crate::comms::enqueue_signup_verification( 1033 &state.db, 1034 user_id, 1035 verification_channel, 1036 recipient, 1037 &formatted_token, 1038 None, 1039 ) 1040 .await 1041 { 1042 warn!( 1043 "Failed to enqueue signup verification notification: {:?}", 1044 e 1045 ); 1046 } 1047 } 1048 } else if let Some(ref user_email) = email { 1049 let token = crate::auth::verification_token::generate_migration_token(&did, user_email); 1050 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 1051 if let Err(e) = crate::comms::enqueue_migration_verification( 1052 &state.db, 1053 user_id, 1054 user_email, 1055 &formatted_token, 1056 &hostname, 1057 ) 1058 .await 1059 { 1060 warn!("Failed to enqueue migration verification email: {:?}", e); 1061 } 1062 } 1063 1064 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) 1065 { 1066 Ok(m) => m, 1067 Err(e) => { 1068 error!("createAccount: Error creating access token: {:?}", e); 1069 return ( 1070 StatusCode::INTERNAL_SERVER_ERROR, 1071 Json(json!({"error": "InternalError"})), 1072 ) 1073 .into_response(); 1074 } 1075 }; 1076 let refresh_meta = 1077 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 1078 Ok(m) => m, 1079 Err(e) => { 1080 error!("createAccount: Error creating refresh token: {:?}", e); 1081 return ( 1082 StatusCode::INTERNAL_SERVER_ERROR, 1083 Json(json!({"error": "InternalError"})), 1084 ) 1085 .into_response(); 1086 } 1087 }; 1088 if let Err(e) = sqlx::query!( 1089 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 1090 did, 1091 access_meta.jti, 1092 refresh_meta.jti, 1093 access_meta.expires_at, 1094 refresh_meta.expires_at 1095 ) 1096 .execute(&state.db) 1097 .await 1098 { 1099 error!("createAccount: Error creating session: {:?}", e); 1100 return ( 1101 StatusCode::INTERNAL_SERVER_ERROR, 1102 Json(json!({"error": "InternalError"})), 1103 ) 1104 .into_response(); 1105 } 1106 1107 let did_doc = state.did_resolver.resolve_did_document(&did).await; 1108 1109 if is_migration { 1110 info!( 1111 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}", 1112 did, handle 1113 ); 1114 } 1115 1116 ( 1117 StatusCode::OK, 1118 Json(CreateAccountOutput { 1119 handle: handle.clone(), 1120 did, 1121 did_doc, 1122 access_jwt: access_meta.token, 1123 refresh_jwt: refresh_meta.token, 1124 verification_required: !is_migration, 1125 verification_channel: verification_channel.to_string(), 1126 }), 1127 ) 1128 .into_response() 1129} 1130