this repo has no description
1use super::did::verify_did_web; 2use crate::api::repo::record::utils::create_signed_commit; 3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 5use crate::state::{AppState, RateLimitKind}; 6use crate::validation::validate_password; 7use axum::{ 8 Json, 9 extract::State, 10 http::{HeaderMap, StatusCode}, 11 response::{IntoResponse, Response}, 12}; 13use bcrypt::{DEFAULT_COST, hash}; 14use jacquard::types::{integer::LimitedU32, string::Tid}; 15use jacquard_repo::{mst::Mst, storage::BlockStore}; 16use k256::{SecretKey, ecdsa::SigningKey}; 17use rand::rngs::OsRng; 18use serde::{Deserialize, Serialize}; 19use serde_json::json; 20use std::sync::Arc; 21use tracing::{debug, error, info, warn}; 22 23fn extract_client_ip(headers: &HeaderMap) -> String { 24 if let Some(forwarded) = headers.get("x-forwarded-for") 25 && let Ok(value) = forwarded.to_str() 26 && let Some(first_ip) = value.split(',').next() 27 { 28 return first_ip.trim().to_string(); 29 } 30 if let Some(real_ip) = headers.get("x-real-ip") 31 && let Ok(value) = real_ip.to_str() 32 { 33 return value.trim().to_string(); 34 } 35 "unknown".to_string() 36} 37 38#[derive(Deserialize)] 39#[serde(rename_all = "camelCase")] 40pub struct CreateAccountInput { 41 pub handle: String, 42 pub email: Option<String>, 43 pub password: String, 44 pub invite_code: Option<String>, 45 pub did: Option<String>, 46 pub did_type: Option<String>, 47 pub signing_key: Option<String>, 48 pub verification_channel: Option<String>, 49 pub discord_id: Option<String>, 50 pub telegram_username: Option<String>, 51 pub signal_number: Option<String>, 52} 53 54#[derive(Serialize)] 55#[serde(rename_all = "camelCase")] 56pub struct CreateAccountOutput { 57 pub handle: String, 58 pub did: String, 59 #[serde(skip_serializing_if = "Option::is_none")] 60 pub access_jwt: Option<String>, 61 #[serde(skip_serializing_if = "Option::is_none")] 62 pub refresh_jwt: Option<String>, 63 pub verification_required: bool, 64 pub verification_channel: String, 65} 66 67pub async fn create_account( 68 State(state): State<AppState>, 69 headers: HeaderMap, 70 Json(input): Json<CreateAccountInput>, 71) -> Response { 72 info!("create_account called"); 73 let client_ip = extract_client_ip(&headers); 74 if !state 75 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 76 .await 77 { 78 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 79 return ( 80 StatusCode::TOO_MANY_REQUESTS, 81 Json(json!({ 82 "error": "RateLimitExceeded", 83 "message": "Too many account creation attempts. Please try again later." 84 })), 85 ) 86 .into_response(); 87 } 88 89 let migration_auth = if let Some(token) = 90 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 91 { 92 if is_service_token(&token) { 93 let verifier = ServiceTokenVerifier::new(); 94 match verifier 95 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 96 .await 97 { 98 Ok(claims) => { 99 debug!("Service token verified for migration: iss={}", claims.iss); 100 Some(claims.iss) 101 } 102 Err(e) => { 103 error!("Service token verification failed: {:?}", e); 104 return ( 105 StatusCode::UNAUTHORIZED, 106 Json(json!({ 107 "error": "AuthenticationFailed", 108 "message": format!("Service token verification failed: {}", e) 109 })), 110 ) 111 .into_response(); 112 } 113 } 114 } else { 115 None 116 } 117 } else { 118 None 119 }; 120 121 let is_did_web_byod = migration_auth.is_some() 122 && input 123 .did 124 .as_ref() 125 .map(|d| d.starts_with("did:web:")) 126 .unwrap_or(false); 127 128 let is_migration = migration_auth.is_some() 129 && input 130 .did 131 .as_ref() 132 .map(|d| d.starts_with("did:plc:")) 133 .unwrap_or(false); 134 135 if (is_migration || is_did_web_byod) 136 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref()) 137 { 138 if provided_did != auth_did { 139 return ( 140 StatusCode::FORBIDDEN, 141 Json(json!({ 142 "error": "AuthorizationError", 143 "message": format!("Service token issuer {} does not match DID {}", auth_did, provided_did) 144 })), 145 ) 146 .into_response(); 147 } 148 if is_did_web_byod { 149 info!(did = %provided_did, "Processing did:web BYOD account creation"); 150 } else { 151 info!(did = %provided_did, "Processing account migration"); 152 } 153 } 154 155 let hostname_for_validation = 156 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 157 let pds_suffix = format!(".{}", hostname_for_validation); 158 159 let validated_short_handle = if !input.handle.contains('.') 160 || input.handle.ends_with(&pds_suffix) 161 { 162 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 163 input 164 .handle 165 .strip_suffix(&pds_suffix) 166 .unwrap_or(&input.handle) 167 } else { 168 &input.handle 169 }; 170 match crate::api::validation::validate_short_handle(handle_to_validate) { 171 Ok(h) => h, 172 Err(e) => { 173 return ( 174 StatusCode::BAD_REQUEST, 175 Json(json!({"error": "InvalidHandle", "message": e.to_string()})), 176 ) 177 .into_response(); 178 } 179 } 180 } else { 181 if input.handle.contains(' ') || input.handle.contains('\t') { 182 return ( 183 StatusCode::BAD_REQUEST, 184 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})), 185 ) 186 .into_response(); 187 } 188 for c in input.handle.chars() { 189 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' { 190 return ( 191 StatusCode::BAD_REQUEST, 192 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})), 193 ) 194 .into_response(); 195 } 196 } 197 let handle_lower = input.handle.to_lowercase(); 198 if crate::moderation::has_explicit_slur(&handle_lower) { 199 return ( 200 StatusCode::BAD_REQUEST, 201 Json(json!({"error": "InvalidHandle", "message": "Inappropriate language in handle"})), 202 ) 203 .into_response(); 204 } 205 handle_lower 206 }; 207 let email: Option<String> = input 208 .email 209 .as_ref() 210 .map(|e| e.trim().to_string()) 211 .filter(|e| !e.is_empty()); 212 if let Some(ref email) = email 213 && !crate::api::validation::is_valid_email(email) 214 { 215 return ( 216 StatusCode::BAD_REQUEST, 217 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 218 ) 219 .into_response(); 220 } 221 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 222 let valid_channels = ["email", "discord", "telegram", "signal"]; 223 if !valid_channels.contains(&verification_channel) && !is_migration { 224 return ( 225 StatusCode::BAD_REQUEST, 226 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 227 ) 228 .into_response(); 229 } 230 let verification_recipient = if is_migration { 231 None 232 } else { 233 Some(match verification_channel { 234 "email" => match &input.email { 235 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 236 _ => return ( 237 StatusCode::BAD_REQUEST, 238 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 239 ).into_response(), 240 }, 241 "discord" => match &input.discord_id { 242 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 243 _ => return ( 244 StatusCode::BAD_REQUEST, 245 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 246 ).into_response(), 247 }, 248 "telegram" => match &input.telegram_username { 249 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 250 _ => return ( 251 StatusCode::BAD_REQUEST, 252 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 253 ).into_response(), 254 }, 255 "signal" => match &input.signal_number { 256 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 257 _ => return ( 258 StatusCode::BAD_REQUEST, 259 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 260 ).into_response(), 261 }, 262 _ => return ( 263 StatusCode::BAD_REQUEST, 264 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 265 ).into_response(), 266 }) 267 }; 268 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 269 let pds_endpoint = format!("https://{}", hostname); 270 let suffix = format!(".{}", hostname); 271 let handle = if input.handle.ends_with(&suffix) { 272 format!("{}.{}", validated_short_handle, hostname) 273 } else if input.handle.contains('.') { 274 validated_short_handle.clone() 275 } else { 276 format!("{}.{}", validated_short_handle, hostname) 277 }; 278 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 279 if let Some(signing_key_did) = &input.signing_key { 280 let reserved = sqlx::query!( 281 r#" 282 SELECT id, private_key_bytes 283 FROM reserved_signing_keys 284 WHERE public_key_did_key = $1 285 AND used_at IS NULL 286 AND expires_at > NOW() 287 FOR UPDATE 288 "#, 289 signing_key_did 290 ) 291 .fetch_optional(&state.db) 292 .await; 293 match reserved { 294 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 295 Ok(None) => { 296 return ( 297 StatusCode::BAD_REQUEST, 298 Json(json!({ 299 "error": "InvalidSigningKey", 300 "message": "Signing key not found, already used, or expired" 301 })), 302 ) 303 .into_response(); 304 } 305 Err(e) => { 306 error!("Error looking up reserved signing key: {:?}", e); 307 return ( 308 StatusCode::INTERNAL_SERVER_ERROR, 309 Json(json!({"error": "InternalError"})), 310 ) 311 .into_response(); 312 } 313 } 314 } else { 315 let secret_key = SecretKey::random(&mut OsRng); 316 (secret_key.to_bytes().to_vec(), None) 317 }; 318 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 319 Ok(k) => k, 320 Err(e) => { 321 error!("Error creating signing key: {:?}", e); 322 return ( 323 StatusCode::INTERNAL_SERVER_ERROR, 324 Json(json!({"error": "InternalError"})), 325 ) 326 .into_response(); 327 } 328 }; 329 let did_type = input.did_type.as_deref().unwrap_or("plc"); 330 let did = match did_type { 331 "web" => { 332 let subdomain_host = format!("{}.{}", input.handle, hostname); 333 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 334 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 335 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 336 self_hosted_did 337 } 338 "web-external" => { 339 let d = match &input.did { 340 Some(d) if !d.trim().is_empty() => d, 341 _ => { 342 return ( 343 StatusCode::BAD_REQUEST, 344 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})), 345 ) 346 .into_response(); 347 } 348 }; 349 if !d.starts_with("did:web:") { 350 return ( 351 StatusCode::BAD_REQUEST, 352 Json( 353 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}), 354 ), 355 ) 356 .into_response(); 357 } 358 if !is_did_web_byod 359 && let Err(e) = 360 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await 361 { 362 return ( 363 StatusCode::BAD_REQUEST, 364 Json(json!({"error": "InvalidDid", "message": e})), 365 ) 366 .into_response(); 367 } 368 info!(did = %d, "Creating external did:web account"); 369 d.clone() 370 } 371 _ => { 372 if let Some(d) = &input.did { 373 if d.starts_with("did:plc:") && is_migration { 374 info!(did = %d, "Migration with existing did:plc"); 375 d.clone() 376 } else if d.starts_with("did:web:") { 377 if !is_did_web_byod 378 && let Err(e) = verify_did_web( 379 d, 380 &hostname, 381 &input.handle, 382 input.signing_key.as_deref(), 383 ) 384 .await 385 { 386 return ( 387 StatusCode::BAD_REQUEST, 388 Json(json!({"error": "InvalidDid", "message": e})), 389 ) 390 .into_response(); 391 } 392 d.clone() 393 } else if !d.trim().is_empty() { 394 return ( 395 StatusCode::BAD_REQUEST, 396 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 397 ) 398 .into_response(); 399 } else { 400 let rotation_key = std::env::var("PLC_ROTATION_KEY") 401 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 402 let genesis_result = match create_genesis_operation( 403 &signing_key, 404 &rotation_key, 405 &handle, 406 &pds_endpoint, 407 ) { 408 Ok(r) => r, 409 Err(e) => { 410 error!("Error creating PLC genesis operation: {:?}", e); 411 return ( 412 StatusCode::INTERNAL_SERVER_ERROR, 413 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 414 ) 415 .into_response(); 416 } 417 }; 418 let plc_client = PlcClient::new(None); 419 if let Err(e) = plc_client 420 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 421 .await 422 { 423 error!("Failed to submit PLC genesis operation: {:?}", e); 424 return ( 425 StatusCode::BAD_GATEWAY, 426 Json(json!({ 427 "error": "UpstreamError", 428 "message": format!("Failed to register DID with PLC directory: {}", e) 429 })), 430 ) 431 .into_response(); 432 } 433 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 434 genesis_result.did 435 } 436 } else { 437 let rotation_key = std::env::var("PLC_ROTATION_KEY") 438 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 439 let genesis_result = match create_genesis_operation( 440 &signing_key, 441 &rotation_key, 442 &handle, 443 &pds_endpoint, 444 ) { 445 Ok(r) => r, 446 Err(e) => { 447 error!("Error creating PLC genesis operation: {:?}", e); 448 return ( 449 StatusCode::INTERNAL_SERVER_ERROR, 450 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 451 ) 452 .into_response(); 453 } 454 }; 455 let plc_client = PlcClient::new(None); 456 if let Err(e) = plc_client 457 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 458 .await 459 { 460 error!("Failed to submit PLC genesis operation: {:?}", e); 461 return ( 462 StatusCode::BAD_GATEWAY, 463 Json(json!({ 464 "error": "UpstreamError", 465 "message": format!("Failed to register DID with PLC directory: {}", e) 466 })), 467 ) 468 .into_response(); 469 } 470 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 471 genesis_result.did 472 } 473 } 474 }; 475 let mut tx = match state.db.begin().await { 476 Ok(tx) => tx, 477 Err(e) => { 478 error!("Error starting transaction: {:?}", e); 479 return ( 480 StatusCode::INTERNAL_SERVER_ERROR, 481 Json(json!({"error": "InternalError"})), 482 ) 483 .into_response(); 484 } 485 }; 486 if is_migration { 487 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 488 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 489 .bind(&did) 490 .fetch_optional(&mut *tx) 491 .await 492 .unwrap_or(None); 493 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 494 if deactivated_at.is_some() { 495 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 496 let update_result: Result<_, sqlx::Error> = 497 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 498 .bind(&handle) 499 .bind(account_id) 500 .execute(&mut *tx) 501 .await; 502 if let Err(e) = update_result { 503 if let Some(db_err) = e.as_database_error() 504 && db_err 505 .constraint() 506 .map(|c| c.contains("handle")) 507 .unwrap_or(false) 508 { 509 return ( 510 StatusCode::BAD_REQUEST, 511 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 512 ) 513 .into_response(); 514 } 515 error!("Error reactivating account: {:?}", e); 516 return ( 517 StatusCode::INTERNAL_SERVER_ERROR, 518 Json(json!({"error": "InternalError"})), 519 ) 520 .into_response(); 521 } 522 if let Err(e) = tx.commit().await { 523 error!("Error committing reactivation: {:?}", e); 524 return ( 525 StatusCode::INTERNAL_SERVER_ERROR, 526 Json(json!({"error": "InternalError"})), 527 ) 528 .into_response(); 529 } 530 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 531 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 532 ) 533 .bind(account_id) 534 .fetch_optional(&state.db) 535 .await 536 .unwrap_or(None); 537 let secret_key_bytes = match key_row { 538 Some((key_bytes, encryption_version)) => { 539 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 540 Ok(k) => k, 541 Err(e) => { 542 error!("Error decrypting key for reactivated account: {:?}", e); 543 return ( 544 StatusCode::INTERNAL_SERVER_ERROR, 545 Json(json!({"error": "InternalError"})), 546 ) 547 .into_response(); 548 } 549 } 550 } 551 None => { 552 error!("No signing key found for reactivated account"); 553 return ( 554 StatusCode::INTERNAL_SERVER_ERROR, 555 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 556 ) 557 .into_response(); 558 } 559 }; 560 let access_meta = 561 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 562 Ok(m) => m, 563 Err(e) => { 564 error!("Error creating access token: {:?}", e); 565 return ( 566 StatusCode::INTERNAL_SERVER_ERROR, 567 Json(json!({"error": "InternalError"})), 568 ) 569 .into_response(); 570 } 571 }; 572 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 573 &did, 574 &secret_key_bytes, 575 ) { 576 Ok(m) => m, 577 Err(e) => { 578 error!("Error creating refresh token: {:?}", e); 579 return ( 580 StatusCode::INTERNAL_SERVER_ERROR, 581 Json(json!({"error": "InternalError"})), 582 ) 583 .into_response(); 584 } 585 }; 586 let session_result: Result<_, sqlx::Error> = sqlx::query( 587 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 588 ) 589 .bind(&did) 590 .bind(&access_meta.jti) 591 .bind(&refresh_meta.jti) 592 .bind(access_meta.expires_at) 593 .bind(refresh_meta.expires_at) 594 .execute(&state.db) 595 .await; 596 if let Err(e) = session_result { 597 error!("Error creating session: {:?}", e); 598 return ( 599 StatusCode::INTERNAL_SERVER_ERROR, 600 Json(json!({"error": "InternalError"})), 601 ) 602 .into_response(); 603 } 604 return ( 605 StatusCode::OK, 606 Json(CreateAccountOutput { 607 handle: handle.clone(), 608 did, 609 access_jwt: Some(access_meta.token), 610 refresh_jwt: Some(refresh_meta.token), 611 verification_required: false, 612 verification_channel: "email".to_string(), 613 }), 614 ) 615 .into_response(); 616 } else { 617 return ( 618 StatusCode::BAD_REQUEST, 619 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 620 ) 621 .into_response(); 622 } 623 } 624 } 625 let exists_result: Option<(i32,)> = 626 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 627 .bind(&handle) 628 .fetch_optional(&mut *tx) 629 .await 630 .unwrap_or(None); 631 if exists_result.is_some() { 632 return ( 633 StatusCode::BAD_REQUEST, 634 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 635 ) 636 .into_response(); 637 } 638 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 639 .map(|v| v == "true" || v == "1") 640 .unwrap_or(false); 641 if invite_code_required 642 && input 643 .invite_code 644 .as_ref() 645 .map(|c| c.trim().is_empty()) 646 .unwrap_or(true) 647 { 648 return ( 649 StatusCode::BAD_REQUEST, 650 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 651 ) 652 .into_response(); 653 } 654 if let Some(code) = &input.invite_code 655 && !code.trim().is_empty() 656 { 657 let invite_query = sqlx::query!( 658 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 659 code 660 ) 661 .fetch_optional(&mut *tx) 662 .await; 663 match invite_query { 664 Ok(Some(row)) => { 665 if row.available_uses <= 0 { 666 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 667 } 668 let update_invite = sqlx::query!( 669 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 670 code 671 ) 672 .execute(&mut *tx) 673 .await; 674 if let Err(e) = update_invite { 675 error!("Error updating invite code: {:?}", e); 676 return ( 677 StatusCode::INTERNAL_SERVER_ERROR, 678 Json(json!({"error": "InternalError"})), 679 ) 680 .into_response(); 681 } 682 } 683 Ok(None) => { 684 return ( 685 StatusCode::BAD_REQUEST, 686 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 687 ) 688 .into_response(); 689 } 690 Err(e) => { 691 error!("Error checking invite code: {:?}", e); 692 return ( 693 StatusCode::INTERNAL_SERVER_ERROR, 694 Json(json!({"error": "InternalError"})), 695 ) 696 .into_response(); 697 } 698 } 699 } 700 if let Err(e) = validate_password(&input.password) { 701 return ( 702 StatusCode::BAD_REQUEST, 703 Json(json!({ 704 "error": "InvalidPassword", 705 "message": e.to_string() 706 })), 707 ) 708 .into_response(); 709 } 710 711 let password_hash = match hash(&input.password, DEFAULT_COST) { 712 Ok(h) => h, 713 Err(e) => { 714 error!("Error hashing password: {:?}", e); 715 return ( 716 StatusCode::INTERNAL_SERVER_ERROR, 717 Json(json!({"error": "InternalError"})), 718 ) 719 .into_response(); 720 } 721 }; 722 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 723 .fetch_one(&mut *tx) 724 .await 725 .map(|c| c.unwrap_or(0) == 0) 726 .unwrap_or(false); 727 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod { 728 Some(chrono::Utc::now()) 729 } else { 730 None 731 }; 732 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 733 r#"INSERT INTO users ( 734 handle, email, did, password_hash, 735 preferred_comms_channel, 736 discord_id, telegram_username, signal_number, 737 is_admin, deactivated_at, email_verified 738 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 739 ) 740 .bind(&handle) 741 .bind(&email) 742 .bind(&did) 743 .bind(&password_hash) 744 .bind(verification_channel) 745 .bind( 746 input 747 .discord_id 748 .as_deref() 749 .map(|s| s.trim()) 750 .filter(|s| !s.is_empty()), 751 ) 752 .bind( 753 input 754 .telegram_username 755 .as_deref() 756 .map(|s| s.trim()) 757 .filter(|s| !s.is_empty()), 758 ) 759 .bind( 760 input 761 .signal_number 762 .as_deref() 763 .map(|s| s.trim()) 764 .filter(|s| !s.is_empty()), 765 ) 766 .bind(is_first_user) 767 .bind(deactivated_at) 768 .bind(false) 769 .fetch_one(&mut *tx) 770 .await; 771 let user_id = match user_insert { 772 Ok((id,)) => id, 773 Err(e) => { 774 if let Some(db_err) = e.as_database_error() 775 && db_err.code().as_deref() == Some("23505") 776 { 777 let constraint = db_err.constraint().unwrap_or(""); 778 if constraint.contains("handle") || constraint.contains("users_handle") { 779 return ( 780 StatusCode::BAD_REQUEST, 781 Json(json!({ 782 "error": "HandleNotAvailable", 783 "message": "Handle already taken" 784 })), 785 ) 786 .into_response(); 787 } else if constraint.contains("email") || constraint.contains("users_email") { 788 return ( 789 StatusCode::BAD_REQUEST, 790 Json(json!({ 791 "error": "InvalidEmail", 792 "message": "Email already registered" 793 })), 794 ) 795 .into_response(); 796 } else if constraint.contains("did") || constraint.contains("users_did") { 797 return ( 798 StatusCode::BAD_REQUEST, 799 Json(json!({ 800 "error": "AccountAlreadyExists", 801 "message": "An account with this DID already exists" 802 })), 803 ) 804 .into_response(); 805 } 806 } 807 error!("Error inserting user: {:?}", e); 808 return ( 809 StatusCode::INTERNAL_SERVER_ERROR, 810 Json(json!({"error": "InternalError"})), 811 ) 812 .into_response(); 813 } 814 }; 815 816 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 817 Ok(enc) => enc, 818 Err(e) => { 819 error!("Error encrypting user key: {:?}", e); 820 return ( 821 StatusCode::INTERNAL_SERVER_ERROR, 822 Json(json!({"error": "InternalError"})), 823 ) 824 .into_response(); 825 } 826 }; 827 let key_insert = sqlx::query!( 828 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 829 user_id, 830 &encrypted_key_bytes[..], 831 crate::config::ENCRYPTION_VERSION 832 ) 833 .execute(&mut *tx) 834 .await; 835 if let Err(e) = key_insert { 836 error!("Error inserting user key: {:?}", e); 837 return ( 838 StatusCode::INTERNAL_SERVER_ERROR, 839 Json(json!({"error": "InternalError"})), 840 ) 841 .into_response(); 842 } 843 if let Some(key_id) = reserved_key_id { 844 let mark_used = sqlx::query!( 845 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 846 key_id 847 ) 848 .execute(&mut *tx) 849 .await; 850 if let Err(e) = mark_used { 851 error!("Error marking reserved key as used: {:?}", e); 852 return ( 853 StatusCode::INTERNAL_SERVER_ERROR, 854 Json(json!({"error": "InternalError"})), 855 ) 856 .into_response(); 857 } 858 } 859 let mst = Mst::new(Arc::new(state.block_store.clone())); 860 let mst_root = match mst.persist().await { 861 Ok(c) => c, 862 Err(e) => { 863 error!("Error persisting MST: {:?}", e); 864 return ( 865 StatusCode::INTERNAL_SERVER_ERROR, 866 Json(json!({"error": "InternalError"})), 867 ) 868 .into_response(); 869 } 870 }; 871 let rev = Tid::now(LimitedU32::MIN); 872 let (commit_bytes, _sig) = 873 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) { 874 Ok(result) => result, 875 Err(e) => { 876 error!("Error creating genesis commit: {:?}", e); 877 return ( 878 StatusCode::INTERNAL_SERVER_ERROR, 879 Json(json!({"error": "InternalError"})), 880 ) 881 .into_response(); 882 } 883 }; 884 let commit_cid = match state.block_store.put(&commit_bytes).await { 885 Ok(c) => c, 886 Err(e) => { 887 error!("Error saving genesis commit: {:?}", e); 888 return ( 889 StatusCode::INTERNAL_SERVER_ERROR, 890 Json(json!({"error": "InternalError"})), 891 ) 892 .into_response(); 893 } 894 }; 895 let commit_cid_str = commit_cid.to_string(); 896 let repo_insert = sqlx::query!( 897 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 898 user_id, 899 commit_cid_str 900 ) 901 .execute(&mut *tx) 902 .await; 903 if let Err(e) = repo_insert { 904 error!("Error initializing repo: {:?}", e); 905 return ( 906 StatusCode::INTERNAL_SERVER_ERROR, 907 Json(json!({"error": "InternalError"})), 908 ) 909 .into_response(); 910 } 911 if let Some(code) = &input.invite_code 912 && !code.trim().is_empty() 913 { 914 let use_insert = sqlx::query!( 915 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 916 code, 917 user_id 918 ) 919 .execute(&mut *tx) 920 .await; 921 if let Err(e) = use_insert { 922 error!("Error recording invite usage: {:?}", e); 923 return ( 924 StatusCode::INTERNAL_SERVER_ERROR, 925 Json(json!({"error": "InternalError"})), 926 ) 927 .into_response(); 928 } 929 } 930 if let Err(e) = tx.commit().await { 931 error!("Error committing transaction: {:?}", e); 932 return ( 933 StatusCode::INTERNAL_SERVER_ERROR, 934 Json(json!({"error": "InternalError"})), 935 ) 936 .into_response(); 937 } 938 if !is_migration && !is_did_web_byod { 939 if let Err(e) = 940 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await 941 { 942 warn!("Failed to sequence identity event for {}: {}", did, e); 943 } 944 if let Err(e) = 945 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 946 { 947 warn!("Failed to sequence account event for {}: {}", did, e); 948 } 949 let profile_record = json!({ 950 "$type": "app.bsky.actor.profile", 951 "displayName": input.handle 952 }); 953 if let Err(e) = crate::api::repo::record::create_record_internal( 954 &state, 955 &did, 956 "app.bsky.actor.profile", 957 "self", 958 &profile_record, 959 ) 960 .await 961 { 962 warn!("Failed to create default profile for {}: {}", did, e); 963 } 964 } 965 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 966 if !is_migration { 967 if let Some(ref recipient) = verification_recipient { 968 let verification_token = crate::auth::verification_token::generate_signup_token( 969 &did, 970 verification_channel, 971 recipient, 972 ); 973 let formatted_token = 974 crate::auth::verification_token::format_token_for_display(&verification_token); 975 if let Err(e) = crate::comms::enqueue_signup_verification( 976 &state.db, 977 user_id, 978 verification_channel, 979 recipient, 980 &formatted_token, 981 None, 982 ) 983 .await 984 { 985 warn!( 986 "Failed to enqueue signup verification notification: {:?}", 987 e 988 ); 989 } 990 } 991 } else if let Some(ref user_email) = email { 992 let token = crate::auth::verification_token::generate_migration_token(&did, user_email); 993 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 994 if let Err(e) = crate::comms::enqueue_migration_verification( 995 &state.db, 996 user_id, 997 user_email, 998 &formatted_token, 999 &hostname, 1000 ) 1001 .await 1002 { 1003 warn!("Failed to enqueue migration verification email: {:?}", e); 1004 } 1005 } 1006 1007 let (access_jwt, refresh_jwt) = if is_migration { 1008 let access_meta = 1009 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 1010 Ok(m) => m, 1011 Err(e) => { 1012 error!("Error creating access token for migration: {:?}", e); 1013 return ( 1014 StatusCode::INTERNAL_SERVER_ERROR, 1015 Json(json!({"error": "InternalError"})), 1016 ) 1017 .into_response(); 1018 } 1019 }; 1020 let refresh_meta = 1021 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 1022 Ok(m) => m, 1023 Err(e) => { 1024 error!("Error creating refresh token for migration: {:?}", e); 1025 return ( 1026 StatusCode::INTERNAL_SERVER_ERROR, 1027 Json(json!({"error": "InternalError"})), 1028 ) 1029 .into_response(); 1030 } 1031 }; 1032 if let Err(e) = sqlx::query!( 1033 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 1034 did, 1035 access_meta.jti, 1036 refresh_meta.jti, 1037 access_meta.expires_at, 1038 refresh_meta.expires_at 1039 ) 1040 .execute(&state.db) 1041 .await 1042 { 1043 error!("Error creating session for migration: {:?}", e); 1044 return ( 1045 StatusCode::INTERNAL_SERVER_ERROR, 1046 Json(json!({"error": "InternalError"})), 1047 ) 1048 .into_response(); 1049 } 1050 (Some(access_meta.token), Some(refresh_meta.token)) 1051 } else { 1052 (None, None) 1053 }; 1054 1055 ( 1056 StatusCode::OK, 1057 Json(CreateAccountOutput { 1058 handle: handle.clone(), 1059 did, 1060 access_jwt, 1061 refresh_jwt, 1062 verification_required: !is_migration, 1063 verification_channel: verification_channel.to_string(), 1064 }), 1065 ) 1066 .into_response() 1067}