this repo has no description
at main 36 kB view raw
1use super::did::verify_did_web; 2use crate::api::error::ApiError; 3use crate::api::repo::record::utils::create_signed_commit; 4use crate::auth::{ServiceTokenVerifier, is_service_token}; 5use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 6use crate::state::{AppState, RateLimitKind}; 7use crate::types::{Did, Handle, Nsid, PlainPassword, Rkey}; 8use crate::validation::validate_password; 9use axum::{ 10 Json, 11 extract::State, 12 http::{HeaderMap, StatusCode}, 13 response::{IntoResponse, Response}, 14}; 15use bcrypt::{DEFAULT_COST, hash}; 16use jacquard::types::{integer::LimitedU32, string::Tid}; 17use jacquard_repo::{mst::Mst, storage::BlockStore}; 18use k256::{SecretKey, ecdsa::SigningKey}; 19use rand::rngs::OsRng; 20use serde::{Deserialize, Serialize}; 21use serde_json::json; 22use std::sync::Arc; 23use tracing::{debug, error, info, warn}; 24 25fn extract_client_ip(headers: &HeaderMap) -> String { 26 if let Some(forwarded) = headers.get("x-forwarded-for") 27 && let Ok(value) = forwarded.to_str() 28 && let Some(first_ip) = value.split(',').next() 29 { 30 return first_ip.trim().to_string(); 31 } 32 if let Some(real_ip) = headers.get("x-real-ip") 33 && let Ok(value) = real_ip.to_str() 34 { 35 return value.trim().to_string(); 36 } 37 "unknown".to_string() 38} 39 40#[derive(Deserialize)] 41#[serde(rename_all = "camelCase")] 42pub struct CreateAccountInput { 43 pub handle: String, 44 pub email: Option<String>, 45 pub password: PlainPassword, 46 pub invite_code: Option<String>, 47 pub did: Option<String>, 48 pub did_type: Option<String>, 49 pub signing_key: Option<String>, 50 pub verification_channel: Option<String>, 51 pub discord_id: Option<String>, 52 pub telegram_username: Option<String>, 53 pub signal_number: Option<String>, 54} 55 56#[derive(Serialize)] 57#[serde(rename_all = "camelCase")] 58pub struct CreateAccountOutput { 59 pub handle: Handle, 60 pub did: Did, 61 #[serde(skip_serializing_if = "Option::is_none")] 62 pub did_doc: Option<serde_json::Value>, 63 pub access_jwt: String, 64 pub refresh_jwt: String, 65 pub verification_required: bool, 66 pub verification_channel: String, 67} 68 69pub async fn create_account( 70 State(state): State<AppState>, 71 headers: HeaderMap, 72 Json(input): Json<CreateAccountInput>, 73) -> Response { 74 let is_potential_migration = input 75 .did 76 .as_ref() 77 .map(|d| d.starts_with("did:plc:")) 78 .unwrap_or(false); 79 if is_potential_migration { 80 info!( 81 "[MIGRATION] createAccount called for potential migration did={:?} handle={}", 82 input.did, input.handle 83 ); 84 } else { 85 info!("create_account called"); 86 } 87 let client_ip = extract_client_ip(&headers); 88 if !state 89 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 90 .await 91 { 92 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 93 return ApiError::RateLimitExceeded(Some( 94 "Too many account creation attempts. Please try again later.".into(), 95 )) 96 .into_response(); 97 } 98 99 let migration_auth = if let Some(extracted) = crate::auth::extract_auth_token_from_header( 100 headers.get("Authorization").and_then(|h| h.to_str().ok()), 101 ) { 102 let token = extracted.token; 103 if is_service_token(&token) { 104 let verifier = ServiceTokenVerifier::new(); 105 match verifier 106 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 107 .await 108 { 109 Ok(claims) => { 110 debug!("Service token verified for migration: iss={}", claims.iss); 111 Some(claims.iss) 112 } 113 Err(e) => { 114 error!("Service token verification failed: {:?}", e); 115 return ApiError::AuthenticationFailed(Some(format!( 116 "Service token verification failed: {}", 117 e 118 ))) 119 .into_response(); 120 } 121 } 122 } else { 123 None 124 } 125 } else { 126 None 127 }; 128 129 let is_did_web_byod = migration_auth.is_some() 130 && input 131 .did 132 .as_ref() 133 .map(|d| d.starts_with("did:web:")) 134 .unwrap_or(false); 135 136 let is_migration = migration_auth.is_some() 137 && input 138 .did 139 .as_ref() 140 .map(|d| d.starts_with("did:plc:")) 141 .unwrap_or(false); 142 143 if (is_migration || is_did_web_byod) 144 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref()) 145 { 146 if provided_did != auth_did { 147 info!( 148 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}", 149 auth_did, provided_did 150 ); 151 return ApiError::AuthorizationError(format!( 152 "Service token issuer {} does not match DID {}", 153 auth_did, provided_did 154 )) 155 .into_response(); 156 } 157 if is_did_web_byod { 158 info!(did = %provided_did, "Processing did:web BYOD account creation"); 159 } else { 160 info!( 161 "[MIGRATION] createAccount: Service token verified, processing migration for did={}", 162 provided_did 163 ); 164 } 165 } 166 167 let hostname_for_validation = 168 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 169 let pds_suffix = format!(".{}", hostname_for_validation); 170 171 let validated_short_handle = if !input.handle.contains('.') 172 || input.handle.ends_with(&pds_suffix) 173 { 174 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 175 input 176 .handle 177 .strip_suffix(&pds_suffix) 178 .unwrap_or(&input.handle) 179 } else { 180 &input.handle 181 }; 182 match crate::api::validation::validate_short_handle(handle_to_validate) { 183 Ok(h) => h, 184 Err(e) => { 185 return ApiError::from(e).into_response(); 186 } 187 } 188 } else { 189 if input.handle.contains(' ') || input.handle.contains('\t') { 190 return ApiError::InvalidRequest("Handle cannot contain spaces".into()).into_response(); 191 } 192 if let Some(c) = input 193 .handle 194 .chars() 195 .find(|c| !c.is_ascii_alphanumeric() && *c != '.' && *c != '-') 196 { 197 return ApiError::InvalidRequest(format!("Handle contains invalid character: {}", c)) 198 .into_response(); 199 } 200 let handle_lower = input.handle.to_lowercase(); 201 if crate::moderation::has_explicit_slur(&handle_lower) { 202 return ApiError::InvalidRequest("Inappropriate language in handle".into()) 203 .into_response(); 204 } 205 handle_lower 206 }; 207 let email: Option<String> = input 208 .email 209 .as_ref() 210 .map(|e| e.trim().to_string()) 211 .filter(|e| !e.is_empty()); 212 if let Some(ref email) = email 213 && !crate::api::validation::is_valid_email(email) 214 { 215 return ApiError::InvalidEmail.into_response(); 216 } 217 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 218 let valid_channels = ["email", "discord", "telegram", "signal"]; 219 if !valid_channels.contains(&verification_channel) && !is_migration { 220 return ApiError::InvalidVerificationChannel.into_response(); 221 } 222 let verification_recipient = if is_migration { 223 None 224 } else { 225 Some(match verification_channel { 226 "email" => match &input.email { 227 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 228 _ => return ApiError::MissingEmail.into_response(), 229 }, 230 "discord" => match &input.discord_id { 231 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 232 _ => return ApiError::MissingDiscordId.into_response(), 233 }, 234 "telegram" => match &input.telegram_username { 235 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 236 _ => return ApiError::MissingTelegramUsername.into_response(), 237 }, 238 "signal" => match &input.signal_number { 239 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 240 _ => return ApiError::MissingSignalNumber.into_response(), 241 }, 242 _ => return ApiError::InvalidVerificationChannel.into_response(), 243 }) 244 }; 245 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 246 let pds_endpoint = format!("https://{}", hostname); 247 let suffix = format!(".{}", hostname); 248 let handle = if input.handle.ends_with(&suffix) { 249 format!("{}.{}", validated_short_handle, hostname) 250 } else if input.handle.contains('.') { 251 validated_short_handle.clone() 252 } else { 253 format!("{}.{}", validated_short_handle, hostname) 254 }; 255 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 256 if let Some(signing_key_did) = &input.signing_key { 257 let reserved = sqlx::query!( 258 r#" 259 SELECT id, private_key_bytes 260 FROM reserved_signing_keys 261 WHERE public_key_did_key = $1 262 AND used_at IS NULL 263 AND expires_at > NOW() 264 FOR UPDATE 265 "#, 266 signing_key_did 267 ) 268 .fetch_optional(&state.db) 269 .await; 270 match reserved { 271 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 272 Ok(None) => { 273 return ApiError::InvalidSigningKey.into_response(); 274 } 275 Err(e) => { 276 error!("Error looking up reserved signing key: {:?}", e); 277 return ApiError::InternalError(None).into_response(); 278 } 279 } 280 } else { 281 let secret_key = SecretKey::random(&mut OsRng); 282 (secret_key.to_bytes().to_vec(), None) 283 }; 284 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 285 Ok(k) => k, 286 Err(e) => { 287 error!("Error creating signing key: {:?}", e); 288 return ApiError::InternalError(None).into_response(); 289 } 290 }; 291 let did_type = input.did_type.as_deref().unwrap_or("plc"); 292 let did = match did_type { 293 "web" => { 294 if !crate::api::server::meta::is_self_hosted_did_web_enabled() { 295 return ApiError::SelfHostedDidWebDisabled.into_response(); 296 } 297 let subdomain_host = format!("{}.{}", input.handle, hostname); 298 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 299 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 300 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 301 self_hosted_did 302 } 303 "web-external" => { 304 let d = match &input.did { 305 Some(d) if !d.trim().is_empty() => d, 306 _ => { 307 return ApiError::InvalidRequest( 308 "External did:web requires the 'did' field to be provided".into(), 309 ) 310 .into_response(); 311 } 312 }; 313 if !d.starts_with("did:web:") { 314 return ApiError::InvalidDid("External DID must be a did:web".into()) 315 .into_response(); 316 } 317 if !is_did_web_byod 318 && let Err(e) = 319 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await 320 { 321 return ApiError::InvalidDid(e).into_response(); 322 } 323 info!(did = %d, "Creating external did:web account"); 324 d.clone() 325 } 326 _ => { 327 if let Some(d) = &input.did { 328 if d.starts_with("did:plc:") && is_migration { 329 info!(did = %d, "Migration with existing did:plc"); 330 d.clone() 331 } else if d.starts_with("did:web:") { 332 if !is_did_web_byod 333 && let Err(e) = verify_did_web( 334 d, 335 &hostname, 336 &input.handle, 337 input.signing_key.as_deref(), 338 ) 339 .await 340 { 341 return ApiError::InvalidDid(e).into_response(); 342 } 343 d.clone() 344 } else if !d.trim().is_empty() { 345 return ApiError::InvalidDid( 346 "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth.".into() 347 ) 348 .into_response(); 349 } else { 350 let rotation_key = std::env::var("PLC_ROTATION_KEY") 351 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 352 let genesis_result = match create_genesis_operation( 353 &signing_key, 354 &rotation_key, 355 &handle, 356 &pds_endpoint, 357 ) { 358 Ok(r) => r, 359 Err(e) => { 360 error!("Error creating PLC genesis operation: {:?}", e); 361 return ApiError::InternalError(Some( 362 "Failed to create PLC operation".into(), 363 )) 364 .into_response(); 365 } 366 }; 367 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone())); 368 if let Err(e) = plc_client 369 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 370 .await 371 { 372 error!("Failed to submit PLC genesis operation: {:?}", e); 373 return ApiError::UpstreamErrorMsg(format!( 374 "Failed to register DID with PLC directory: {}", 375 e 376 )) 377 .into_response(); 378 } 379 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 380 genesis_result.did 381 } 382 } else { 383 let rotation_key = std::env::var("PLC_ROTATION_KEY") 384 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 385 let genesis_result = match create_genesis_operation( 386 &signing_key, 387 &rotation_key, 388 &handle, 389 &pds_endpoint, 390 ) { 391 Ok(r) => r, 392 Err(e) => { 393 error!("Error creating PLC genesis operation: {:?}", e); 394 return ApiError::InternalError(Some( 395 "Failed to create PLC operation".into(), 396 )) 397 .into_response(); 398 } 399 }; 400 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone())); 401 if let Err(e) = plc_client 402 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 403 .await 404 { 405 error!("Failed to submit PLC genesis operation: {:?}", e); 406 return ApiError::UpstreamErrorMsg(format!( 407 "Failed to register DID with PLC directory: {}", 408 e 409 )) 410 .into_response(); 411 } 412 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 413 genesis_result.did 414 } 415 } 416 }; 417 let mut tx = match state.db.begin().await { 418 Ok(tx) => tx, 419 Err(e) => { 420 error!("Error starting transaction: {:?}", e); 421 return ApiError::InternalError(None).into_response(); 422 } 423 }; 424 if is_migration { 425 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 426 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 427 .bind(&did) 428 .fetch_optional(&mut *tx) 429 .await 430 .unwrap_or(None); 431 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 432 if deactivated_at.is_some() { 433 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 434 let update_result: Result<_, sqlx::Error> = 435 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 436 .bind(&handle) 437 .bind(account_id) 438 .execute(&mut *tx) 439 .await; 440 if let Err(e) = update_result { 441 if let Some(db_err) = e.as_database_error() 442 && db_err 443 .constraint() 444 .map(|c| c.contains("handle")) 445 .unwrap_or(false) 446 { 447 return ApiError::HandleTaken.into_response(); 448 } 449 error!("Error reactivating account: {:?}", e); 450 return ApiError::InternalError(None).into_response(); 451 } 452 if let Err(e) = tx.commit().await { 453 error!("Error committing reactivation: {:?}", e); 454 return ApiError::InternalError(None).into_response(); 455 } 456 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 457 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 458 ) 459 .bind(account_id) 460 .fetch_optional(&state.db) 461 .await 462 .unwrap_or(None); 463 let secret_key_bytes = match key_row { 464 Some((key_bytes, encryption_version)) => { 465 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 466 Ok(k) => k, 467 Err(e) => { 468 error!("Error decrypting key for reactivated account: {:?}", e); 469 return ApiError::InternalError(None).into_response(); 470 } 471 } 472 } 473 None => { 474 error!("No signing key found for reactivated account"); 475 return ApiError::InternalError(Some( 476 "Account signing key not found".into(), 477 )) 478 .into_response(); 479 } 480 }; 481 let access_meta = 482 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 483 Ok(m) => m, 484 Err(e) => { 485 error!("Error creating access token: {:?}", e); 486 return ApiError::InternalError(None).into_response(); 487 } 488 }; 489 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 490 &did, 491 &secret_key_bytes, 492 ) { 493 Ok(m) => m, 494 Err(e) => { 495 error!("Error creating refresh token: {:?}", e); 496 return ApiError::InternalError(None).into_response(); 497 } 498 }; 499 let session_result: Result<_, sqlx::Error> = sqlx::query( 500 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 501 ) 502 .bind(&did) 503 .bind(&access_meta.jti) 504 .bind(&refresh_meta.jti) 505 .bind(access_meta.expires_at) 506 .bind(refresh_meta.expires_at) 507 .execute(&state.db) 508 .await; 509 if let Err(e) = session_result { 510 error!("Error creating session: {:?}", e); 511 return ApiError::InternalError(None).into_response(); 512 } 513 return ( 514 axum::http::StatusCode::OK, 515 Json(CreateAccountOutput { 516 handle: handle.clone().into(), 517 did: did.clone().into(), 518 did_doc: state.did_resolver.resolve_did_document(&did).await, 519 access_jwt: access_meta.token, 520 refresh_jwt: refresh_meta.token, 521 verification_required: false, 522 verification_channel: "email".to_string(), 523 }), 524 ) 525 .into_response(); 526 } else { 527 return ApiError::AccountAlreadyExists.into_response(); 528 } 529 } 530 } 531 let exists_result: Option<(i32,)> = 532 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 533 .bind(&handle) 534 .fetch_optional(&mut *tx) 535 .await 536 .unwrap_or(None); 537 if exists_result.is_some() { 538 return ApiError::HandleTaken.into_response(); 539 } 540 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 541 .map(|v| v == "true" || v == "1") 542 .unwrap_or(false); 543 if invite_code_required 544 && input 545 .invite_code 546 .as_ref() 547 .map(|c| c.trim().is_empty()) 548 .unwrap_or(true) 549 { 550 return ApiError::InviteCodeRequired.into_response(); 551 } 552 if let Some(code) = &input.invite_code 553 && !code.trim().is_empty() 554 { 555 let invite_query = sqlx::query!( 556 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 557 code 558 ) 559 .fetch_optional(&mut *tx) 560 .await; 561 match invite_query { 562 Ok(Some(row)) => { 563 if row.available_uses <= 0 { 564 return ApiError::InvalidInviteCode.into_response(); 565 } 566 let update_invite = sqlx::query!( 567 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 568 code 569 ) 570 .execute(&mut *tx) 571 .await; 572 if let Err(e) = update_invite { 573 error!("Error updating invite code: {:?}", e); 574 return ApiError::InternalError(None).into_response(); 575 } 576 } 577 Ok(None) => { 578 return ApiError::InvalidInviteCode.into_response(); 579 } 580 Err(e) => { 581 error!("Error checking invite code: {:?}", e); 582 return ApiError::InternalError(None).into_response(); 583 } 584 } 585 } 586 if let Err(e) = validate_password(&input.password) { 587 return ApiError::InvalidRequest(e.to_string()).into_response(); 588 } 589 590 let password_clone = input.password.clone(); 591 let password_hash = 592 match tokio::task::spawn_blocking(move || hash(&password_clone, DEFAULT_COST)).await { 593 Ok(Ok(h)) => h, 594 Ok(Err(e)) => { 595 error!("Error hashing password: {:?}", e); 596 return ApiError::InternalError(None).into_response(); 597 } 598 Err(e) => { 599 error!("Failed to spawn blocking task: {:?}", e); 600 return ApiError::InternalError(None).into_response(); 601 } 602 }; 603 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 604 .fetch_one(&mut *tx) 605 .await 606 .map(|c| c.unwrap_or(0) == 0) 607 .unwrap_or(false); 608 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod { 609 Some(chrono::Utc::now()) 610 } else { 611 None 612 }; 613 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 614 r#"INSERT INTO users ( 615 handle, email, did, password_hash, 616 preferred_comms_channel, 617 discord_id, telegram_username, signal_number, 618 is_admin, deactivated_at, email_verified 619 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 620 ) 621 .bind(&handle) 622 .bind(&email) 623 .bind(&did) 624 .bind(&password_hash) 625 .bind(verification_channel) 626 .bind( 627 input 628 .discord_id 629 .as_deref() 630 .map(|s| s.trim()) 631 .filter(|s| !s.is_empty()), 632 ) 633 .bind( 634 input 635 .telegram_username 636 .as_deref() 637 .map(|s| s.trim()) 638 .filter(|s| !s.is_empty()), 639 ) 640 .bind( 641 input 642 .signal_number 643 .as_deref() 644 .map(|s| s.trim()) 645 .filter(|s| !s.is_empty()), 646 ) 647 .bind(is_first_user) 648 .bind(deactivated_at) 649 .bind(false) 650 .fetch_one(&mut *tx) 651 .await; 652 let user_id = match user_insert { 653 Ok((id,)) => id, 654 Err(e) => { 655 if let Some(db_err) = e.as_database_error() 656 && db_err.code().as_deref() == Some("23505") 657 { 658 let constraint = db_err.constraint().unwrap_or(""); 659 if constraint.contains("handle") || constraint.contains("users_handle") { 660 return ApiError::HandleNotAvailable(None).into_response(); 661 } else if constraint.contains("email") || constraint.contains("users_email") { 662 return ApiError::EmailTaken.into_response(); 663 } else if constraint.contains("did") || constraint.contains("users_did") { 664 return ApiError::AccountAlreadyExists.into_response(); 665 } 666 } 667 error!("Error inserting user: {:?}", e); 668 return ApiError::InternalError(None).into_response(); 669 } 670 }; 671 672 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 673 Ok(enc) => enc, 674 Err(e) => { 675 error!("Error encrypting user key: {:?}", e); 676 return ApiError::InternalError(None).into_response(); 677 } 678 }; 679 let key_insert = sqlx::query!( 680 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 681 user_id, 682 &encrypted_key_bytes[..], 683 crate::config::ENCRYPTION_VERSION 684 ) 685 .execute(&mut *tx) 686 .await; 687 if let Err(e) = key_insert { 688 error!("Error inserting user key: {:?}", e); 689 return ApiError::InternalError(None).into_response(); 690 } 691 if let Some(key_id) = reserved_key_id { 692 let mark_used = sqlx::query!( 693 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 694 key_id 695 ) 696 .execute(&mut *tx) 697 .await; 698 if let Err(e) = mark_used { 699 error!("Error marking reserved key as used: {:?}", e); 700 return ApiError::InternalError(None).into_response(); 701 } 702 } 703 let mst = Mst::new(Arc::new(state.block_store.clone())); 704 let mst_root = match mst.persist().await { 705 Ok(c) => c, 706 Err(e) => { 707 error!("Error persisting MST: {:?}", e); 708 return ApiError::InternalError(None).into_response(); 709 } 710 }; 711 let rev = Tid::now(LimitedU32::MIN); 712 let did_for_commit = Did::new_unchecked(&did); 713 let (commit_bytes, _sig) = 714 match create_signed_commit(&did_for_commit, mst_root, rev.as_ref(), None, &signing_key) { 715 Ok(result) => result, 716 Err(e) => { 717 error!("Error creating genesis commit: {:?}", e); 718 return ApiError::InternalError(None).into_response(); 719 } 720 }; 721 let commit_cid = match state.block_store.put(&commit_bytes).await { 722 Ok(c) => c, 723 Err(e) => { 724 error!("Error saving genesis commit: {:?}", e); 725 return ApiError::InternalError(None).into_response(); 726 } 727 }; 728 let commit_cid_str = commit_cid.to_string(); 729 let rev_str = rev.as_ref().to_string(); 730 let repo_insert = sqlx::query!( 731 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", 732 user_id, 733 commit_cid_str, 734 rev_str 735 ) 736 .execute(&mut *tx) 737 .await; 738 if let Err(e) = repo_insert { 739 error!("Error initializing repo: {:?}", e); 740 return ApiError::InternalError(None).into_response(); 741 } 742 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()]; 743 if let Err(e) = sqlx::query!( 744 r#" 745 INSERT INTO user_blocks (user_id, block_cid) 746 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 747 ON CONFLICT (user_id, block_cid) DO NOTHING 748 "#, 749 user_id, 750 &genesis_block_cids 751 ) 752 .execute(&mut *tx) 753 .await 754 { 755 error!("Error inserting user_blocks: {:?}", e); 756 return ApiError::InternalError(None).into_response(); 757 } 758 if let Some(code) = &input.invite_code 759 && !code.trim().is_empty() 760 { 761 let use_insert = sqlx::query!( 762 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 763 code, 764 user_id 765 ) 766 .execute(&mut *tx) 767 .await; 768 if let Err(e) = use_insert { 769 error!("Error recording invite usage: {:?}", e); 770 return ApiError::InternalError(None).into_response(); 771 } 772 } 773 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() { 774 let birthdate_pref = json!({ 775 "$type": "app.bsky.actor.defs#personalDetailsPref", 776 "birthDate": "1998-05-06T00:00:00.000Z" 777 }); 778 if let Err(e) = sqlx::query!( 779 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3) 780 ON CONFLICT (user_id, name) DO NOTHING", 781 user_id, 782 "app.bsky.actor.defs#personalDetailsPref", 783 birthdate_pref 784 ) 785 .execute(&mut *tx) 786 .await 787 { 788 warn!("Failed to set default birthdate preference: {:?}", e); 789 } 790 } 791 if let Err(e) = tx.commit().await { 792 error!("Error committing transaction: {:?}", e); 793 return ApiError::InternalError(None).into_response(); 794 } 795 if !is_migration && !is_did_web_byod { 796 let did_typed = Did::new_unchecked(&did); 797 let handle_typed = Handle::new_unchecked(&handle); 798 if let Err(e) = crate::api::repo::record::sequence_identity_event( 799 &state, 800 &did_typed, 801 Some(&handle_typed), 802 ) 803 .await 804 { 805 warn!("Failed to sequence identity event for {}: {}", did, e); 806 } 807 if let Err(e) = 808 crate::api::repo::record::sequence_account_event(&state, &did_typed, true, None).await 809 { 810 warn!("Failed to sequence account event for {}: {}", did, e); 811 } 812 if let Err(e) = crate::api::repo::record::sequence_genesis_commit( 813 &state, 814 &did_typed, 815 &commit_cid, 816 &mst_root, 817 &rev_str, 818 ) 819 .await 820 { 821 warn!("Failed to sequence commit event for {}: {}", did, e); 822 } 823 if let Err(e) = crate::api::repo::record::sequence_sync_event( 824 &state, 825 &did_typed, 826 &commit_cid_str, 827 Some(rev.as_ref()), 828 ) 829 .await 830 { 831 warn!("Failed to sequence sync event for {}: {}", did, e); 832 } 833 let profile_record = json!({ 834 "$type": "app.bsky.actor.profile", 835 "displayName": input.handle 836 }); 837 let profile_collection = Nsid::new_unchecked("app.bsky.actor.profile"); 838 let profile_rkey = Rkey::new_unchecked("self"); 839 if let Err(e) = crate::api::repo::record::create_record_internal( 840 &state, 841 &did_typed, 842 &profile_collection, 843 &profile_rkey, 844 &profile_record, 845 ) 846 .await 847 { 848 warn!("Failed to create default profile for {}: {}", did, e); 849 } 850 } 851 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 852 if !is_migration { 853 if let Some(ref recipient) = verification_recipient { 854 let verification_token = crate::auth::verification_token::generate_signup_token( 855 &did, 856 verification_channel, 857 recipient, 858 ); 859 let formatted_token = 860 crate::auth::verification_token::format_token_for_display(&verification_token); 861 if let Err(e) = crate::comms::enqueue_signup_verification( 862 &state.db, 863 user_id, 864 verification_channel, 865 recipient, 866 &formatted_token, 867 None, 868 ) 869 .await 870 { 871 warn!( 872 "Failed to enqueue signup verification notification: {:?}", 873 e 874 ); 875 } 876 } 877 } else if let Some(ref user_email) = email { 878 let token = crate::auth::verification_token::generate_migration_token(&did, user_email); 879 let formatted_token = crate::auth::verification_token::format_token_for_display(&token); 880 if let Err(e) = crate::comms::enqueue_migration_verification( 881 &state.db, 882 user_id, 883 user_email, 884 &formatted_token, 885 &hostname, 886 ) 887 .await 888 { 889 warn!("Failed to enqueue migration verification email: {:?}", e); 890 } 891 } 892 893 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) 894 { 895 Ok(m) => m, 896 Err(e) => { 897 error!("createAccount: Error creating access token: {:?}", e); 898 return ApiError::InternalError(None).into_response(); 899 } 900 }; 901 let refresh_meta = 902 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 903 Ok(m) => m, 904 Err(e) => { 905 error!("createAccount: Error creating refresh token: {:?}", e); 906 return ApiError::InternalError(None).into_response(); 907 } 908 }; 909 if let Err(e) = sqlx::query!( 910 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 911 did, 912 access_meta.jti, 913 refresh_meta.jti, 914 access_meta.expires_at, 915 refresh_meta.expires_at 916 ) 917 .execute(&state.db) 918 .await 919 { 920 error!("createAccount: Error creating session: {:?}", e); 921 return ApiError::InternalError(None).into_response(); 922 } 923 924 let did_doc = state.did_resolver.resolve_did_document(&did).await; 925 926 if is_migration { 927 info!( 928 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}", 929 did, handle 930 ); 931 } 932 933 ( 934 StatusCode::OK, 935 Json(CreateAccountOutput { 936 handle: handle.clone().into(), 937 did: did.into(), 938 did_doc, 939 access_jwt: access_meta.token, 940 refresh_jwt: refresh_meta.token, 941 verification_required: !is_migration, 942 verification_channel: verification_channel.to_string(), 943 }), 944 ) 945 .into_response() 946}