this repo has no description
1use super::did::verify_did_web; 2use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 3use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 4use crate::state::{AppState, RateLimitKind}; 5use axum::{ 6 Json, 7 extract::State, 8 http::{HeaderMap, StatusCode}, 9 response::{IntoResponse, Response}, 10}; 11use bcrypt::{DEFAULT_COST, hash}; 12use jacquard::types::{did::Did, integer::LimitedU32, string::Tid}; 13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 14use k256::{SecretKey, ecdsa::SigningKey}; 15use rand::rngs::OsRng; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use std::sync::Arc; 19use tracing::{debug, error, info, warn}; 20 21fn extract_client_ip(headers: &HeaderMap) -> String { 22 if let Some(forwarded) = headers.get("x-forwarded-for") 23 && let Ok(value) = forwarded.to_str() 24 && let Some(first_ip) = value.split(',').next() 25 { 26 return first_ip.trim().to_string(); 27 } 28 if let Some(real_ip) = headers.get("x-real-ip") 29 && let Ok(value) = real_ip.to_str() 30 { 31 return value.trim().to_string(); 32 } 33 "unknown".to_string() 34} 35 36#[derive(Deserialize)] 37#[serde(rename_all = "camelCase")] 38pub struct CreateAccountInput { 39 pub handle: String, 40 pub email: Option<String>, 41 pub password: String, 42 pub invite_code: Option<String>, 43 pub did: Option<String>, 44 pub did_type: Option<String>, 45 pub signing_key: Option<String>, 46 pub verification_channel: Option<String>, 47 pub discord_id: Option<String>, 48 pub telegram_username: Option<String>, 49 pub signal_number: Option<String>, 50} 51 52#[derive(Serialize)] 53#[serde(rename_all = "camelCase")] 54pub struct CreateAccountOutput { 55 pub handle: String, 56 pub did: String, 57 #[serde(skip_serializing_if = "Option::is_none")] 58 pub access_jwt: Option<String>, 59 #[serde(skip_serializing_if = "Option::is_none")] 60 pub refresh_jwt: Option<String>, 61 pub verification_required: bool, 62 pub verification_channel: String, 63} 64 65pub async fn create_account( 66 State(state): State<AppState>, 67 headers: HeaderMap, 68 Json(input): Json<CreateAccountInput>, 69) -> Response { 70 info!("create_account called"); 71 let client_ip = extract_client_ip(&headers); 72 if !state 73 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 74 .await 75 { 76 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 77 return ( 78 StatusCode::TOO_MANY_REQUESTS, 79 Json(json!({ 80 "error": "RateLimitExceeded", 81 "message": "Too many account creation attempts. Please try again later." 82 })), 83 ) 84 .into_response(); 85 } 86 87 let migration_auth = if let Some(token) = 88 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 89 { 90 if is_service_token(&token) { 91 let verifier = ServiceTokenVerifier::new(); 92 match verifier 93 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 94 .await 95 { 96 Ok(claims) => { 97 debug!("Service token verified for migration: iss={}", claims.iss); 98 Some(claims.iss) 99 } 100 Err(e) => { 101 error!("Service token verification failed: {:?}", e); 102 return ( 103 StatusCode::UNAUTHORIZED, 104 Json(json!({ 105 "error": "AuthenticationFailed", 106 "message": format!("Service token verification failed: {}", e) 107 })), 108 ) 109 .into_response(); 110 } 111 } 112 } else { 113 None 114 } 115 } else { 116 None 117 }; 118 119 let is_migration = migration_auth.is_some() 120 && input 121 .did 122 .as_ref() 123 .map(|d| d.starts_with("did:plc:")) 124 .unwrap_or(false); 125 126 if is_migration { 127 let migration_did = input.did.as_ref().unwrap(); 128 let auth_did = migration_auth.as_ref().unwrap(); 129 if migration_did != auth_did { 130 return ( 131 StatusCode::FORBIDDEN, 132 Json(json!({ 133 "error": "AuthorizationError", 134 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did) 135 })), 136 ) 137 .into_response(); 138 } 139 info!(did = %migration_did, "Processing account migration"); 140 } 141 142 let hostname_for_validation = 143 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 144 let pds_suffix = format!(".{}", hostname_for_validation); 145 146 let validated_short_handle = if !input.handle.contains('.') 147 || input.handle.ends_with(&pds_suffix) 148 { 149 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 150 input 151 .handle 152 .strip_suffix(&pds_suffix) 153 .unwrap_or(&input.handle) 154 } else { 155 &input.handle 156 }; 157 match crate::api::validation::validate_short_handle(handle_to_validate) { 158 Ok(h) => h, 159 Err(e) => { 160 return ( 161 StatusCode::BAD_REQUEST, 162 Json(json!({"error": "InvalidHandle", "message": e.to_string()})), 163 ) 164 .into_response(); 165 } 166 } 167 } else { 168 if input.handle.contains(' ') || input.handle.contains('\t') { 169 return ( 170 StatusCode::BAD_REQUEST, 171 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})), 172 ) 173 .into_response(); 174 } 175 for c in input.handle.chars() { 176 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' { 177 return ( 178 StatusCode::BAD_REQUEST, 179 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})), 180 ) 181 .into_response(); 182 } 183 } 184 input.handle.to_lowercase() 185 }; 186 let email: Option<String> = input 187 .email 188 .as_ref() 189 .map(|e| e.trim().to_string()) 190 .filter(|e| !e.is_empty()); 191 if let Some(ref email) = email 192 && !crate::api::validation::is_valid_email(email) 193 { 194 return ( 195 StatusCode::BAD_REQUEST, 196 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 197 ) 198 .into_response(); 199 } 200 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 201 let valid_channels = ["email", "discord", "telegram", "signal"]; 202 if !valid_channels.contains(&verification_channel) && !is_migration { 203 return ( 204 StatusCode::BAD_REQUEST, 205 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 206 ) 207 .into_response(); 208 } 209 let verification_recipient = if is_migration { 210 None 211 } else { 212 Some(match verification_channel { 213 "email" => match &input.email { 214 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 215 _ => return ( 216 StatusCode::BAD_REQUEST, 217 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 218 ).into_response(), 219 }, 220 "discord" => match &input.discord_id { 221 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 222 _ => return ( 223 StatusCode::BAD_REQUEST, 224 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 225 ).into_response(), 226 }, 227 "telegram" => match &input.telegram_username { 228 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 229 _ => return ( 230 StatusCode::BAD_REQUEST, 231 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 232 ).into_response(), 233 }, 234 "signal" => match &input.signal_number { 235 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 236 _ => return ( 237 StatusCode::BAD_REQUEST, 238 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 239 ).into_response(), 240 }, 241 _ => return ( 242 StatusCode::BAD_REQUEST, 243 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 244 ).into_response(), 245 }) 246 }; 247 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 248 let pds_endpoint = format!("https://{}", hostname); 249 let suffix = format!(".{}", hostname); 250 let handle = if input.handle.ends_with(&suffix) { 251 format!("{}.{}", validated_short_handle, hostname) 252 } else if input.handle.contains('.') { 253 validated_short_handle.clone() 254 } else { 255 format!("{}.{}", validated_short_handle, hostname) 256 }; 257 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 258 if let Some(signing_key_did) = &input.signing_key { 259 let reserved = sqlx::query!( 260 r#" 261 SELECT id, private_key_bytes 262 FROM reserved_signing_keys 263 WHERE public_key_did_key = $1 264 AND used_at IS NULL 265 AND expires_at > NOW() 266 FOR UPDATE 267 "#, 268 signing_key_did 269 ) 270 .fetch_optional(&state.db) 271 .await; 272 match reserved { 273 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 274 Ok(None) => { 275 return ( 276 StatusCode::BAD_REQUEST, 277 Json(json!({ 278 "error": "InvalidSigningKey", 279 "message": "Signing key not found, already used, or expired" 280 })), 281 ) 282 .into_response(); 283 } 284 Err(e) => { 285 error!("Error looking up reserved signing key: {:?}", e); 286 return ( 287 StatusCode::INTERNAL_SERVER_ERROR, 288 Json(json!({"error": "InternalError"})), 289 ) 290 .into_response(); 291 } 292 } 293 } else { 294 let secret_key = SecretKey::random(&mut OsRng); 295 (secret_key.to_bytes().to_vec(), None) 296 }; 297 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 298 Ok(k) => k, 299 Err(e) => { 300 error!("Error creating signing key: {:?}", e); 301 return ( 302 StatusCode::INTERNAL_SERVER_ERROR, 303 Json(json!({"error": "InternalError"})), 304 ) 305 .into_response(); 306 } 307 }; 308 let did_type = input.did_type.as_deref().unwrap_or("plc"); 309 let did = match did_type { 310 "web" => { 311 let subdomain_host = format!("{}.{}", input.handle, hostname); 312 let encoded_subdomain = subdomain_host.replace(':', "%3A"); 313 let self_hosted_did = format!("did:web:{}", encoded_subdomain); 314 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)"); 315 self_hosted_did 316 } 317 "web-external" => { 318 let d = match &input.did { 319 Some(d) if !d.trim().is_empty() => d, 320 _ => { 321 return ( 322 StatusCode::BAD_REQUEST, 323 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})), 324 ) 325 .into_response(); 326 } 327 }; 328 if !d.starts_with("did:web:") { 329 return ( 330 StatusCode::BAD_REQUEST, 331 Json( 332 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}), 333 ), 334 ) 335 .into_response(); 336 } 337 if let Err(e) = 338 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await 339 { 340 return ( 341 StatusCode::BAD_REQUEST, 342 Json(json!({"error": "InvalidDid", "message": e})), 343 ) 344 .into_response(); 345 } 346 info!(did = %d, "Creating external did:web account"); 347 d.clone() 348 } 349 _ => { 350 if let Some(d) = &input.did { 351 if d.starts_with("did:plc:") && is_migration { 352 info!(did = %d, "Migration with existing did:plc"); 353 d.clone() 354 } else if d.starts_with("did:web:") { 355 if let Err(e) = 356 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()) 357 .await 358 { 359 return ( 360 StatusCode::BAD_REQUEST, 361 Json(json!({"error": "InvalidDid", "message": e})), 362 ) 363 .into_response(); 364 } 365 d.clone() 366 } else if !d.trim().is_empty() { 367 return ( 368 StatusCode::BAD_REQUEST, 369 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 370 ) 371 .into_response(); 372 } else { 373 let rotation_key = std::env::var("PLC_ROTATION_KEY") 374 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 375 let genesis_result = match create_genesis_operation( 376 &signing_key, 377 &rotation_key, 378 &handle, 379 &pds_endpoint, 380 ) { 381 Ok(r) => r, 382 Err(e) => { 383 error!("Error creating PLC genesis operation: {:?}", e); 384 return ( 385 StatusCode::INTERNAL_SERVER_ERROR, 386 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 387 ) 388 .into_response(); 389 } 390 }; 391 let plc_client = PlcClient::new(None); 392 if let Err(e) = plc_client 393 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 394 .await 395 { 396 error!("Failed to submit PLC genesis operation: {:?}", e); 397 return ( 398 StatusCode::BAD_GATEWAY, 399 Json(json!({ 400 "error": "UpstreamError", 401 "message": format!("Failed to register DID with PLC directory: {}", e) 402 })), 403 ) 404 .into_response(); 405 } 406 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 407 genesis_result.did 408 } 409 } else { 410 let rotation_key = std::env::var("PLC_ROTATION_KEY") 411 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 412 let genesis_result = match create_genesis_operation( 413 &signing_key, 414 &rotation_key, 415 &handle, 416 &pds_endpoint, 417 ) { 418 Ok(r) => r, 419 Err(e) => { 420 error!("Error creating PLC genesis operation: {:?}", e); 421 return ( 422 StatusCode::INTERNAL_SERVER_ERROR, 423 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 424 ) 425 .into_response(); 426 } 427 }; 428 let plc_client = PlcClient::new(None); 429 if let Err(e) = plc_client 430 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 431 .await 432 { 433 error!("Failed to submit PLC genesis operation: {:?}", e); 434 return ( 435 StatusCode::BAD_GATEWAY, 436 Json(json!({ 437 "error": "UpstreamError", 438 "message": format!("Failed to register DID with PLC directory: {}", e) 439 })), 440 ) 441 .into_response(); 442 } 443 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 444 genesis_result.did 445 } 446 } 447 }; 448 let mut tx = match state.db.begin().await { 449 Ok(tx) => tx, 450 Err(e) => { 451 error!("Error starting transaction: {:?}", e); 452 return ( 453 StatusCode::INTERNAL_SERVER_ERROR, 454 Json(json!({"error": "InternalError"})), 455 ) 456 .into_response(); 457 } 458 }; 459 if is_migration { 460 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 461 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 462 .bind(&did) 463 .fetch_optional(&mut *tx) 464 .await 465 .unwrap_or(None); 466 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 467 if deactivated_at.is_some() { 468 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration"); 469 let update_result: Result<_, sqlx::Error> = 470 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 471 .bind(&handle) 472 .bind(account_id) 473 .execute(&mut *tx) 474 .await; 475 if let Err(e) = update_result { 476 if let Some(db_err) = e.as_database_error() 477 && db_err 478 .constraint() 479 .map(|c| c.contains("handle")) 480 .unwrap_or(false) 481 { 482 return ( 483 StatusCode::BAD_REQUEST, 484 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 485 ) 486 .into_response(); 487 } 488 error!("Error reactivating account: {:?}", e); 489 return ( 490 StatusCode::INTERNAL_SERVER_ERROR, 491 Json(json!({"error": "InternalError"})), 492 ) 493 .into_response(); 494 } 495 if let Err(e) = tx.commit().await { 496 error!("Error committing reactivation: {:?}", e); 497 return ( 498 StatusCode::INTERNAL_SERVER_ERROR, 499 Json(json!({"error": "InternalError"})), 500 ) 501 .into_response(); 502 } 503 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 504 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 505 ) 506 .bind(account_id) 507 .fetch_optional(&state.db) 508 .await 509 .unwrap_or(None); 510 let secret_key_bytes = match key_row { 511 Some((key_bytes, encryption_version)) => { 512 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 513 Ok(k) => k, 514 Err(e) => { 515 error!("Error decrypting key for reactivated account: {:?}", e); 516 return ( 517 StatusCode::INTERNAL_SERVER_ERROR, 518 Json(json!({"error": "InternalError"})), 519 ) 520 .into_response(); 521 } 522 } 523 } 524 None => { 525 error!("No signing key found for reactivated account"); 526 return ( 527 StatusCode::INTERNAL_SERVER_ERROR, 528 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 529 ) 530 .into_response(); 531 } 532 }; 533 let access_meta = 534 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 535 Ok(m) => m, 536 Err(e) => { 537 error!("Error creating access token: {:?}", e); 538 return ( 539 StatusCode::INTERNAL_SERVER_ERROR, 540 Json(json!({"error": "InternalError"})), 541 ) 542 .into_response(); 543 } 544 }; 545 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 546 &did, 547 &secret_key_bytes, 548 ) { 549 Ok(m) => m, 550 Err(e) => { 551 error!("Error creating refresh token: {:?}", e); 552 return ( 553 StatusCode::INTERNAL_SERVER_ERROR, 554 Json(json!({"error": "InternalError"})), 555 ) 556 .into_response(); 557 } 558 }; 559 let session_result: Result<_, sqlx::Error> = sqlx::query( 560 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 561 ) 562 .bind(&did) 563 .bind(&access_meta.jti) 564 .bind(&refresh_meta.jti) 565 .bind(access_meta.expires_at) 566 .bind(refresh_meta.expires_at) 567 .execute(&state.db) 568 .await; 569 if let Err(e) = session_result { 570 error!("Error creating session: {:?}", e); 571 return ( 572 StatusCode::INTERNAL_SERVER_ERROR, 573 Json(json!({"error": "InternalError"})), 574 ) 575 .into_response(); 576 } 577 return ( 578 StatusCode::OK, 579 Json(CreateAccountOutput { 580 handle: handle.clone(), 581 did, 582 access_jwt: Some(access_meta.token), 583 refresh_jwt: Some(refresh_meta.token), 584 verification_required: false, 585 verification_channel: "email".to_string(), 586 }), 587 ) 588 .into_response(); 589 } else { 590 return ( 591 StatusCode::BAD_REQUEST, 592 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 593 ) 594 .into_response(); 595 } 596 } 597 } 598 let exists_result: Option<(i32,)> = 599 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 600 .bind(&handle) 601 .fetch_optional(&mut *tx) 602 .await 603 .unwrap_or(None); 604 if exists_result.is_some() { 605 return ( 606 StatusCode::BAD_REQUEST, 607 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 608 ) 609 .into_response(); 610 } 611 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 612 .map(|v| v == "true" || v == "1") 613 .unwrap_or(false); 614 if invite_code_required 615 && input 616 .invite_code 617 .as_ref() 618 .map(|c| c.trim().is_empty()) 619 .unwrap_or(true) 620 { 621 return ( 622 StatusCode::BAD_REQUEST, 623 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 624 ) 625 .into_response(); 626 } 627 if let Some(code) = &input.invite_code 628 && !code.trim().is_empty() 629 { 630 let invite_query = sqlx::query!( 631 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 632 code 633 ) 634 .fetch_optional(&mut *tx) 635 .await; 636 match invite_query { 637 Ok(Some(row)) => { 638 if row.available_uses <= 0 { 639 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 640 } 641 let update_invite = sqlx::query!( 642 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 643 code 644 ) 645 .execute(&mut *tx) 646 .await; 647 if let Err(e) = update_invite { 648 error!("Error updating invite code: {:?}", e); 649 return ( 650 StatusCode::INTERNAL_SERVER_ERROR, 651 Json(json!({"error": "InternalError"})), 652 ) 653 .into_response(); 654 } 655 } 656 Ok(None) => { 657 return ( 658 StatusCode::BAD_REQUEST, 659 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 660 ) 661 .into_response(); 662 } 663 Err(e) => { 664 error!("Error checking invite code: {:?}", e); 665 return ( 666 StatusCode::INTERNAL_SERVER_ERROR, 667 Json(json!({"error": "InternalError"})), 668 ) 669 .into_response(); 670 } 671 } 672 } 673 let password_hash = match hash(&input.password, DEFAULT_COST) { 674 Ok(h) => h, 675 Err(e) => { 676 error!("Error hashing password: {:?}", e); 677 return ( 678 StatusCode::INTERNAL_SERVER_ERROR, 679 Json(json!({"error": "InternalError"})), 680 ) 681 .into_response(); 682 } 683 }; 684 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 685 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30); 686 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 687 .fetch_one(&mut *tx) 688 .await 689 .map(|c| c.unwrap_or(0) == 0) 690 .unwrap_or(false); 691 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration { 692 Some(chrono::Utc::now()) 693 } else { 694 None 695 }; 696 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 697 r#"INSERT INTO users ( 698 handle, email, did, password_hash, 699 preferred_comms_channel, 700 discord_id, telegram_username, signal_number, 701 is_admin, deactivated_at, email_verified 702 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 703 ) 704 .bind(&handle) 705 .bind(&email) 706 .bind(&did) 707 .bind(&password_hash) 708 .bind(verification_channel) 709 .bind( 710 input 711 .discord_id 712 .as_deref() 713 .map(|s| s.trim()) 714 .filter(|s| !s.is_empty()), 715 ) 716 .bind( 717 input 718 .telegram_username 719 .as_deref() 720 .map(|s| s.trim()) 721 .filter(|s| !s.is_empty()), 722 ) 723 .bind( 724 input 725 .signal_number 726 .as_deref() 727 .map(|s| s.trim()) 728 .filter(|s| !s.is_empty()), 729 ) 730 .bind(is_first_user) 731 .bind(deactivated_at) 732 .bind(is_migration) 733 .fetch_one(&mut *tx) 734 .await; 735 let user_id = match user_insert { 736 Ok((id,)) => id, 737 Err(e) => { 738 if let Some(db_err) = e.as_database_error() 739 && db_err.code().as_deref() == Some("23505") 740 { 741 let constraint = db_err.constraint().unwrap_or(""); 742 if constraint.contains("handle") || constraint.contains("users_handle") { 743 return ( 744 StatusCode::BAD_REQUEST, 745 Json(json!({ 746 "error": "HandleNotAvailable", 747 "message": "Handle already taken" 748 })), 749 ) 750 .into_response(); 751 } else if constraint.contains("email") || constraint.contains("users_email") { 752 return ( 753 StatusCode::BAD_REQUEST, 754 Json(json!({ 755 "error": "InvalidEmail", 756 "message": "Email already registered" 757 })), 758 ) 759 .into_response(); 760 } else if constraint.contains("did") || constraint.contains("users_did") { 761 return ( 762 StatusCode::BAD_REQUEST, 763 Json(json!({ 764 "error": "AccountAlreadyExists", 765 "message": "An account with this DID already exists" 766 })), 767 ) 768 .into_response(); 769 } 770 } 771 error!("Error inserting user: {:?}", e); 772 return ( 773 StatusCode::INTERNAL_SERVER_ERROR, 774 Json(json!({"error": "InternalError"})), 775 ) 776 .into_response(); 777 } 778 }; 779 780 if !is_migration 781 && let Some(ref recipient) = verification_recipient 782 && let Err(e) = sqlx::query!( 783 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, $2::comms_channel, $3, $4, $5)", 784 user_id, 785 verification_channel as _, 786 verification_code, 787 recipient, 788 code_expires_at 789 ) 790 .execute(&mut *tx) 791 .await { 792 error!("Error inserting verification code: {:?}", e); 793 return ( 794 StatusCode::INTERNAL_SERVER_ERROR, 795 Json(json!({"error": "InternalError"})), 796 ) 797 .into_response(); 798 } 799 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 800 Ok(enc) => enc, 801 Err(e) => { 802 error!("Error encrypting user key: {:?}", e); 803 return ( 804 StatusCode::INTERNAL_SERVER_ERROR, 805 Json(json!({"error": "InternalError"})), 806 ) 807 .into_response(); 808 } 809 }; 810 let key_insert = sqlx::query!( 811 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 812 user_id, 813 &encrypted_key_bytes[..], 814 crate::config::ENCRYPTION_VERSION 815 ) 816 .execute(&mut *tx) 817 .await; 818 if let Err(e) = key_insert { 819 error!("Error inserting user key: {:?}", e); 820 return ( 821 StatusCode::INTERNAL_SERVER_ERROR, 822 Json(json!({"error": "InternalError"})), 823 ) 824 .into_response(); 825 } 826 if let Some(key_id) = reserved_key_id { 827 let mark_used = sqlx::query!( 828 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 829 key_id 830 ) 831 .execute(&mut *tx) 832 .await; 833 if let Err(e) = mark_used { 834 error!("Error marking reserved key as used: {:?}", e); 835 return ( 836 StatusCode::INTERNAL_SERVER_ERROR, 837 Json(json!({"error": "InternalError"})), 838 ) 839 .into_response(); 840 } 841 } 842 let mst = Mst::new(Arc::new(state.block_store.clone())); 843 let mst_root = match mst.persist().await { 844 Ok(c) => c, 845 Err(e) => { 846 error!("Error persisting MST: {:?}", e); 847 return ( 848 StatusCode::INTERNAL_SERVER_ERROR, 849 Json(json!({"error": "InternalError"})), 850 ) 851 .into_response(); 852 } 853 }; 854 let did_obj = match Did::new(&did) { 855 Ok(d) => d, 856 Err(_) => { 857 return ( 858 StatusCode::INTERNAL_SERVER_ERROR, 859 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 860 ) 861 .into_response(); 862 } 863 }; 864 let rev = Tid::now(LimitedU32::MIN); 865 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None); 866 let signed_commit = match unsigned_commit.sign(&signing_key) { 867 Ok(c) => c, 868 Err(e) => { 869 error!("Error signing genesis commit: {:?}", e); 870 return ( 871 StatusCode::INTERNAL_SERVER_ERROR, 872 Json(json!({"error": "InternalError"})), 873 ) 874 .into_response(); 875 } 876 }; 877 let commit_bytes = match signed_commit.to_cbor() { 878 Ok(b) => b, 879 Err(e) => { 880 error!("Error serializing genesis commit: {:?}", e); 881 return ( 882 StatusCode::INTERNAL_SERVER_ERROR, 883 Json(json!({"error": "InternalError"})), 884 ) 885 .into_response(); 886 } 887 }; 888 let commit_cid = match state.block_store.put(&commit_bytes).await { 889 Ok(c) => c, 890 Err(e) => { 891 error!("Error saving genesis commit: {:?}", e); 892 return ( 893 StatusCode::INTERNAL_SERVER_ERROR, 894 Json(json!({"error": "InternalError"})), 895 ) 896 .into_response(); 897 } 898 }; 899 let commit_cid_str = commit_cid.to_string(); 900 let repo_insert = sqlx::query!( 901 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 902 user_id, 903 commit_cid_str 904 ) 905 .execute(&mut *tx) 906 .await; 907 if let Err(e) = repo_insert { 908 error!("Error initializing repo: {:?}", e); 909 return ( 910 StatusCode::INTERNAL_SERVER_ERROR, 911 Json(json!({"error": "InternalError"})), 912 ) 913 .into_response(); 914 } 915 if let Some(code) = &input.invite_code 916 && !code.trim().is_empty() 917 { 918 let use_insert = sqlx::query!( 919 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 920 code, 921 user_id 922 ) 923 .execute(&mut *tx) 924 .await; 925 if let Err(e) = use_insert { 926 error!("Error recording invite usage: {:?}", e); 927 return ( 928 StatusCode::INTERNAL_SERVER_ERROR, 929 Json(json!({"error": "InternalError"})), 930 ) 931 .into_response(); 932 } 933 } 934 if let Err(e) = tx.commit().await { 935 error!("Error committing transaction: {:?}", e); 936 return ( 937 StatusCode::INTERNAL_SERVER_ERROR, 938 Json(json!({"error": "InternalError"})), 939 ) 940 .into_response(); 941 } 942 if !is_migration { 943 if let Err(e) = 944 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await 945 { 946 warn!("Failed to sequence identity event for {}: {}", did, e); 947 } 948 if let Err(e) = 949 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 950 { 951 warn!("Failed to sequence account event for {}: {}", did, e); 952 } 953 let profile_record = json!({ 954 "$type": "app.bsky.actor.profile", 955 "displayName": input.handle 956 }); 957 if let Err(e) = crate::api::repo::record::create_record_internal( 958 &state, 959 &did, 960 "app.bsky.actor.profile", 961 "self", 962 &profile_record, 963 ) 964 .await 965 { 966 warn!("Failed to create default profile for {}: {}", did, e); 967 } 968 if let Some(ref recipient) = verification_recipient 969 && let Err(e) = crate::comms::enqueue_signup_verification( 970 &state.db, 971 user_id, 972 verification_channel, 973 recipient, 974 &verification_code, 975 ) 976 .await 977 { 978 warn!( 979 "Failed to enqueue signup verification notification: {:?}", 980 e 981 ); 982 } 983 } 984 985 let (access_jwt, refresh_jwt) = if is_migration { 986 let access_meta = 987 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 988 Ok(m) => m, 989 Err(e) => { 990 error!("Error creating access token for migration: {:?}", e); 991 return ( 992 StatusCode::INTERNAL_SERVER_ERROR, 993 Json(json!({"error": "InternalError"})), 994 ) 995 .into_response(); 996 } 997 }; 998 let refresh_meta = 999 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 1000 Ok(m) => m, 1001 Err(e) => { 1002 error!("Error creating refresh token for migration: {:?}", e); 1003 return ( 1004 StatusCode::INTERNAL_SERVER_ERROR, 1005 Json(json!({"error": "InternalError"})), 1006 ) 1007 .into_response(); 1008 } 1009 }; 1010 if let Err(e) = sqlx::query!( 1011 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 1012 did, 1013 access_meta.jti, 1014 refresh_meta.jti, 1015 access_meta.expires_at, 1016 refresh_meta.expires_at 1017 ) 1018 .execute(&state.db) 1019 .await 1020 { 1021 error!("Error creating session for migration: {:?}", e); 1022 return ( 1023 StatusCode::INTERNAL_SERVER_ERROR, 1024 Json(json!({"error": "InternalError"})), 1025 ) 1026 .into_response(); 1027 } 1028 (Some(access_meta.token), Some(refresh_meta.token)) 1029 } else { 1030 (None, None) 1031 }; 1032 1033 ( 1034 StatusCode::OK, 1035 Json(CreateAccountOutput { 1036 handle: handle.clone(), 1037 did, 1038 access_jwt, 1039 refresh_jwt, 1040 verification_required: !is_migration, 1041 verification_channel: verification_channel.to_string(), 1042 }), 1043 ) 1044 .into_response() 1045}