this repo has no description
1use super::did::verify_did_web; 2use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 3use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 4use crate::state::{AppState, RateLimitKind}; 5use axum::{ 6 Json, 7 extract::State, 8 http::{HeaderMap, StatusCode}, 9 response::{IntoResponse, Response}, 10}; 11use bcrypt::{DEFAULT_COST, hash}; 12use jacquard::types::{did::Did, integer::LimitedU32, string::Tid}; 13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 14use k256::{SecretKey, ecdsa::SigningKey}; 15use rand::rngs::OsRng; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use std::sync::Arc; 19use tracing::{debug, error, info, warn}; 20 21fn extract_client_ip(headers: &HeaderMap) -> String { 22 if let Some(forwarded) = headers.get("x-forwarded-for") 23 && let Ok(value) = forwarded.to_str() 24 && let Some(first_ip) = value.split(',').next() 25 { 26 return first_ip.trim().to_string(); 27 } 28 if let Some(real_ip) = headers.get("x-real-ip") 29 && let Ok(value) = real_ip.to_str() 30 { 31 return value.trim().to_string(); 32 } 33 "unknown".to_string() 34} 35 36#[derive(Deserialize)] 37#[serde(rename_all = "camelCase")] 38pub struct CreateAccountInput { 39 pub handle: String, 40 pub email: Option<String>, 41 pub password: String, 42 pub invite_code: Option<String>, 43 pub did: Option<String>, 44 pub signing_key: Option<String>, 45 pub verification_channel: Option<String>, 46 pub discord_id: Option<String>, 47 pub telegram_username: Option<String>, 48 pub signal_number: Option<String>, 49} 50 51#[derive(Serialize)] 52#[serde(rename_all = "camelCase")] 53pub struct CreateAccountOutput { 54 pub handle: String, 55 pub did: String, 56 #[serde(skip_serializing_if = "Option::is_none")] 57 pub access_jwt: Option<String>, 58 #[serde(skip_serializing_if = "Option::is_none")] 59 pub refresh_jwt: Option<String>, 60 pub verification_required: bool, 61 pub verification_channel: String, 62} 63 64pub async fn create_account( 65 State(state): State<AppState>, 66 headers: HeaderMap, 67 Json(input): Json<CreateAccountInput>, 68) -> Response { 69 info!("create_account called"); 70 let client_ip = extract_client_ip(&headers); 71 if !state 72 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 73 .await 74 { 75 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 76 return ( 77 StatusCode::TOO_MANY_REQUESTS, 78 Json(json!({ 79 "error": "RateLimitExceeded", 80 "message": "Too many account creation attempts. Please try again later." 81 })), 82 ) 83 .into_response(); 84 } 85 86 let migration_auth = if let Some(token) = 87 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 88 { 89 if is_service_token(&token) { 90 let verifier = ServiceTokenVerifier::new(); 91 match verifier 92 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 93 .await 94 { 95 Ok(claims) => { 96 debug!("Service token verified for migration: iss={}", claims.iss); 97 Some(claims.iss) 98 } 99 Err(e) => { 100 error!("Service token verification failed: {:?}", e); 101 return ( 102 StatusCode::UNAUTHORIZED, 103 Json(json!({ 104 "error": "AuthenticationFailed", 105 "message": format!("Service token verification failed: {}", e) 106 })), 107 ) 108 .into_response(); 109 } 110 } 111 } else { 112 None 113 } 114 } else { 115 None 116 }; 117 118 let is_migration = migration_auth.is_some() 119 && input 120 .did 121 .as_ref() 122 .map(|d| d.starts_with("did:plc:")) 123 .unwrap_or(false); 124 125 if is_migration { 126 let migration_did = input.did.as_ref().unwrap(); 127 let auth_did = migration_auth.as_ref().unwrap(); 128 if migration_did != auth_did { 129 return ( 130 StatusCode::FORBIDDEN, 131 Json(json!({ 132 "error": "AuthorizationError", 133 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did) 134 })), 135 ) 136 .into_response(); 137 } 138 info!(did = %migration_did, "Processing account migration"); 139 } 140 141 if input.handle.contains('!') || input.handle.contains('@') { 142 return ( 143 StatusCode::BAD_REQUEST, 144 Json( 145 json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"}), 146 ), 147 ) 148 .into_response(); 149 } 150 let email: Option<String> = input 151 .email 152 .as_ref() 153 .map(|e| e.trim().to_string()) 154 .filter(|e| !e.is_empty()); 155 if let Some(ref email) = email 156 && !crate::api::validation::is_valid_email(email) 157 { 158 return ( 159 StatusCode::BAD_REQUEST, 160 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 161 ) 162 .into_response(); 163 } 164 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 165 let valid_channels = ["email", "discord", "telegram", "signal"]; 166 if !valid_channels.contains(&verification_channel) && !is_migration { 167 return ( 168 StatusCode::BAD_REQUEST, 169 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 170 ) 171 .into_response(); 172 } 173 let verification_recipient = if is_migration { 174 None 175 } else { 176 Some(match verification_channel { 177 "email" => match &input.email { 178 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 179 _ => return ( 180 StatusCode::BAD_REQUEST, 181 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 182 ).into_response(), 183 }, 184 "discord" => match &input.discord_id { 185 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 186 _ => return ( 187 StatusCode::BAD_REQUEST, 188 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 189 ).into_response(), 190 }, 191 "telegram" => match &input.telegram_username { 192 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 193 _ => return ( 194 StatusCode::BAD_REQUEST, 195 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 196 ).into_response(), 197 }, 198 "signal" => match &input.signal_number { 199 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 200 _ => return ( 201 StatusCode::BAD_REQUEST, 202 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 203 ).into_response(), 204 }, 205 _ => return ( 206 StatusCode::BAD_REQUEST, 207 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 208 ).into_response(), 209 }) 210 }; 211 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 212 let pds_endpoint = format!("https://{}", hostname); 213 let suffix = format!(".{}", hostname); 214 let short_handle = if input.handle.ends_with(&suffix) { 215 input.handle.strip_suffix(&suffix).unwrap_or(&input.handle) 216 } else { 217 &input.handle 218 }; 219 let full_handle = format!("{}.{}", short_handle, hostname); 220 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 221 if let Some(signing_key_did) = &input.signing_key { 222 let reserved = sqlx::query!( 223 r#" 224 SELECT id, private_key_bytes 225 FROM reserved_signing_keys 226 WHERE public_key_did_key = $1 227 AND used_at IS NULL 228 AND expires_at > NOW() 229 FOR UPDATE 230 "#, 231 signing_key_did 232 ) 233 .fetch_optional(&state.db) 234 .await; 235 match reserved { 236 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 237 Ok(None) => { 238 return ( 239 StatusCode::BAD_REQUEST, 240 Json(json!({ 241 "error": "InvalidSigningKey", 242 "message": "Signing key not found, already used, or expired" 243 })), 244 ) 245 .into_response(); 246 } 247 Err(e) => { 248 error!("Error looking up reserved signing key: {:?}", e); 249 return ( 250 StatusCode::INTERNAL_SERVER_ERROR, 251 Json(json!({"error": "InternalError"})), 252 ) 253 .into_response(); 254 } 255 } 256 } else { 257 let secret_key = SecretKey::random(&mut OsRng); 258 (secret_key.to_bytes().to_vec(), None) 259 }; 260 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 261 Ok(k) => k, 262 Err(e) => { 263 error!("Error creating signing key: {:?}", e); 264 return ( 265 StatusCode::INTERNAL_SERVER_ERROR, 266 Json(json!({"error": "InternalError"})), 267 ) 268 .into_response(); 269 } 270 }; 271 let did = if let Some(d) = &input.did { 272 if d.trim().is_empty() { 273 let rotation_key = std::env::var("PLC_ROTATION_KEY") 274 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 275 let genesis_result = match create_genesis_operation( 276 &signing_key, 277 &rotation_key, 278 &full_handle, 279 &pds_endpoint, 280 ) { 281 Ok(r) => r, 282 Err(e) => { 283 error!("Error creating PLC genesis operation: {:?}", e); 284 return ( 285 StatusCode::INTERNAL_SERVER_ERROR, 286 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 287 ) 288 .into_response(); 289 } 290 }; 291 let plc_client = PlcClient::new(None); 292 if let Err(e) = plc_client 293 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 294 .await 295 { 296 error!("Failed to submit PLC genesis operation: {:?}", e); 297 return ( 298 StatusCode::BAD_GATEWAY, 299 Json(json!({ 300 "error": "UpstreamError", 301 "message": format!("Failed to register DID with PLC directory: {}", e) 302 })), 303 ) 304 .into_response(); 305 } 306 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 307 genesis_result.did 308 } else if d.starts_with("did:web:") { 309 if let Err(e) = verify_did_web(d, &hostname, &input.handle).await { 310 return ( 311 StatusCode::BAD_REQUEST, 312 Json(json!({"error": "InvalidDid", "message": e})), 313 ) 314 .into_response(); 315 } 316 d.clone() 317 } else if d.starts_with("did:plc:") && is_migration { 318 d.clone() 319 } else { 320 return ( 321 StatusCode::BAD_REQUEST, 322 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 323 ) 324 .into_response(); 325 } 326 } else { 327 let rotation_key = std::env::var("PLC_ROTATION_KEY") 328 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 329 let genesis_result = match create_genesis_operation( 330 &signing_key, 331 &rotation_key, 332 &full_handle, 333 &pds_endpoint, 334 ) { 335 Ok(r) => r, 336 Err(e) => { 337 error!("Error creating PLC genesis operation: {:?}", e); 338 return ( 339 StatusCode::INTERNAL_SERVER_ERROR, 340 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 341 ) 342 .into_response(); 343 } 344 }; 345 let plc_client = PlcClient::new(None); 346 if let Err(e) = plc_client 347 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 348 .await 349 { 350 error!("Failed to submit PLC genesis operation: {:?}", e); 351 return ( 352 StatusCode::BAD_GATEWAY, 353 Json(json!({ 354 "error": "UpstreamError", 355 "message": format!("Failed to register DID with PLC directory: {}", e) 356 })), 357 ) 358 .into_response(); 359 } 360 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 361 genesis_result.did 362 }; 363 let mut tx = match state.db.begin().await { 364 Ok(tx) => tx, 365 Err(e) => { 366 error!("Error starting transaction: {:?}", e); 367 return ( 368 StatusCode::INTERNAL_SERVER_ERROR, 369 Json(json!({"error": "InternalError"})), 370 ) 371 .into_response(); 372 } 373 }; 374 if is_migration { 375 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 376 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE") 377 .bind(&did) 378 .fetch_optional(&mut *tx) 379 .await 380 .unwrap_or(None); 381 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 382 if deactivated_at.is_some() { 383 info!(did = %did, old_handle = %old_handle, new_handle = %short_handle, "Preparing existing account for inbound migration"); 384 let update_result: Result<_, sqlx::Error> = 385 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") 386 .bind(short_handle) 387 .bind(account_id) 388 .execute(&mut *tx) 389 .await; 390 if let Err(e) = update_result { 391 if let Some(db_err) = e.as_database_error() 392 && db_err 393 .constraint() 394 .map(|c| c.contains("handle")) 395 .unwrap_or(false) 396 { 397 return ( 398 StatusCode::BAD_REQUEST, 399 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 400 ) 401 .into_response(); 402 } 403 error!("Error reactivating account: {:?}", e); 404 return ( 405 StatusCode::INTERNAL_SERVER_ERROR, 406 Json(json!({"error": "InternalError"})), 407 ) 408 .into_response(); 409 } 410 if let Err(e) = tx.commit().await { 411 error!("Error committing reactivation: {:?}", e); 412 return ( 413 StatusCode::INTERNAL_SERVER_ERROR, 414 Json(json!({"error": "InternalError"})), 415 ) 416 .into_response(); 417 } 418 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 419 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 420 ) 421 .bind(account_id) 422 .fetch_optional(&state.db) 423 .await 424 .unwrap_or(None); 425 let secret_key_bytes = match key_row { 426 Some((key_bytes, encryption_version)) => { 427 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 428 Ok(k) => k, 429 Err(e) => { 430 error!("Error decrypting key for reactivated account: {:?}", e); 431 return ( 432 StatusCode::INTERNAL_SERVER_ERROR, 433 Json(json!({"error": "InternalError"})), 434 ) 435 .into_response(); 436 } 437 } 438 } 439 None => { 440 error!("No signing key found for reactivated account"); 441 return ( 442 StatusCode::INTERNAL_SERVER_ERROR, 443 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 444 ) 445 .into_response(); 446 } 447 }; 448 let access_meta = 449 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 450 Ok(m) => m, 451 Err(e) => { 452 error!("Error creating access token: {:?}", e); 453 return ( 454 StatusCode::INTERNAL_SERVER_ERROR, 455 Json(json!({"error": "InternalError"})), 456 ) 457 .into_response(); 458 } 459 }; 460 let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 461 &did, 462 &secret_key_bytes, 463 ) { 464 Ok(m) => m, 465 Err(e) => { 466 error!("Error creating refresh token: {:?}", e); 467 return ( 468 StatusCode::INTERNAL_SERVER_ERROR, 469 Json(json!({"error": "InternalError"})), 470 ) 471 .into_response(); 472 } 473 }; 474 let session_result: Result<_, sqlx::Error> = sqlx::query( 475 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 476 ) 477 .bind(&did) 478 .bind(&access_meta.jti) 479 .bind(&refresh_meta.jti) 480 .bind(access_meta.expires_at) 481 .bind(refresh_meta.expires_at) 482 .execute(&state.db) 483 .await; 484 if let Err(e) = session_result { 485 error!("Error creating session: {:?}", e); 486 return ( 487 StatusCode::INTERNAL_SERVER_ERROR, 488 Json(json!({"error": "InternalError"})), 489 ) 490 .into_response(); 491 } 492 return ( 493 StatusCode::OK, 494 Json(CreateAccountOutput { 495 handle: full_handle.clone(), 496 did, 497 access_jwt: Some(access_meta.token), 498 refresh_jwt: Some(refresh_meta.token), 499 verification_required: false, 500 verification_channel: "email".to_string(), 501 }), 502 ) 503 .into_response(); 504 } else { 505 return ( 506 StatusCode::BAD_REQUEST, 507 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 508 ) 509 .into_response(); 510 } 511 } 512 } 513 let exists_result: Option<(i32,)> = 514 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL") 515 .bind(short_handle) 516 .fetch_optional(&mut *tx) 517 .await 518 .unwrap_or(None); 519 if exists_result.is_some() { 520 return ( 521 StatusCode::BAD_REQUEST, 522 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 523 ) 524 .into_response(); 525 } 526 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 527 .map(|v| v == "true" || v == "1") 528 .unwrap_or(false); 529 if invite_code_required 530 && input 531 .invite_code 532 .as_ref() 533 .map(|c| c.trim().is_empty()) 534 .unwrap_or(true) 535 { 536 return ( 537 StatusCode::BAD_REQUEST, 538 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 539 ) 540 .into_response(); 541 } 542 if let Some(code) = &input.invite_code 543 && !code.trim().is_empty() 544 { 545 let invite_query = sqlx::query!( 546 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 547 code 548 ) 549 .fetch_optional(&mut *tx) 550 .await; 551 match invite_query { 552 Ok(Some(row)) => { 553 if row.available_uses <= 0 { 554 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 555 } 556 let update_invite = sqlx::query!( 557 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 558 code 559 ) 560 .execute(&mut *tx) 561 .await; 562 if let Err(e) = update_invite { 563 error!("Error updating invite code: {:?}", e); 564 return ( 565 StatusCode::INTERNAL_SERVER_ERROR, 566 Json(json!({"error": "InternalError"})), 567 ) 568 .into_response(); 569 } 570 } 571 Ok(None) => { 572 return ( 573 StatusCode::BAD_REQUEST, 574 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 575 ) 576 .into_response(); 577 } 578 Err(e) => { 579 error!("Error checking invite code: {:?}", e); 580 return ( 581 StatusCode::INTERNAL_SERVER_ERROR, 582 Json(json!({"error": "InternalError"})), 583 ) 584 .into_response(); 585 } 586 } 587 } 588 let password_hash = match hash(&input.password, DEFAULT_COST) { 589 Ok(h) => h, 590 Err(e) => { 591 error!("Error hashing password: {:?}", e); 592 return ( 593 StatusCode::INTERNAL_SERVER_ERROR, 594 Json(json!({"error": "InternalError"})), 595 ) 596 .into_response(); 597 } 598 }; 599 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 600 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30); 601 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 602 .fetch_one(&mut *tx) 603 .await 604 .map(|c| c.unwrap_or(0) == 0) 605 .unwrap_or(false); 606 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration { 607 Some(chrono::Utc::now()) 608 } else { 609 None 610 }; 611 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 612 r#"INSERT INTO users ( 613 handle, email, did, password_hash, 614 preferred_comms_channel, 615 discord_id, telegram_username, signal_number, 616 is_admin, deactivated_at, email_verified 617 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 618 ) 619 .bind(short_handle) 620 .bind(&email) 621 .bind(&did) 622 .bind(&password_hash) 623 .bind(verification_channel) 624 .bind( 625 input 626 .discord_id 627 .as_deref() 628 .map(|s| s.trim()) 629 .filter(|s| !s.is_empty()), 630 ) 631 .bind( 632 input 633 .telegram_username 634 .as_deref() 635 .map(|s| s.trim()) 636 .filter(|s| !s.is_empty()), 637 ) 638 .bind( 639 input 640 .signal_number 641 .as_deref() 642 .map(|s| s.trim()) 643 .filter(|s| !s.is_empty()), 644 ) 645 .bind(is_first_user) 646 .bind(deactivated_at) 647 .bind(is_migration) 648 .fetch_one(&mut *tx) 649 .await; 650 let user_id = match user_insert { 651 Ok((id,)) => id, 652 Err(e) => { 653 if let Some(db_err) = e.as_database_error() 654 && db_err.code().as_deref() == Some("23505") 655 { 656 let constraint = db_err.constraint().unwrap_or(""); 657 if constraint.contains("handle") || constraint.contains("users_handle") { 658 return ( 659 StatusCode::BAD_REQUEST, 660 Json(json!({ 661 "error": "HandleNotAvailable", 662 "message": "Handle already taken" 663 })), 664 ) 665 .into_response(); 666 } else if constraint.contains("email") || constraint.contains("users_email") { 667 return ( 668 StatusCode::BAD_REQUEST, 669 Json(json!({ 670 "error": "InvalidEmail", 671 "message": "Email already registered" 672 })), 673 ) 674 .into_response(); 675 } else if constraint.contains("did") || constraint.contains("users_did") { 676 return ( 677 StatusCode::BAD_REQUEST, 678 Json(json!({ 679 "error": "AccountAlreadyExists", 680 "message": "An account with this DID already exists" 681 })), 682 ) 683 .into_response(); 684 } 685 } 686 error!("Error inserting user: {:?}", e); 687 return ( 688 StatusCode::INTERNAL_SERVER_ERROR, 689 Json(json!({"error": "InternalError"})), 690 ) 691 .into_response(); 692 } 693 }; 694 695 if !is_migration 696 && let Err(e) = sqlx::query!( 697 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, 'email', $2, $3, $4)", 698 user_id, 699 verification_code, 700 email, 701 code_expires_at 702 ) 703 .execute(&mut *tx) 704 .await { 705 error!("Error inserting verification code: {:?}", e); 706 return ( 707 StatusCode::INTERNAL_SERVER_ERROR, 708 Json(json!({"error": "InternalError"})), 709 ) 710 .into_response(); 711 } 712 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 713 Ok(enc) => enc, 714 Err(e) => { 715 error!("Error encrypting user key: {:?}", e); 716 return ( 717 StatusCode::INTERNAL_SERVER_ERROR, 718 Json(json!({"error": "InternalError"})), 719 ) 720 .into_response(); 721 } 722 }; 723 let key_insert = sqlx::query!( 724 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 725 user_id, 726 &encrypted_key_bytes[..], 727 crate::config::ENCRYPTION_VERSION 728 ) 729 .execute(&mut *tx) 730 .await; 731 if let Err(e) = key_insert { 732 error!("Error inserting user key: {:?}", e); 733 return ( 734 StatusCode::INTERNAL_SERVER_ERROR, 735 Json(json!({"error": "InternalError"})), 736 ) 737 .into_response(); 738 } 739 if let Some(key_id) = reserved_key_id { 740 let mark_used = sqlx::query!( 741 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 742 key_id 743 ) 744 .execute(&mut *tx) 745 .await; 746 if let Err(e) = mark_used { 747 error!("Error marking reserved key as used: {:?}", e); 748 return ( 749 StatusCode::INTERNAL_SERVER_ERROR, 750 Json(json!({"error": "InternalError"})), 751 ) 752 .into_response(); 753 } 754 } 755 let mst = Mst::new(Arc::new(state.block_store.clone())); 756 let mst_root = match mst.persist().await { 757 Ok(c) => c, 758 Err(e) => { 759 error!("Error persisting MST: {:?}", e); 760 return ( 761 StatusCode::INTERNAL_SERVER_ERROR, 762 Json(json!({"error": "InternalError"})), 763 ) 764 .into_response(); 765 } 766 }; 767 let did_obj = match Did::new(&did) { 768 Ok(d) => d, 769 Err(_) => { 770 return ( 771 StatusCode::INTERNAL_SERVER_ERROR, 772 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 773 ) 774 .into_response(); 775 } 776 }; 777 let rev = Tid::now(LimitedU32::MIN); 778 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None); 779 let signed_commit = match unsigned_commit.sign(&signing_key) { 780 Ok(c) => c, 781 Err(e) => { 782 error!("Error signing genesis commit: {:?}", e); 783 return ( 784 StatusCode::INTERNAL_SERVER_ERROR, 785 Json(json!({"error": "InternalError"})), 786 ) 787 .into_response(); 788 } 789 }; 790 let commit_bytes = match signed_commit.to_cbor() { 791 Ok(b) => b, 792 Err(e) => { 793 error!("Error serializing genesis commit: {:?}", e); 794 return ( 795 StatusCode::INTERNAL_SERVER_ERROR, 796 Json(json!({"error": "InternalError"})), 797 ) 798 .into_response(); 799 } 800 }; 801 let commit_cid = match state.block_store.put(&commit_bytes).await { 802 Ok(c) => c, 803 Err(e) => { 804 error!("Error saving genesis commit: {:?}", e); 805 return ( 806 StatusCode::INTERNAL_SERVER_ERROR, 807 Json(json!({"error": "InternalError"})), 808 ) 809 .into_response(); 810 } 811 }; 812 let commit_cid_str = commit_cid.to_string(); 813 let repo_insert = sqlx::query!( 814 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 815 user_id, 816 commit_cid_str 817 ) 818 .execute(&mut *tx) 819 .await; 820 if let Err(e) = repo_insert { 821 error!("Error initializing repo: {:?}", e); 822 return ( 823 StatusCode::INTERNAL_SERVER_ERROR, 824 Json(json!({"error": "InternalError"})), 825 ) 826 .into_response(); 827 } 828 if let Some(code) = &input.invite_code 829 && !code.trim().is_empty() 830 { 831 let use_insert = sqlx::query!( 832 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 833 code, 834 user_id 835 ) 836 .execute(&mut *tx) 837 .await; 838 if let Err(e) = use_insert { 839 error!("Error recording invite usage: {:?}", e); 840 return ( 841 StatusCode::INTERNAL_SERVER_ERROR, 842 Json(json!({"error": "InternalError"})), 843 ) 844 .into_response(); 845 } 846 } 847 if let Err(e) = tx.commit().await { 848 error!("Error committing transaction: {:?}", e); 849 return ( 850 StatusCode::INTERNAL_SERVER_ERROR, 851 Json(json!({"error": "InternalError"})), 852 ) 853 .into_response(); 854 } 855 if !is_migration { 856 if let Err(e) = 857 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&full_handle)) 858 .await 859 { 860 warn!("Failed to sequence identity event for {}: {}", did, e); 861 } 862 if let Err(e) = 863 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 864 { 865 warn!("Failed to sequence account event for {}: {}", did, e); 866 } 867 let profile_record = json!({ 868 "$type": "app.bsky.actor.profile", 869 "displayName": input.handle 870 }); 871 if let Err(e) = crate::api::repo::record::create_record_internal( 872 &state, 873 &did, 874 "app.bsky.actor.profile", 875 "self", 876 &profile_record, 877 ) 878 .await 879 { 880 warn!("Failed to create default profile for {}: {}", did, e); 881 } 882 if let Some(ref recipient) = verification_recipient 883 && let Err(e) = crate::comms::enqueue_signup_verification( 884 &state.db, 885 user_id, 886 verification_channel, 887 recipient, 888 &verification_code, 889 ) 890 .await 891 { 892 warn!( 893 "Failed to enqueue signup verification notification: {:?}", 894 e 895 ); 896 } 897 } 898 899 let (access_jwt, refresh_jwt) = if is_migration { 900 let access_meta = 901 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 902 Ok(m) => m, 903 Err(e) => { 904 error!("Error creating access token for migration: {:?}", e); 905 return ( 906 StatusCode::INTERNAL_SERVER_ERROR, 907 Json(json!({"error": "InternalError"})), 908 ) 909 .into_response(); 910 } 911 }; 912 let refresh_meta = 913 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 914 Ok(m) => m, 915 Err(e) => { 916 error!("Error creating refresh token for migration: {:?}", e); 917 return ( 918 StatusCode::INTERNAL_SERVER_ERROR, 919 Json(json!({"error": "InternalError"})), 920 ) 921 .into_response(); 922 } 923 }; 924 if let Err(e) = sqlx::query!( 925 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 926 did, 927 access_meta.jti, 928 refresh_meta.jti, 929 access_meta.expires_at, 930 refresh_meta.expires_at 931 ) 932 .execute(&state.db) 933 .await 934 { 935 error!("Error creating session for migration: {:?}", e); 936 return ( 937 StatusCode::INTERNAL_SERVER_ERROR, 938 Json(json!({"error": "InternalError"})), 939 ) 940 .into_response(); 941 } 942 (Some(access_meta.token), Some(refresh_meta.token)) 943 } else { 944 (None, None) 945 }; 946 947 ( 948 StatusCode::OK, 949 Json(CreateAccountOutput { 950 handle: full_handle.clone(), 951 did, 952 access_jwt, 953 refresh_jwt, 954 verification_required: !is_migration, 955 verification_channel: verification_channel.to_string(), 956 }), 957 ) 958 .into_response() 959}