this repo has no description
1use super::did::verify_did_web; 2use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 3use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 4use crate::state::{AppState, RateLimitKind}; 5use axum::{ 6 Json, 7 extract::State, 8 http::{HeaderMap, StatusCode}, 9 response::{IntoResponse, Response}, 10}; 11use bcrypt::{DEFAULT_COST, hash}; 12use jacquard::types::{did::Did, integer::LimitedU32, string::Tid}; 13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 14use k256::{SecretKey, ecdsa::SigningKey}; 15use rand::rngs::OsRng; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use std::sync::Arc; 19use tracing::{debug, error, info, warn}; 20 21fn extract_client_ip(headers: &HeaderMap) -> String { 22 if let Some(forwarded) = headers.get("x-forwarded-for") 23 && let Ok(value) = forwarded.to_str() 24 && let Some(first_ip) = value.split(',').next() 25 { 26 return first_ip.trim().to_string(); 27 } 28 if let Some(real_ip) = headers.get("x-real-ip") 29 && let Ok(value) = real_ip.to_str() 30 { 31 return value.trim().to_string(); 32 } 33 "unknown".to_string() 34} 35 36#[derive(Deserialize)] 37#[serde(rename_all = "camelCase")] 38pub struct CreateAccountInput { 39 pub handle: String, 40 pub email: Option<String>, 41 pub password: String, 42 pub invite_code: Option<String>, 43 pub did: Option<String>, 44 pub did_type: Option<String>, 45 pub signing_key: Option<String>, 46 pub verification_channel: Option<String>, 47 pub discord_id: Option<String>, 48 pub telegram_username: Option<String>, 49 pub signal_number: Option<String>, 50} 51 52#[derive(Serialize)] 53#[serde(rename_all = "camelCase")] 54pub struct CreateAccountOutput { 55 pub handle: String, 56 pub did: String, 57 #[serde(skip_serializing_if = "Option::is_none")] 58 pub access_jwt: Option<String>, 59 #[serde(skip_serializing_if = "Option::is_none")] 60 pub refresh_jwt: Option<String>, 61 pub verification_required: bool, 62 pub verification_channel: String, 63} 64 65pub async fn create_account( 66 State(state): State<AppState>, 67 headers: HeaderMap, 68 Json(input): Json<CreateAccountInput>, 69) -> Response { 70 info!("create_account called"); 71 let client_ip = extract_client_ip(&headers); 72 if !state 73 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 74 .await 75 { 76 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 77 return ( 78 StatusCode::TOO_MANY_REQUESTS, 79 Json(json!({ 80 "error": "RateLimitExceeded", 81 "message": "Too many account creation attempts. Please try again later." 82 })), 83 ) 84 .into_response(); 85 } 86 87 let migration_auth = if let Some(token) = 88 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 89 { 90 if is_service_token(&token) { 91 let verifier = ServiceTokenVerifier::new(); 92 match verifier 93 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 94 .await 95 { 96 Ok(claims) => { 97 debug!("Service token verified for migration: iss={}", claims.iss); 98 Some(claims.iss) 99 } 100 Err(e) => { 101 error!("Service token verification failed: {:?}", e); 102 return ( 103 StatusCode::UNAUTHORIZED, 104 Json(json!({ 105 "error": "AuthenticationFailed", 106 "message": format!("Service token verification failed: {}", e) 107 })), 108 ) 109 .into_response(); 110 } 111 } 112 } else { 113 None 114 } 115 } else { 116 None 117 }; 118 119 let is_migration = migration_auth.is_some() 120 && input 121 .did 122 .as_ref() 123 .map(|d| d.starts_with("did:plc:")) 124 .unwrap_or(false); 125 126 if is_migration { 127 let migration_did = input.did.as_ref().unwrap(); 128 let auth_did = migration_auth.as_ref().unwrap(); 129 if migration_did != auth_did { 130 return ( 131 StatusCode::FORBIDDEN, 132 Json(json!({ 133 "error": "AuthorizationError", 134 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did) 135 })), 136 ) 137 .into_response(); 138 } 139 info!(did = %migration_did, "Processing account migration"); 140 } 141 142 let hostname_for_validation = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 143 let pds_suffix = format!(".{}", hostname_for_validation); 144 145 let validated_short_handle = if !input.handle.contains('.') || input.handle.ends_with(&pds_suffix) { 146 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 147 input.handle.strip_suffix(&pds_suffix).unwrap_or(&input.handle) 148 } else { 149 &input.handle 150 }; 151 match crate::api::validation::validate_short_handle(handle_to_validate) { 152 Ok(h) => h, 153 Err(e) => { 154 return ( 155 StatusCode::BAD_REQUEST, 156 Json(json!({"error": "InvalidHandle", "message": e.to_string()})), 157 ) 158 .into_response(); 159 } 160 } 161 } else { 162 if input.handle.contains(' ') || input.handle.contains('\t') { 163 return ( 164 StatusCode::BAD_REQUEST, 165 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})), 166 ) 167 .into_response(); 168 } 169 input.handle.to_lowercase() 170 }; 171 let email: Option<String> = input 172 .email 173 .as_ref() 174 .map(|e| e.trim().to_string()) 175 .filter(|e| !e.is_empty()); 176 if let Some(ref email) = email 177 && !crate::api::validation::is_valid_email(email) 178 { 179 return ( 180 StatusCode::BAD_REQUEST, 181 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 182 ) 183 .into_response(); 184 } 185 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 186 let valid_channels = ["email", "discord", "telegram", "signal"]; 187 if !valid_channels.contains(&verification_channel) && !is_migration { 188 return ( 189 StatusCode::BAD_REQUEST, 190 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 191 ) 192 .into_response(); 193 } 194 let verification_recipient = if is_migration { 195 None 196 } else { 197 Some(match verification_channel { 198 "email" => match &input.email { 199 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 200 _ => return ( 201 StatusCode::BAD_REQUEST, 202 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 203 ).into_response(), 204 }, 205 "discord" => match &input.discord_id { 206 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 207 _ => return ( 208 StatusCode::BAD_REQUEST, 209 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 210 ).into_response(), 211 }, 212 "telegram" => match &input.telegram_username { 213 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 214 _ => return ( 215 StatusCode::BAD_REQUEST, 216 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 217 ).into_response(), 218 }, 219 "signal" => match &input.signal_number { 220 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 221 _ => return ( 222 StatusCode::BAD_REQUEST, 223 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 224 ).into_response(), 225 }, 226 _ => return ( 227 StatusCode::BAD_REQUEST, 228 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 229 ).into_response(), 230 }) 231 }; 232 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 233 let pds_endpoint = format!("https://{}", hostname); 234 let suffix = format!(".{}", hostname); 235 let handle = if input.handle.ends_with(&suffix) { 236 format!("{}.{}", validated_short_handle, hostname) 237 } else if input.handle.contains('.') { 238 validated_short_handle.clone() 239 } else { 240 format!("{}.{}", validated_short_handle, hostname) 241 }; 242 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 243 if let Some(signing_key_did) = &input.signing_key { 244 let reserved = sqlx::query!( 245 r#" 246 SELECT id, private_key_bytes 247 FROM reserved_signing_keys 248 WHERE public_key_did_key = $1 249 AND used_at IS NULL 250 AND expires_at > NOW() 251 FOR UPDATE 252 "#, 253 signing_key_did 254 ) 255 .fetch_optional(&state.db) 256 .await; 257 match reserved { 258 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 259 Ok(None) => { 260 return ( 261 StatusCode::BAD_REQUEST, 262 Json(json!({ 263 "error": "InvalidSigningKey", 264 "message": "Signing key not found, already used, or expired" 265 })), 266 ) 267 .into_response(); 268 } 269 Err(e) => { 270 error!("Error looking up reserved signing key: {:?}", e); 271 return ( 272 StatusCode::INTERNAL_SERVER_ERROR, 273 Json(json!({"error": "InternalError"})), 274 ) 275 .into_response(); 276 } 277 } 278 } else { 279 let secret_key = SecretKey::random(&mut OsRng); 280 (secret_key.to_bytes().to_vec(), None) 281 }; 282 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 283 Ok(k) => k, 284 Err(e) => { 285 error!("Error creating signing key: {:?}", e); 286 return ( 287 StatusCode::INTERNAL_SERVER_ERROR, 288 Json(json!({"error": "InternalError"})), 289 ) 290 .into_response(); 291 } 292 }; 293 let did_type = input.did_type.as_deref().unwrap_or("plc"); 294 let did = match did_type { 295 "web" => { 296 let subdomain_host = format!("{}.{}", input.handle, hostname); 297 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 298 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 299 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 300 self_hosted_did 301 } 302 "web-external" => { 303 let d = match &input.did { 304 Some(d) if !d.trim().is_empty() => d, 305 _ => { 306 return ( 307 StatusCode::BAD_REQUEST, 308 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})), 309 ) 310 .into_response(); 311 } 312 }; 313 if !d.starts_with("did:web:") { 314 return ( 315 StatusCode::BAD_REQUEST, 316 Json( 317 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}), 318 ), 319 ) 320 .into_response(); 321 } 322 if let Err(e) = verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await { 323 return ( 324 StatusCode::BAD_REQUEST, 325 Json(json!({"error": "InvalidDid", "message": e})), 326 ) 327 .into_response(); 328 } 329 info!(did = %d, "Creating external did:web account"); 330 d.clone() 331 } 332 _ => { 333 if let Some(d) = &input.did { 334 if d.starts_with("did:plc:") && is_migration { 335 info!(did = %d, "Migration with existing did:plc"); 336 d.clone() 337 } else if d.starts_with("did:web:") { 338 if let Err(e) = verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await { 339 return ( 340 StatusCode::BAD_REQUEST, 341 Json(json!({"error": "InvalidDid", "message": e})), 342 ) 343 .into_response(); 344 } 345 d.clone() 346 } else if !d.trim().is_empty() { 347 return ( 348 StatusCode::BAD_REQUEST, 349 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 350 ) 351 .into_response(); 352 } else { 353 let rotation_key = std::env::var("PLC_ROTATION_KEY") 354 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 355 let genesis_result = match create_genesis_operation( 356 &signing_key, 357 &rotation_key, 358 &handle, 359 &pds_endpoint, 360 ) { 361 Ok(r) => r, 362 Err(e) => { 363 error!("Error creating PLC genesis operation: {:?}", e); 364 return ( 365 StatusCode::INTERNAL_SERVER_ERROR, 366 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 367 ) 368 .into_response(); 369 } 370 }; 371 let plc_client = PlcClient::new(None); 372 if let Err(e) = plc_client 373 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 374 .await 375 { 376 error!("Failed to submit PLC genesis operation: {:?}", e); 377 return ( 378 StatusCode::BAD_GATEWAY, 379 Json(json!({ 380 "error": "UpstreamError", 381 "message": format!("Failed to register DID with PLC directory: {}", e) 382 })), 383 ) 384 .into_response(); 385 } 386 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 387 genesis_result.did 388 } 389 } else { 390 let rotation_key = std::env::var("PLC_ROTATION_KEY") 391 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 392 let genesis_result = match create_genesis_operation( 393 &signing_key, 394 &rotation_key, 395 &handle, 396 &pds_endpoint, 397 ) { 398 Ok(r) => r, 399 Err(e) => { 400 error!("Error creating PLC genesis operation: {:?}", e); 401 return ( 402 StatusCode::INTERNAL_SERVER_ERROR, 403 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 404 ) 405 .into_response(); 406 } 407 }; 408 let plc_client = PlcClient::new(None); 409 if let Err(e) = plc_client 410 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 411 .await 412 { 413 error!("Failed to submit PLC genesis operation: {:?}", e); 414 return ( 415 StatusCode::BAD_GATEWAY, 416 Json(json!({ 417 "error": "UpstreamError", 418 "message": format!("Failed to register DID with PLC directory: {}", e) 419 })), 420 ) 421 .into_response(); 422 } 423 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 424 genesis_result.did 425 } 426 } 427 }; 428 let mut tx = match state.db.begin().await { 429 Ok(tx) => tx, 430 Err(e) => { 431 error!("Error starting transaction: {:?}", e); 432 return ( 433 StatusCode::INTERNAL_SERVER_ERROR, 434 Json(json!({"error": "InternalError"})), 435 ) 436 .into_response(); 437 } 438 }; 439 if is_migration { 440 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 441 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 442 .bind(&did) 443 .fetch_optional(&mut *tx) 444 .await 445 .unwrap_or(None); 446 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 447 if deactivated_at.is_some() { 448 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 449 let update_result: Result<_, sqlx::Error> = 450 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 451 .bind(&handle) 452 .bind(account_id) 453 .execute(&mut *tx) 454 .await; 455 if let Err(e) = update_result { 456 if let Some(db_err) = e.as_database_error() 457 && db_err 458 .constraint() 459 .map(|c| c.contains("handle")) 460 .unwrap_or(false) 461 { 462 return ( 463 StatusCode::BAD_REQUEST, 464 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 465 ) 466 .into_response(); 467 } 468 error!("Error reactivating account: {:?}", e); 469 return ( 470 StatusCode::INTERNAL_SERVER_ERROR, 471 Json(json!({"error": "InternalError"})), 472 ) 473 .into_response(); 474 } 475 if let Err(e) = tx.commit().await { 476 error!("Error committing reactivation: {:?}", e); 477 return ( 478 StatusCode::INTERNAL_SERVER_ERROR, 479 Json(json!({"error": "InternalError"})), 480 ) 481 .into_response(); 482 } 483 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 484 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 485 ) 486 .bind(account_id) 487 .fetch_optional(&state.db) 488 .await 489 .unwrap_or(None); 490 let secret_key_bytes = match key_row { 491 Some((key_bytes, encryption_version)) => { 492 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 493 Ok(k) => k, 494 Err(e) => { 495 error!("Error decrypting key for reactivated account: {:?}", e); 496 return ( 497 StatusCode::INTERNAL_SERVER_ERROR, 498 Json(json!({"error": "InternalError"})), 499 ) 500 .into_response(); 501 } 502 } 503 } 504 None => { 505 error!("No signing key found for reactivated account"); 506 return ( 507 StatusCode::INTERNAL_SERVER_ERROR, 508 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 509 ) 510 .into_response(); 511 } 512 }; 513 let access_meta = 514 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 515 Ok(m) => m, 516 Err(e) => { 517 error!("Error creating access token: {:?}", e); 518 return ( 519 StatusCode::INTERNAL_SERVER_ERROR, 520 Json(json!({"error": "InternalError"})), 521 ) 522 .into_response(); 523 } 524 }; 525 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 526 &did, 527 &secret_key_bytes, 528 ) { 529 Ok(m) => m, 530 Err(e) => { 531 error!("Error creating refresh token: {:?}", e); 532 return ( 533 StatusCode::INTERNAL_SERVER_ERROR, 534 Json(json!({"error": "InternalError"})), 535 ) 536 .into_response(); 537 } 538 }; 539 let session_result: Result<_, sqlx::Error> = sqlx::query( 540 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 541 ) 542 .bind(&did) 543 .bind(&access_meta.jti) 544 .bind(&refresh_meta.jti) 545 .bind(access_meta.expires_at) 546 .bind(refresh_meta.expires_at) 547 .execute(&state.db) 548 .await; 549 if let Err(e) = session_result { 550 error!("Error creating session: {:?}", e); 551 return ( 552 StatusCode::INTERNAL_SERVER_ERROR, 553 Json(json!({"error": "InternalError"})), 554 ) 555 .into_response(); 556 } 557 return ( 558 StatusCode::OK, 559 Json(CreateAccountOutput { 560 handle: handle.clone(), 561 did, 562 access_jwt: Some(access_meta.token), 563 refresh_jwt: Some(refresh_meta.token), 564 verification_required: false, 565 verification_channel: "email".to_string(), 566 }), 567 ) 568 .into_response(); 569 } else { 570 return ( 571 StatusCode::BAD_REQUEST, 572 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 573 ) 574 .into_response(); 575 } 576 } 577 } 578 let exists_result: Option<(i32,)> = 579 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 580 .bind(&handle) 581 .fetch_optional(&mut *tx) 582 .await 583 .unwrap_or(None); 584 if exists_result.is_some() { 585 return ( 586 StatusCode::BAD_REQUEST, 587 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 588 ) 589 .into_response(); 590 } 591 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 592 .map(|v| v == "true" || v == "1") 593 .unwrap_or(false); 594 if invite_code_required 595 && input 596 .invite_code 597 .as_ref() 598 .map(|c| c.trim().is_empty()) 599 .unwrap_or(true) 600 { 601 return ( 602 StatusCode::BAD_REQUEST, 603 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 604 ) 605 .into_response(); 606 } 607 if let Some(code) = &input.invite_code 608 && !code.trim().is_empty() 609 { 610 let invite_query = sqlx::query!( 611 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 612 code 613 ) 614 .fetch_optional(&mut *tx) 615 .await; 616 match invite_query { 617 Ok(Some(row)) => { 618 if row.available_uses <= 0 { 619 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 620 } 621 let update_invite = sqlx::query!( 622 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 623 code 624 ) 625 .execute(&mut *tx) 626 .await; 627 if let Err(e) = update_invite { 628 error!("Error updating invite code: {:?}", e); 629 return ( 630 StatusCode::INTERNAL_SERVER_ERROR, 631 Json(json!({"error": "InternalError"})), 632 ) 633 .into_response(); 634 } 635 } 636 Ok(None) => { 637 return ( 638 StatusCode::BAD_REQUEST, 639 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 640 ) 641 .into_response(); 642 } 643 Err(e) => { 644 error!("Error checking invite code: {:?}", e); 645 return ( 646 StatusCode::INTERNAL_SERVER_ERROR, 647 Json(json!({"error": "InternalError"})), 648 ) 649 .into_response(); 650 } 651 } 652 } 653 let password_hash = match hash(&input.password, DEFAULT_COST) { 654 Ok(h) => h, 655 Err(e) => { 656 error!("Error hashing password: {:?}", e); 657 return ( 658 StatusCode::INTERNAL_SERVER_ERROR, 659 Json(json!({"error": "InternalError"})), 660 ) 661 .into_response(); 662 } 663 }; 664 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 665 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30); 666 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 667 .fetch_one(&mut *tx) 668 .await 669 .map(|c| c.unwrap_or(0) == 0) 670 .unwrap_or(false); 671 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration { 672 Some(chrono::Utc::now()) 673 } else { 674 None 675 }; 676 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 677 r#"INSERT INTO users ( 678 handle, email, did, password_hash, 679 preferred_comms_channel, 680 discord_id, telegram_username, signal_number, 681 is_admin, deactivated_at, email_verified 682 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 683 ) 684 .bind(&handle) 685 .bind(&email) 686 .bind(&did) 687 .bind(&password_hash) 688 .bind(verification_channel) 689 .bind( 690 input 691 .discord_id 692 .as_deref() 693 .map(|s| s.trim()) 694 .filter(|s| !s.is_empty()), 695 ) 696 .bind( 697 input 698 .telegram_username 699 .as_deref() 700 .map(|s| s.trim()) 701 .filter(|s| !s.is_empty()), 702 ) 703 .bind( 704 input 705 .signal_number 706 .as_deref() 707 .map(|s| s.trim()) 708 .filter(|s| !s.is_empty()), 709 ) 710 .bind(is_first_user) 711 .bind(deactivated_at) 712 .bind(is_migration) 713 .fetch_one(&mut *tx) 714 .await; 715 let user_id = match user_insert { 716 Ok((id,)) => id, 717 Err(e) => { 718 if let Some(db_err) = e.as_database_error() 719 && db_err.code().as_deref() == Some("23505") 720 { 721 let constraint = db_err.constraint().unwrap_or(""); 722 if constraint.contains("handle") || constraint.contains("users_handle") { 723 return ( 724 StatusCode::BAD_REQUEST, 725 Json(json!({ 726 "error": "HandleNotAvailable", 727 "message": "Handle already taken" 728 })), 729 ) 730 .into_response(); 731 } else if constraint.contains("email") || constraint.contains("users_email") { 732 return ( 733 StatusCode::BAD_REQUEST, 734 Json(json!({ 735 "error": "InvalidEmail", 736 "message": "Email already registered" 737 })), 738 ) 739 .into_response(); 740 } else if constraint.contains("did") || constraint.contains("users_did") { 741 return ( 742 StatusCode::BAD_REQUEST, 743 Json(json!({ 744 "error": "AccountAlreadyExists", 745 "message": "An account with this DID already exists" 746 })), 747 ) 748 .into_response(); 749 } 750 } 751 error!("Error inserting user: {:?}", e); 752 return ( 753 StatusCode::INTERNAL_SERVER_ERROR, 754 Json(json!({"error": "InternalError"})), 755 ) 756 .into_response(); 757 } 758 }; 759 760 if !is_migration 761 && let Err(e) = sqlx::query!( 762 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, 'email', $2, $3, $4)", 763 user_id, 764 verification_code, 765 email, 766 code_expires_at 767 ) 768 .execute(&mut *tx) 769 .await { 770 error!("Error inserting verification code: {:?}", e); 771 return ( 772 StatusCode::INTERNAL_SERVER_ERROR, 773 Json(json!({"error": "InternalError"})), 774 ) 775 .into_response(); 776 } 777 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 778 Ok(enc) => enc, 779 Err(e) => { 780 error!("Error encrypting user key: {:?}", e); 781 return ( 782 StatusCode::INTERNAL_SERVER_ERROR, 783 Json(json!({"error": "InternalError"})), 784 ) 785 .into_response(); 786 } 787 }; 788 let key_insert = sqlx::query!( 789 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 790 user_id, 791 &encrypted_key_bytes[..], 792 crate::config::ENCRYPTION_VERSION 793 ) 794 .execute(&mut *tx) 795 .await; 796 if let Err(e) = key_insert { 797 error!("Error inserting user key: {:?}", e); 798 return ( 799 StatusCode::INTERNAL_SERVER_ERROR, 800 Json(json!({"error": "InternalError"})), 801 ) 802 .into_response(); 803 } 804 if let Some(key_id) = reserved_key_id { 805 let mark_used = sqlx::query!( 806 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 807 key_id 808 ) 809 .execute(&mut *tx) 810 .await; 811 if let Err(e) = mark_used { 812 error!("Error marking reserved key as used: {:?}", e); 813 return ( 814 StatusCode::INTERNAL_SERVER_ERROR, 815 Json(json!({"error": "InternalError"})), 816 ) 817 .into_response(); 818 } 819 } 820 let mst = Mst::new(Arc::new(state.block_store.clone())); 821 let mst_root = match mst.persist().await { 822 Ok(c) => c, 823 Err(e) => { 824 error!("Error persisting MST: {:?}", e); 825 return ( 826 StatusCode::INTERNAL_SERVER_ERROR, 827 Json(json!({"error": "InternalError"})), 828 ) 829 .into_response(); 830 } 831 }; 832 let did_obj = match Did::new(&did) { 833 Ok(d) => d, 834 Err(_) => { 835 return ( 836 StatusCode::INTERNAL_SERVER_ERROR, 837 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 838 ) 839 .into_response(); 840 } 841 }; 842 let rev = Tid::now(LimitedU32::MIN); 843 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None); 844 let signed_commit = match unsigned_commit.sign(&signing_key) { 845 Ok(c) => c, 846 Err(e) => { 847 error!("Error signing genesis commit: {:?}", e); 848 return ( 849 StatusCode::INTERNAL_SERVER_ERROR, 850 Json(json!({"error": "InternalError"})), 851 ) 852 .into_response(); 853 } 854 }; 855 let commit_bytes = match signed_commit.to_cbor() { 856 Ok(b) => b, 857 Err(e) => { 858 error!("Error serializing genesis commit: {:?}", e); 859 return ( 860 StatusCode::INTERNAL_SERVER_ERROR, 861 Json(json!({"error": "InternalError"})), 862 ) 863 .into_response(); 864 } 865 }; 866 let commit_cid = match state.block_store.put(&commit_bytes).await { 867 Ok(c) => c, 868 Err(e) => { 869 error!("Error saving genesis commit: {:?}", e); 870 return ( 871 StatusCode::INTERNAL_SERVER_ERROR, 872 Json(json!({"error": "InternalError"})), 873 ) 874 .into_response(); 875 } 876 }; 877 let commit_cid_str = commit_cid.to_string(); 878 let repo_insert = sqlx::query!( 879 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 880 user_id, 881 commit_cid_str 882 ) 883 .execute(&mut *tx) 884 .await; 885 if let Err(e) = repo_insert { 886 error!("Error initializing repo: {:?}", e); 887 return ( 888 StatusCode::INTERNAL_SERVER_ERROR, 889 Json(json!({"error": "InternalError"})), 890 ) 891 .into_response(); 892 } 893 if let Some(code) = &input.invite_code 894 && !code.trim().is_empty() 895 { 896 let use_insert = sqlx::query!( 897 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 898 code, 899 user_id 900 ) 901 .execute(&mut *tx) 902 .await; 903 if let Err(e) = use_insert { 904 error!("Error recording invite usage: {:?}", e); 905 return ( 906 StatusCode::INTERNAL_SERVER_ERROR, 907 Json(json!({"error": "InternalError"})), 908 ) 909 .into_response(); 910 } 911 } 912 if let Err(e) = tx.commit().await { 913 error!("Error committing transaction: {:?}", e); 914 return ( 915 StatusCode::INTERNAL_SERVER_ERROR, 916 Json(json!({"error": "InternalError"})), 917 ) 918 .into_response(); 919 } 920 if !is_migration { 921 if let Err(e) = 922 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)) 923 .await 924 { 925 warn!("Failed to sequence identity event for {}: {}", did, e); 926 } 927 if let Err(e) = 928 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 929 { 930 warn!("Failed to sequence account event for {}: {}", did, e); 931 } 932 let profile_record = json!({ 933 "$type": "app.bsky.actor.profile", 934 "displayName": input.handle 935 }); 936 if let Err(e) = crate::api::repo::record::create_record_internal( 937 &state, 938 &did, 939 "app.bsky.actor.profile", 940 "self", 941 &profile_record, 942 ) 943 .await 944 { 945 warn!("Failed to create default profile for {}: {}", did, e); 946 } 947 if let Some(ref recipient) = verification_recipient 948 && let Err(e) = crate::comms::enqueue_signup_verification( 949 &state.db, 950 user_id, 951 verification_channel, 952 recipient, 953 &verification_code, 954 ) 955 .await 956 { 957 warn!( 958 "Failed to enqueue signup verification notification: {:?}", 959 e 960 ); 961 } 962 } 963 964 let (access_jwt, refresh_jwt) = if is_migration { 965 let access_meta = 966 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 967 Ok(m) => m, 968 Err(e) => { 969 error!("Error creating access token for migration: {:?}", e); 970 return ( 971 StatusCode::INTERNAL_SERVER_ERROR, 972 Json(json!({"error": "InternalError"})), 973 ) 974 .into_response(); 975 } 976 }; 977 let refresh_meta = 978 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 979 Ok(m) => m, 980 Err(e) => { 981 error!("Error creating refresh token for migration: {:?}", e); 982 return ( 983 StatusCode::INTERNAL_SERVER_ERROR, 984 Json(json!({"error": "InternalError"})), 985 ) 986 .into_response(); 987 } 988 }; 989 if let Err(e) = sqlx::query!( 990 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 991 did, 992 access_meta.jti, 993 refresh_meta.jti, 994 access_meta.expires_at, 995 refresh_meta.expires_at 996 ) 997 .execute(&state.db) 998 .await 999 { 1000 error!("Error creating session for migration: {:?}", e); 1001 return ( 1002 StatusCode::INTERNAL_SERVER_ERROR, 1003 Json(json!({"error": "InternalError"})), 1004 ) 1005 .into_response(); 1006 } 1007 (Some(access_meta.token), Some(refresh_meta.token)) 1008 } else { 1009 (None, None) 1010 }; 1011 1012 ( 1013 StatusCode::OK, 1014 Json(CreateAccountOutput { 1015 handle: handle.clone(), 1016 did, 1017 access_jwt, 1018 refresh_jwt, 1019 verification_required: !is_migration, 1020 verification_channel: verification_channel.to_string(), 1021 }), 1022 ) 1023 .into_response() 1024}