this repo has no description
1use super::did::verify_did_web; 2use crate::api::error::ApiError; 3use crate::api::repo::record::utils::create_signed_commit; 4use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 5use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 6use crate::state::{AppState, RateLimitKind}; 7use crate::types::{Did, Handle, PlainPassword}; 8use crate::validation::validate_password; 9use axum::{ 10 Json, 11 extract::State, 12 http::{HeaderMap, StatusCode}, 13 response::{IntoResponse, Response}, 14}; 15use serde_json::json; 16use bcrypt::{DEFAULT_COST, hash}; 17use jacquard::types::{integer::LimitedU32, string::Tid}; 18use jacquard_repo::{mst::Mst, storage::BlockStore}; 19use k256::{SecretKey, ecdsa::SigningKey}; 20use rand::rngs::OsRng; 21use serde::{Deserialize, Serialize}; 22use std::sync::Arc; 23use tracing::{debug, error, info, warn}; 24 25fn extract_client_ip(headers: &HeaderMap) -> String { 26 if let Some(forwarded) = headers.get("x-forwarded-for") 27 && let Ok(value) = forwarded.to_str() 28 && let Some(first_ip) = value.split(',').next() 29 { 30 return first_ip.trim().to_string(); 31 } 32 if let Some(real_ip) = headers.get("x-real-ip") 33 && let Ok(value) = real_ip.to_str() 34 { 35 return value.trim().to_string(); 36 } 37 "unknown".to_string() 38} 39 40#[derive(Deserialize)] 41#[serde(rename_all = "camelCase")] 42pub struct CreateAccountInput { 43 pub handle: String, 44 pub email: Option<String>, 45 pub password: PlainPassword, 46 pub invite_code: Option<String>, 47 pub did: Option<String>, 48 pub did_type: Option<String>, 49 pub signing_key: Option<String>, 50 pub verification_channel: Option<String>, 51 pub discord_id: Option<String>, 52 pub telegram_username: Option<String>, 53 pub signal_number: Option<String>, 54} 55 56#[derive(Serialize)] 57#[serde(rename_all = "camelCase")] 58pub struct CreateAccountOutput { 59 pub handle: Handle, 60 pub did: Did, 61 #[serde(skip_serializing_if = "Option::is_none")] 62 pub did_doc: Option<serde_json::Value>, 63 pub access_jwt: String, 64 pub refresh_jwt: String, 65 pub verification_required: bool, 66 pub verification_channel: String, 67} 68 69pub async fn create_account( 70 State(state): State<AppState>, 71 headers: HeaderMap, 72 Json(input): Json<CreateAccountInput>, 73) -> Response { 74 let is_potential_migration = input 75 .did 76 .as_ref() 77 .map(|d| d.starts_with("did:plc:")) 78 .unwrap_or(false); 79 if is_potential_migration { 80 info!( 81 "[MIGRATION] createAccount called for potential migration did={:?} handle={}", 82 input.did, input.handle 83 ); 84 } else { 85 info!("create_account called"); 86 } 87 let client_ip = extract_client_ip(&headers); 88 if !state 89 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 90 .await 91 { 92 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 93 return ApiError::RateLimitExceeded(Some("Too many account creation attempts. Please try again later.".into(),)) 94 .into_response(); 95 } 96 97 let migration_auth = if let Some(token) = 98 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 99 { 100 if is_service_token(&token) { 101 let verifier = ServiceTokenVerifier::new(); 102 match verifier 103 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 104 .await 105 { 106 Ok(claims) => { 107 debug!("Service token verified for migration: iss={}", claims.iss); 108 Some(claims.iss) 109 } 110 Err(e) => { 111 error!("Service token verification failed: {:?}", e); 112 return ApiError::AuthenticationFailed(Some(format!( 113 "Service token verification failed: {}", 114 e 115 ))) 116 .into_response(); 117 } 118 } 119 } else { 120 None 121 } 122 } else { 123 None 124 }; 125 126 let is_did_web_byod = migration_auth.is_some() 127 && input 128 .did 129 .as_ref() 130 .map(|d| d.starts_with("did:web:")) 131 .unwrap_or(false); 132 133 let is_migration = migration_auth.is_some() 134 && input 135 .did 136 .as_ref() 137 .map(|d| d.starts_with("did:plc:")) 138 .unwrap_or(false); 139 140 if (is_migration || is_did_web_byod) 141 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref()) 142 { 143 if provided_did != auth_did { 144 info!( 145 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}", 146 auth_did, provided_did 147 ); 148 return ApiError::AuthorizationError(format!( 149 "Service token issuer {} does not match DID {}", 150 auth_did, provided_did 151 )) 152 .into_response(); 153 } 154 if is_did_web_byod { 155 info!(did = %provided_did, "Processing did:web BYOD account creation"); 156 } else { 157 info!( 158 "[MIGRATION] createAccount: Service token verified, processing migration for did={}", 159 provided_did 160 ); 161 } 162 } 163 164 let hostname_for_validation = 165 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 166 let pds_suffix = format!(".{}", hostname_for_validation); 167 168 let validated_short_handle = if !input.handle.contains('.') 169 || input.handle.ends_with(&pds_suffix) 170 { 171 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 172 input 173 .handle 174 .strip_suffix(&pds_suffix) 175 .unwrap_or(&input.handle) 176 } else { 177 &input.handle 178 }; 179 match crate::api::validation::validate_short_handle(handle_to_validate) { 180 Ok(h) => h, 181 Err(e) => { 182 return ApiError::from(e).into_response(); 183 } 184 } 185 } else { 186 if input.handle.contains(' ') || input.handle.contains('\t') { 187 return ApiError::InvalidRequest("Handle cannot contain spaces".into()).into_response(); 188 } 189 for c in input.handle.chars() { 190 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' { 191 return ApiError::InvalidRequest(format!( 192 "Handle contains invalid character: {}", 193 c 194 )) 195 .into_response(); 196 } 197 } 198 let handle_lower = input.handle.to_lowercase(); 199 if crate::moderation::has_explicit_slur(&handle_lower) { 200 return ApiError::InvalidRequest("Inappropriate language in handle".into()) 201 .into_response(); 202 } 203 handle_lower 204 }; 205 let email: Option<String> = input 206 .email 207 .as_ref() 208 .map(|e| e.trim().to_string()) 209 .filter(|e| !e.is_empty()); 210 if let Some(ref email) = email 211 && !crate::api::validation::is_valid_email(email) 212 { 213 return ApiError::InvalidEmail.into_response(); 214 } 215 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 216 let valid_channels = ["email", "discord", "telegram", "signal"]; 217 if !valid_channels.contains(&verification_channel) && !is_migration { 218 return ApiError::InvalidVerificationChannel.into_response(); 219 } 220 let verification_recipient = if is_migration { 221 None 222 } else { 223 Some(match verification_channel { 224 "email" => match &input.email { 225 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 226 _ => return ApiError::MissingEmail.into_response(), 227 }, 228 "discord" => match &input.discord_id { 229 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 230 _ => return ApiError::MissingDiscordId.into_response(), 231 }, 232 "telegram" => match &input.telegram_username { 233 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 234 _ => return ApiError::MissingTelegramUsername.into_response(), 235 }, 236 "signal" => match &input.signal_number { 237 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 238 _ => return ApiError::MissingSignalNumber.into_response(), 239 }, 240 _ => return ApiError::InvalidVerificationChannel.into_response(), 241 }) 242 }; 243 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 244 let pds_endpoint = format!("https://{}", hostname); 245 let suffix = format!(".{}", hostname); 246 let handle = if input.handle.ends_with(&suffix) { 247 format!("{}.{}", validated_short_handle, hostname) 248 } else if input.handle.contains('.') { 249 validated_short_handle.clone() 250 } else { 251 format!("{}.{}", validated_short_handle, hostname) 252 }; 253 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 254 if let Some(signing_key_did) = &input.signing_key { 255 let reserved = sqlx::query!( 256 r#" 257 SELECT id, private_key_bytes 258 FROM reserved_signing_keys 259 WHERE public_key_did_key = $1 260 AND used_at IS NULL 261 AND expires_at > NOW() 262 FOR UPDATE 263 "#, 264 signing_key_did 265 ) 266 .fetch_optional(&state.db) 267 .await; 268 match reserved { 269 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 270 Ok(None) => { 271 return ApiError::InvalidSigningKey.into_response(); 272 } 273 Err(e) => { 274 error!("Error looking up reserved signing key: {:?}", e); 275 return ApiError::InternalError(None).into_response(); 276 } 277 } 278 } else { 279 let secret_key = SecretKey::random(&mut OsRng); 280 (secret_key.to_bytes().to_vec(), None) 281 }; 282 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 283 Ok(k) => k, 284 Err(e) => { 285 error!("Error creating signing key: {:?}", e); 286 return ApiError::InternalError(None).into_response(); 287 } 288 }; 289 let did_type = input.did_type.as_deref().unwrap_or("plc"); 290 let did = match did_type { 291 "web" => { 292 if !crate::api::server::meta::is_self_hosted_did_web_enabled() { 293 return ApiError::SelfHostedDidWebDisabled.into_response(); 294 } 295 let subdomain_host = format!("{}.{}", input.handle, hostname); 296 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 297 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 298 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 299 self_hosted_did 300 } 301 "web-external" => { 302 let d = match &input.did { 303 Some(d) if !d.trim().is_empty() => d, 304 _ => { 305 return ApiError::InvalidRequest( 306 "External did:web requires the 'did' field to be provided".into(), 307 ) 308 .into_response(); 309 } 310 }; 311 if !d.starts_with("did:web:") { 312 return ApiError::InvalidDid("External DID must be a did:web".into()) 313 .into_response(); 314 } 315 if !is_did_web_byod 316 && let Err(e) = 317 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await 318 { 319 return ApiError::InvalidDid(e).into_response(); 320 } 321 info!(did = %d, "Creating external did:web account"); 322 d.clone() 323 } 324 _ => { 325 if let Some(d) = &input.did { 326 if d.starts_with("did:plc:") && is_migration { 327 info!(did = %d, "Migration with existing did:plc"); 328 d.clone() 329 } else if d.starts_with("did:web:") { 330 if !is_did_web_byod 331 && let Err(e) = verify_did_web( 332 d, 333 &hostname, 334 &input.handle, 335 input.signing_key.as_deref(), 336 ) 337 .await 338 { 339 return ApiError::InvalidDid(e).into_response(); 340 } 341 d.clone() 342 } else if !d.trim().is_empty() { 343 return ApiError::InvalidDid( 344 "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth.".into() 345 ) 346 .into_response(); 347 } else { 348 let rotation_key = std::env::var("PLC_ROTATION_KEY") 349 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 350 let genesis_result = match create_genesis_operation( 351 &signing_key, 352 &rotation_key, 353 &handle, 354 &pds_endpoint, 355 ) { 356 Ok(r) => r, 357 Err(e) => { 358 error!("Error creating PLC genesis operation: {:?}", e); 359 return ApiError::InternalError(Some( 360 "Failed to create PLC operation".into(), 361 )) 362 .into_response(); 363 } 364 }; 365 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone())); 366 if let Err(e) = plc_client 367 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 368 .await 369 { 370 error!("Failed to submit PLC genesis operation: {:?}", e); 371 return ApiError::UpstreamErrorMsg(format!( 372 "Failed to register DID with PLC directory: {}", 373 e 374 )) 375 .into_response(); 376 } 377 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 378 genesis_result.did 379 } 380 } else { 381 let rotation_key = std::env::var("PLC_ROTATION_KEY") 382 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 383 let genesis_result = match create_genesis_operation( 384 &signing_key, 385 &rotation_key, 386 &handle, 387 &pds_endpoint, 388 ) { 389 Ok(r) => r, 390 Err(e) => { 391 error!("Error creating PLC genesis operation: {:?}", e); 392 return ApiError::InternalError(Some( 393 "Failed to create PLC operation".into(), 394 )) 395 .into_response(); 396 } 397 }; 398 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone())); 399 if let Err(e) = plc_client 400 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 401 .await 402 { 403 error!("Failed to submit PLC genesis operation: {:?}", e); 404 return ApiError::UpstreamErrorMsg(format!( 405 "Failed to register DID with PLC directory: {}", 406 e 407 )) 408 .into_response(); 409 } 410 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 411 genesis_result.did 412 } 413 } 414 }; 415 let mut tx = match state.db.begin().await { 416 Ok(tx) => tx, 417 Err(e) => { 418 error!("Error starting transaction: {:?}", e); 419 return ApiError::InternalError(None).into_response(); 420 } 421 }; 422 if is_migration { 423 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 424 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 425 .bind(&did) 426 .fetch_optional(&mut *tx) 427 .await 428 .unwrap_or(None); 429 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 430 if deactivated_at.is_some() { 431 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 432 let update_result: Result<_, sqlx::Error> = 433 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 434 .bind(&handle) 435 .bind(account_id) 436 .execute(&mut *tx) 437 .await; 438 if let Err(e) = update_result { 439 if let Some(db_err) = e.as_database_error() 440 && db_err 441 .constraint() 442 .map(|c| c.contains("handle")) 443 .unwrap_or(false) 444 { 445 return ApiError::HandleTaken.into_response(); 446 } 447 error!("Error reactivating account: {:?}", e); 448 return ApiError::InternalError(None).into_response(); 449 } 450 if let Err(e) = tx.commit().await { 451 error!("Error committing reactivation: {:?}", e); 452 return ApiError::InternalError(None).into_response(); 453 } 454 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 455 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 456 ) 457 .bind(account_id) 458 .fetch_optional(&state.db) 459 .await 460 .unwrap_or(None); 461 let secret_key_bytes = match key_row { 462 Some((key_bytes, encryption_version)) => { 463 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 464 Ok(k) => k, 465 Err(e) => { 466 error!("Error decrypting key for reactivated account: {:?}", e); 467 return ApiError::InternalError(None).into_response(); 468 } 469 } 470 } 471 None => { 472 error!("No signing key found for reactivated account"); 473 return ApiError::InternalError(Some( 474 "Account signing key not found".into(), 475 )) 476 .into_response(); 477 } 478 }; 479 let access_meta = 480 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 481 Ok(m) => m, 482 Err(e) => { 483 error!("Error creating access token: {:?}", e); 484 return ApiError::InternalError(None).into_response(); 485 } 486 }; 487 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 488 &did, 489 &secret_key_bytes, 490 ) { 491 Ok(m) => m, 492 Err(e) => { 493 error!("Error creating refresh token: {:?}", e); 494 return ApiError::InternalError(None).into_response(); 495 } 496 }; 497 let session_result: Result<_, sqlx::Error> = sqlx::query( 498 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 499 ) 500 .bind(&did) 501 .bind(&access_meta.jti) 502 .bind(&refresh_meta.jti) 503 .bind(access_meta.expires_at) 504 .bind(refresh_meta.expires_at) 505 .execute(&state.db) 506 .await; 507 if let Err(e) = session_result { 508 error!("Error creating session: {:?}", e); 509 return ApiError::InternalError(None).into_response(); 510 } 511 return ( 512 axum::http::StatusCode::OK, 513 Json(CreateAccountOutput { 514 handle: handle.clone().into(), 515 did: did.clone().into(), 516 did_doc: state.did_resolver.resolve_did_document(&did).await, 517 access_jwt: access_meta.token, 518 refresh_jwt: refresh_meta.token, 519 verification_required: false, 520 verification_channel: "email".to_string(), 521 }), 522 ) 523 .into_response(); 524 } else { 525 return ApiError::AccountAlreadyExists.into_response(); 526 } 527 } 528 } 529 let exists_result: Option<(i32,)> = 530 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 531 .bind(&handle) 532 .fetch_optional(&mut *tx) 533 .await 534 .unwrap_or(None); 535 if exists_result.is_some() { 536 return ApiError::HandleTaken.into_response(); 537 } 538 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 539 .map(|v| v == "true" || v == "1") 540 .unwrap_or(false); 541 if invite_code_required 542 && input 543 .invite_code 544 .as_ref() 545 .map(|c| c.trim().is_empty()) 546 .unwrap_or(true) 547 { 548 return ApiError::InviteCodeRequired.into_response(); 549 } 550 if let Some(code) = &input.invite_code 551 && !code.trim().is_empty() 552 { 553 let invite_query = sqlx::query!( 554 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 555 code 556 ) 557 .fetch_optional(&mut *tx) 558 .await; 559 match invite_query { 560 Ok(Some(row)) => { 561 if row.available_uses <= 0 { 562 return ApiError::InvalidInviteCode.into_response(); 563 } 564 let update_invite = sqlx::query!( 565 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 566 code 567 ) 568 .execute(&mut *tx) 569 .await; 570 if let Err(e) = update_invite { 571 error!("Error updating invite code: {:?}", e); 572 return ApiError::InternalError(None).into_response(); 573 } 574 } 575 Ok(None) => { 576 return ApiError::InvalidInviteCode.into_response(); 577 } 578 Err(e) => { 579 error!("Error checking invite code: {:?}", e); 580 return ApiError::InternalError(None).into_response(); 581 } 582 } 583 } 584 if let Err(e) = validate_password(&input.password) { 585 return ApiError::InvalidRequest(e.to_string()).into_response(); 586 } 587 588 let password_clone = input.password.clone(); 589 let password_hash = 590 match tokio::task::spawn_blocking(move || hash(&password_clone, DEFAULT_COST)).await { 591 Ok(Ok(h)) => h, 592 Ok(Err(e)) => { 593 error!("Error hashing password: {:?}", e); 594 return ApiError::InternalError(None).into_response(); 595 } 596 Err(e) => { 597 error!("Failed to spawn blocking task: {:?}", e); 598 return ApiError::InternalError(None).into_response(); 599 } 600 }; 601 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 602 .fetch_one(&mut *tx) 603 .await 604 .map(|c| c.unwrap_or(0) == 0) 605 .unwrap_or(false); 606 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod { 607 Some(chrono::Utc::now()) 608 } else { 609 None 610 }; 611 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 612 r#"INSERT INTO users ( 613 handle, email, did, password_hash, 614 preferred_comms_channel, 615 discord_id, telegram_username, signal_number, 616 is_admin, deactivated_at, email_verified 617 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 618 ) 619 .bind(&handle) 620 .bind(&email) 621 .bind(&did) 622 .bind(&password_hash) 623 .bind(verification_channel) 624 .bind( 625 input 626 .discord_id 627 .as_deref() 628 .map(|s| s.trim()) 629 .filter(|s| !s.is_empty()), 630 ) 631 .bind( 632 input 633 .telegram_username 634 .as_deref() 635 .map(|s| s.trim()) 636 .filter(|s| !s.is_empty()), 637 ) 638 .bind( 639 input 640 .signal_number 641 .as_deref() 642 .map(|s| s.trim()) 643 .filter(|s| !s.is_empty()), 644 ) 645 .bind(is_first_user) 646 .bind(deactivated_at) 647 .bind(false) 648 .fetch_one(&mut *tx) 649 .await; 650 let user_id = match user_insert { 651 Ok((id,)) => id, 652 Err(e) => { 653 if let Some(db_err) = e.as_database_error() 654 && db_err.code().as_deref() == Some("23505") 655 { 656 let constraint = db_err.constraint().unwrap_or(""); 657 if constraint.contains("handle") || constraint.contains("users_handle") { 658 return ApiError::HandleNotAvailable(None).into_response(); 659 } else if constraint.contains("email") || constraint.contains("users_email") { 660 return ApiError::EmailTaken.into_response(); 661 } else if constraint.contains("did") || constraint.contains("users_did") { 662 return ApiError::AccountAlreadyExists.into_response(); 663 } 664 } 665 error!("Error inserting user: {:?}", e); 666 return ApiError::InternalError(None).into_response(); 667 } 668 }; 669 670 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 671 Ok(enc) => enc, 672 Err(e) => { 673 error!("Error encrypting user key: {:?}", e); 674 return ApiError::InternalError(None).into_response(); 675 } 676 }; 677 let key_insert = sqlx::query!( 678 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 679 user_id, 680 &encrypted_key_bytes[..], 681 crate::config::ENCRYPTION_VERSION 682 ) 683 .execute(&mut *tx) 684 .await; 685 if let Err(e) = key_insert { 686 error!("Error inserting user key: {:?}", e); 687 return ApiError::InternalError(None).into_response(); 688 } 689 if let Some(key_id) = reserved_key_id { 690 let mark_used = sqlx::query!( 691 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 692 key_id 693 ) 694 .execute(&mut *tx) 695 .await; 696 if let Err(e) = mark_used { 697 error!("Error marking reserved key as used: {:?}", e); 698 return ApiError::InternalError(None).into_response(); 699 } 700 } 701 let mst = Mst::new(Arc::new(state.block_store.clone())); 702 let mst_root = match mst.persist().await { 703 Ok(c) => c, 704 Err(e) => { 705 error!("Error persisting MST: {:?}", e); 706 return ApiError::InternalError(None).into_response(); 707 } 708 }; 709 let rev = Tid::now(LimitedU32::MIN); 710 let (commit_bytes, _sig) = 711 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) { 712 Ok(result) => result, 713 Err(e) => { 714 error!("Error creating genesis commit: {:?}", e); 715 return ApiError::InternalError(None).into_response(); 716 } 717 }; 718 let commit_cid = match state.block_store.put(&commit_bytes).await { 719 Ok(c) => c, 720 Err(e) => { 721 error!("Error saving genesis commit: {:?}", e); 722 return ApiError::InternalError(None).into_response(); 723 } 724 }; 725 let commit_cid_str = commit_cid.to_string(); 726 let rev_str = rev.as_ref().to_string(); 727 let repo_insert = sqlx::query!( 728 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", 729 user_id, 730 commit_cid_str, 731 rev_str 732 ) 733 .execute(&mut *tx) 734 .await; 735 if let Err(e) = repo_insert { 736 error!("Error initializing repo: {:?}", e); 737 return ApiError::InternalError(None).into_response(); 738 } 739 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()]; 740 if let Err(e) = sqlx::query!( 741 r#" 742 INSERT INTO user_blocks (user_id, block_cid) 743 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 744 ON CONFLICT (user_id, block_cid) DO NOTHING 745 "#, 746 user_id, 747 &genesis_block_cids 748 ) 749 .execute(&mut *tx) 750 .await 751 { 752 error!("Error inserting user_blocks: {:?}", e); 753 return ApiError::InternalError(None).into_response(); 754 } 755 if let Some(code) = &input.invite_code 756 && !code.trim().is_empty() 757 { 758 let use_insert = sqlx::query!( 759 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 760 code, 761 user_id 762 ) 763 .execute(&mut *tx) 764 .await; 765 if let Err(e) = use_insert { 766 error!("Error recording invite usage: {:?}", e); 767 return ApiError::InternalError(None).into_response(); 768 } 769 } 770 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() { 771 let birthdate_pref = json!({ 772 "$type": "app.bsky.actor.defs#personalDetailsPref", 773 "birthDate": "1998-05-06T00:00:00.000Z" 774 }); 775 if let Err(e) = sqlx::query!( 776 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3) 777 ON CONFLICT (user_id, name) DO NOTHING", 778 user_id, 779 "app.bsky.actor.defs#personalDetailsPref", 780 birthdate_pref 781 ) 782 .execute(&mut *tx) 783 .await 784 { 785 warn!("Failed to set default birthdate preference: {:?}", e); 786 } 787 } 788 if let Err(e) = tx.commit().await { 789 error!("Error committing transaction: {:?}", e); 790 return ApiError::InternalError(None).into_response(); 791 } 792 if !is_migration && !is_did_web_byod { 793 if let Err(e) = 794 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await 795 { 796 warn!("Failed to sequence identity event for {}: {}", did, e); 797 } 798 if let Err(e) = 799 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 800 { 801 warn!("Failed to sequence account event for {}: {}", did, e); 802 } 803 if let Err(e) = crate::api::repo::record::sequence_genesis_commit( 804 &state, 805 &did, 806 &commit_cid, 807 &mst_root, 808 &rev_str, 809 ) 810 .await 811 { 812 warn!("Failed to sequence commit event for {}: {}", did, e); 813 } 814 if let Err(e) = crate::api::repo::record::sequence_sync_event( 815 &state, 816 &did, 817 &commit_cid_str, 818 Some(rev.as_ref()), 819 ) 820 .await 821 { 822 warn!("Failed to sequence sync event for {}: {}", did, e); 823 } 824 let profile_record = json!({ 825 "$type": "app.bsky.actor.profile", 826 "displayName": input.handle 827 }); 828 if let Err(e) = crate::api::repo::record::create_record_internal( 829 &state, 830 &did, 831 "app.bsky.actor.profile", 832 "self", 833 &profile_record, 834 ) 835 .await 836 { 837 warn!("Failed to create default profile for {}: {}", did, e); 838 } 839 } 840 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 841 if !is_migration { 842 if let Some(ref recipient) = verification_recipient { 843 let verification_token = crate::auth::verification_token::generate_signup_token( 844 &did, 845 verification_channel, 846 recipient, 847 ); 848 let formatted_token = 849 crate::auth::verification_token::format_token_for_display(&verification_token); 850 if let Err(e) = crate::comms::enqueue_signup_verification( 851 &state.db, 852 user_id, 853 verification_channel, 854 recipient, 855 &formatted_token, 856 None, 857 ) 858 .await 859 { 860 warn!( 861 "Failed to enqueue signup verification notification: {:?}", 862 e 863 ); 864 } 865 } 866 } else if let Some(ref user_email) = email { 867 let token = crate::auth::verification_token::generate_migration_token(&did, user_email); 868 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 869 if let Err(e) = crate::comms::enqueue_migration_verification( 870 &state.db, 871 user_id, 872 user_email, 873 &formatted_token, 874 &hostname, 875 ) 876 .await 877 { 878 warn!("Failed to enqueue migration verification email: {:?}", e); 879 } 880 } 881 882 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) 883 { 884 Ok(m) => m, 885 Err(e) => { 886 error!("createAccount: Error creating access token: {:?}", e); 887 return ApiError::InternalError(None).into_response(); 888 } 889 }; 890 let refresh_meta = 891 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 892 Ok(m) => m, 893 Err(e) => { 894 error!("createAccount: Error creating refresh token: {:?}", e); 895 return ApiError::InternalError(None).into_response(); 896 } 897 }; 898 if let Err(e) = sqlx::query!( 899 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 900 did, 901 access_meta.jti, 902 refresh_meta.jti, 903 access_meta.expires_at, 904 refresh_meta.expires_at 905 ) 906 .execute(&state.db) 907 .await 908 { 909 error!("createAccount: Error creating session: {:?}", e); 910 return ApiError::InternalError(None).into_response(); 911 } 912 913 let did_doc = state.did_resolver.resolve_did_document(&did).await; 914 915 if is_migration { 916 info!( 917 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}", 918 did, handle 919 ); 920 } 921 922 ( 923 StatusCode::OK, 924 Json(CreateAccountOutput { 925 handle: handle.clone().into(), 926 did: did.into(), 927 did_doc, 928 access_jwt: access_meta.token, 929 refresh_jwt: refresh_meta.token, 930 verification_required: !is_migration, 931 verification_channel: verification_channel.to_string(), 932 }), 933 ) 934 .into_response() 935}