this repo has no description
1use super::did::verify_did_web; 2use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 3use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 4use crate::state::{AppState, RateLimitKind}; 5use axum::{ 6 Json, 7 extract::State, 8 http::{HeaderMap, StatusCode}, 9 response::{IntoResponse, Response}, 10}; 11use bcrypt::{DEFAULT_COST, hash}; 12use jacquard::types::{did::Did, integer::LimitedU32, string::Tid}; 13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 14use k256::{SecretKey, ecdsa::SigningKey}; 15use rand::rngs::OsRng; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use std::sync::Arc; 19use tracing::{debug, error, info, warn}; 20 21fn extract_client_ip(headers: &HeaderMap) -> String { 22 if let Some(forwarded) = headers.get("x-forwarded-for") 23 && let Ok(value) = forwarded.to_str() 24 && let Some(first_ip) = value.split(',').next() 25 { 26 return first_ip.trim().to_string(); 27 } 28 if let Some(real_ip) = headers.get("x-real-ip") 29 && let Ok(value) = real_ip.to_str() 30 { 31 return value.trim().to_string(); 32 } 33 "unknown".to_string() 34} 35 36#[derive(Deserialize)] 37#[serde(rename_all = "camelCase")] 38pub struct CreateAccountInput { 39 pub handle: String, 40 pub email: Option<String>, 41 pub password: String, 42 pub invite_code: Option<String>, 43 pub did: Option<String>, 44 pub did_type: Option<String>, 45 pub signing_key: Option<String>, 46 pub verification_channel: Option<String>, 47 pub discord_id: Option<String>, 48 pub telegram_username: Option<String>, 49 pub signal_number: Option<String>, 50} 51 52#[derive(Serialize)] 53#[serde(rename_all = "camelCase")] 54pub struct CreateAccountOutput { 55 pub handle: String, 56 pub did: String, 57 #[serde(skip_serializing_if = "Option::is_none")] 58 pub access_jwt: Option<String>, 59 #[serde(skip_serializing_if = "Option::is_none")] 60 pub refresh_jwt: Option<String>, 61 pub verification_required: bool, 62 pub verification_channel: String, 63} 64 65pub async fn create_account( 66 State(state): State<AppState>, 67 headers: HeaderMap, 68 Json(input): Json<CreateAccountInput>, 69) -> Response { 70 info!("create_account called"); 71 let client_ip = extract_client_ip(&headers); 72 if !state 73 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 74 .await 75 { 76 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 77 return ( 78 StatusCode::TOO_MANY_REQUESTS, 79 Json(json!({ 80 "error": "RateLimitExceeded", 81 "message": "Too many account creation attempts. Please try again later." 82 })), 83 ) 84 .into_response(); 85 } 86 87 let migration_auth = if let Some(token) = 88 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 89 { 90 if is_service_token(&token) { 91 let verifier = ServiceTokenVerifier::new(); 92 match verifier 93 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 94 .await 95 { 96 Ok(claims) => { 97 debug!("Service token verified for migration: iss={}", claims.iss); 98 Some(claims.iss) 99 } 100 Err(e) => { 101 error!("Service token verification failed: {:?}", e); 102 return ( 103 StatusCode::UNAUTHORIZED, 104 Json(json!({ 105 "error": "AuthenticationFailed", 106 "message": format!("Service token verification failed: {}", e) 107 })), 108 ) 109 .into_response(); 110 } 111 } 112 } else { 113 None 114 } 115 } else { 116 None 117 }; 118 119 let is_migration = migration_auth.is_some() 120 && input 121 .did 122 .as_ref() 123 .map(|d| d.starts_with("did:plc:")) 124 .unwrap_or(false); 125 126 if is_migration { 127 let migration_did = input.did.as_ref().unwrap(); 128 let auth_did = migration_auth.as_ref().unwrap(); 129 if migration_did != auth_did { 130 return ( 131 StatusCode::FORBIDDEN, 132 Json(json!({ 133 "error": "AuthorizationError", 134 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did) 135 })), 136 ) 137 .into_response(); 138 } 139 info!(did = %migration_did, "Processing account migration"); 140 } 141 142 if input.handle.contains('!') || input.handle.contains('@') { 143 return ( 144 StatusCode::BAD_REQUEST, 145 Json( 146 json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"}), 147 ), 148 ) 149 .into_response(); 150 } 151 let email: Option<String> = input 152 .email 153 .as_ref() 154 .map(|e| e.trim().to_string()) 155 .filter(|e| !e.is_empty()); 156 if let Some(ref email) = email 157 && !crate::api::validation::is_valid_email(email) 158 { 159 return ( 160 StatusCode::BAD_REQUEST, 161 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 162 ) 163 .into_response(); 164 } 165 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 166 let valid_channels = ["email", "discord", "telegram", "signal"]; 167 if !valid_channels.contains(&verification_channel) && !is_migration { 168 return ( 169 StatusCode::BAD_REQUEST, 170 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 171 ) 172 .into_response(); 173 } 174 let verification_recipient = if is_migration { 175 None 176 } else { 177 Some(match verification_channel { 178 "email" => match &input.email { 179 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 180 _ => return ( 181 StatusCode::BAD_REQUEST, 182 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 183 ).into_response(), 184 }, 185 "discord" => match &input.discord_id { 186 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 187 _ => return ( 188 StatusCode::BAD_REQUEST, 189 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 190 ).into_response(), 191 }, 192 "telegram" => match &input.telegram_username { 193 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 194 _ => return ( 195 StatusCode::BAD_REQUEST, 196 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 197 ).into_response(), 198 }, 199 "signal" => match &input.signal_number { 200 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 201 _ => return ( 202 StatusCode::BAD_REQUEST, 203 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 204 ).into_response(), 205 }, 206 _ => return ( 207 StatusCode::BAD_REQUEST, 208 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 209 ).into_response(), 210 }) 211 }; 212 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 213 let pds_endpoint = format!("https://{}", hostname); 214 let suffix = format!(".{}", hostname); 215 let short_handle = if input.handle.ends_with(&suffix) { 216 input.handle.strip_suffix(&suffix).unwrap_or(&input.handle) 217 } else { 218 &input.handle 219 }; 220 let full_handle = format!("{}.{}", short_handle, hostname); 221 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 222 if let Some(signing_key_did) = &input.signing_key { 223 let reserved = sqlx::query!( 224 r#" 225 SELECT id, private_key_bytes 226 FROM reserved_signing_keys 227 WHERE public_key_did_key = $1 228 AND used_at IS NULL 229 AND expires_at > NOW() 230 FOR UPDATE 231 "#, 232 signing_key_did 233 ) 234 .fetch_optional(&state.db) 235 .await; 236 match reserved { 237 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 238 Ok(None) => { 239 return ( 240 StatusCode::BAD_REQUEST, 241 Json(json!({ 242 "error": "InvalidSigningKey", 243 "message": "Signing key not found, already used, or expired" 244 })), 245 ) 246 .into_response(); 247 } 248 Err(e) => { 249 error!("Error looking up reserved signing key: {:?}", e); 250 return ( 251 StatusCode::INTERNAL_SERVER_ERROR, 252 Json(json!({"error": "InternalError"})), 253 ) 254 .into_response(); 255 } 256 } 257 } else { 258 let secret_key = SecretKey::random(&mut OsRng); 259 (secret_key.to_bytes().to_vec(), None) 260 }; 261 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 262 Ok(k) => k, 263 Err(e) => { 264 error!("Error creating signing key: {:?}", e); 265 return ( 266 StatusCode::INTERNAL_SERVER_ERROR, 267 Json(json!({"error": "InternalError"})), 268 ) 269 .into_response(); 270 } 271 }; 272 let did_type = input.did_type.as_deref().unwrap_or("plc"); 273 let did = match did_type { 274 "web" => { 275 let subdomain_host = format!("{}.{}", input.handle, hostname); 276 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 277 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 278 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 279 self_hosted_did 280 } 281 "web-external" => { 282 let d = match &input.did { 283 Some(d) if !d.trim().is_empty() => d, 284 _ => { 285 return ( 286 StatusCode::BAD_REQUEST, 287 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})), 288 ) 289 .into_response(); 290 } 291 }; 292 if !d.starts_with("did:web:") { 293 return ( 294 StatusCode::BAD_REQUEST, 295 Json( 296 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}), 297 ), 298 ) 299 .into_response(); 300 } 301 if let Err(e) = verify_did_web(d, &hostname, &input.handle).await { 302 return ( 303 StatusCode::BAD_REQUEST, 304 Json(json!({"error": "InvalidDid", "message": e})), 305 ) 306 .into_response(); 307 } 308 info!(did = %d, "Creating external did:web account"); 309 d.clone() 310 } 311 _ => { 312 if let Some(d) = &input.did { 313 if d.starts_with("did:plc:") && is_migration { 314 info!(did = %d, "Migration with existing did:plc"); 315 d.clone() 316 } else if d.starts_with("did:web:") { 317 if let Err(e) = verify_did_web(d, &hostname, &input.handle).await { 318 return ( 319 StatusCode::BAD_REQUEST, 320 Json(json!({"error": "InvalidDid", "message": e})), 321 ) 322 .into_response(); 323 } 324 d.clone() 325 } else if !d.trim().is_empty() { 326 return ( 327 StatusCode::BAD_REQUEST, 328 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 329 ) 330 .into_response(); 331 } else { 332 let rotation_key = std::env::var("PLC_ROTATION_KEY") 333 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 334 let genesis_result = match create_genesis_operation( 335 &signing_key, 336 &rotation_key, 337 &full_handle, 338 &pds_endpoint, 339 ) { 340 Ok(r) => r, 341 Err(e) => { 342 error!("Error creating PLC genesis operation: {:?}", e); 343 return ( 344 StatusCode::INTERNAL_SERVER_ERROR, 345 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 346 ) 347 .into_response(); 348 } 349 }; 350 let plc_client = PlcClient::new(None); 351 if let Err(e) = plc_client 352 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 353 .await 354 { 355 error!("Failed to submit PLC genesis operation: {:?}", e); 356 return ( 357 StatusCode::BAD_GATEWAY, 358 Json(json!({ 359 "error": "UpstreamError", 360 "message": format!("Failed to register DID with PLC directory: {}", e) 361 })), 362 ) 363 .into_response(); 364 } 365 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 366 genesis_result.did 367 } 368 } else { 369 let rotation_key = std::env::var("PLC_ROTATION_KEY") 370 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 371 let genesis_result = match create_genesis_operation( 372 &signing_key, 373 &rotation_key, 374 &full_handle, 375 &pds_endpoint, 376 ) { 377 Ok(r) => r, 378 Err(e) => { 379 error!("Error creating PLC genesis operation: {:?}", e); 380 return ( 381 StatusCode::INTERNAL_SERVER_ERROR, 382 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 383 ) 384 .into_response(); 385 } 386 }; 387 let plc_client = PlcClient::new(None); 388 if let Err(e) = plc_client 389 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 390 .await 391 { 392 error!("Failed to submit PLC genesis operation: {:?}", e); 393 return ( 394 StatusCode::BAD_GATEWAY, 395 Json(json!({ 396 "error": "UpstreamError", 397 "message": format!("Failed to register DID with PLC directory: {}", e) 398 })), 399 ) 400 .into_response(); 401 } 402 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 403 genesis_result.did 404 } 405 } 406 }; 407 let mut tx = match state.db.begin().await { 408 Ok(tx) => tx, 409 Err(e) => { 410 error!("Error starting transaction: {:?}", e); 411 return ( 412 StatusCode::INTERNAL_SERVER_ERROR, 413 Json(json!({"error": "InternalError"})), 414 ) 415 .into_response(); 416 } 417 }; 418 if is_migration { 419 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 420 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 421 .bind(&did) 422 .fetch_optional(&mut *tx) 423 .await 424 .unwrap_or(None); 425 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 426 if deactivated_at.is_some() { 427 info!(did = %did, old_handle = %old_handle, new_handle = %short_handle, "Preparing existing account for inbound migration"); 428 let update_result: Result<_, sqlx::Error> = 429 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 430 .bind(short_handle) 431 .bind(account_id) 432 .execute(&mut *tx) 433 .await; 434 if let Err(e) = update_result { 435 if let Some(db_err) = e.as_database_error() 436 && db_err 437 .constraint() 438 .map(|c| c.contains("handle")) 439 .unwrap_or(false) 440 { 441 return ( 442 StatusCode::BAD_REQUEST, 443 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 444 ) 445 .into_response(); 446 } 447 error!("Error reactivating account: {:?}", e); 448 return ( 449 StatusCode::INTERNAL_SERVER_ERROR, 450 Json(json!({"error": "InternalError"})), 451 ) 452 .into_response(); 453 } 454 if let Err(e) = tx.commit().await { 455 error!("Error committing reactivation: {:?}", e); 456 return ( 457 StatusCode::INTERNAL_SERVER_ERROR, 458 Json(json!({"error": "InternalError"})), 459 ) 460 .into_response(); 461 } 462 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 463 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 464 ) 465 .bind(account_id) 466 .fetch_optional(&state.db) 467 .await 468 .unwrap_or(None); 469 let secret_key_bytes = match key_row { 470 Some((key_bytes, encryption_version)) => { 471 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 472 Ok(k) => k, 473 Err(e) => { 474 error!("Error decrypting key for reactivated account: {:?}", e); 475 return ( 476 StatusCode::INTERNAL_SERVER_ERROR, 477 Json(json!({"error": "InternalError"})), 478 ) 479 .into_response(); 480 } 481 } 482 } 483 None => { 484 error!("No signing key found for reactivated account"); 485 return ( 486 StatusCode::INTERNAL_SERVER_ERROR, 487 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 488 ) 489 .into_response(); 490 } 491 }; 492 let access_meta = 493 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 494 Ok(m) => m, 495 Err(e) => { 496 error!("Error creating access token: {:?}", e); 497 return ( 498 StatusCode::INTERNAL_SERVER_ERROR, 499 Json(json!({"error": "InternalError"})), 500 ) 501 .into_response(); 502 } 503 }; 504 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 505 &did, 506 &secret_key_bytes, 507 ) { 508 Ok(m) => m, 509 Err(e) => { 510 error!("Error creating refresh token: {:?}", e); 511 return ( 512 StatusCode::INTERNAL_SERVER_ERROR, 513 Json(json!({"error": "InternalError"})), 514 ) 515 .into_response(); 516 } 517 }; 518 let session_result: Result<_, sqlx::Error> = sqlx::query( 519 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 520 ) 521 .bind(&did) 522 .bind(&access_meta.jti) 523 .bind(&refresh_meta.jti) 524 .bind(access_meta.expires_at) 525 .bind(refresh_meta.expires_at) 526 .execute(&state.db) 527 .await; 528 if let Err(e) = session_result { 529 error!("Error creating session: {:?}", e); 530 return ( 531 StatusCode::INTERNAL_SERVER_ERROR, 532 Json(json!({"error": "InternalError"})), 533 ) 534 .into_response(); 535 } 536 return ( 537 StatusCode::OK, 538 Json(CreateAccountOutput { 539 handle: full_handle.clone(), 540 did, 541 access_jwt: Some(access_meta.token), 542 refresh_jwt: Some(refresh_meta.token), 543 verification_required: false, 544 verification_channel: "email".to_string(), 545 }), 546 ) 547 .into_response(); 548 } else { 549 return ( 550 StatusCode::BAD_REQUEST, 551 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 552 ) 553 .into_response(); 554 } 555 } 556 } 557 let exists_result: Option<(i32,)> = 558 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 559 .bind(short_handle) 560 .fetch_optional(&mut *tx) 561 .await 562 .unwrap_or(None); 563 if exists_result.is_some() { 564 return ( 565 StatusCode::BAD_REQUEST, 566 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 567 ) 568 .into_response(); 569 } 570 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 571 .map(|v| v == "true" || v == "1") 572 .unwrap_or(false); 573 if invite_code_required 574 && input 575 .invite_code 576 .as_ref() 577 .map(|c| c.trim().is_empty()) 578 .unwrap_or(true) 579 { 580 return ( 581 StatusCode::BAD_REQUEST, 582 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 583 ) 584 .into_response(); 585 } 586 if let Some(code) = &input.invite_code 587 && !code.trim().is_empty() 588 { 589 let invite_query = sqlx::query!( 590 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 591 code 592 ) 593 .fetch_optional(&mut *tx) 594 .await; 595 match invite_query { 596 Ok(Some(row)) => { 597 if row.available_uses <= 0 { 598 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 599 } 600 let update_invite = sqlx::query!( 601 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 602 code 603 ) 604 .execute(&mut *tx) 605 .await; 606 if let Err(e) = update_invite { 607 error!("Error updating invite code: {:?}", e); 608 return ( 609 StatusCode::INTERNAL_SERVER_ERROR, 610 Json(json!({"error": "InternalError"})), 611 ) 612 .into_response(); 613 } 614 } 615 Ok(None) => { 616 return ( 617 StatusCode::BAD_REQUEST, 618 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 619 ) 620 .into_response(); 621 } 622 Err(e) => { 623 error!("Error checking invite code: {:?}", e); 624 return ( 625 StatusCode::INTERNAL_SERVER_ERROR, 626 Json(json!({"error": "InternalError"})), 627 ) 628 .into_response(); 629 } 630 } 631 } 632 let password_hash = match hash(&input.password, DEFAULT_COST) { 633 Ok(h) => h, 634 Err(e) => { 635 error!("Error hashing password: {:?}", e); 636 return ( 637 StatusCode::INTERNAL_SERVER_ERROR, 638 Json(json!({"error": "InternalError"})), 639 ) 640 .into_response(); 641 } 642 }; 643 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 644 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30); 645 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 646 .fetch_one(&mut *tx) 647 .await 648 .map(|c| c.unwrap_or(0) == 0) 649 .unwrap_or(false); 650 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration { 651 Some(chrono::Utc::now()) 652 } else { 653 None 654 }; 655 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 656 r#"INSERT INTO users ( 657 handle, email, did, password_hash, 658 preferred_comms_channel, 659 discord_id, telegram_username, signal_number, 660 is_admin, deactivated_at, email_verified 661 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 662 ) 663 .bind(short_handle) 664 .bind(&email) 665 .bind(&did) 666 .bind(&password_hash) 667 .bind(verification_channel) 668 .bind( 669 input 670 .discord_id 671 .as_deref() 672 .map(|s| s.trim()) 673 .filter(|s| !s.is_empty()), 674 ) 675 .bind( 676 input 677 .telegram_username 678 .as_deref() 679 .map(|s| s.trim()) 680 .filter(|s| !s.is_empty()), 681 ) 682 .bind( 683 input 684 .signal_number 685 .as_deref() 686 .map(|s| s.trim()) 687 .filter(|s| !s.is_empty()), 688 ) 689 .bind(is_first_user) 690 .bind(deactivated_at) 691 .bind(is_migration) 692 .fetch_one(&mut *tx) 693 .await; 694 let user_id = match user_insert { 695 Ok((id,)) => id, 696 Err(e) => { 697 if let Some(db_err) = e.as_database_error() 698 && db_err.code().as_deref() == Some("23505") 699 { 700 let constraint = db_err.constraint().unwrap_or(""); 701 if constraint.contains("handle") || constraint.contains("users_handle") { 702 return ( 703 StatusCode::BAD_REQUEST, 704 Json(json!({ 705 "error": "HandleNotAvailable", 706 "message": "Handle already taken" 707 })), 708 ) 709 .into_response(); 710 } else if constraint.contains("email") || constraint.contains("users_email") { 711 return ( 712 StatusCode::BAD_REQUEST, 713 Json(json!({ 714 "error": "InvalidEmail", 715 "message": "Email already registered" 716 })), 717 ) 718 .into_response(); 719 } else if constraint.contains("did") || constraint.contains("users_did") { 720 return ( 721 StatusCode::BAD_REQUEST, 722 Json(json!({ 723 "error": "AccountAlreadyExists", 724 "message": "An account with this DID already exists" 725 })), 726 ) 727 .into_response(); 728 } 729 } 730 error!("Error inserting user: {:?}", e); 731 return ( 732 StatusCode::INTERNAL_SERVER_ERROR, 733 Json(json!({"error": "InternalError"})), 734 ) 735 .into_response(); 736 } 737 }; 738 739 if !is_migration 740 && let Err(e) = sqlx::query!( 741 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, 'email', $2, $3, $4)", 742 user_id, 743 verification_code, 744 email, 745 code_expires_at 746 ) 747 .execute(&mut *tx) 748 .await { 749 error!("Error inserting verification code: {:?}", e); 750 return ( 751 StatusCode::INTERNAL_SERVER_ERROR, 752 Json(json!({"error": "InternalError"})), 753 ) 754 .into_response(); 755 } 756 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 757 Ok(enc) => enc, 758 Err(e) => { 759 error!("Error encrypting user key: {:?}", e); 760 return ( 761 StatusCode::INTERNAL_SERVER_ERROR, 762 Json(json!({"error": "InternalError"})), 763 ) 764 .into_response(); 765 } 766 }; 767 let key_insert = sqlx::query!( 768 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 769 user_id, 770 &encrypted_key_bytes[..], 771 crate::config::ENCRYPTION_VERSION 772 ) 773 .execute(&mut *tx) 774 .await; 775 if let Err(e) = key_insert { 776 error!("Error inserting user key: {:?}", e); 777 return ( 778 StatusCode::INTERNAL_SERVER_ERROR, 779 Json(json!({"error": "InternalError"})), 780 ) 781 .into_response(); 782 } 783 if let Some(key_id) = reserved_key_id { 784 let mark_used = sqlx::query!( 785 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 786 key_id 787 ) 788 .execute(&mut *tx) 789 .await; 790 if let Err(e) = mark_used { 791 error!("Error marking reserved key as used: {:?}", e); 792 return ( 793 StatusCode::INTERNAL_SERVER_ERROR, 794 Json(json!({"error": "InternalError"})), 795 ) 796 .into_response(); 797 } 798 } 799 let mst = Mst::new(Arc::new(state.block_store.clone())); 800 let mst_root = match mst.persist().await { 801 Ok(c) => c, 802 Err(e) => { 803 error!("Error persisting MST: {:?}", e); 804 return ( 805 StatusCode::INTERNAL_SERVER_ERROR, 806 Json(json!({"error": "InternalError"})), 807 ) 808 .into_response(); 809 } 810 }; 811 let did_obj = match Did::new(&did) { 812 Ok(d) => d, 813 Err(_) => { 814 return ( 815 StatusCode::INTERNAL_SERVER_ERROR, 816 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 817 ) 818 .into_response(); 819 } 820 }; 821 let rev = Tid::now(LimitedU32::MIN); 822 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None); 823 let signed_commit = match unsigned_commit.sign(&signing_key) { 824 Ok(c) => c, 825 Err(e) => { 826 error!("Error signing genesis commit: {:?}", e); 827 return ( 828 StatusCode::INTERNAL_SERVER_ERROR, 829 Json(json!({"error": "InternalError"})), 830 ) 831 .into_response(); 832 } 833 }; 834 let commit_bytes = match signed_commit.to_cbor() { 835 Ok(b) => b, 836 Err(e) => { 837 error!("Error serializing genesis commit: {:?}", e); 838 return ( 839 StatusCode::INTERNAL_SERVER_ERROR, 840 Json(json!({"error": "InternalError"})), 841 ) 842 .into_response(); 843 } 844 }; 845 let commit_cid = match state.block_store.put(&commit_bytes).await { 846 Ok(c) => c, 847 Err(e) => { 848 error!("Error saving genesis commit: {:?}", e); 849 return ( 850 StatusCode::INTERNAL_SERVER_ERROR, 851 Json(json!({"error": "InternalError"})), 852 ) 853 .into_response(); 854 } 855 }; 856 let commit_cid_str = commit_cid.to_string(); 857 let repo_insert = sqlx::query!( 858 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 859 user_id, 860 commit_cid_str 861 ) 862 .execute(&mut *tx) 863 .await; 864 if let Err(e) = repo_insert { 865 error!("Error initializing repo: {:?}", e); 866 return ( 867 StatusCode::INTERNAL_SERVER_ERROR, 868 Json(json!({"error": "InternalError"})), 869 ) 870 .into_response(); 871 } 872 if let Some(code) = &input.invite_code 873 && !code.trim().is_empty() 874 { 875 let use_insert = sqlx::query!( 876 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 877 code, 878 user_id 879 ) 880 .execute(&mut *tx) 881 .await; 882 if let Err(e) = use_insert { 883 error!("Error recording invite usage: {:?}", e); 884 return ( 885 StatusCode::INTERNAL_SERVER_ERROR, 886 Json(json!({"error": "InternalError"})), 887 ) 888 .into_response(); 889 } 890 } 891 if let Err(e) = tx.commit().await { 892 error!("Error committing transaction: {:?}", e); 893 return ( 894 StatusCode::INTERNAL_SERVER_ERROR, 895 Json(json!({"error": "InternalError"})), 896 ) 897 .into_response(); 898 } 899 if !is_migration { 900 if let Err(e) = 901 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&full_handle)) 902 .await 903 { 904 warn!("Failed to sequence identity event for {}: {}", did, e); 905 } 906 if let Err(e) = 907 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 908 { 909 warn!("Failed to sequence account event for {}: {}", did, e); 910 } 911 let profile_record = json!({ 912 "$type": "app.bsky.actor.profile", 913 "displayName": input.handle 914 }); 915 if let Err(e) = crate::api::repo::record::create_record_internal( 916 &state, 917 &did, 918 "app.bsky.actor.profile", 919 "self", 920 &profile_record, 921 ) 922 .await 923 { 924 warn!("Failed to create default profile for {}: {}", did, e); 925 } 926 if let Some(ref recipient) = verification_recipient 927 && let Err(e) = crate::comms::enqueue_signup_verification( 928 &state.db, 929 user_id, 930 verification_channel, 931 recipient, 932 &verification_code, 933 ) 934 .await 935 { 936 warn!( 937 "Failed to enqueue signup verification notification: {:?}", 938 e 939 ); 940 } 941 } 942 943 let (access_jwt, refresh_jwt) = if is_migration { 944 let access_meta = 945 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 946 Ok(m) => m, 947 Err(e) => { 948 error!("Error creating access token for migration: {:?}", e); 949 return ( 950 StatusCode::INTERNAL_SERVER_ERROR, 951 Json(json!({"error": "InternalError"})), 952 ) 953 .into_response(); 954 } 955 }; 956 let refresh_meta = 957 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 958 Ok(m) => m, 959 Err(e) => { 960 error!("Error creating refresh token for migration: {:?}", e); 961 return ( 962 StatusCode::INTERNAL_SERVER_ERROR, 963 Json(json!({"error": "InternalError"})), 964 ) 965 .into_response(); 966 } 967 }; 968 if let Err(e) = sqlx::query!( 969 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 970 did, 971 access_meta.jti, 972 refresh_meta.jti, 973 access_meta.expires_at, 974 refresh_meta.expires_at 975 ) 976 .execute(&state.db) 977 .await 978 { 979 error!("Error creating session for migration: {:?}", e); 980 return ( 981 StatusCode::INTERNAL_SERVER_ERROR, 982 Json(json!({"error": "InternalError"})), 983 ) 984 .into_response(); 985 } 986 (Some(access_meta.token), Some(refresh_meta.token)) 987 } else { 988 (None, None) 989 }; 990 991 ( 992 StatusCode::OK, 993 Json(CreateAccountOutput { 994 handle: full_handle.clone(), 995 did, 996 access_jwt, 997 refresh_jwt, 998 verification_required: !is_migration, 999 verification_channel: verification_channel.to_string(), 1000 }), 1001 ) 1002 .into_response() 1003}