this repo has no description
1use super::did::verify_did_web; 2use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 3use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 4use crate::state::{AppState, RateLimitKind}; 5use crate::validation::validate_password; 6use axum::{ 7 Json, 8 extract::State, 9 http::{HeaderMap, StatusCode}, 10 response::{IntoResponse, Response}, 11}; 12use bcrypt::{DEFAULT_COST, hash}; 13use jacquard::types::{did::Did, integer::LimitedU32, string::Tid}; 14use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 15use k256::{SecretKey, ecdsa::SigningKey}; 16use rand::rngs::OsRng; 17use serde::{Deserialize, Serialize}; 18use serde_json::json; 19use std::sync::Arc; 20use tracing::{debug, error, info, warn}; 21 22fn extract_client_ip(headers: &HeaderMap) -> String { 23 if let Some(forwarded) = headers.get("x-forwarded-for") 24 && let Ok(value) = forwarded.to_str() 25 && let Some(first_ip) = value.split(',').next() 26 { 27 return first_ip.trim().to_string(); 28 } 29 if let Some(real_ip) = headers.get("x-real-ip") 30 && let Ok(value) = real_ip.to_str() 31 { 32 return value.trim().to_string(); 33 } 34 "unknown".to_string() 35} 36 37#[derive(Deserialize)] 38#[serde(rename_all = "camelCase")] 39pub struct CreateAccountInput { 40 pub handle: String, 41 pub email: Option<String>, 42 pub password: String, 43 pub invite_code: Option<String>, 44 pub did: Option<String>, 45 pub did_type: Option<String>, 46 pub signing_key: Option<String>, 47 pub verification_channel: Option<String>, 48 pub discord_id: Option<String>, 49 pub telegram_username: Option<String>, 50 pub signal_number: Option<String>, 51} 52 53#[derive(Serialize)] 54#[serde(rename_all = "camelCase")] 55pub struct CreateAccountOutput { 56 pub handle: String, 57 pub did: String, 58 #[serde(skip_serializing_if = "Option::is_none")] 59 pub access_jwt: Option<String>, 60 #[serde(skip_serializing_if = "Option::is_none")] 61 pub refresh_jwt: Option<String>, 62 pub verification_required: bool, 63 pub verification_channel: String, 64} 65 66pub async fn create_account( 67 State(state): State<AppState>, 68 headers: HeaderMap, 69 Json(input): Json<CreateAccountInput>, 70) -> Response { 71 info!("create_account called"); 72 let client_ip = extract_client_ip(&headers); 73 if !state 74 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 75 .await 76 { 77 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 78 return ( 79 StatusCode::TOO_MANY_REQUESTS, 80 Json(json!({ 81 "error": "RateLimitExceeded", 82 "message": "Too many account creation attempts. Please try again later." 83 })), 84 ) 85 .into_response(); 86 } 87 88 let migration_auth = if let Some(token) = 89 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 90 { 91 if is_service_token(&token) { 92 let verifier = ServiceTokenVerifier::new(); 93 match verifier 94 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 95 .await 96 { 97 Ok(claims) => { 98 debug!("Service token verified for migration: iss={}", claims.iss); 99 Some(claims.iss) 100 } 101 Err(e) => { 102 error!("Service token verification failed: {:?}", e); 103 return ( 104 StatusCode::UNAUTHORIZED, 105 Json(json!({ 106 "error": "AuthenticationFailed", 107 "message": format!("Service token verification failed: {}", e) 108 })), 109 ) 110 .into_response(); 111 } 112 } 113 } else { 114 None 115 } 116 } else { 117 None 118 }; 119 120 let is_migration = migration_auth.is_some() 121 && input 122 .did 123 .as_ref() 124 .map(|d| d.starts_with("did:plc:")) 125 .unwrap_or(false); 126 127 if is_migration { 128 if let (Some(migration_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref()) 129 { 130 if migration_did != auth_did { 131 return ( 132 StatusCode::FORBIDDEN, 133 Json(json!({ 134 "error": "AuthorizationError", 135 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did) 136 })), 137 ) 138 .into_response(); 139 } 140 info!(did = %migration_did, "Processing account migration"); 141 } 142 } 143 144 let hostname_for_validation = 145 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 146 let pds_suffix = format!(".{}", hostname_for_validation); 147 148 let validated_short_handle = if !input.handle.contains('.') 149 || input.handle.ends_with(&pds_suffix) 150 { 151 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 152 input 153 .handle 154 .strip_suffix(&pds_suffix) 155 .unwrap_or(&input.handle) 156 } else { 157 &input.handle 158 }; 159 match crate::api::validation::validate_short_handle(handle_to_validate) { 160 Ok(h) => h, 161 Err(e) => { 162 return ( 163 StatusCode::BAD_REQUEST, 164 Json(json!({"error": "InvalidHandle", "message": e.to_string()})), 165 ) 166 .into_response(); 167 } 168 } 169 } else { 170 if input.handle.contains(' ') || input.handle.contains('\t') { 171 return ( 172 StatusCode::BAD_REQUEST, 173 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})), 174 ) 175 .into_response(); 176 } 177 for c in input.handle.chars() { 178 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' { 179 return ( 180 StatusCode::BAD_REQUEST, 181 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})), 182 ) 183 .into_response(); 184 } 185 } 186 input.handle.to_lowercase() 187 }; 188 let email: Option<String> = input 189 .email 190 .as_ref() 191 .map(|e| e.trim().to_string()) 192 .filter(|e| !e.is_empty()); 193 if let Some(ref email) = email 194 && !crate::api::validation::is_valid_email(email) 195 { 196 return ( 197 StatusCode::BAD_REQUEST, 198 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 199 ) 200 .into_response(); 201 } 202 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 203 let valid_channels = ["email", "discord", "telegram", "signal"]; 204 if !valid_channels.contains(&verification_channel) && !is_migration { 205 return ( 206 StatusCode::BAD_REQUEST, 207 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 208 ) 209 .into_response(); 210 } 211 let verification_recipient = if is_migration { 212 None 213 } else { 214 Some(match verification_channel { 215 "email" => match &input.email { 216 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 217 _ => return ( 218 StatusCode::BAD_REQUEST, 219 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 220 ).into_response(), 221 }, 222 "discord" => match &input.discord_id { 223 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 224 _ => return ( 225 StatusCode::BAD_REQUEST, 226 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 227 ).into_response(), 228 }, 229 "telegram" => match &input.telegram_username { 230 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 231 _ => return ( 232 StatusCode::BAD_REQUEST, 233 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 234 ).into_response(), 235 }, 236 "signal" => match &input.signal_number { 237 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 238 _ => return ( 239 StatusCode::BAD_REQUEST, 240 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 241 ).into_response(), 242 }, 243 _ => return ( 244 StatusCode::BAD_REQUEST, 245 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 246 ).into_response(), 247 }) 248 }; 249 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 250 let pds_endpoint = format!("https://{}", hostname); 251 let suffix = format!(".{}", hostname); 252 let handle = if input.handle.ends_with(&suffix) { 253 format!("{}.{}", validated_short_handle, hostname) 254 } else if input.handle.contains('.') { 255 validated_short_handle.clone() 256 } else { 257 format!("{}.{}", validated_short_handle, hostname) 258 }; 259 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 260 if let Some(signing_key_did) = &input.signing_key { 261 let reserved = sqlx::query!( 262 r#" 263 SELECT id, private_key_bytes 264 FROM reserved_signing_keys 265 WHERE public_key_did_key = $1 266 AND used_at IS NULL 267 AND expires_at > NOW() 268 FOR UPDATE 269 "#, 270 signing_key_did 271 ) 272 .fetch_optional(&state.db) 273 .await; 274 match reserved { 275 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 276 Ok(None) => { 277 return ( 278 StatusCode::BAD_REQUEST, 279 Json(json!({ 280 "error": "InvalidSigningKey", 281 "message": "Signing key not found, already used, or expired" 282 })), 283 ) 284 .into_response(); 285 } 286 Err(e) => { 287 error!("Error looking up reserved signing key: {:?}", e); 288 return ( 289 StatusCode::INTERNAL_SERVER_ERROR, 290 Json(json!({"error": "InternalError"})), 291 ) 292 .into_response(); 293 } 294 } 295 } else { 296 let secret_key = SecretKey::random(&mut OsRng); 297 (secret_key.to_bytes().to_vec(), None) 298 }; 299 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 300 Ok(k) => k, 301 Err(e) => { 302 error!("Error creating signing key: {:?}", e); 303 return ( 304 StatusCode::INTERNAL_SERVER_ERROR, 305 Json(json!({"error": "InternalError"})), 306 ) 307 .into_response(); 308 } 309 }; 310 let did_type = input.did_type.as_deref().unwrap_or("plc"); 311 let did = match did_type { 312 "web" => { 313 let subdomain_host = format!("{}.{}", input.handle, hostname); 314 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 315 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 316 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 317 self_hosted_did 318 } 319 "web-external" => { 320 let d = match &input.did { 321 Some(d) if !d.trim().is_empty() => d, 322 _ => { 323 return ( 324 StatusCode::BAD_REQUEST, 325 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})), 326 ) 327 .into_response(); 328 } 329 }; 330 if !d.starts_with("did:web:") { 331 return ( 332 StatusCode::BAD_REQUEST, 333 Json( 334 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}), 335 ), 336 ) 337 .into_response(); 338 } 339 if let Err(e) = 340 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await 341 { 342 return ( 343 StatusCode::BAD_REQUEST, 344 Json(json!({"error": "InvalidDid", "message": e})), 345 ) 346 .into_response(); 347 } 348 info!(did = %d, "Creating external did:web account"); 349 d.clone() 350 } 351 _ => { 352 if let Some(d) = &input.did { 353 if d.starts_with("did:plc:") && is_migration { 354 info!(did = %d, "Migration with existing did:plc"); 355 d.clone() 356 } else if d.starts_with("did:web:") { 357 if let Err(e) = 358 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()) 359 .await 360 { 361 return ( 362 StatusCode::BAD_REQUEST, 363 Json(json!({"error": "InvalidDid", "message": e})), 364 ) 365 .into_response(); 366 } 367 d.clone() 368 } else if !d.trim().is_empty() { 369 return ( 370 StatusCode::BAD_REQUEST, 371 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 372 ) 373 .into_response(); 374 } else { 375 let rotation_key = std::env::var("PLC_ROTATION_KEY") 376 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 377 let genesis_result = match create_genesis_operation( 378 &signing_key, 379 &rotation_key, 380 &handle, 381 &pds_endpoint, 382 ) { 383 Ok(r) => r, 384 Err(e) => { 385 error!("Error creating PLC genesis operation: {:?}", e); 386 return ( 387 StatusCode::INTERNAL_SERVER_ERROR, 388 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 389 ) 390 .into_response(); 391 } 392 }; 393 let plc_client = PlcClient::new(None); 394 if let Err(e) = plc_client 395 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 396 .await 397 { 398 error!("Failed to submit PLC genesis operation: {:?}", e); 399 return ( 400 StatusCode::BAD_GATEWAY, 401 Json(json!({ 402 "error": "UpstreamError", 403 "message": format!("Failed to register DID with PLC directory: {}", e) 404 })), 405 ) 406 .into_response(); 407 } 408 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 409 genesis_result.did 410 } 411 } else { 412 let rotation_key = std::env::var("PLC_ROTATION_KEY") 413 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 414 let genesis_result = match create_genesis_operation( 415 &signing_key, 416 &rotation_key, 417 &handle, 418 &pds_endpoint, 419 ) { 420 Ok(r) => r, 421 Err(e) => { 422 error!("Error creating PLC genesis operation: {:?}", e); 423 return ( 424 StatusCode::INTERNAL_SERVER_ERROR, 425 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 426 ) 427 .into_response(); 428 } 429 }; 430 let plc_client = PlcClient::new(None); 431 if let Err(e) = plc_client 432 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 433 .await 434 { 435 error!("Failed to submit PLC genesis operation: {:?}", e); 436 return ( 437 StatusCode::BAD_GATEWAY, 438 Json(json!({ 439 "error": "UpstreamError", 440 "message": format!("Failed to register DID with PLC directory: {}", e) 441 })), 442 ) 443 .into_response(); 444 } 445 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 446 genesis_result.did 447 } 448 } 449 }; 450 let mut tx = match state.db.begin().await { 451 Ok(tx) => tx, 452 Err(e) => { 453 error!("Error starting transaction: {:?}", e); 454 return ( 455 StatusCode::INTERNAL_SERVER_ERROR, 456 Json(json!({"error": "InternalError"})), 457 ) 458 .into_response(); 459 } 460 }; 461 if is_migration { 462 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 463 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 464 .bind(&did) 465 .fetch_optional(&mut *tx) 466 .await 467 .unwrap_or(None); 468 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 469 if deactivated_at.is_some() { 470 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 471 let update_result: Result<_, sqlx::Error> = 472 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 473 .bind(&handle) 474 .bind(account_id) 475 .execute(&mut *tx) 476 .await; 477 if let Err(e) = update_result { 478 if let Some(db_err) = e.as_database_error() 479 && db_err 480 .constraint() 481 .map(|c| c.contains("handle")) 482 .unwrap_or(false) 483 { 484 return ( 485 StatusCode::BAD_REQUEST, 486 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 487 ) 488 .into_response(); 489 } 490 error!("Error reactivating account: {:?}", e); 491 return ( 492 StatusCode::INTERNAL_SERVER_ERROR, 493 Json(json!({"error": "InternalError"})), 494 ) 495 .into_response(); 496 } 497 if let Err(e) = tx.commit().await { 498 error!("Error committing reactivation: {:?}", e); 499 return ( 500 StatusCode::INTERNAL_SERVER_ERROR, 501 Json(json!({"error": "InternalError"})), 502 ) 503 .into_response(); 504 } 505 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 506 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 507 ) 508 .bind(account_id) 509 .fetch_optional(&state.db) 510 .await 511 .unwrap_or(None); 512 let secret_key_bytes = match key_row { 513 Some((key_bytes, encryption_version)) => { 514 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 515 Ok(k) => k, 516 Err(e) => { 517 error!("Error decrypting key for reactivated account: {:?}", e); 518 return ( 519 StatusCode::INTERNAL_SERVER_ERROR, 520 Json(json!({"error": "InternalError"})), 521 ) 522 .into_response(); 523 } 524 } 525 } 526 None => { 527 error!("No signing key found for reactivated account"); 528 return ( 529 StatusCode::INTERNAL_SERVER_ERROR, 530 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 531 ) 532 .into_response(); 533 } 534 }; 535 let access_meta = 536 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 537 Ok(m) => m, 538 Err(e) => { 539 error!("Error creating access token: {:?}", e); 540 return ( 541 StatusCode::INTERNAL_SERVER_ERROR, 542 Json(json!({"error": "InternalError"})), 543 ) 544 .into_response(); 545 } 546 }; 547 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 548 &did, 549 &secret_key_bytes, 550 ) { 551 Ok(m) => m, 552 Err(e) => { 553 error!("Error creating refresh token: {:?}", e); 554 return ( 555 StatusCode::INTERNAL_SERVER_ERROR, 556 Json(json!({"error": "InternalError"})), 557 ) 558 .into_response(); 559 } 560 }; 561 let session_result: Result<_, sqlx::Error> = sqlx::query( 562 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 563 ) 564 .bind(&did) 565 .bind(&access_meta.jti) 566 .bind(&refresh_meta.jti) 567 .bind(access_meta.expires_at) 568 .bind(refresh_meta.expires_at) 569 .execute(&state.db) 570 .await; 571 if let Err(e) = session_result { 572 error!("Error creating session: {:?}", e); 573 return ( 574 StatusCode::INTERNAL_SERVER_ERROR, 575 Json(json!({"error": "InternalError"})), 576 ) 577 .into_response(); 578 } 579 return ( 580 StatusCode::OK, 581 Json(CreateAccountOutput { 582 handle: handle.clone(), 583 did, 584 access_jwt: Some(access_meta.token), 585 refresh_jwt: Some(refresh_meta.token), 586 verification_required: false, 587 verification_channel: "email".to_string(), 588 }), 589 ) 590 .into_response(); 591 } else { 592 return ( 593 StatusCode::BAD_REQUEST, 594 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 595 ) 596 .into_response(); 597 } 598 } 599 } 600 let exists_result: Option<(i32,)> = 601 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 602 .bind(&handle) 603 .fetch_optional(&mut *tx) 604 .await 605 .unwrap_or(None); 606 if exists_result.is_some() { 607 return ( 608 StatusCode::BAD_REQUEST, 609 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 610 ) 611 .into_response(); 612 } 613 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 614 .map(|v| v == "true" || v == "1") 615 .unwrap_or(false); 616 if invite_code_required 617 && input 618 .invite_code 619 .as_ref() 620 .map(|c| c.trim().is_empty()) 621 .unwrap_or(true) 622 { 623 return ( 624 StatusCode::BAD_REQUEST, 625 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 626 ) 627 .into_response(); 628 } 629 if let Some(code) = &input.invite_code 630 && !code.trim().is_empty() 631 { 632 let invite_query = sqlx::query!( 633 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 634 code 635 ) 636 .fetch_optional(&mut *tx) 637 .await; 638 match invite_query { 639 Ok(Some(row)) => { 640 if row.available_uses <= 0 { 641 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 642 } 643 let update_invite = sqlx::query!( 644 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 645 code 646 ) 647 .execute(&mut *tx) 648 .await; 649 if let Err(e) = update_invite { 650 error!("Error updating invite code: {:?}", e); 651 return ( 652 StatusCode::INTERNAL_SERVER_ERROR, 653 Json(json!({"error": "InternalError"})), 654 ) 655 .into_response(); 656 } 657 } 658 Ok(None) => { 659 return ( 660 StatusCode::BAD_REQUEST, 661 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 662 ) 663 .into_response(); 664 } 665 Err(e) => { 666 error!("Error checking invite code: {:?}", e); 667 return ( 668 StatusCode::INTERNAL_SERVER_ERROR, 669 Json(json!({"error": "InternalError"})), 670 ) 671 .into_response(); 672 } 673 } 674 } 675 if let Err(e) = validate_password(&input.password) { 676 return ( 677 StatusCode::BAD_REQUEST, 678 Json(json!({ 679 "error": "InvalidPassword", 680 "message": e.to_string() 681 })), 682 ) 683 .into_response(); 684 } 685 686 let password_hash = match hash(&input.password, DEFAULT_COST) { 687 Ok(h) => h, 688 Err(e) => { 689 error!("Error hashing password: {:?}", e); 690 return ( 691 StatusCode::INTERNAL_SERVER_ERROR, 692 Json(json!({"error": "InternalError"})), 693 ) 694 .into_response(); 695 } 696 }; 697 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 698 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30); 699 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 700 .fetch_one(&mut *tx) 701 .await 702 .map(|c| c.unwrap_or(0) == 0) 703 .unwrap_or(false); 704 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration { 705 Some(chrono::Utc::now()) 706 } else { 707 None 708 }; 709 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 710 r#"INSERT INTO users ( 711 handle, email, did, password_hash, 712 preferred_comms_channel, 713 discord_id, telegram_username, signal_number, 714 is_admin, deactivated_at, email_verified 715 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 716 ) 717 .bind(&handle) 718 .bind(&email) 719 .bind(&did) 720 .bind(&password_hash) 721 .bind(verification_channel) 722 .bind( 723 input 724 .discord_id 725 .as_deref() 726 .map(|s| s.trim()) 727 .filter(|s| !s.is_empty()), 728 ) 729 .bind( 730 input 731 .telegram_username 732 .as_deref() 733 .map(|s| s.trim()) 734 .filter(|s| !s.is_empty()), 735 ) 736 .bind( 737 input 738 .signal_number 739 .as_deref() 740 .map(|s| s.trim()) 741 .filter(|s| !s.is_empty()), 742 ) 743 .bind(is_first_user) 744 .bind(deactivated_at) 745 .bind(is_migration) 746 .fetch_one(&mut *tx) 747 .await; 748 let user_id = match user_insert { 749 Ok((id,)) => id, 750 Err(e) => { 751 if let Some(db_err) = e.as_database_error() 752 && db_err.code().as_deref() == Some("23505") 753 { 754 let constraint = db_err.constraint().unwrap_or(""); 755 if constraint.contains("handle") || constraint.contains("users_handle") { 756 return ( 757 StatusCode::BAD_REQUEST, 758 Json(json!({ 759 "error": "HandleNotAvailable", 760 "message": "Handle already taken" 761 })), 762 ) 763 .into_response(); 764 } else if constraint.contains("email") || constraint.contains("users_email") { 765 return ( 766 StatusCode::BAD_REQUEST, 767 Json(json!({ 768 "error": "InvalidEmail", 769 "message": "Email already registered" 770 })), 771 ) 772 .into_response(); 773 } else if constraint.contains("did") || constraint.contains("users_did") { 774 return ( 775 StatusCode::BAD_REQUEST, 776 Json(json!({ 777 "error": "AccountAlreadyExists", 778 "message": "An account with this DID already exists" 779 })), 780 ) 781 .into_response(); 782 } 783 } 784 error!("Error inserting user: {:?}", e); 785 return ( 786 StatusCode::INTERNAL_SERVER_ERROR, 787 Json(json!({"error": "InternalError"})), 788 ) 789 .into_response(); 790 } 791 }; 792 793 if !is_migration 794 && let Some(ref recipient) = verification_recipient 795 && let Err(e) = sqlx::query!( 796 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, $2::comms_channel, $3, $4, $5)", 797 user_id, 798 verification_channel as _, 799 verification_code, 800 recipient, 801 code_expires_at 802 ) 803 .execute(&mut *tx) 804 .await { 805 error!("Error inserting verification code: {:?}", e); 806 return ( 807 StatusCode::INTERNAL_SERVER_ERROR, 808 Json(json!({"error": "InternalError"})), 809 ) 810 .into_response(); 811 } 812 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 813 Ok(enc) => enc, 814 Err(e) => { 815 error!("Error encrypting user key: {:?}", e); 816 return ( 817 StatusCode::INTERNAL_SERVER_ERROR, 818 Json(json!({"error": "InternalError"})), 819 ) 820 .into_response(); 821 } 822 }; 823 let key_insert = sqlx::query!( 824 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 825 user_id, 826 &encrypted_key_bytes[..], 827 crate::config::ENCRYPTION_VERSION 828 ) 829 .execute(&mut *tx) 830 .await; 831 if let Err(e) = key_insert { 832 error!("Error inserting user key: {:?}", e); 833 return ( 834 StatusCode::INTERNAL_SERVER_ERROR, 835 Json(json!({"error": "InternalError"})), 836 ) 837 .into_response(); 838 } 839 if let Some(key_id) = reserved_key_id { 840 let mark_used = sqlx::query!( 841 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 842 key_id 843 ) 844 .execute(&mut *tx) 845 .await; 846 if let Err(e) = mark_used { 847 error!("Error marking reserved key as used: {:?}", e); 848 return ( 849 StatusCode::INTERNAL_SERVER_ERROR, 850 Json(json!({"error": "InternalError"})), 851 ) 852 .into_response(); 853 } 854 } 855 let mst = Mst::new(Arc::new(state.block_store.clone())); 856 let mst_root = match mst.persist().await { 857 Ok(c) => c, 858 Err(e) => { 859 error!("Error persisting MST: {:?}", e); 860 return ( 861 StatusCode::INTERNAL_SERVER_ERROR, 862 Json(json!({"error": "InternalError"})), 863 ) 864 .into_response(); 865 } 866 }; 867 let did_obj = match Did::new(&did) { 868 Ok(d) => d, 869 Err(_) => { 870 return ( 871 StatusCode::INTERNAL_SERVER_ERROR, 872 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 873 ) 874 .into_response(); 875 } 876 }; 877 let rev = Tid::now(LimitedU32::MIN); 878 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None); 879 let signed_commit = match unsigned_commit.sign(&signing_key) { 880 Ok(c) => c, 881 Err(e) => { 882 error!("Error signing genesis commit: {:?}", e); 883 return ( 884 StatusCode::INTERNAL_SERVER_ERROR, 885 Json(json!({"error": "InternalError"})), 886 ) 887 .into_response(); 888 } 889 }; 890 let commit_bytes = match signed_commit.to_cbor() { 891 Ok(b) => b, 892 Err(e) => { 893 error!("Error serializing genesis commit: {:?}", e); 894 return ( 895 StatusCode::INTERNAL_SERVER_ERROR, 896 Json(json!({"error": "InternalError"})), 897 ) 898 .into_response(); 899 } 900 }; 901 let commit_cid = match state.block_store.put(&commit_bytes).await { 902 Ok(c) => c, 903 Err(e) => { 904 error!("Error saving genesis commit: {:?}", e); 905 return ( 906 StatusCode::INTERNAL_SERVER_ERROR, 907 Json(json!({"error": "InternalError"})), 908 ) 909 .into_response(); 910 } 911 }; 912 let commit_cid_str = commit_cid.to_string(); 913 let repo_insert = sqlx::query!( 914 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 915 user_id, 916 commit_cid_str 917 ) 918 .execute(&mut *tx) 919 .await; 920 if let Err(e) = repo_insert { 921 error!("Error initializing repo: {:?}", e); 922 return ( 923 StatusCode::INTERNAL_SERVER_ERROR, 924 Json(json!({"error": "InternalError"})), 925 ) 926 .into_response(); 927 } 928 if let Some(code) = &input.invite_code 929 && !code.trim().is_empty() 930 { 931 let use_insert = sqlx::query!( 932 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 933 code, 934 user_id 935 ) 936 .execute(&mut *tx) 937 .await; 938 if let Err(e) = use_insert { 939 error!("Error recording invite usage: {:?}", e); 940 return ( 941 StatusCode::INTERNAL_SERVER_ERROR, 942 Json(json!({"error": "InternalError"})), 943 ) 944 .into_response(); 945 } 946 } 947 if let Err(e) = tx.commit().await { 948 error!("Error committing transaction: {:?}", e); 949 return ( 950 StatusCode::INTERNAL_SERVER_ERROR, 951 Json(json!({"error": "InternalError"})), 952 ) 953 .into_response(); 954 } 955 if !is_migration { 956 if let Err(e) = 957 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await 958 { 959 warn!("Failed to sequence identity event for {}: {}", did, e); 960 } 961 if let Err(e) = 962 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 963 { 964 warn!("Failed to sequence account event for {}: {}", did, e); 965 } 966 let profile_record = json!({ 967 "$type": "app.bsky.actor.profile", 968 "displayName": input.handle 969 }); 970 if let Err(e) = crate::api::repo::record::create_record_internal( 971 &state, 972 &did, 973 "app.bsky.actor.profile", 974 "self", 975 &profile_record, 976 ) 977 .await 978 { 979 warn!("Failed to create default profile for {}: {}", did, e); 980 } 981 if let Some(ref recipient) = verification_recipient 982 && let Err(e) = crate::comms::enqueue_signup_verification( 983 &state.db, 984 user_id, 985 verification_channel, 986 recipient, 987 &verification_code, 988 ) 989 .await 990 { 991 warn!( 992 "Failed to enqueue signup verification notification: {:?}", 993 e 994 ); 995 } 996 } 997 998 let (access_jwt, refresh_jwt) = if is_migration { 999 let access_meta = 1000 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 1001 Ok(m) => m, 1002 Err(e) => { 1003 error!("Error creating access token for migration: {:?}", e); 1004 return ( 1005 StatusCode::INTERNAL_SERVER_ERROR, 1006 Json(json!({"error": "InternalError"})), 1007 ) 1008 .into_response(); 1009 } 1010 }; 1011 let refresh_meta = 1012 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 1013 Ok(m) => m, 1014 Err(e) => { 1015 error!("Error creating refresh token for migration: {:?}", e); 1016 return ( 1017 StatusCode::INTERNAL_SERVER_ERROR, 1018 Json(json!({"error": "InternalError"})), 1019 ) 1020 .into_response(); 1021 } 1022 }; 1023 if let Err(e) = sqlx::query!( 1024 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 1025 did, 1026 access_meta.jti, 1027 refresh_meta.jti, 1028 access_meta.expires_at, 1029 refresh_meta.expires_at 1030 ) 1031 .execute(&state.db) 1032 .await 1033 { 1034 error!("Error creating session for migration: {:?}", e); 1035 return ( 1036 StatusCode::INTERNAL_SERVER_ERROR, 1037 Json(json!({"error": "InternalError"})), 1038 ) 1039 .into_response(); 1040 } 1041 (Some(access_meta.token), Some(refresh_meta.token)) 1042 } else { 1043 (None, None) 1044 }; 1045 1046 ( 1047 StatusCode::OK, 1048 Json(CreateAccountOutput { 1049 handle: handle.clone(), 1050 did, 1051 access_jwt, 1052 refresh_jwt, 1053 verification_required: !is_migration, 1054 verification_channel: verification_channel.to_string(), 1055 }), 1056 ) 1057 .into_response() 1058}