this repo has no description
1use super::did::verify_did_web; 2use crate::api::repo::record::utils::create_signed_commit; 3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 5use crate::state::{AppState, RateLimitKind}; 6use crate::validation::validate_password; 7use axum::{ 8 Json, 9 extract::State, 10 http::{HeaderMap, StatusCode}, 11 response::{IntoResponse, Response}, 12}; 13use bcrypt::{DEFAULT_COST, hash}; 14use jacquard::types::{integer::LimitedU32, string::Tid}; 15use jacquard_repo::{mst::Mst, storage::BlockStore}; 16use k256::{SecretKey, ecdsa::SigningKey}; 17use rand::rngs::OsRng; 18use serde::{Deserialize, Serialize}; 19use serde_json::json; 20use std::sync::Arc; 21use tracing::{debug, error, info, warn}; 22 23fn extract_client_ip(headers: &HeaderMap) -> String { 24 if let Some(forwarded) = headers.get("x-forwarded-for") 25 && let Ok(value) = forwarded.to_str() 26 && let Some(first_ip) = value.split(',').next() 27 { 28 return first_ip.trim().to_string(); 29 } 30 if let Some(real_ip) = headers.get("x-real-ip") 31 && let Ok(value) = real_ip.to_str() 32 { 33 return value.trim().to_string(); 34 } 35 "unknown".to_string() 36} 37 38#[derive(Deserialize)] 39#[serde(rename_all = "camelCase")] 40pub struct CreateAccountInput { 41 pub handle: String, 42 pub email: Option<String>, 43 pub password: String, 44 pub invite_code: Option<String>, 45 pub did: Option<String>, 46 pub did_type: Option<String>, 47 pub signing_key: Option<String>, 48 pub verification_channel: Option<String>, 49 pub discord_id: Option<String>, 50 pub telegram_username: Option<String>, 51 pub signal_number: Option<String>, 52} 53 54#[derive(Serialize)] 55#[serde(rename_all = "camelCase")] 56pub struct CreateAccountOutput { 57 pub handle: String, 58 pub did: String, 59 #[serde(skip_serializing_if = "Option::is_none")] 60 pub access_jwt: Option<String>, 61 #[serde(skip_serializing_if = "Option::is_none")] 62 pub refresh_jwt: Option<String>, 63 pub verification_required: bool, 64 pub verification_channel: String, 65} 66 67pub async fn create_account( 68 State(state): State<AppState>, 69 headers: HeaderMap, 70 Json(input): Json<CreateAccountInput>, 71) -> Response { 72 info!("create_account called"); 73 let client_ip = extract_client_ip(&headers); 74 if !state 75 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 76 .await 77 { 78 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 79 return ( 80 StatusCode::TOO_MANY_REQUESTS, 81 Json(json!({ 82 "error": "RateLimitExceeded", 83 "message": "Too many account creation attempts. Please try again later." 84 })), 85 ) 86 .into_response(); 87 } 88 89 let migration_auth = if let Some(token) = 90 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 91 { 92 if is_service_token(&token) { 93 let verifier = ServiceTokenVerifier::new(); 94 match verifier 95 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 96 .await 97 { 98 Ok(claims) => { 99 debug!("Service token verified for migration: iss={}", claims.iss); 100 Some(claims.iss) 101 } 102 Err(e) => { 103 error!("Service token verification failed: {:?}", e); 104 return ( 105 StatusCode::UNAUTHORIZED, 106 Json(json!({ 107 "error": "AuthenticationFailed", 108 "message": format!("Service token verification failed: {}", e) 109 })), 110 ) 111 .into_response(); 112 } 113 } 114 } else { 115 None 116 } 117 } else { 118 None 119 }; 120 121 let is_did_web_byod = migration_auth.is_some() 122 && input 123 .did 124 .as_ref() 125 .map(|d| d.starts_with("did:web:")) 126 .unwrap_or(false); 127 128 let is_migration = migration_auth.is_some() 129 && input 130 .did 131 .as_ref() 132 .map(|d| d.starts_with("did:plc:")) 133 .unwrap_or(false); 134 135 if (is_migration || is_did_web_byod) 136 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref()) 137 { 138 if provided_did != auth_did { 139 return ( 140 StatusCode::FORBIDDEN, 141 Json(json!({ 142 "error": "AuthorizationError", 143 "message": format!("Service token issuer {} does not match DID {}", auth_did, provided_did) 144 })), 145 ) 146 .into_response(); 147 } 148 if is_did_web_byod { 149 info!(did = %provided_did, "Processing did:web BYOD account creation"); 150 } else { 151 info!(did = %provided_did, "Processing account migration"); 152 } 153 } 154 155 let hostname_for_validation = 156 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 157 let pds_suffix = format!(".{}", hostname_for_validation); 158 159 let validated_short_handle = if !input.handle.contains('.') 160 || input.handle.ends_with(&pds_suffix) 161 { 162 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 163 input 164 .handle 165 .strip_suffix(&pds_suffix) 166 .unwrap_or(&input.handle) 167 } else { 168 &input.handle 169 }; 170 match crate::api::validation::validate_short_handle(handle_to_validate) { 171 Ok(h) => h, 172 Err(e) => { 173 return ( 174 StatusCode::BAD_REQUEST, 175 Json(json!({"error": "InvalidHandle", "message": e.to_string()})), 176 ) 177 .into_response(); 178 } 179 } 180 } else { 181 if input.handle.contains(' ') || input.handle.contains('\t') { 182 return ( 183 StatusCode::BAD_REQUEST, 184 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})), 185 ) 186 .into_response(); 187 } 188 for c in input.handle.chars() { 189 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' { 190 return ( 191 StatusCode::BAD_REQUEST, 192 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})), 193 ) 194 .into_response(); 195 } 196 } 197 input.handle.to_lowercase() 198 }; 199 let email: Option<String> = input 200 .email 201 .as_ref() 202 .map(|e| e.trim().to_string()) 203 .filter(|e| !e.is_empty()); 204 if let Some(ref email) = email 205 && !crate::api::validation::is_valid_email(email) 206 { 207 return ( 208 StatusCode::BAD_REQUEST, 209 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 210 ) 211 .into_response(); 212 } 213 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 214 let valid_channels = ["email", "discord", "telegram", "signal"]; 215 if !valid_channels.contains(&verification_channel) && !is_migration { 216 return ( 217 StatusCode::BAD_REQUEST, 218 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 219 ) 220 .into_response(); 221 } 222 let verification_recipient = if is_migration { 223 None 224 } else { 225 Some(match verification_channel { 226 "email" => match &input.email { 227 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 228 _ => return ( 229 StatusCode::BAD_REQUEST, 230 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 231 ).into_response(), 232 }, 233 "discord" => match &input.discord_id { 234 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 235 _ => return ( 236 StatusCode::BAD_REQUEST, 237 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 238 ).into_response(), 239 }, 240 "telegram" => match &input.telegram_username { 241 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 242 _ => return ( 243 StatusCode::BAD_REQUEST, 244 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 245 ).into_response(), 246 }, 247 "signal" => match &input.signal_number { 248 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 249 _ => return ( 250 StatusCode::BAD_REQUEST, 251 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 252 ).into_response(), 253 }, 254 _ => return ( 255 StatusCode::BAD_REQUEST, 256 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 257 ).into_response(), 258 }) 259 }; 260 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 261 let pds_endpoint = format!("https://{}", hostname); 262 let suffix = format!(".{}", hostname); 263 let handle = if input.handle.ends_with(&suffix) { 264 format!("{}.{}", validated_short_handle, hostname) 265 } else if input.handle.contains('.') { 266 validated_short_handle.clone() 267 } else { 268 format!("{}.{}", validated_short_handle, hostname) 269 }; 270 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 271 if let Some(signing_key_did) = &input.signing_key { 272 let reserved = sqlx::query!( 273 r#" 274 SELECT id, private_key_bytes 275 FROM reserved_signing_keys 276 WHERE public_key_did_key = $1 277 AND used_at IS NULL 278 AND expires_at > NOW() 279 FOR UPDATE 280 "#, 281 signing_key_did 282 ) 283 .fetch_optional(&state.db) 284 .await; 285 match reserved { 286 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 287 Ok(None) => { 288 return ( 289 StatusCode::BAD_REQUEST, 290 Json(json!({ 291 "error": "InvalidSigningKey", 292 "message": "Signing key not found, already used, or expired" 293 })), 294 ) 295 .into_response(); 296 } 297 Err(e) => { 298 error!("Error looking up reserved signing key: {:?}", e); 299 return ( 300 StatusCode::INTERNAL_SERVER_ERROR, 301 Json(json!({"error": "InternalError"})), 302 ) 303 .into_response(); 304 } 305 } 306 } else { 307 let secret_key = SecretKey::random(&mut OsRng); 308 (secret_key.to_bytes().to_vec(), None) 309 }; 310 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 311 Ok(k) => k, 312 Err(e) => { 313 error!("Error creating signing key: {:?}", e); 314 return ( 315 StatusCode::INTERNAL_SERVER_ERROR, 316 Json(json!({"error": "InternalError"})), 317 ) 318 .into_response(); 319 } 320 }; 321 let did_type = input.did_type.as_deref().unwrap_or("plc"); 322 let did = match did_type { 323 "web" => { 324 let subdomain_host = format!("{}.{}", input.handle, hostname); 325 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 326 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 327 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 328 self_hosted_did 329 } 330 "web-external" => { 331 let d = match &input.did { 332 Some(d) if !d.trim().is_empty() => d, 333 _ => { 334 return ( 335 StatusCode::BAD_REQUEST, 336 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})), 337 ) 338 .into_response(); 339 } 340 }; 341 if !d.starts_with("did:web:") { 342 return ( 343 StatusCode::BAD_REQUEST, 344 Json( 345 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}), 346 ), 347 ) 348 .into_response(); 349 } 350 if !is_did_web_byod 351 && let Err(e) = 352 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await 353 { 354 return ( 355 StatusCode::BAD_REQUEST, 356 Json(json!({"error": "InvalidDid", "message": e})), 357 ) 358 .into_response(); 359 } 360 info!(did = %d, "Creating external did:web account"); 361 d.clone() 362 } 363 _ => { 364 if let Some(d) = &input.did { 365 if d.starts_with("did:plc:") && is_migration { 366 info!(did = %d, "Migration with existing did:plc"); 367 d.clone() 368 } else if d.starts_with("did:web:") { 369 if !is_did_web_byod 370 && let Err(e) = verify_did_web( 371 d, 372 &hostname, 373 &input.handle, 374 input.signing_key.as_deref(), 375 ) 376 .await 377 { 378 return ( 379 StatusCode::BAD_REQUEST, 380 Json(json!({"error": "InvalidDid", "message": e})), 381 ) 382 .into_response(); 383 } 384 d.clone() 385 } else if !d.trim().is_empty() { 386 return ( 387 StatusCode::BAD_REQUEST, 388 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 389 ) 390 .into_response(); 391 } else { 392 let rotation_key = std::env::var("PLC_ROTATION_KEY") 393 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 394 let genesis_result = match create_genesis_operation( 395 &signing_key, 396 &rotation_key, 397 &handle, 398 &pds_endpoint, 399 ) { 400 Ok(r) => r, 401 Err(e) => { 402 error!("Error creating PLC genesis operation: {:?}", e); 403 return ( 404 StatusCode::INTERNAL_SERVER_ERROR, 405 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 406 ) 407 .into_response(); 408 } 409 }; 410 let plc_client = PlcClient::new(None); 411 if let Err(e) = plc_client 412 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 413 .await 414 { 415 error!("Failed to submit PLC genesis operation: {:?}", e); 416 return ( 417 StatusCode::BAD_GATEWAY, 418 Json(json!({ 419 "error": "UpstreamError", 420 "message": format!("Failed to register DID with PLC directory: {}", e) 421 })), 422 ) 423 .into_response(); 424 } 425 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 426 genesis_result.did 427 } 428 } else { 429 let rotation_key = std::env::var("PLC_ROTATION_KEY") 430 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 431 let genesis_result = match create_genesis_operation( 432 &signing_key, 433 &rotation_key, 434 &handle, 435 &pds_endpoint, 436 ) { 437 Ok(r) => r, 438 Err(e) => { 439 error!("Error creating PLC genesis operation: {:?}", e); 440 return ( 441 StatusCode::INTERNAL_SERVER_ERROR, 442 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 443 ) 444 .into_response(); 445 } 446 }; 447 let plc_client = PlcClient::new(None); 448 if let Err(e) = plc_client 449 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 450 .await 451 { 452 error!("Failed to submit PLC genesis operation: {:?}", e); 453 return ( 454 StatusCode::BAD_GATEWAY, 455 Json(json!({ 456 "error": "UpstreamError", 457 "message": format!("Failed to register DID with PLC directory: {}", e) 458 })), 459 ) 460 .into_response(); 461 } 462 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 463 genesis_result.did 464 } 465 } 466 }; 467 let mut tx = match state.db.begin().await { 468 Ok(tx) => tx, 469 Err(e) => { 470 error!("Error starting transaction: {:?}", e); 471 return ( 472 StatusCode::INTERNAL_SERVER_ERROR, 473 Json(json!({"error": "InternalError"})), 474 ) 475 .into_response(); 476 } 477 }; 478 if is_migration { 479 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 480 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 481 .bind(&did) 482 .fetch_optional(&mut *tx) 483 .await 484 .unwrap_or(None); 485 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 486 if deactivated_at.is_some() { 487 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 488 let update_result: Result<_, sqlx::Error> = 489 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 490 .bind(&handle) 491 .bind(account_id) 492 .execute(&mut *tx) 493 .await; 494 if let Err(e) = update_result { 495 if let Some(db_err) = e.as_database_error() 496 && db_err 497 .constraint() 498 .map(|c| c.contains("handle")) 499 .unwrap_or(false) 500 { 501 return ( 502 StatusCode::BAD_REQUEST, 503 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 504 ) 505 .into_response(); 506 } 507 error!("Error reactivating account: {:?}", e); 508 return ( 509 StatusCode::INTERNAL_SERVER_ERROR, 510 Json(json!({"error": "InternalError"})), 511 ) 512 .into_response(); 513 } 514 if let Err(e) = tx.commit().await { 515 error!("Error committing reactivation: {:?}", e); 516 return ( 517 StatusCode::INTERNAL_SERVER_ERROR, 518 Json(json!({"error": "InternalError"})), 519 ) 520 .into_response(); 521 } 522 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 523 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 524 ) 525 .bind(account_id) 526 .fetch_optional(&state.db) 527 .await 528 .unwrap_or(None); 529 let secret_key_bytes = match key_row { 530 Some((key_bytes, encryption_version)) => { 531 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 532 Ok(k) => k, 533 Err(e) => { 534 error!("Error decrypting key for reactivated account: {:?}", e); 535 return ( 536 StatusCode::INTERNAL_SERVER_ERROR, 537 Json(json!({"error": "InternalError"})), 538 ) 539 .into_response(); 540 } 541 } 542 } 543 None => { 544 error!("No signing key found for reactivated account"); 545 return ( 546 StatusCode::INTERNAL_SERVER_ERROR, 547 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 548 ) 549 .into_response(); 550 } 551 }; 552 let access_meta = 553 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 554 Ok(m) => m, 555 Err(e) => { 556 error!("Error creating access token: {:?}", e); 557 return ( 558 StatusCode::INTERNAL_SERVER_ERROR, 559 Json(json!({"error": "InternalError"})), 560 ) 561 .into_response(); 562 } 563 }; 564 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 565 &did, 566 &secret_key_bytes, 567 ) { 568 Ok(m) => m, 569 Err(e) => { 570 error!("Error creating refresh token: {:?}", e); 571 return ( 572 StatusCode::INTERNAL_SERVER_ERROR, 573 Json(json!({"error": "InternalError"})), 574 ) 575 .into_response(); 576 } 577 }; 578 let session_result: Result<_, sqlx::Error> = sqlx::query( 579 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 580 ) 581 .bind(&did) 582 .bind(&access_meta.jti) 583 .bind(&refresh_meta.jti) 584 .bind(access_meta.expires_at) 585 .bind(refresh_meta.expires_at) 586 .execute(&state.db) 587 .await; 588 if let Err(e) = session_result { 589 error!("Error creating session: {:?}", e); 590 return ( 591 StatusCode::INTERNAL_SERVER_ERROR, 592 Json(json!({"error": "InternalError"})), 593 ) 594 .into_response(); 595 } 596 return ( 597 StatusCode::OK, 598 Json(CreateAccountOutput { 599 handle: handle.clone(), 600 did, 601 access_jwt: Some(access_meta.token), 602 refresh_jwt: Some(refresh_meta.token), 603 verification_required: false, 604 verification_channel: "email".to_string(), 605 }), 606 ) 607 .into_response(); 608 } else { 609 return ( 610 StatusCode::BAD_REQUEST, 611 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 612 ) 613 .into_response(); 614 } 615 } 616 } 617 let exists_result: Option<(i32,)> = 618 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 619 .bind(&handle) 620 .fetch_optional(&mut *tx) 621 .await 622 .unwrap_or(None); 623 if exists_result.is_some() { 624 return ( 625 StatusCode::BAD_REQUEST, 626 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 627 ) 628 .into_response(); 629 } 630 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 631 .map(|v| v == "true" || v == "1") 632 .unwrap_or(false); 633 if invite_code_required 634 && input 635 .invite_code 636 .as_ref() 637 .map(|c| c.trim().is_empty()) 638 .unwrap_or(true) 639 { 640 return ( 641 StatusCode::BAD_REQUEST, 642 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 643 ) 644 .into_response(); 645 } 646 if let Some(code) = &input.invite_code 647 && !code.trim().is_empty() 648 { 649 let invite_query = sqlx::query!( 650 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 651 code 652 ) 653 .fetch_optional(&mut *tx) 654 .await; 655 match invite_query { 656 Ok(Some(row)) => { 657 if row.available_uses <= 0 { 658 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 659 } 660 let update_invite = sqlx::query!( 661 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 662 code 663 ) 664 .execute(&mut *tx) 665 .await; 666 if let Err(e) = update_invite { 667 error!("Error updating invite code: {:?}", e); 668 return ( 669 StatusCode::INTERNAL_SERVER_ERROR, 670 Json(json!({"error": "InternalError"})), 671 ) 672 .into_response(); 673 } 674 } 675 Ok(None) => { 676 return ( 677 StatusCode::BAD_REQUEST, 678 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 679 ) 680 .into_response(); 681 } 682 Err(e) => { 683 error!("Error checking invite code: {:?}", e); 684 return ( 685 StatusCode::INTERNAL_SERVER_ERROR, 686 Json(json!({"error": "InternalError"})), 687 ) 688 .into_response(); 689 } 690 } 691 } 692 if let Err(e) = validate_password(&input.password) { 693 return ( 694 StatusCode::BAD_REQUEST, 695 Json(json!({ 696 "error": "InvalidPassword", 697 "message": e.to_string() 698 })), 699 ) 700 .into_response(); 701 } 702 703 let password_hash = match hash(&input.password, DEFAULT_COST) { 704 Ok(h) => h, 705 Err(e) => { 706 error!("Error hashing password: {:?}", e); 707 return ( 708 StatusCode::INTERNAL_SERVER_ERROR, 709 Json(json!({"error": "InternalError"})), 710 ) 711 .into_response(); 712 } 713 }; 714 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 715 .fetch_one(&mut *tx) 716 .await 717 .map(|c| c.unwrap_or(0) == 0) 718 .unwrap_or(false); 719 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod { 720 Some(chrono::Utc::now()) 721 } else { 722 None 723 }; 724 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 725 r#"INSERT INTO users ( 726 handle, email, did, password_hash, 727 preferred_comms_channel, 728 discord_id, telegram_username, signal_number, 729 is_admin, deactivated_at, email_verified 730 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 731 ) 732 .bind(&handle) 733 .bind(&email) 734 .bind(&did) 735 .bind(&password_hash) 736 .bind(verification_channel) 737 .bind( 738 input 739 .discord_id 740 .as_deref() 741 .map(|s| s.trim()) 742 .filter(|s| !s.is_empty()), 743 ) 744 .bind( 745 input 746 .telegram_username 747 .as_deref() 748 .map(|s| s.trim()) 749 .filter(|s| !s.is_empty()), 750 ) 751 .bind( 752 input 753 .signal_number 754 .as_deref() 755 .map(|s| s.trim()) 756 .filter(|s| !s.is_empty()), 757 ) 758 .bind(is_first_user) 759 .bind(deactivated_at) 760 .bind(false) 761 .fetch_one(&mut *tx) 762 .await; 763 let user_id = match user_insert { 764 Ok((id,)) => id, 765 Err(e) => { 766 if let Some(db_err) = e.as_database_error() 767 && db_err.code().as_deref() == Some("23505") 768 { 769 let constraint = db_err.constraint().unwrap_or(""); 770 if constraint.contains("handle") || constraint.contains("users_handle") { 771 return ( 772 StatusCode::BAD_REQUEST, 773 Json(json!({ 774 "error": "HandleNotAvailable", 775 "message": "Handle already taken" 776 })), 777 ) 778 .into_response(); 779 } else if constraint.contains("email") || constraint.contains("users_email") { 780 return ( 781 StatusCode::BAD_REQUEST, 782 Json(json!({ 783 "error": "InvalidEmail", 784 "message": "Email already registered" 785 })), 786 ) 787 .into_response(); 788 } else if constraint.contains("did") || constraint.contains("users_did") { 789 return ( 790 StatusCode::BAD_REQUEST, 791 Json(json!({ 792 "error": "AccountAlreadyExists", 793 "message": "An account with this DID already exists" 794 })), 795 ) 796 .into_response(); 797 } 798 } 799 error!("Error inserting user: {:?}", e); 800 return ( 801 StatusCode::INTERNAL_SERVER_ERROR, 802 Json(json!({"error": "InternalError"})), 803 ) 804 .into_response(); 805 } 806 }; 807 808 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 809 Ok(enc) => enc, 810 Err(e) => { 811 error!("Error encrypting user key: {:?}", e); 812 return ( 813 StatusCode::INTERNAL_SERVER_ERROR, 814 Json(json!({"error": "InternalError"})), 815 ) 816 .into_response(); 817 } 818 }; 819 let key_insert = sqlx::query!( 820 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 821 user_id, 822 &encrypted_key_bytes[..], 823 crate::config::ENCRYPTION_VERSION 824 ) 825 .execute(&mut *tx) 826 .await; 827 if let Err(e) = key_insert { 828 error!("Error inserting user key: {:?}", e); 829 return ( 830 StatusCode::INTERNAL_SERVER_ERROR, 831 Json(json!({"error": "InternalError"})), 832 ) 833 .into_response(); 834 } 835 if let Some(key_id) = reserved_key_id { 836 let mark_used = sqlx::query!( 837 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 838 key_id 839 ) 840 .execute(&mut *tx) 841 .await; 842 if let Err(e) = mark_used { 843 error!("Error marking reserved key as used: {:?}", e); 844 return ( 845 StatusCode::INTERNAL_SERVER_ERROR, 846 Json(json!({"error": "InternalError"})), 847 ) 848 .into_response(); 849 } 850 } 851 let mst = Mst::new(Arc::new(state.block_store.clone())); 852 let mst_root = match mst.persist().await { 853 Ok(c) => c, 854 Err(e) => { 855 error!("Error persisting MST: {:?}", e); 856 return ( 857 StatusCode::INTERNAL_SERVER_ERROR, 858 Json(json!({"error": "InternalError"})), 859 ) 860 .into_response(); 861 } 862 }; 863 let rev = Tid::now(LimitedU32::MIN); 864 let (commit_bytes, _sig) = 865 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) { 866 Ok(result) => result, 867 Err(e) => { 868 error!("Error creating genesis commit: {:?}", e); 869 return ( 870 StatusCode::INTERNAL_SERVER_ERROR, 871 Json(json!({"error": "InternalError"})), 872 ) 873 .into_response(); 874 } 875 }; 876 let commit_cid = match state.block_store.put(&commit_bytes).await { 877 Ok(c) => c, 878 Err(e) => { 879 error!("Error saving genesis commit: {:?}", e); 880 return ( 881 StatusCode::INTERNAL_SERVER_ERROR, 882 Json(json!({"error": "InternalError"})), 883 ) 884 .into_response(); 885 } 886 }; 887 let commit_cid_str = commit_cid.to_string(); 888 let repo_insert = sqlx::query!( 889 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 890 user_id, 891 commit_cid_str 892 ) 893 .execute(&mut *tx) 894 .await; 895 if let Err(e) = repo_insert { 896 error!("Error initializing repo: {:?}", e); 897 return ( 898 StatusCode::INTERNAL_SERVER_ERROR, 899 Json(json!({"error": "InternalError"})), 900 ) 901 .into_response(); 902 } 903 if let Some(code) = &input.invite_code 904 && !code.trim().is_empty() 905 { 906 let use_insert = sqlx::query!( 907 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 908 code, 909 user_id 910 ) 911 .execute(&mut *tx) 912 .await; 913 if let Err(e) = use_insert { 914 error!("Error recording invite usage: {:?}", e); 915 return ( 916 StatusCode::INTERNAL_SERVER_ERROR, 917 Json(json!({"error": "InternalError"})), 918 ) 919 .into_response(); 920 } 921 } 922 if let Err(e) = tx.commit().await { 923 error!("Error committing transaction: {:?}", e); 924 return ( 925 StatusCode::INTERNAL_SERVER_ERROR, 926 Json(json!({"error": "InternalError"})), 927 ) 928 .into_response(); 929 } 930 if !is_migration && !is_did_web_byod { 931 if let Err(e) = 932 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await 933 { 934 warn!("Failed to sequence identity event for {}: {}", did, e); 935 } 936 if let Err(e) = 937 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 938 { 939 warn!("Failed to sequence account event for {}: {}", did, e); 940 } 941 let profile_record = json!({ 942 "$type": "app.bsky.actor.profile", 943 "displayName": input.handle 944 }); 945 if let Err(e) = crate::api::repo::record::create_record_internal( 946 &state, 947 &did, 948 "app.bsky.actor.profile", 949 "self", 950 &profile_record, 951 ) 952 .await 953 { 954 warn!("Failed to create default profile for {}: {}", did, e); 955 } 956 } 957 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 958 if !is_migration { 959 if let Some(ref recipient) = verification_recipient { 960 let verification_token = crate::auth::verification_token::generate_signup_token( 961 &did, 962 verification_channel, 963 recipient, 964 ); 965 let formatted_token = 966 crate::auth::verification_token::format_token_for_display(&verification_token); 967 if let Err(e) = crate::comms::enqueue_signup_verification( 968 &state.db, 969 user_id, 970 verification_channel, 971 recipient, 972 &formatted_token, 973 None, 974 ) 975 .await 976 { 977 warn!( 978 "Failed to enqueue signup verification notification: {:?}", 979 e 980 ); 981 } 982 } 983 } else if let Some(ref user_email) = email { 984 let token = crate::auth::verification_token::generate_migration_token(&did, user_email); 985 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 986 if let Err(e) = crate::comms::enqueue_migration_verification( 987 &state.db, 988 user_id, 989 user_email, 990 &formatted_token, 991 &hostname, 992 ) 993 .await 994 { 995 warn!("Failed to enqueue migration verification email: {:?}", e); 996 } 997 } 998 999 let (access_jwt, refresh_jwt) = if is_migration { 1000 let access_meta = 1001 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 1002 Ok(m) => m, 1003 Err(e) => { 1004 error!("Error creating access token for migration: {:?}", e); 1005 return ( 1006 StatusCode::INTERNAL_SERVER_ERROR, 1007 Json(json!({"error": "InternalError"})), 1008 ) 1009 .into_response(); 1010 } 1011 }; 1012 let refresh_meta = 1013 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 1014 Ok(m) => m, 1015 Err(e) => { 1016 error!("Error creating refresh token for migration: {:?}", e); 1017 return ( 1018 StatusCode::INTERNAL_SERVER_ERROR, 1019 Json(json!({"error": "InternalError"})), 1020 ) 1021 .into_response(); 1022 } 1023 }; 1024 if let Err(e) = sqlx::query!( 1025 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 1026 did, 1027 access_meta.jti, 1028 refresh_meta.jti, 1029 access_meta.expires_at, 1030 refresh_meta.expires_at 1031 ) 1032 .execute(&state.db) 1033 .await 1034 { 1035 error!("Error creating session for migration: {:?}", e); 1036 return ( 1037 StatusCode::INTERNAL_SERVER_ERROR, 1038 Json(json!({"error": "InternalError"})), 1039 ) 1040 .into_response(); 1041 } 1042 (Some(access_meta.token), Some(refresh_meta.token)) 1043 } else { 1044 (None, None) 1045 }; 1046 1047 ( 1048 StatusCode::OK, 1049 Json(CreateAccountOutput { 1050 handle: handle.clone(), 1051 did, 1052 access_jwt, 1053 refresh_jwt, 1054 verification_required: !is_migration, 1055 verification_channel: verification_channel.to_string(), 1056 }), 1057 ) 1058 .into_response() 1059}