this repo has no description
1use super::did::verify_did_web; 2use crate::api::repo::record::utils::create_signed_commit; 3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 5use crate::state::{AppState, RateLimitKind}; 6use crate::validation::validate_password; 7use axum::{ 8 Json, 9 extract::State, 10 http::{HeaderMap, StatusCode}, 11 response::{IntoResponse, Response}, 12}; 13use bcrypt::{DEFAULT_COST, hash}; 14use jacquard::types::{integer::LimitedU32, string::Tid}; 15use jacquard_repo::{mst::Mst, storage::BlockStore}; 16use k256::{SecretKey, ecdsa::SigningKey}; 17use rand::rngs::OsRng; 18use serde::{Deserialize, Serialize}; 19use serde_json::json; 20use std::sync::Arc; 21use tracing::{debug, error, info, warn}; 22 23fn extract_client_ip(headers: &HeaderMap) -> String { 24 if let Some(forwarded) = headers.get("x-forwarded-for") 25 && let Ok(value) = forwarded.to_str() 26 && let Some(first_ip) = value.split(',').next() 27 { 28 return first_ip.trim().to_string(); 29 } 30 if let Some(real_ip) = headers.get("x-real-ip") 31 && let Ok(value) = real_ip.to_str() 32 { 33 return value.trim().to_string(); 34 } 35 "unknown".to_string() 36} 37 38#[derive(Deserialize)] 39#[serde(rename_all = "camelCase")] 40pub struct CreateAccountInput { 41 pub handle: String, 42 pub email: Option<String>, 43 pub password: String, 44 pub invite_code: Option<String>, 45 pub did: Option<String>, 46 pub did_type: Option<String>, 47 pub signing_key: Option<String>, 48 pub verification_channel: Option<String>, 49 pub discord_id: Option<String>, 50 pub telegram_username: Option<String>, 51 pub signal_number: Option<String>, 52} 53 54#[derive(Serialize)] 55#[serde(rename_all = "camelCase")] 56pub struct CreateAccountOutput { 57 pub handle: String, 58 pub did: String, 59 #[serde(skip_serializing_if = "Option::is_none")] 60 pub access_jwt: Option<String>, 61 #[serde(skip_serializing_if = "Option::is_none")] 62 pub refresh_jwt: Option<String>, 63 pub verification_required: bool, 64 pub verification_channel: String, 65} 66 67pub async fn create_account( 68 State(state): State<AppState>, 69 headers: HeaderMap, 70 Json(input): Json<CreateAccountInput>, 71) -> Response { 72 info!("create_account called"); 73 let client_ip = extract_client_ip(&headers); 74 if !state 75 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 76 .await 77 { 78 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 79 return ( 80 StatusCode::TOO_MANY_REQUESTS, 81 Json(json!({ 82 "error": "RateLimitExceeded", 83 "message": "Too many account creation attempts. Please try again later." 84 })), 85 ) 86 .into_response(); 87 } 88 89 let migration_auth = if let Some(token) = 90 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 91 { 92 if is_service_token(&token) { 93 let verifier = ServiceTokenVerifier::new(); 94 match verifier 95 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 96 .await 97 { 98 Ok(claims) => { 99 debug!("Service token verified for migration: iss={}", claims.iss); 100 Some(claims.iss) 101 } 102 Err(e) => { 103 error!("Service token verification failed: {:?}", e); 104 return ( 105 StatusCode::UNAUTHORIZED, 106 Json(json!({ 107 "error": "AuthenticationFailed", 108 "message": format!("Service token verification failed: {}", e) 109 })), 110 ) 111 .into_response(); 112 } 113 } 114 } else { 115 None 116 } 117 } else { 118 None 119 }; 120 121 let is_migration = migration_auth.is_some() 122 && input 123 .did 124 .as_ref() 125 .map(|d| d.starts_with("did:plc:") || d.starts_with("did:web:")) 126 .unwrap_or(false); 127 128 let is_did_web_byod = migration_auth.is_some() 129 && input 130 .did 131 .as_ref() 132 .map(|d| d.starts_with("did:web:")) 133 .unwrap_or(false); 134 135 if is_migration { 136 if let (Some(migration_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref()) 137 { 138 if migration_did != auth_did { 139 return ( 140 StatusCode::FORBIDDEN, 141 Json(json!({ 142 "error": "AuthorizationError", 143 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did) 144 })), 145 ) 146 .into_response(); 147 } 148 if is_did_web_byod { 149 info!(did = %migration_did, "Processing did:web BYOD account creation"); 150 } else { 151 info!(did = %migration_did, "Processing account migration"); 152 } 153 } 154 } 155 156 let hostname_for_validation = 157 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 158 let pds_suffix = format!(".{}", hostname_for_validation); 159 160 let validated_short_handle = if !input.handle.contains('.') 161 || input.handle.ends_with(&pds_suffix) 162 { 163 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 164 input 165 .handle 166 .strip_suffix(&pds_suffix) 167 .unwrap_or(&input.handle) 168 } else { 169 &input.handle 170 }; 171 match crate::api::validation::validate_short_handle(handle_to_validate) { 172 Ok(h) => h, 173 Err(e) => { 174 return ( 175 StatusCode::BAD_REQUEST, 176 Json(json!({"error": "InvalidHandle", "message": e.to_string()})), 177 ) 178 .into_response(); 179 } 180 } 181 } else { 182 if input.handle.contains(' ') || input.handle.contains('\t') { 183 return ( 184 StatusCode::BAD_REQUEST, 185 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})), 186 ) 187 .into_response(); 188 } 189 for c in input.handle.chars() { 190 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' { 191 return ( 192 StatusCode::BAD_REQUEST, 193 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})), 194 ) 195 .into_response(); 196 } 197 } 198 input.handle.to_lowercase() 199 }; 200 let email: Option<String> = input 201 .email 202 .as_ref() 203 .map(|e| e.trim().to_string()) 204 .filter(|e| !e.is_empty()); 205 if let Some(ref email) = email 206 && !crate::api::validation::is_valid_email(email) 207 { 208 return ( 209 StatusCode::BAD_REQUEST, 210 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 211 ) 212 .into_response(); 213 } 214 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 215 let valid_channels = ["email", "discord", "telegram", "signal"]; 216 if !valid_channels.contains(&verification_channel) && !is_migration { 217 return ( 218 StatusCode::BAD_REQUEST, 219 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 220 ) 221 .into_response(); 222 } 223 let verification_recipient = if is_migration { 224 None 225 } else { 226 Some(match verification_channel { 227 "email" => match &input.email { 228 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 229 _ => return ( 230 StatusCode::BAD_REQUEST, 231 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 232 ).into_response(), 233 }, 234 "discord" => match &input.discord_id { 235 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 236 _ => return ( 237 StatusCode::BAD_REQUEST, 238 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 239 ).into_response(), 240 }, 241 "telegram" => match &input.telegram_username { 242 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 243 _ => return ( 244 StatusCode::BAD_REQUEST, 245 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 246 ).into_response(), 247 }, 248 "signal" => match &input.signal_number { 249 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 250 _ => return ( 251 StatusCode::BAD_REQUEST, 252 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 253 ).into_response(), 254 }, 255 _ => return ( 256 StatusCode::BAD_REQUEST, 257 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 258 ).into_response(), 259 }) 260 }; 261 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 262 let pds_endpoint = format!("https://{}", hostname); 263 let suffix = format!(".{}", hostname); 264 let handle = if input.handle.ends_with(&suffix) { 265 format!("{}.{}", validated_short_handle, hostname) 266 } else if input.handle.contains('.') { 267 validated_short_handle.clone() 268 } else { 269 format!("{}.{}", validated_short_handle, hostname) 270 }; 271 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 272 if let Some(signing_key_did) = &input.signing_key { 273 let reserved = sqlx::query!( 274 r#" 275 SELECT id, private_key_bytes 276 FROM reserved_signing_keys 277 WHERE public_key_did_key = $1 278 AND used_at IS NULL 279 AND expires_at > NOW() 280 FOR UPDATE 281 "#, 282 signing_key_did 283 ) 284 .fetch_optional(&state.db) 285 .await; 286 match reserved { 287 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 288 Ok(None) => { 289 return ( 290 StatusCode::BAD_REQUEST, 291 Json(json!({ 292 "error": "InvalidSigningKey", 293 "message": "Signing key not found, already used, or expired" 294 })), 295 ) 296 .into_response(); 297 } 298 Err(e) => { 299 error!("Error looking up reserved signing key: {:?}", e); 300 return ( 301 StatusCode::INTERNAL_SERVER_ERROR, 302 Json(json!({"error": "InternalError"})), 303 ) 304 .into_response(); 305 } 306 } 307 } else { 308 let secret_key = SecretKey::random(&mut OsRng); 309 (secret_key.to_bytes().to_vec(), None) 310 }; 311 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 312 Ok(k) => k, 313 Err(e) => { 314 error!("Error creating signing key: {:?}", e); 315 return ( 316 StatusCode::INTERNAL_SERVER_ERROR, 317 Json(json!({"error": "InternalError"})), 318 ) 319 .into_response(); 320 } 321 }; 322 let did_type = input.did_type.as_deref().unwrap_or("plc"); 323 let did = match did_type { 324 "web" => { 325 let subdomain_host = format!("{}.{}", input.handle, hostname); 326 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 327 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 328 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 329 self_hosted_did 330 } 331 "web-external" => { 332 let d = match &input.did { 333 Some(d) if !d.trim().is_empty() => d, 334 _ => { 335 return ( 336 StatusCode::BAD_REQUEST, 337 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})), 338 ) 339 .into_response(); 340 } 341 }; 342 if !d.starts_with("did:web:") { 343 return ( 344 StatusCode::BAD_REQUEST, 345 Json( 346 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}), 347 ), 348 ) 349 .into_response(); 350 } 351 if !is_did_web_byod { 352 if let Err(e) = 353 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await 354 { 355 return ( 356 StatusCode::BAD_REQUEST, 357 Json(json!({"error": "InvalidDid", "message": e})), 358 ) 359 .into_response(); 360 } 361 } 362 info!(did = %d, "Creating external did:web account"); 363 d.clone() 364 } 365 _ => { 366 if let Some(d) = &input.did { 367 if d.starts_with("did:plc:") && is_migration { 368 info!(did = %d, "Migration with existing did:plc"); 369 d.clone() 370 } else if d.starts_with("did:web:") { 371 if !is_did_web_byod { 372 if let Err(e) = 373 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()) 374 .await 375 { 376 return ( 377 StatusCode::BAD_REQUEST, 378 Json(json!({"error": "InvalidDid", "message": e})), 379 ) 380 .into_response(); 381 } 382 } 383 d.clone() 384 } else if !d.trim().is_empty() { 385 return ( 386 StatusCode::BAD_REQUEST, 387 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 388 ) 389 .into_response(); 390 } else { 391 let rotation_key = std::env::var("PLC_ROTATION_KEY") 392 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 393 let genesis_result = match create_genesis_operation( 394 &signing_key, 395 &rotation_key, 396 &handle, 397 &pds_endpoint, 398 ) { 399 Ok(r) => r, 400 Err(e) => { 401 error!("Error creating PLC genesis operation: {:?}", e); 402 return ( 403 StatusCode::INTERNAL_SERVER_ERROR, 404 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 405 ) 406 .into_response(); 407 } 408 }; 409 let plc_client = PlcClient::new(None); 410 if let Err(e) = plc_client 411 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 412 .await 413 { 414 error!("Failed to submit PLC genesis operation: {:?}", e); 415 return ( 416 StatusCode::BAD_GATEWAY, 417 Json(json!({ 418 "error": "UpstreamError", 419 "message": format!("Failed to register DID with PLC directory: {}", e) 420 })), 421 ) 422 .into_response(); 423 } 424 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 425 genesis_result.did 426 } 427 } else { 428 let rotation_key = std::env::var("PLC_ROTATION_KEY") 429 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 430 let genesis_result = match create_genesis_operation( 431 &signing_key, 432 &rotation_key, 433 &handle, 434 &pds_endpoint, 435 ) { 436 Ok(r) => r, 437 Err(e) => { 438 error!("Error creating PLC genesis operation: {:?}", e); 439 return ( 440 StatusCode::INTERNAL_SERVER_ERROR, 441 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 442 ) 443 .into_response(); 444 } 445 }; 446 let plc_client = PlcClient::new(None); 447 if let Err(e) = plc_client 448 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 449 .await 450 { 451 error!("Failed to submit PLC genesis operation: {:?}", e); 452 return ( 453 StatusCode::BAD_GATEWAY, 454 Json(json!({ 455 "error": "UpstreamError", 456 "message": format!("Failed to register DID with PLC directory: {}", e) 457 })), 458 ) 459 .into_response(); 460 } 461 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 462 genesis_result.did 463 } 464 } 465 }; 466 let mut tx = match state.db.begin().await { 467 Ok(tx) => tx, 468 Err(e) => { 469 error!("Error starting transaction: {:?}", e); 470 return ( 471 StatusCode::INTERNAL_SERVER_ERROR, 472 Json(json!({"error": "InternalError"})), 473 ) 474 .into_response(); 475 } 476 }; 477 if is_migration { 478 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 479 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 480 .bind(&did) 481 .fetch_optional(&mut *tx) 482 .await 483 .unwrap_or(None); 484 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 485 if deactivated_at.is_some() { 486 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 487 let update_result: Result<_, sqlx::Error> = 488 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 489 .bind(&handle) 490 .bind(account_id) 491 .execute(&mut *tx) 492 .await; 493 if let Err(e) = update_result { 494 if let Some(db_err) = e.as_database_error() 495 && db_err 496 .constraint() 497 .map(|c| c.contains("handle")) 498 .unwrap_or(false) 499 { 500 return ( 501 StatusCode::BAD_REQUEST, 502 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 503 ) 504 .into_response(); 505 } 506 error!("Error reactivating account: {:?}", e); 507 return ( 508 StatusCode::INTERNAL_SERVER_ERROR, 509 Json(json!({"error": "InternalError"})), 510 ) 511 .into_response(); 512 } 513 if let Err(e) = tx.commit().await { 514 error!("Error committing reactivation: {:?}", e); 515 return ( 516 StatusCode::INTERNAL_SERVER_ERROR, 517 Json(json!({"error": "InternalError"})), 518 ) 519 .into_response(); 520 } 521 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 522 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 523 ) 524 .bind(account_id) 525 .fetch_optional(&state.db) 526 .await 527 .unwrap_or(None); 528 let secret_key_bytes = match key_row { 529 Some((key_bytes, encryption_version)) => { 530 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 531 Ok(k) => k, 532 Err(e) => { 533 error!("Error decrypting key for reactivated account: {:?}", e); 534 return ( 535 StatusCode::INTERNAL_SERVER_ERROR, 536 Json(json!({"error": "InternalError"})), 537 ) 538 .into_response(); 539 } 540 } 541 } 542 None => { 543 error!("No signing key found for reactivated account"); 544 return ( 545 StatusCode::INTERNAL_SERVER_ERROR, 546 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 547 ) 548 .into_response(); 549 } 550 }; 551 let access_meta = 552 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 553 Ok(m) => m, 554 Err(e) => { 555 error!("Error creating access token: {:?}", e); 556 return ( 557 StatusCode::INTERNAL_SERVER_ERROR, 558 Json(json!({"error": "InternalError"})), 559 ) 560 .into_response(); 561 } 562 }; 563 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 564 &did, 565 &secret_key_bytes, 566 ) { 567 Ok(m) => m, 568 Err(e) => { 569 error!("Error creating refresh token: {:?}", e); 570 return ( 571 StatusCode::INTERNAL_SERVER_ERROR, 572 Json(json!({"error": "InternalError"})), 573 ) 574 .into_response(); 575 } 576 }; 577 let session_result: Result<_, sqlx::Error> = sqlx::query( 578 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 579 ) 580 .bind(&did) 581 .bind(&access_meta.jti) 582 .bind(&refresh_meta.jti) 583 .bind(access_meta.expires_at) 584 .bind(refresh_meta.expires_at) 585 .execute(&state.db) 586 .await; 587 if let Err(e) = session_result { 588 error!("Error creating session: {:?}", e); 589 return ( 590 StatusCode::INTERNAL_SERVER_ERROR, 591 Json(json!({"error": "InternalError"})), 592 ) 593 .into_response(); 594 } 595 return ( 596 StatusCode::OK, 597 Json(CreateAccountOutput { 598 handle: handle.clone(), 599 did, 600 access_jwt: Some(access_meta.token), 601 refresh_jwt: Some(refresh_meta.token), 602 verification_required: false, 603 verification_channel: "email".to_string(), 604 }), 605 ) 606 .into_response(); 607 } else { 608 return ( 609 StatusCode::BAD_REQUEST, 610 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 611 ) 612 .into_response(); 613 } 614 } 615 } 616 let exists_result: Option<(i32,)> = 617 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 618 .bind(&handle) 619 .fetch_optional(&mut *tx) 620 .await 621 .unwrap_or(None); 622 if exists_result.is_some() { 623 return ( 624 StatusCode::BAD_REQUEST, 625 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 626 ) 627 .into_response(); 628 } 629 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 630 .map(|v| v == "true" || v == "1") 631 .unwrap_or(false); 632 if invite_code_required 633 && input 634 .invite_code 635 .as_ref() 636 .map(|c| c.trim().is_empty()) 637 .unwrap_or(true) 638 { 639 return ( 640 StatusCode::BAD_REQUEST, 641 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 642 ) 643 .into_response(); 644 } 645 if let Some(code) = &input.invite_code 646 && !code.trim().is_empty() 647 { 648 let invite_query = sqlx::query!( 649 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 650 code 651 ) 652 .fetch_optional(&mut *tx) 653 .await; 654 match invite_query { 655 Ok(Some(row)) => { 656 if row.available_uses <= 0 { 657 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 658 } 659 let update_invite = sqlx::query!( 660 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 661 code 662 ) 663 .execute(&mut *tx) 664 .await; 665 if let Err(e) = update_invite { 666 error!("Error updating invite code: {:?}", e); 667 return ( 668 StatusCode::INTERNAL_SERVER_ERROR, 669 Json(json!({"error": "InternalError"})), 670 ) 671 .into_response(); 672 } 673 } 674 Ok(None) => { 675 return ( 676 StatusCode::BAD_REQUEST, 677 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 678 ) 679 .into_response(); 680 } 681 Err(e) => { 682 error!("Error checking invite code: {:?}", e); 683 return ( 684 StatusCode::INTERNAL_SERVER_ERROR, 685 Json(json!({"error": "InternalError"})), 686 ) 687 .into_response(); 688 } 689 } 690 } 691 if let Err(e) = validate_password(&input.password) { 692 return ( 693 StatusCode::BAD_REQUEST, 694 Json(json!({ 695 "error": "InvalidPassword", 696 "message": e.to_string() 697 })), 698 ) 699 .into_response(); 700 } 701 702 let password_hash = match hash(&input.password, DEFAULT_COST) { 703 Ok(h) => h, 704 Err(e) => { 705 error!("Error hashing password: {:?}", e); 706 return ( 707 StatusCode::INTERNAL_SERVER_ERROR, 708 Json(json!({"error": "InternalError"})), 709 ) 710 .into_response(); 711 } 712 }; 713 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 714 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30); 715 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 716 .fetch_one(&mut *tx) 717 .await 718 .map(|c| c.unwrap_or(0) == 0) 719 .unwrap_or(false); 720 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration { 721 Some(chrono::Utc::now()) 722 } else { 723 None 724 }; 725 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 726 r#"INSERT INTO users ( 727 handle, email, did, password_hash, 728 preferred_comms_channel, 729 discord_id, telegram_username, signal_number, 730 is_admin, deactivated_at, email_verified 731 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 732 ) 733 .bind(&handle) 734 .bind(&email) 735 .bind(&did) 736 .bind(&password_hash) 737 .bind(verification_channel) 738 .bind( 739 input 740 .discord_id 741 .as_deref() 742 .map(|s| s.trim()) 743 .filter(|s| !s.is_empty()), 744 ) 745 .bind( 746 input 747 .telegram_username 748 .as_deref() 749 .map(|s| s.trim()) 750 .filter(|s| !s.is_empty()), 751 ) 752 .bind( 753 input 754 .signal_number 755 .as_deref() 756 .map(|s| s.trim()) 757 .filter(|s| !s.is_empty()), 758 ) 759 .bind(is_first_user) 760 .bind(deactivated_at) 761 .bind(is_migration) 762 .fetch_one(&mut *tx) 763 .await; 764 let user_id = match user_insert { 765 Ok((id,)) => id, 766 Err(e) => { 767 if let Some(db_err) = e.as_database_error() 768 && db_err.code().as_deref() == Some("23505") 769 { 770 let constraint = db_err.constraint().unwrap_or(""); 771 if constraint.contains("handle") || constraint.contains("users_handle") { 772 return ( 773 StatusCode::BAD_REQUEST, 774 Json(json!({ 775 "error": "HandleNotAvailable", 776 "message": "Handle already taken" 777 })), 778 ) 779 .into_response(); 780 } else if constraint.contains("email") || constraint.contains("users_email") { 781 return ( 782 StatusCode::BAD_REQUEST, 783 Json(json!({ 784 "error": "InvalidEmail", 785 "message": "Email already registered" 786 })), 787 ) 788 .into_response(); 789 } else if constraint.contains("did") || constraint.contains("users_did") { 790 return ( 791 StatusCode::BAD_REQUEST, 792 Json(json!({ 793 "error": "AccountAlreadyExists", 794 "message": "An account with this DID already exists" 795 })), 796 ) 797 .into_response(); 798 } 799 } 800 error!("Error inserting user: {:?}", e); 801 return ( 802 StatusCode::INTERNAL_SERVER_ERROR, 803 Json(json!({"error": "InternalError"})), 804 ) 805 .into_response(); 806 } 807 }; 808 809 if !is_migration 810 && let Some(ref recipient) = verification_recipient 811 && let Err(e) = sqlx::query!( 812 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, $2::comms_channel, $3, $4, $5)", 813 user_id, 814 verification_channel as _, 815 verification_code, 816 recipient, 817 code_expires_at 818 ) 819 .execute(&mut *tx) 820 .await { 821 error!("Error inserting verification code: {:?}", e); 822 return ( 823 StatusCode::INTERNAL_SERVER_ERROR, 824 Json(json!({"error": "InternalError"})), 825 ) 826 .into_response(); 827 } 828 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 829 Ok(enc) => enc, 830 Err(e) => { 831 error!("Error encrypting user key: {:?}", e); 832 return ( 833 StatusCode::INTERNAL_SERVER_ERROR, 834 Json(json!({"error": "InternalError"})), 835 ) 836 .into_response(); 837 } 838 }; 839 let key_insert = sqlx::query!( 840 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 841 user_id, 842 &encrypted_key_bytes[..], 843 crate::config::ENCRYPTION_VERSION 844 ) 845 .execute(&mut *tx) 846 .await; 847 if let Err(e) = key_insert { 848 error!("Error inserting user key: {:?}", e); 849 return ( 850 StatusCode::INTERNAL_SERVER_ERROR, 851 Json(json!({"error": "InternalError"})), 852 ) 853 .into_response(); 854 } 855 if let Some(key_id) = reserved_key_id { 856 let mark_used = sqlx::query!( 857 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 858 key_id 859 ) 860 .execute(&mut *tx) 861 .await; 862 if let Err(e) = mark_used { 863 error!("Error marking reserved key as used: {:?}", e); 864 return ( 865 StatusCode::INTERNAL_SERVER_ERROR, 866 Json(json!({"error": "InternalError"})), 867 ) 868 .into_response(); 869 } 870 } 871 let mst = Mst::new(Arc::new(state.block_store.clone())); 872 let mst_root = match mst.persist().await { 873 Ok(c) => c, 874 Err(e) => { 875 error!("Error persisting MST: {:?}", e); 876 return ( 877 StatusCode::INTERNAL_SERVER_ERROR, 878 Json(json!({"error": "InternalError"})), 879 ) 880 .into_response(); 881 } 882 }; 883 let rev = Tid::now(LimitedU32::MIN); 884 let (commit_bytes, _sig) = match create_signed_commit(&did, mst_root, &rev.to_string(), None, &signing_key) { 885 Ok(result) => result, 886 Err(e) => { 887 error!("Error creating genesis commit: {:?}", e); 888 return ( 889 StatusCode::INTERNAL_SERVER_ERROR, 890 Json(json!({"error": "InternalError"})), 891 ) 892 .into_response(); 893 } 894 }; 895 let commit_cid = match state.block_store.put(&commit_bytes).await { 896 Ok(c) => c, 897 Err(e) => { 898 error!("Error saving genesis commit: {:?}", e); 899 return ( 900 StatusCode::INTERNAL_SERVER_ERROR, 901 Json(json!({"error": "InternalError"})), 902 ) 903 .into_response(); 904 } 905 }; 906 let commit_cid_str = commit_cid.to_string(); 907 let repo_insert = sqlx::query!( 908 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 909 user_id, 910 commit_cid_str 911 ) 912 .execute(&mut *tx) 913 .await; 914 if let Err(e) = repo_insert { 915 error!("Error initializing repo: {:?}", e); 916 return ( 917 StatusCode::INTERNAL_SERVER_ERROR, 918 Json(json!({"error": "InternalError"})), 919 ) 920 .into_response(); 921 } 922 if let Some(code) = &input.invite_code 923 && !code.trim().is_empty() 924 { 925 let use_insert = sqlx::query!( 926 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 927 code, 928 user_id 929 ) 930 .execute(&mut *tx) 931 .await; 932 if let Err(e) = use_insert { 933 error!("Error recording invite usage: {:?}", e); 934 return ( 935 StatusCode::INTERNAL_SERVER_ERROR, 936 Json(json!({"error": "InternalError"})), 937 ) 938 .into_response(); 939 } 940 } 941 if let Err(e) = tx.commit().await { 942 error!("Error committing transaction: {:?}", e); 943 return ( 944 StatusCode::INTERNAL_SERVER_ERROR, 945 Json(json!({"error": "InternalError"})), 946 ) 947 .into_response(); 948 } 949 if !is_migration { 950 if let Err(e) = 951 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await 952 { 953 warn!("Failed to sequence identity event for {}: {}", did, e); 954 } 955 if let Err(e) = 956 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 957 { 958 warn!("Failed to sequence account event for {}: {}", did, e); 959 } 960 let profile_record = json!({ 961 "$type": "app.bsky.actor.profile", 962 "displayName": input.handle 963 }); 964 if let Err(e) = crate::api::repo::record::create_record_internal( 965 &state, 966 &did, 967 "app.bsky.actor.profile", 968 "self", 969 &profile_record, 970 ) 971 .await 972 { 973 warn!("Failed to create default profile for {}: {}", did, e); 974 } 975 if let Some(ref recipient) = verification_recipient 976 && let Err(e) = crate::comms::enqueue_signup_verification( 977 &state.db, 978 user_id, 979 verification_channel, 980 recipient, 981 &verification_code, 982 None, 983 ) 984 .await 985 { 986 warn!( 987 "Failed to enqueue signup verification notification: {:?}", 988 e 989 ); 990 } 991 } 992 993 let (access_jwt, refresh_jwt) = if is_migration { 994 let access_meta = 995 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 996 Ok(m) => m, 997 Err(e) => { 998 error!("Error creating access token for migration: {:?}", e); 999 return ( 1000 StatusCode::INTERNAL_SERVER_ERROR, 1001 Json(json!({"error": "InternalError"})), 1002 ) 1003 .into_response(); 1004 } 1005 }; 1006 let refresh_meta = 1007 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 1008 Ok(m) => m, 1009 Err(e) => { 1010 error!("Error creating refresh token for migration: {:?}", e); 1011 return ( 1012 StatusCode::INTERNAL_SERVER_ERROR, 1013 Json(json!({"error": "InternalError"})), 1014 ) 1015 .into_response(); 1016 } 1017 }; 1018 if let Err(e) = sqlx::query!( 1019 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 1020 did, 1021 access_meta.jti, 1022 refresh_meta.jti, 1023 access_meta.expires_at, 1024 refresh_meta.expires_at 1025 ) 1026 .execute(&state.db) 1027 .await 1028 { 1029 error!("Error creating session for migration: {:?}", e); 1030 return ( 1031 StatusCode::INTERNAL_SERVER_ERROR, 1032 Json(json!({"error": "InternalError"})), 1033 ) 1034 .into_response(); 1035 } 1036 (Some(access_meta.token), Some(refresh_meta.token)) 1037 } else { 1038 (None, None) 1039 }; 1040 1041 ( 1042 StatusCode::OK, 1043 Json(CreateAccountOutput { 1044 handle: handle.clone(), 1045 did, 1046 access_jwt, 1047 refresh_jwt, 1048 verification_required: !is_migration, 1049 verification_channel: verification_channel.to_string(), 1050 }), 1051 ) 1052 .into_response() 1053}