this repo has no description
1use super::did::verify_did_web; 2use crate::api::repo::record::utils::create_signed_commit; 3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 5use crate::state::{AppState, RateLimitKind}; 6use crate::validation::validate_password; 7use axum::{ 8 Json, 9 extract::State, 10 http::{HeaderMap, StatusCode}, 11 response::{IntoResponse, Response}, 12}; 13use bcrypt::{DEFAULT_COST, hash}; 14use jacquard::types::{integer::LimitedU32, string::Tid}; 15use jacquard_repo::{mst::Mst, storage::BlockStore}; 16use k256::{SecretKey, ecdsa::SigningKey}; 17use rand::rngs::OsRng; 18use serde::{Deserialize, Serialize}; 19use serde_json::json; 20use std::sync::Arc; 21use tracing::{debug, error, info, warn}; 22 23fn extract_client_ip(headers: &HeaderMap) -> String { 24 if let Some(forwarded) = headers.get("x-forwarded-for") 25 && let Ok(value) = forwarded.to_str() 26 && let Some(first_ip) = value.split(',').next() 27 { 28 return first_ip.trim().to_string(); 29 } 30 if let Some(real_ip) = headers.get("x-real-ip") 31 && let Ok(value) = real_ip.to_str() 32 { 33 return value.trim().to_string(); 34 } 35 "unknown".to_string() 36} 37 38#[derive(Deserialize)] 39#[serde(rename_all = "camelCase")] 40pub struct CreateAccountInput { 41 pub handle: String, 42 pub email: Option<String>, 43 pub password: String, 44 pub invite_code: Option<String>, 45 pub did: Option<String>, 46 pub did_type: Option<String>, 47 pub signing_key: Option<String>, 48 pub verification_channel: Option<String>, 49 pub discord_id: Option<String>, 50 pub telegram_username: Option<String>, 51 pub signal_number: Option<String>, 52} 53 54#[derive(Serialize)] 55#[serde(rename_all = "camelCase")] 56pub struct CreateAccountOutput { 57 pub handle: String, 58 pub did: String, 59 #[serde(skip_serializing_if = "Option::is_none")] 60 pub access_jwt: Option<String>, 61 #[serde(skip_serializing_if = "Option::is_none")] 62 pub refresh_jwt: Option<String>, 63 pub verification_required: bool, 64 pub verification_channel: String, 65} 66 67pub async fn create_account( 68 State(state): State<AppState>, 69 headers: HeaderMap, 70 Json(input): Json<CreateAccountInput>, 71) -> Response { 72 info!("create_account called"); 73 let client_ip = extract_client_ip(&headers); 74 if !state 75 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 76 .await 77 { 78 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 79 return ( 80 StatusCode::TOO_MANY_REQUESTS, 81 Json(json!({ 82 "error": "RateLimitExceeded", 83 "message": "Too many account creation attempts. Please try again later." 84 })), 85 ) 86 .into_response(); 87 } 88 89 let migration_auth = if let Some(token) = 90 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 91 { 92 if is_service_token(&token) { 93 let verifier = ServiceTokenVerifier::new(); 94 match verifier 95 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 96 .await 97 { 98 Ok(claims) => { 99 debug!("Service token verified for migration: iss={}", claims.iss); 100 Some(claims.iss) 101 } 102 Err(e) => { 103 error!("Service token verification failed: {:?}", e); 104 return ( 105 StatusCode::UNAUTHORIZED, 106 Json(json!({ 107 "error": "AuthenticationFailed", 108 "message": format!("Service token verification failed: {}", e) 109 })), 110 ) 111 .into_response(); 112 } 113 } 114 } else { 115 None 116 } 117 } else { 118 None 119 }; 120 121 let is_migration = migration_auth.is_some() 122 && input 123 .did 124 .as_ref() 125 .map(|d| d.starts_with("did:plc:")) 126 .unwrap_or(false); 127 128 if is_migration { 129 if let (Some(migration_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref()) 130 { 131 if migration_did != auth_did { 132 return ( 133 StatusCode::FORBIDDEN, 134 Json(json!({ 135 "error": "AuthorizationError", 136 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did) 137 })), 138 ) 139 .into_response(); 140 } 141 info!(did = %migration_did, "Processing account migration"); 142 } 143 } 144 145 let hostname_for_validation = 146 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 147 let pds_suffix = format!(".{}", hostname_for_validation); 148 149 let validated_short_handle = if !input.handle.contains('.') 150 || input.handle.ends_with(&pds_suffix) 151 { 152 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 153 input 154 .handle 155 .strip_suffix(&pds_suffix) 156 .unwrap_or(&input.handle) 157 } else { 158 &input.handle 159 }; 160 match crate::api::validation::validate_short_handle(handle_to_validate) { 161 Ok(h) => h, 162 Err(e) => { 163 return ( 164 StatusCode::BAD_REQUEST, 165 Json(json!({"error": "InvalidHandle", "message": e.to_string()})), 166 ) 167 .into_response(); 168 } 169 } 170 } else { 171 if input.handle.contains(' ') || input.handle.contains('\t') { 172 return ( 173 StatusCode::BAD_REQUEST, 174 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})), 175 ) 176 .into_response(); 177 } 178 for c in input.handle.chars() { 179 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' { 180 return ( 181 StatusCode::BAD_REQUEST, 182 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})), 183 ) 184 .into_response(); 185 } 186 } 187 input.handle.to_lowercase() 188 }; 189 let email: Option<String> = input 190 .email 191 .as_ref() 192 .map(|e| e.trim().to_string()) 193 .filter(|e| !e.is_empty()); 194 if let Some(ref email) = email 195 && !crate::api::validation::is_valid_email(email) 196 { 197 return ( 198 StatusCode::BAD_REQUEST, 199 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 200 ) 201 .into_response(); 202 } 203 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 204 let valid_channels = ["email", "discord", "telegram", "signal"]; 205 if !valid_channels.contains(&verification_channel) && !is_migration { 206 return ( 207 StatusCode::BAD_REQUEST, 208 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 209 ) 210 .into_response(); 211 } 212 let verification_recipient = if is_migration { 213 None 214 } else { 215 Some(match verification_channel { 216 "email" => match &input.email { 217 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 218 _ => return ( 219 StatusCode::BAD_REQUEST, 220 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 221 ).into_response(), 222 }, 223 "discord" => match &input.discord_id { 224 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 225 _ => return ( 226 StatusCode::BAD_REQUEST, 227 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 228 ).into_response(), 229 }, 230 "telegram" => match &input.telegram_username { 231 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 232 _ => return ( 233 StatusCode::BAD_REQUEST, 234 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 235 ).into_response(), 236 }, 237 "signal" => match &input.signal_number { 238 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 239 _ => return ( 240 StatusCode::BAD_REQUEST, 241 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 242 ).into_response(), 243 }, 244 _ => return ( 245 StatusCode::BAD_REQUEST, 246 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 247 ).into_response(), 248 }) 249 }; 250 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 251 let pds_endpoint = format!("https://{}", hostname); 252 let suffix = format!(".{}", hostname); 253 let handle = if input.handle.ends_with(&suffix) { 254 format!("{}.{}", validated_short_handle, hostname) 255 } else if input.handle.contains('.') { 256 validated_short_handle.clone() 257 } else { 258 format!("{}.{}", validated_short_handle, hostname) 259 }; 260 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 261 if let Some(signing_key_did) = &input.signing_key { 262 let reserved = sqlx::query!( 263 r#" 264 SELECT id, private_key_bytes 265 FROM reserved_signing_keys 266 WHERE public_key_did_key = $1 267 AND used_at IS NULL 268 AND expires_at > NOW() 269 FOR UPDATE 270 "#, 271 signing_key_did 272 ) 273 .fetch_optional(&state.db) 274 .await; 275 match reserved { 276 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 277 Ok(None) => { 278 return ( 279 StatusCode::BAD_REQUEST, 280 Json(json!({ 281 "error": "InvalidSigningKey", 282 "message": "Signing key not found, already used, or expired" 283 })), 284 ) 285 .into_response(); 286 } 287 Err(e) => { 288 error!("Error looking up reserved signing key: {:?}", e); 289 return ( 290 StatusCode::INTERNAL_SERVER_ERROR, 291 Json(json!({"error": "InternalError"})), 292 ) 293 .into_response(); 294 } 295 } 296 } else { 297 let secret_key = SecretKey::random(&mut OsRng); 298 (secret_key.to_bytes().to_vec(), None) 299 }; 300 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 301 Ok(k) => k, 302 Err(e) => { 303 error!("Error creating signing key: {:?}", e); 304 return ( 305 StatusCode::INTERNAL_SERVER_ERROR, 306 Json(json!({"error": "InternalError"})), 307 ) 308 .into_response(); 309 } 310 }; 311 let did_type = input.did_type.as_deref().unwrap_or("plc"); 312 let did = match did_type { 313 "web" => { 314 let subdomain_host = format!("{}.{}", input.handle, hostname); 315 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 316 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 317 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 318 self_hosted_did 319 } 320 "web-external" => { 321 let d = match &input.did { 322 Some(d) if !d.trim().is_empty() => d, 323 _ => { 324 return ( 325 StatusCode::BAD_REQUEST, 326 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})), 327 ) 328 .into_response(); 329 } 330 }; 331 if !d.starts_with("did:web:") { 332 return ( 333 StatusCode::BAD_REQUEST, 334 Json( 335 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}), 336 ), 337 ) 338 .into_response(); 339 } 340 if let Err(e) = 341 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await 342 { 343 return ( 344 StatusCode::BAD_REQUEST, 345 Json(json!({"error": "InvalidDid", "message": e})), 346 ) 347 .into_response(); 348 } 349 info!(did = %d, "Creating external did:web account"); 350 d.clone() 351 } 352 _ => { 353 if let Some(d) = &input.did { 354 if d.starts_with("did:plc:") && is_migration { 355 info!(did = %d, "Migration with existing did:plc"); 356 d.clone() 357 } else if d.starts_with("did:web:") { 358 if let Err(e) = 359 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()) 360 .await 361 { 362 return ( 363 StatusCode::BAD_REQUEST, 364 Json(json!({"error": "InvalidDid", "message": e})), 365 ) 366 .into_response(); 367 } 368 d.clone() 369 } else if !d.trim().is_empty() { 370 return ( 371 StatusCode::BAD_REQUEST, 372 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 373 ) 374 .into_response(); 375 } else { 376 let rotation_key = std::env::var("PLC_ROTATION_KEY") 377 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 378 let genesis_result = match create_genesis_operation( 379 &signing_key, 380 &rotation_key, 381 &handle, 382 &pds_endpoint, 383 ) { 384 Ok(r) => r, 385 Err(e) => { 386 error!("Error creating PLC genesis operation: {:?}", e); 387 return ( 388 StatusCode::INTERNAL_SERVER_ERROR, 389 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 390 ) 391 .into_response(); 392 } 393 }; 394 let plc_client = PlcClient::new(None); 395 if let Err(e) = plc_client 396 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 397 .await 398 { 399 error!("Failed to submit PLC genesis operation: {:?}", e); 400 return ( 401 StatusCode::BAD_GATEWAY, 402 Json(json!({ 403 "error": "UpstreamError", 404 "message": format!("Failed to register DID with PLC directory: {}", e) 405 })), 406 ) 407 .into_response(); 408 } 409 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 410 genesis_result.did 411 } 412 } else { 413 let rotation_key = std::env::var("PLC_ROTATION_KEY") 414 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 415 let genesis_result = match create_genesis_operation( 416 &signing_key, 417 &rotation_key, 418 &handle, 419 &pds_endpoint, 420 ) { 421 Ok(r) => r, 422 Err(e) => { 423 error!("Error creating PLC genesis operation: {:?}", e); 424 return ( 425 StatusCode::INTERNAL_SERVER_ERROR, 426 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 427 ) 428 .into_response(); 429 } 430 }; 431 let plc_client = PlcClient::new(None); 432 if let Err(e) = plc_client 433 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 434 .await 435 { 436 error!("Failed to submit PLC genesis operation: {:?}", e); 437 return ( 438 StatusCode::BAD_GATEWAY, 439 Json(json!({ 440 "error": "UpstreamError", 441 "message": format!("Failed to register DID with PLC directory: {}", e) 442 })), 443 ) 444 .into_response(); 445 } 446 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 447 genesis_result.did 448 } 449 } 450 }; 451 let mut tx = match state.db.begin().await { 452 Ok(tx) => tx, 453 Err(e) => { 454 error!("Error starting transaction: {:?}", e); 455 return ( 456 StatusCode::INTERNAL_SERVER_ERROR, 457 Json(json!({"error": "InternalError"})), 458 ) 459 .into_response(); 460 } 461 }; 462 if is_migration { 463 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 464 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 465 .bind(&did) 466 .fetch_optional(&mut *tx) 467 .await 468 .unwrap_or(None); 469 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 470 if deactivated_at.is_some() { 471 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 472 let update_result: Result<_, sqlx::Error> = 473 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 474 .bind(&handle) 475 .bind(account_id) 476 .execute(&mut *tx) 477 .await; 478 if let Err(e) = update_result { 479 if let Some(db_err) = e.as_database_error() 480 && db_err 481 .constraint() 482 .map(|c| c.contains("handle")) 483 .unwrap_or(false) 484 { 485 return ( 486 StatusCode::BAD_REQUEST, 487 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 488 ) 489 .into_response(); 490 } 491 error!("Error reactivating account: {:?}", e); 492 return ( 493 StatusCode::INTERNAL_SERVER_ERROR, 494 Json(json!({"error": "InternalError"})), 495 ) 496 .into_response(); 497 } 498 if let Err(e) = tx.commit().await { 499 error!("Error committing reactivation: {:?}", e); 500 return ( 501 StatusCode::INTERNAL_SERVER_ERROR, 502 Json(json!({"error": "InternalError"})), 503 ) 504 .into_response(); 505 } 506 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 507 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 508 ) 509 .bind(account_id) 510 .fetch_optional(&state.db) 511 .await 512 .unwrap_or(None); 513 let secret_key_bytes = match key_row { 514 Some((key_bytes, encryption_version)) => { 515 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 516 Ok(k) => k, 517 Err(e) => { 518 error!("Error decrypting key for reactivated account: {:?}", e); 519 return ( 520 StatusCode::INTERNAL_SERVER_ERROR, 521 Json(json!({"error": "InternalError"})), 522 ) 523 .into_response(); 524 } 525 } 526 } 527 None => { 528 error!("No signing key found for reactivated account"); 529 return ( 530 StatusCode::INTERNAL_SERVER_ERROR, 531 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 532 ) 533 .into_response(); 534 } 535 }; 536 let access_meta = 537 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 538 Ok(m) => m, 539 Err(e) => { 540 error!("Error creating access token: {:?}", e); 541 return ( 542 StatusCode::INTERNAL_SERVER_ERROR, 543 Json(json!({"error": "InternalError"})), 544 ) 545 .into_response(); 546 } 547 }; 548 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 549 &did, 550 &secret_key_bytes, 551 ) { 552 Ok(m) => m, 553 Err(e) => { 554 error!("Error creating refresh token: {:?}", e); 555 return ( 556 StatusCode::INTERNAL_SERVER_ERROR, 557 Json(json!({"error": "InternalError"})), 558 ) 559 .into_response(); 560 } 561 }; 562 let session_result: Result<_, sqlx::Error> = sqlx::query( 563 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 564 ) 565 .bind(&did) 566 .bind(&access_meta.jti) 567 .bind(&refresh_meta.jti) 568 .bind(access_meta.expires_at) 569 .bind(refresh_meta.expires_at) 570 .execute(&state.db) 571 .await; 572 if let Err(e) = session_result { 573 error!("Error creating session: {:?}", e); 574 return ( 575 StatusCode::INTERNAL_SERVER_ERROR, 576 Json(json!({"error": "InternalError"})), 577 ) 578 .into_response(); 579 } 580 return ( 581 StatusCode::OK, 582 Json(CreateAccountOutput { 583 handle: handle.clone(), 584 did, 585 access_jwt: Some(access_meta.token), 586 refresh_jwt: Some(refresh_meta.token), 587 verification_required: false, 588 verification_channel: "email".to_string(), 589 }), 590 ) 591 .into_response(); 592 } else { 593 return ( 594 StatusCode::BAD_REQUEST, 595 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 596 ) 597 .into_response(); 598 } 599 } 600 } 601 let exists_result: Option<(i32,)> = 602 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 603 .bind(&handle) 604 .fetch_optional(&mut *tx) 605 .await 606 .unwrap_or(None); 607 if exists_result.is_some() { 608 return ( 609 StatusCode::BAD_REQUEST, 610 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 611 ) 612 .into_response(); 613 } 614 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 615 .map(|v| v == "true" || v == "1") 616 .unwrap_or(false); 617 if invite_code_required 618 && input 619 .invite_code 620 .as_ref() 621 .map(|c| c.trim().is_empty()) 622 .unwrap_or(true) 623 { 624 return ( 625 StatusCode::BAD_REQUEST, 626 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 627 ) 628 .into_response(); 629 } 630 if let Some(code) = &input.invite_code 631 && !code.trim().is_empty() 632 { 633 let invite_query = sqlx::query!( 634 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 635 code 636 ) 637 .fetch_optional(&mut *tx) 638 .await; 639 match invite_query { 640 Ok(Some(row)) => { 641 if row.available_uses <= 0 { 642 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 643 } 644 let update_invite = sqlx::query!( 645 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 646 code 647 ) 648 .execute(&mut *tx) 649 .await; 650 if let Err(e) = update_invite { 651 error!("Error updating invite code: {:?}", e); 652 return ( 653 StatusCode::INTERNAL_SERVER_ERROR, 654 Json(json!({"error": "InternalError"})), 655 ) 656 .into_response(); 657 } 658 } 659 Ok(None) => { 660 return ( 661 StatusCode::BAD_REQUEST, 662 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 663 ) 664 .into_response(); 665 } 666 Err(e) => { 667 error!("Error checking invite code: {:?}", e); 668 return ( 669 StatusCode::INTERNAL_SERVER_ERROR, 670 Json(json!({"error": "InternalError"})), 671 ) 672 .into_response(); 673 } 674 } 675 } 676 if let Err(e) = validate_password(&input.password) { 677 return ( 678 StatusCode::BAD_REQUEST, 679 Json(json!({ 680 "error": "InvalidPassword", 681 "message": e.to_string() 682 })), 683 ) 684 .into_response(); 685 } 686 687 let password_hash = match hash(&input.password, DEFAULT_COST) { 688 Ok(h) => h, 689 Err(e) => { 690 error!("Error hashing password: {:?}", e); 691 return ( 692 StatusCode::INTERNAL_SERVER_ERROR, 693 Json(json!({"error": "InternalError"})), 694 ) 695 .into_response(); 696 } 697 }; 698 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 699 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30); 700 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 701 .fetch_one(&mut *tx) 702 .await 703 .map(|c| c.unwrap_or(0) == 0) 704 .unwrap_or(false); 705 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration { 706 Some(chrono::Utc::now()) 707 } else { 708 None 709 }; 710 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 711 r#"INSERT INTO users ( 712 handle, email, did, password_hash, 713 preferred_comms_channel, 714 discord_id, telegram_username, signal_number, 715 is_admin, deactivated_at, email_verified 716 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 717 ) 718 .bind(&handle) 719 .bind(&email) 720 .bind(&did) 721 .bind(&password_hash) 722 .bind(verification_channel) 723 .bind( 724 input 725 .discord_id 726 .as_deref() 727 .map(|s| s.trim()) 728 .filter(|s| !s.is_empty()), 729 ) 730 .bind( 731 input 732 .telegram_username 733 .as_deref() 734 .map(|s| s.trim()) 735 .filter(|s| !s.is_empty()), 736 ) 737 .bind( 738 input 739 .signal_number 740 .as_deref() 741 .map(|s| s.trim()) 742 .filter(|s| !s.is_empty()), 743 ) 744 .bind(is_first_user) 745 .bind(deactivated_at) 746 .bind(is_migration) 747 .fetch_one(&mut *tx) 748 .await; 749 let user_id = match user_insert { 750 Ok((id,)) => id, 751 Err(e) => { 752 if let Some(db_err) = e.as_database_error() 753 && db_err.code().as_deref() == Some("23505") 754 { 755 let constraint = db_err.constraint().unwrap_or(""); 756 if constraint.contains("handle") || constraint.contains("users_handle") { 757 return ( 758 StatusCode::BAD_REQUEST, 759 Json(json!({ 760 "error": "HandleNotAvailable", 761 "message": "Handle already taken" 762 })), 763 ) 764 .into_response(); 765 } else if constraint.contains("email") || constraint.contains("users_email") { 766 return ( 767 StatusCode::BAD_REQUEST, 768 Json(json!({ 769 "error": "InvalidEmail", 770 "message": "Email already registered" 771 })), 772 ) 773 .into_response(); 774 } else if constraint.contains("did") || constraint.contains("users_did") { 775 return ( 776 StatusCode::BAD_REQUEST, 777 Json(json!({ 778 "error": "AccountAlreadyExists", 779 "message": "An account with this DID already exists" 780 })), 781 ) 782 .into_response(); 783 } 784 } 785 error!("Error inserting user: {:?}", e); 786 return ( 787 StatusCode::INTERNAL_SERVER_ERROR, 788 Json(json!({"error": "InternalError"})), 789 ) 790 .into_response(); 791 } 792 }; 793 794 if !is_migration 795 && let Some(ref recipient) = verification_recipient 796 && let Err(e) = sqlx::query!( 797 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, $2::comms_channel, $3, $4, $5)", 798 user_id, 799 verification_channel as _, 800 verification_code, 801 recipient, 802 code_expires_at 803 ) 804 .execute(&mut *tx) 805 .await { 806 error!("Error inserting verification code: {:?}", e); 807 return ( 808 StatusCode::INTERNAL_SERVER_ERROR, 809 Json(json!({"error": "InternalError"})), 810 ) 811 .into_response(); 812 } 813 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 814 Ok(enc) => enc, 815 Err(e) => { 816 error!("Error encrypting user key: {:?}", e); 817 return ( 818 StatusCode::INTERNAL_SERVER_ERROR, 819 Json(json!({"error": "InternalError"})), 820 ) 821 .into_response(); 822 } 823 }; 824 let key_insert = sqlx::query!( 825 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 826 user_id, 827 &encrypted_key_bytes[..], 828 crate::config::ENCRYPTION_VERSION 829 ) 830 .execute(&mut *tx) 831 .await; 832 if let Err(e) = key_insert { 833 error!("Error inserting user key: {:?}", e); 834 return ( 835 StatusCode::INTERNAL_SERVER_ERROR, 836 Json(json!({"error": "InternalError"})), 837 ) 838 .into_response(); 839 } 840 if let Some(key_id) = reserved_key_id { 841 let mark_used = sqlx::query!( 842 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 843 key_id 844 ) 845 .execute(&mut *tx) 846 .await; 847 if let Err(e) = mark_used { 848 error!("Error marking reserved key as used: {:?}", e); 849 return ( 850 StatusCode::INTERNAL_SERVER_ERROR, 851 Json(json!({"error": "InternalError"})), 852 ) 853 .into_response(); 854 } 855 } 856 let mst = Mst::new(Arc::new(state.block_store.clone())); 857 let mst_root = match mst.persist().await { 858 Ok(c) => c, 859 Err(e) => { 860 error!("Error persisting MST: {:?}", e); 861 return ( 862 StatusCode::INTERNAL_SERVER_ERROR, 863 Json(json!({"error": "InternalError"})), 864 ) 865 .into_response(); 866 } 867 }; 868 let rev = Tid::now(LimitedU32::MIN); 869 let (commit_bytes, _sig) = match create_signed_commit(&did, mst_root, &rev.to_string(), None, &signing_key) { 870 Ok(result) => result, 871 Err(e) => { 872 error!("Error creating genesis commit: {:?}", e); 873 return ( 874 StatusCode::INTERNAL_SERVER_ERROR, 875 Json(json!({"error": "InternalError"})), 876 ) 877 .into_response(); 878 } 879 }; 880 let commit_cid = match state.block_store.put(&commit_bytes).await { 881 Ok(c) => c, 882 Err(e) => { 883 error!("Error saving genesis commit: {:?}", e); 884 return ( 885 StatusCode::INTERNAL_SERVER_ERROR, 886 Json(json!({"error": "InternalError"})), 887 ) 888 .into_response(); 889 } 890 }; 891 let commit_cid_str = commit_cid.to_string(); 892 let repo_insert = sqlx::query!( 893 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 894 user_id, 895 commit_cid_str 896 ) 897 .execute(&mut *tx) 898 .await; 899 if let Err(e) = repo_insert { 900 error!("Error initializing repo: {:?}", e); 901 return ( 902 StatusCode::INTERNAL_SERVER_ERROR, 903 Json(json!({"error": "InternalError"})), 904 ) 905 .into_response(); 906 } 907 if let Some(code) = &input.invite_code 908 && !code.trim().is_empty() 909 { 910 let use_insert = sqlx::query!( 911 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 912 code, 913 user_id 914 ) 915 .execute(&mut *tx) 916 .await; 917 if let Err(e) = use_insert { 918 error!("Error recording invite usage: {:?}", e); 919 return ( 920 StatusCode::INTERNAL_SERVER_ERROR, 921 Json(json!({"error": "InternalError"})), 922 ) 923 .into_response(); 924 } 925 } 926 if let Err(e) = tx.commit().await { 927 error!("Error committing transaction: {:?}", e); 928 return ( 929 StatusCode::INTERNAL_SERVER_ERROR, 930 Json(json!({"error": "InternalError"})), 931 ) 932 .into_response(); 933 } 934 if !is_migration { 935 if let Err(e) = 936 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await 937 { 938 warn!("Failed to sequence identity event for {}: {}", did, e); 939 } 940 if let Err(e) = 941 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 942 { 943 warn!("Failed to sequence account event for {}: {}", did, e); 944 } 945 let profile_record = json!({ 946 "$type": "app.bsky.actor.profile", 947 "displayName": input.handle 948 }); 949 if let Err(e) = crate::api::repo::record::create_record_internal( 950 &state, 951 &did, 952 "app.bsky.actor.profile", 953 "self", 954 &profile_record, 955 ) 956 .await 957 { 958 warn!("Failed to create default profile for {}: {}", did, e); 959 } 960 if let Some(ref recipient) = verification_recipient 961 && let Err(e) = crate::comms::enqueue_signup_verification( 962 &state.db, 963 user_id, 964 verification_channel, 965 recipient, 966 &verification_code, 967 None, 968 ) 969 .await 970 { 971 warn!( 972 "Failed to enqueue signup verification notification: {:?}", 973 e 974 ); 975 } 976 } 977 978 let (access_jwt, refresh_jwt) = if is_migration { 979 let access_meta = 980 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 981 Ok(m) => m, 982 Err(e) => { 983 error!("Error creating access token for migration: {:?}", e); 984 return ( 985 StatusCode::INTERNAL_SERVER_ERROR, 986 Json(json!({"error": "InternalError"})), 987 ) 988 .into_response(); 989 } 990 }; 991 let refresh_meta = 992 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 993 Ok(m) => m, 994 Err(e) => { 995 error!("Error creating refresh token for migration: {:?}", e); 996 return ( 997 StatusCode::INTERNAL_SERVER_ERROR, 998 Json(json!({"error": "InternalError"})), 999 ) 1000 .into_response(); 1001 } 1002 }; 1003 if let Err(e) = sqlx::query!( 1004 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 1005 did, 1006 access_meta.jti, 1007 refresh_meta.jti, 1008 access_meta.expires_at, 1009 refresh_meta.expires_at 1010 ) 1011 .execute(&state.db) 1012 .await 1013 { 1014 error!("Error creating session for migration: {:?}", e); 1015 return ( 1016 StatusCode::INTERNAL_SERVER_ERROR, 1017 Json(json!({"error": "InternalError"})), 1018 ) 1019 .into_response(); 1020 } 1021 (Some(access_meta.token), Some(refresh_meta.token)) 1022 } else { 1023 (None, None) 1024 }; 1025 1026 ( 1027 StatusCode::OK, 1028 Json(CreateAccountOutput { 1029 handle: handle.clone(), 1030 did, 1031 access_jwt, 1032 refresh_jwt, 1033 verification_required: !is_migration, 1034 verification_channel: verification_channel.to_string(), 1035 }), 1036 ) 1037 .into_response() 1038}