this repo has no description
1use super::did::verify_did_web; 2use crate::api::error::ApiError; 3use crate::api::repo::record::utils::create_signed_commit; 4use crate::auth::{ServiceTokenVerifier, is_service_token}; 5use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 6use crate::state::{AppState, RateLimitKind}; 7use crate::types::{Did, Handle, Nsid, PlainPassword, Rkey}; 8use crate::validation::validate_password; 9use axum::{ 10 Json, 11 extract::State, 12 http::{HeaderMap, StatusCode}, 13 response::{IntoResponse, Response}, 14}; 15use bcrypt::{DEFAULT_COST, hash}; 16use jacquard::types::{integer::LimitedU32, string::Tid}; 17use jacquard_repo::{mst::Mst, storage::BlockStore}; 18use k256::{SecretKey, ecdsa::SigningKey}; 19use rand::rngs::OsRng; 20use serde::{Deserialize, Serialize}; 21use serde_json::json; 22use std::sync::Arc; 23use tracing::{debug, error, info, warn}; 24 25fn extract_client_ip(headers: &HeaderMap) -> String { 26 if let Some(forwarded) = headers.get("x-forwarded-for") 27 && let Ok(value) = forwarded.to_str() 28 && let Some(first_ip) = value.split(',').next() 29 { 30 return first_ip.trim().to_string(); 31 } 32 if let Some(real_ip) = headers.get("x-real-ip") 33 && let Ok(value) = real_ip.to_str() 34 { 35 return value.trim().to_string(); 36 } 37 "unknown".to_string() 38} 39 40#[derive(Deserialize)] 41#[serde(rename_all = "camelCase")] 42pub struct CreateAccountInput { 43 pub handle: String, 44 pub email: Option<String>, 45 pub password: PlainPassword, 46 pub invite_code: Option<String>, 47 pub did: Option<String>, 48 pub did_type: Option<String>, 49 pub signing_key: Option<String>, 50 pub verification_channel: Option<String>, 51 pub discord_id: Option<String>, 52 pub telegram_username: Option<String>, 53 pub signal_number: Option<String>, 54} 55 56#[derive(Serialize)] 57#[serde(rename_all = "camelCase")] 58pub struct CreateAccountOutput { 59 pub handle: Handle, 60 pub did: Did, 61 #[serde(skip_serializing_if = "Option::is_none")] 62 pub did_doc: Option<serde_json::Value>, 63 pub access_jwt: String, 64 pub refresh_jwt: String, 65 pub verification_required: bool, 66 pub verification_channel: String, 67} 68 69pub async fn create_account( 70 State(state): State<AppState>, 71 headers: HeaderMap, 72 Json(input): Json<CreateAccountInput>, 73) -> Response { 74 let is_potential_migration = input 75 .did 76 .as_ref() 77 .map(|d| d.starts_with("did:plc:")) 78 .unwrap_or(false); 79 if is_potential_migration { 80 info!( 81 "[MIGRATION] createAccount called for potential migration did={:?} handle={}", 82 input.did, input.handle 83 ); 84 } else { 85 info!("create_account called"); 86 } 87 let client_ip = extract_client_ip(&headers); 88 if !state 89 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 90 .await 91 { 92 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 93 return ApiError::RateLimitExceeded(Some( 94 "Too many account creation attempts. Please try again later.".into(), 95 )) 96 .into_response(); 97 } 98 99 let migration_auth = if let Some(extracted) = crate::auth::extract_auth_token_from_header( 100 headers.get("Authorization").and_then(|h| h.to_str().ok()), 101 ) { 102 let token = extracted.token; 103 if is_service_token(&token) { 104 let verifier = ServiceTokenVerifier::new(); 105 match verifier 106 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 107 .await 108 { 109 Ok(claims) => { 110 debug!("Service token verified for migration: iss={}", claims.iss); 111 Some(claims.iss) 112 } 113 Err(e) => { 114 error!("Service token verification failed: {:?}", e); 115 return ApiError::AuthenticationFailed(Some(format!( 116 "Service token verification failed: {}", 117 e 118 ))) 119 .into_response(); 120 } 121 } 122 } else { 123 None 124 } 125 } else { 126 None 127 }; 128 129 let is_did_web_byod = migration_auth.is_some() 130 && input 131 .did 132 .as_ref() 133 .map(|d| d.starts_with("did:web:")) 134 .unwrap_or(false); 135 136 let is_migration = migration_auth.is_some() 137 && input 138 .did 139 .as_ref() 140 .map(|d| d.starts_with("did:plc:")) 141 .unwrap_or(false); 142 143 if (is_migration || is_did_web_byod) 144 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref()) 145 { 146 if provided_did != auth_did { 147 info!( 148 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}", 149 auth_did, provided_did 150 ); 151 return ApiError::AuthorizationError(format!( 152 "Service token issuer {} does not match DID {}", 153 auth_did, provided_did 154 )) 155 .into_response(); 156 } 157 if is_did_web_byod { 158 info!(did = %provided_did, "Processing did:web BYOD account creation"); 159 } else { 160 info!( 161 "[MIGRATION] createAccount: Service token verified, processing migration for did={}", 162 provided_did 163 ); 164 } 165 } 166 167 let hostname_for_validation = 168 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 169 let pds_suffix = format!(".{}", hostname_for_validation); 170 171 let validated_short_handle = if !input.handle.contains('.') 172 || input.handle.ends_with(&pds_suffix) 173 { 174 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 175 input 176 .handle 177 .strip_suffix(&pds_suffix) 178 .unwrap_or(&input.handle) 179 } else { 180 &input.handle 181 }; 182 match crate::api::validation::validate_short_handle(handle_to_validate) { 183 Ok(h) => h, 184 Err(e) => { 185 return ApiError::from(e).into_response(); 186 } 187 } 188 } else { 189 if input.handle.contains(' ') || input.handle.contains('\t') { 190 return ApiError::InvalidRequest("Handle cannot contain spaces".into()).into_response(); 191 } 192 for c in input.handle.chars() { 193 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' { 194 return ApiError::InvalidRequest(format!( 195 "Handle contains invalid character: {}", 196 c 197 )) 198 .into_response(); 199 } 200 } 201 let handle_lower = input.handle.to_lowercase(); 202 if crate::moderation::has_explicit_slur(&handle_lower) { 203 return ApiError::InvalidRequest("Inappropriate language in handle".into()) 204 .into_response(); 205 } 206 handle_lower 207 }; 208 let email: Option<String> = input 209 .email 210 .as_ref() 211 .map(|e| e.trim().to_string()) 212 .filter(|e| !e.is_empty()); 213 if let Some(ref email) = email 214 && !crate::api::validation::is_valid_email(email) 215 { 216 return ApiError::InvalidEmail.into_response(); 217 } 218 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 219 let valid_channels = ["email", "discord", "telegram", "signal"]; 220 if !valid_channels.contains(&verification_channel) && !is_migration { 221 return ApiError::InvalidVerificationChannel.into_response(); 222 } 223 let verification_recipient = if is_migration { 224 None 225 } else { 226 Some(match verification_channel { 227 "email" => match &input.email { 228 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 229 _ => return ApiError::MissingEmail.into_response(), 230 }, 231 "discord" => match &input.discord_id { 232 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 233 _ => return ApiError::MissingDiscordId.into_response(), 234 }, 235 "telegram" => match &input.telegram_username { 236 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 237 _ => return ApiError::MissingTelegramUsername.into_response(), 238 }, 239 "signal" => match &input.signal_number { 240 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 241 _ => return ApiError::MissingSignalNumber.into_response(), 242 }, 243 _ => return ApiError::InvalidVerificationChannel.into_response(), 244 }) 245 }; 246 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 247 let pds_endpoint = format!("https://{}", hostname); 248 let suffix = format!(".{}", hostname); 249 let handle = if input.handle.ends_with(&suffix) { 250 format!("{}.{}", validated_short_handle, hostname) 251 } else if input.handle.contains('.') { 252 validated_short_handle.clone() 253 } else { 254 format!("{}.{}", validated_short_handle, hostname) 255 }; 256 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 257 if let Some(signing_key_did) = &input.signing_key { 258 let reserved = sqlx::query!( 259 r#" 260 SELECT id, private_key_bytes 261 FROM reserved_signing_keys 262 WHERE public_key_did_key = $1 263 AND used_at IS NULL 264 AND expires_at > NOW() 265 FOR UPDATE 266 "#, 267 signing_key_did 268 ) 269 .fetch_optional(&state.db) 270 .await; 271 match reserved { 272 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 273 Ok(None) => { 274 return ApiError::InvalidSigningKey.into_response(); 275 } 276 Err(e) => { 277 error!("Error looking up reserved signing key: {:?}", e); 278 return ApiError::InternalError(None).into_response(); 279 } 280 } 281 } else { 282 let secret_key = SecretKey::random(&mut OsRng); 283 (secret_key.to_bytes().to_vec(), None) 284 }; 285 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 286 Ok(k) => k, 287 Err(e) => { 288 error!("Error creating signing key: {:?}", e); 289 return ApiError::InternalError(None).into_response(); 290 } 291 }; 292 let did_type = input.did_type.as_deref().unwrap_or("plc"); 293 let did = match did_type { 294 "web" => { 295 if !crate::api::server::meta::is_self_hosted_did_web_enabled() { 296 return ApiError::SelfHostedDidWebDisabled.into_response(); 297 } 298 let subdomain_host = format!("{}.{}", input.handle, hostname); 299 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 300 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 301 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 302 self_hosted_did 303 } 304 "web-external" => { 305 let d = match &input.did { 306 Some(d) if !d.trim().is_empty() => d, 307 _ => { 308 return ApiError::InvalidRequest( 309 "External did:web requires the 'did' field to be provided".into(), 310 ) 311 .into_response(); 312 } 313 }; 314 if !d.starts_with("did:web:") { 315 return ApiError::InvalidDid("External DID must be a did:web".into()) 316 .into_response(); 317 } 318 if !is_did_web_byod 319 && let Err(e) = 320 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await 321 { 322 return ApiError::InvalidDid(e).into_response(); 323 } 324 info!(did = %d, "Creating external did:web account"); 325 d.clone() 326 } 327 _ => { 328 if let Some(d) = &input.did { 329 if d.starts_with("did:plc:") && is_migration { 330 info!(did = %d, "Migration with existing did:plc"); 331 d.clone() 332 } else if d.starts_with("did:web:") { 333 if !is_did_web_byod 334 && let Err(e) = verify_did_web( 335 d, 336 &hostname, 337 &input.handle, 338 input.signing_key.as_deref(), 339 ) 340 .await 341 { 342 return ApiError::InvalidDid(e).into_response(); 343 } 344 d.clone() 345 } else if !d.trim().is_empty() { 346 return ApiError::InvalidDid( 347 "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth.".into() 348 ) 349 .into_response(); 350 } else { 351 let rotation_key = std::env::var("PLC_ROTATION_KEY") 352 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 353 let genesis_result = match create_genesis_operation( 354 &signing_key, 355 &rotation_key, 356 &handle, 357 &pds_endpoint, 358 ) { 359 Ok(r) => r, 360 Err(e) => { 361 error!("Error creating PLC genesis operation: {:?}", e); 362 return ApiError::InternalError(Some( 363 "Failed to create PLC operation".into(), 364 )) 365 .into_response(); 366 } 367 }; 368 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone())); 369 if let Err(e) = plc_client 370 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 371 .await 372 { 373 error!("Failed to submit PLC genesis operation: {:?}", e); 374 return ApiError::UpstreamErrorMsg(format!( 375 "Failed to register DID with PLC directory: {}", 376 e 377 )) 378 .into_response(); 379 } 380 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 381 genesis_result.did 382 } 383 } else { 384 let rotation_key = std::env::var("PLC_ROTATION_KEY") 385 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 386 let genesis_result = match create_genesis_operation( 387 &signing_key, 388 &rotation_key, 389 &handle, 390 &pds_endpoint, 391 ) { 392 Ok(r) => r, 393 Err(e) => { 394 error!("Error creating PLC genesis operation: {:?}", e); 395 return ApiError::InternalError(Some( 396 "Failed to create PLC operation".into(), 397 )) 398 .into_response(); 399 } 400 }; 401 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone())); 402 if let Err(e) = plc_client 403 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 404 .await 405 { 406 error!("Failed to submit PLC genesis operation: {:?}", e); 407 return ApiError::UpstreamErrorMsg(format!( 408 "Failed to register DID with PLC directory: {}", 409 e 410 )) 411 .into_response(); 412 } 413 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 414 genesis_result.did 415 } 416 } 417 }; 418 let mut tx = match state.db.begin().await { 419 Ok(tx) => tx, 420 Err(e) => { 421 error!("Error starting transaction: {:?}", e); 422 return ApiError::InternalError(None).into_response(); 423 } 424 }; 425 if is_migration { 426 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 427 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 428 .bind(&did) 429 .fetch_optional(&mut *tx) 430 .await 431 .unwrap_or(None); 432 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 433 if deactivated_at.is_some() { 434 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 435 let update_result: Result<_, sqlx::Error> = 436 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 437 .bind(&handle) 438 .bind(account_id) 439 .execute(&mut *tx) 440 .await; 441 if let Err(e) = update_result { 442 if let Some(db_err) = e.as_database_error() 443 && db_err 444 .constraint() 445 .map(|c| c.contains("handle")) 446 .unwrap_or(false) 447 { 448 return ApiError::HandleTaken.into_response(); 449 } 450 error!("Error reactivating account: {:?}", e); 451 return ApiError::InternalError(None).into_response(); 452 } 453 if let Err(e) = tx.commit().await { 454 error!("Error committing reactivation: {:?}", e); 455 return ApiError::InternalError(None).into_response(); 456 } 457 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 458 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 459 ) 460 .bind(account_id) 461 .fetch_optional(&state.db) 462 .await 463 .unwrap_or(None); 464 let secret_key_bytes = match key_row { 465 Some((key_bytes, encryption_version)) => { 466 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 467 Ok(k) => k, 468 Err(e) => { 469 error!("Error decrypting key for reactivated account: {:?}", e); 470 return ApiError::InternalError(None).into_response(); 471 } 472 } 473 } 474 None => { 475 error!("No signing key found for reactivated account"); 476 return ApiError::InternalError(Some( 477 "Account signing key not found".into(), 478 )) 479 .into_response(); 480 } 481 }; 482 let access_meta = 483 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 484 Ok(m) => m, 485 Err(e) => { 486 error!("Error creating access token: {:?}", e); 487 return ApiError::InternalError(None).into_response(); 488 } 489 }; 490 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 491 &did, 492 &secret_key_bytes, 493 ) { 494 Ok(m) => m, 495 Err(e) => { 496 error!("Error creating refresh token: {:?}", e); 497 return ApiError::InternalError(None).into_response(); 498 } 499 }; 500 let session_result: Result<_, sqlx::Error> = sqlx::query( 501 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 502 ) 503 .bind(&did) 504 .bind(&access_meta.jti) 505 .bind(&refresh_meta.jti) 506 .bind(access_meta.expires_at) 507 .bind(refresh_meta.expires_at) 508 .execute(&state.db) 509 .await; 510 if let Err(e) = session_result { 511 error!("Error creating session: {:?}", e); 512 return ApiError::InternalError(None).into_response(); 513 } 514 return ( 515 axum::http::StatusCode::OK, 516 Json(CreateAccountOutput { 517 handle: handle.clone().into(), 518 did: did.clone().into(), 519 did_doc: state.did_resolver.resolve_did_document(&did).await, 520 access_jwt: access_meta.token, 521 refresh_jwt: refresh_meta.token, 522 verification_required: false, 523 verification_channel: "email".to_string(), 524 }), 525 ) 526 .into_response(); 527 } else { 528 return ApiError::AccountAlreadyExists.into_response(); 529 } 530 } 531 } 532 let exists_result: Option<(i32,)> = 533 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 534 .bind(&handle) 535 .fetch_optional(&mut *tx) 536 .await 537 .unwrap_or(None); 538 if exists_result.is_some() { 539 return ApiError::HandleTaken.into_response(); 540 } 541 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 542 .map(|v| v == "true" || v == "1") 543 .unwrap_or(false); 544 if invite_code_required 545 && input 546 .invite_code 547 .as_ref() 548 .map(|c| c.trim().is_empty()) 549 .unwrap_or(true) 550 { 551 return ApiError::InviteCodeRequired.into_response(); 552 } 553 if let Some(code) = &input.invite_code 554 && !code.trim().is_empty() 555 { 556 let invite_query = sqlx::query!( 557 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 558 code 559 ) 560 .fetch_optional(&mut *tx) 561 .await; 562 match invite_query { 563 Ok(Some(row)) => { 564 if row.available_uses <= 0 { 565 return ApiError::InvalidInviteCode.into_response(); 566 } 567 let update_invite = sqlx::query!( 568 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 569 code 570 ) 571 .execute(&mut *tx) 572 .await; 573 if let Err(e) = update_invite { 574 error!("Error updating invite code: {:?}", e); 575 return ApiError::InternalError(None).into_response(); 576 } 577 } 578 Ok(None) => { 579 return ApiError::InvalidInviteCode.into_response(); 580 } 581 Err(e) => { 582 error!("Error checking invite code: {:?}", e); 583 return ApiError::InternalError(None).into_response(); 584 } 585 } 586 } 587 if let Err(e) = validate_password(&input.password) { 588 return ApiError::InvalidRequest(e.to_string()).into_response(); 589 } 590 591 let password_clone = input.password.clone(); 592 let password_hash = 593 match tokio::task::spawn_blocking(move || hash(&password_clone, DEFAULT_COST)).await { 594 Ok(Ok(h)) => h, 595 Ok(Err(e)) => { 596 error!("Error hashing password: {:?}", e); 597 return ApiError::InternalError(None).into_response(); 598 } 599 Err(e) => { 600 error!("Failed to spawn blocking task: {:?}", e); 601 return ApiError::InternalError(None).into_response(); 602 } 603 }; 604 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 605 .fetch_one(&mut *tx) 606 .await 607 .map(|c| c.unwrap_or(0) == 0) 608 .unwrap_or(false); 609 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod { 610 Some(chrono::Utc::now()) 611 } else { 612 None 613 }; 614 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 615 r#"INSERT INTO users ( 616 handle, email, did, password_hash, 617 preferred_comms_channel, 618 discord_id, telegram_username, signal_number, 619 is_admin, deactivated_at, email_verified 620 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 621 ) 622 .bind(&handle) 623 .bind(&email) 624 .bind(&did) 625 .bind(&password_hash) 626 .bind(verification_channel) 627 .bind( 628 input 629 .discord_id 630 .as_deref() 631 .map(|s| s.trim()) 632 .filter(|s| !s.is_empty()), 633 ) 634 .bind( 635 input 636 .telegram_username 637 .as_deref() 638 .map(|s| s.trim()) 639 .filter(|s| !s.is_empty()), 640 ) 641 .bind( 642 input 643 .signal_number 644 .as_deref() 645 .map(|s| s.trim()) 646 .filter(|s| !s.is_empty()), 647 ) 648 .bind(is_first_user) 649 .bind(deactivated_at) 650 .bind(false) 651 .fetch_one(&mut *tx) 652 .await; 653 let user_id = match user_insert { 654 Ok((id,)) => id, 655 Err(e) => { 656 if let Some(db_err) = e.as_database_error() 657 && db_err.code().as_deref() == Some("23505") 658 { 659 let constraint = db_err.constraint().unwrap_or(""); 660 if constraint.contains("handle") || constraint.contains("users_handle") { 661 return ApiError::HandleNotAvailable(None).into_response(); 662 } else if constraint.contains("email") || constraint.contains("users_email") { 663 return ApiError::EmailTaken.into_response(); 664 } else if constraint.contains("did") || constraint.contains("users_did") { 665 return ApiError::AccountAlreadyExists.into_response(); 666 } 667 } 668 error!("Error inserting user: {:?}", e); 669 return ApiError::InternalError(None).into_response(); 670 } 671 }; 672 673 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 674 Ok(enc) => enc, 675 Err(e) => { 676 error!("Error encrypting user key: {:?}", e); 677 return ApiError::InternalError(None).into_response(); 678 } 679 }; 680 let key_insert = sqlx::query!( 681 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 682 user_id, 683 &encrypted_key_bytes[..], 684 crate::config::ENCRYPTION_VERSION 685 ) 686 .execute(&mut *tx) 687 .await; 688 if let Err(e) = key_insert { 689 error!("Error inserting user key: {:?}", e); 690 return ApiError::InternalError(None).into_response(); 691 } 692 if let Some(key_id) = reserved_key_id { 693 let mark_used = sqlx::query!( 694 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 695 key_id 696 ) 697 .execute(&mut *tx) 698 .await; 699 if let Err(e) = mark_used { 700 error!("Error marking reserved key as used: {:?}", e); 701 return ApiError::InternalError(None).into_response(); 702 } 703 } 704 let mst = Mst::new(Arc::new(state.block_store.clone())); 705 let mst_root = match mst.persist().await { 706 Ok(c) => c, 707 Err(e) => { 708 error!("Error persisting MST: {:?}", e); 709 return ApiError::InternalError(None).into_response(); 710 } 711 }; 712 let rev = Tid::now(LimitedU32::MIN); 713 let did_for_commit = Did::new_unchecked(&did); 714 let (commit_bytes, _sig) = 715 match create_signed_commit(&did_for_commit, mst_root, rev.as_ref(), None, &signing_key) { 716 Ok(result) => result, 717 Err(e) => { 718 error!("Error creating genesis commit: {:?}", e); 719 return ApiError::InternalError(None).into_response(); 720 } 721 }; 722 let commit_cid = match state.block_store.put(&commit_bytes).await { 723 Ok(c) => c, 724 Err(e) => { 725 error!("Error saving genesis commit: {:?}", e); 726 return ApiError::InternalError(None).into_response(); 727 } 728 }; 729 let commit_cid_str = commit_cid.to_string(); 730 let rev_str = rev.as_ref().to_string(); 731 let repo_insert = sqlx::query!( 732 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", 733 user_id, 734 commit_cid_str, 735 rev_str 736 ) 737 .execute(&mut *tx) 738 .await; 739 if let Err(e) = repo_insert { 740 error!("Error initializing repo: {:?}", e); 741 return ApiError::InternalError(None).into_response(); 742 } 743 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()]; 744 if let Err(e) = sqlx::query!( 745 r#" 746 INSERT INTO user_blocks (user_id, block_cid) 747 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 748 ON CONFLICT (user_id, block_cid) DO NOTHING 749 "#, 750 user_id, 751 &genesis_block_cids 752 ) 753 .execute(&mut *tx) 754 .await 755 { 756 error!("Error inserting user_blocks: {:?}", e); 757 return ApiError::InternalError(None).into_response(); 758 } 759 if let Some(code) = &input.invite_code 760 && !code.trim().is_empty() 761 { 762 let use_insert = sqlx::query!( 763 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 764 code, 765 user_id 766 ) 767 .execute(&mut *tx) 768 .await; 769 if let Err(e) = use_insert { 770 error!("Error recording invite usage: {:?}", e); 771 return ApiError::InternalError(None).into_response(); 772 } 773 } 774 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() { 775 let birthdate_pref = json!({ 776 "$type": "app.bsky.actor.defs#personalDetailsPref", 777 "birthDate": "1998-05-06T00:00:00.000Z" 778 }); 779 if let Err(e) = sqlx::query!( 780 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3) 781 ON CONFLICT (user_id, name) DO NOTHING", 782 user_id, 783 "app.bsky.actor.defs#personalDetailsPref", 784 birthdate_pref 785 ) 786 .execute(&mut *tx) 787 .await 788 { 789 warn!("Failed to set default birthdate preference: {:?}", e); 790 } 791 } 792 if let Err(e) = tx.commit().await { 793 error!("Error committing transaction: {:?}", e); 794 return ApiError::InternalError(None).into_response(); 795 } 796 if !is_migration && !is_did_web_byod { 797 let did_typed = Did::new_unchecked(&did); 798 let handle_typed = Handle::new_unchecked(&handle); 799 if let Err(e) = 800 crate::api::repo::record::sequence_identity_event(&state, &did_typed, Some(&handle_typed)).await 801 { 802 warn!("Failed to sequence identity event for {}: {}", did, e); 803 } 804 if let Err(e) = 805 crate::api::repo::record::sequence_account_event(&state, &did_typed, true, None).await 806 { 807 warn!("Failed to sequence account event for {}: {}", did, e); 808 } 809 if let Err(e) = crate::api::repo::record::sequence_genesis_commit( 810 &state, 811 &did_typed, 812 &commit_cid, 813 &mst_root, 814 &rev_str, 815 ) 816 .await 817 { 818 warn!("Failed to sequence commit event for {}: {}", did, e); 819 } 820 if let Err(e) = crate::api::repo::record::sequence_sync_event( 821 &state, 822 &did_typed, 823 &commit_cid_str, 824 Some(rev.as_ref()), 825 ) 826 .await 827 { 828 warn!("Failed to sequence sync event for {}: {}", did, e); 829 } 830 let profile_record = json!({ 831 "$type": "app.bsky.actor.profile", 832 "displayName": input.handle 833 }); 834 let profile_collection = Nsid::new_unchecked("app.bsky.actor.profile"); 835 let profile_rkey = Rkey::new_unchecked("self"); 836 if let Err(e) = crate::api::repo::record::create_record_internal( 837 &state, 838 &did_typed, 839 &profile_collection, 840 &profile_rkey, 841 &profile_record, 842 ) 843 .await 844 { 845 warn!("Failed to create default profile for {}: {}", did, e); 846 } 847 } 848 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 849 if !is_migration { 850 if let Some(ref recipient) = verification_recipient { 851 let verification_token = crate::auth::verification_token::generate_signup_token( 852 &did, 853 verification_channel, 854 recipient, 855 ); 856 let formatted_token = 857 crate::auth::verification_token::format_token_for_display(&verification_token); 858 if let Err(e) = crate::comms::enqueue_signup_verification( 859 &state.db, 860 user_id, 861 verification_channel, 862 recipient, 863 &formatted_token, 864 None, 865 ) 866 .await 867 { 868 warn!( 869 "Failed to enqueue signup verification notification: {:?}", 870 e 871 ); 872 } 873 } 874 } else if let Some(ref user_email) = email { 875 let token = crate::auth::verification_token::generate_migration_token(&did, user_email); 876 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 877 if let Err(e) = crate::comms::enqueue_migration_verification( 878 &state.db, 879 user_id, 880 user_email, 881 &formatted_token, 882 &hostname, 883 ) 884 .await 885 { 886 warn!("Failed to enqueue migration verification email: {:?}", e); 887 } 888 } 889 890 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) 891 { 892 Ok(m) => m, 893 Err(e) => { 894 error!("createAccount: Error creating access token: {:?}", e); 895 return ApiError::InternalError(None).into_response(); 896 } 897 }; 898 let refresh_meta = 899 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 900 Ok(m) => m, 901 Err(e) => { 902 error!("createAccount: Error creating refresh token: {:?}", e); 903 return ApiError::InternalError(None).into_response(); 904 } 905 }; 906 if let Err(e) = sqlx::query!( 907 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 908 did, 909 access_meta.jti, 910 refresh_meta.jti, 911 access_meta.expires_at, 912 refresh_meta.expires_at 913 ) 914 .execute(&state.db) 915 .await 916 { 917 error!("createAccount: Error creating session: {:?}", e); 918 return ApiError::InternalError(None).into_response(); 919 } 920 921 let did_doc = state.did_resolver.resolve_did_document(&did).await; 922 923 if is_migration { 924 info!( 925 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}", 926 did, handle 927 ); 928 } 929 930 ( 931 StatusCode::OK, 932 Json(CreateAccountOutput { 933 handle: handle.clone().into(), 934 did: did.into(), 935 did_doc, 936 access_jwt: access_meta.token, 937 refresh_jwt: refresh_meta.token, 938 verification_required: !is_migration, 939 verification_channel: verification_channel.to_string(), 940 }), 941 ) 942 .into_response() 943}