this repo has no description
1use super::did::verify_did_web; 2use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 3use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 4use crate::state::{AppState, RateLimitKind}; 5use axum::{ 6 Json, 7 extract::State, 8 http::{HeaderMap, StatusCode}, 9 response::{IntoResponse, Response}, 10}; 11use bcrypt::{DEFAULT_COST, hash}; 12use jacquard::types::{did::Did, integer::LimitedU32, string::Tid}; 13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 14use k256::{SecretKey, ecdsa::SigningKey}; 15use rand::rngs::OsRng; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use std::sync::Arc; 19use tracing::{debug, error, info, warn}; 20 21fn extract_client_ip(headers: &HeaderMap) -> String { 22 if let Some(forwarded) = headers.get("x-forwarded-for") 23 && let Ok(value) = forwarded.to_str() 24 && let Some(first_ip) = value.split(',').next() { 25 return first_ip.trim().to_string(); 26 } 27 if let Some(real_ip) = headers.get("x-real-ip") 28 && let Ok(value) = real_ip.to_str() { 29 return value.trim().to_string(); 30 } 31 "unknown".to_string() 32} 33 34#[derive(Deserialize)] 35#[serde(rename_all = "camelCase")] 36pub struct CreateAccountInput { 37 pub handle: String, 38 pub email: Option<String>, 39 pub password: String, 40 pub invite_code: Option<String>, 41 pub did: Option<String>, 42 pub signing_key: Option<String>, 43 pub verification_channel: Option<String>, 44 pub discord_id: Option<String>, 45 pub telegram_username: Option<String>, 46 pub signal_number: Option<String>, 47} 48 49#[derive(Serialize)] 50#[serde(rename_all = "camelCase")] 51pub struct CreateAccountOutput { 52 pub handle: String, 53 pub did: String, 54 #[serde(skip_serializing_if = "Option::is_none")] 55 pub access_jwt: Option<String>, 56 #[serde(skip_serializing_if = "Option::is_none")] 57 pub refresh_jwt: Option<String>, 58 pub verification_required: bool, 59 pub verification_channel: String, 60} 61 62pub async fn create_account( 63 State(state): State<AppState>, 64 headers: HeaderMap, 65 Json(input): Json<CreateAccountInput>, 66) -> Response { 67 info!("create_account called"); 68 let client_ip = extract_client_ip(&headers); 69 if !state 70 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 71 .await 72 { 73 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 74 return ( 75 StatusCode::TOO_MANY_REQUESTS, 76 Json(json!({ 77 "error": "RateLimitExceeded", 78 "message": "Too many account creation attempts. Please try again later." 79 })), 80 ) 81 .into_response(); 82 } 83 84 let migration_auth = if let Some(token) = 85 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 86 { 87 if is_service_token(&token) { 88 let verifier = ServiceTokenVerifier::new(); 89 match verifier 90 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 91 .await 92 { 93 Ok(claims) => { 94 debug!("Service token verified for migration: iss={}", claims.iss); 95 Some(claims.iss) 96 } 97 Err(e) => { 98 error!("Service token verification failed: {:?}", e); 99 return ( 100 StatusCode::UNAUTHORIZED, 101 Json(json!({ 102 "error": "AuthenticationFailed", 103 "message": format!("Service token verification failed: {}", e) 104 })), 105 ) 106 .into_response(); 107 } 108 } 109 } else { 110 None 111 } 112 } else { 113 None 114 }; 115 116 let is_migration = migration_auth.is_some() 117 && input.did.as_ref().map(|d| d.starts_with("did:plc:")).unwrap_or(false); 118 119 if is_migration { 120 let migration_did = input.did.as_ref().unwrap(); 121 let auth_did = migration_auth.as_ref().unwrap(); 122 if migration_did != auth_did { 123 return ( 124 StatusCode::FORBIDDEN, 125 Json(json!({ 126 "error": "AuthorizationError", 127 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did) 128 })), 129 ) 130 .into_response(); 131 } 132 info!(did = %migration_did, "Processing account migration"); 133 } 134 135 if input.handle.contains('!') || input.handle.contains('@') { 136 return ( 137 StatusCode::BAD_REQUEST, 138 Json( 139 json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"}), 140 ), 141 ) 142 .into_response(); 143 } 144 let email: Option<String> = input 145 .email 146 .as_ref() 147 .map(|e| e.trim().to_string()) 148 .filter(|e| !e.is_empty()); 149 if let Some(ref email) = email 150 && !crate::api::validation::is_valid_email(email) { 151 return ( 152 StatusCode::BAD_REQUEST, 153 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 154 ) 155 .into_response(); 156 } 157 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 158 let valid_channels = ["email", "discord", "telegram", "signal"]; 159 if !valid_channels.contains(&verification_channel) && !is_migration { 160 return ( 161 StatusCode::BAD_REQUEST, 162 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 163 ) 164 .into_response(); 165 } 166 let verification_recipient = if is_migration { 167 None 168 } else { 169 Some(match verification_channel { 170 "email" => match &input.email { 171 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 172 _ => return ( 173 StatusCode::BAD_REQUEST, 174 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 175 ).into_response(), 176 }, 177 "discord" => match &input.discord_id { 178 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 179 _ => return ( 180 StatusCode::BAD_REQUEST, 181 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 182 ).into_response(), 183 }, 184 "telegram" => match &input.telegram_username { 185 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 186 _ => return ( 187 StatusCode::BAD_REQUEST, 188 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 189 ).into_response(), 190 }, 191 "signal" => match &input.signal_number { 192 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 193 _ => return ( 194 StatusCode::BAD_REQUEST, 195 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 196 ).into_response(), 197 }, 198 _ => return ( 199 StatusCode::BAD_REQUEST, 200 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 201 ).into_response(), 202 }) 203 }; 204 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 205 let pds_endpoint = format!("https://{}", hostname); 206 let suffix = format!(".{}", hostname); 207 let short_handle = if input.handle.ends_with(&suffix) { 208 input.handle.strip_suffix(&suffix).unwrap_or(&input.handle) 209 } else { 210 &input.handle 211 }; 212 let full_handle = format!("{}.{}", short_handle, hostname); 213 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 214 if let Some(signing_key_did) = &input.signing_key { 215 let reserved = sqlx::query!( 216 r#" 217 SELECT id, private_key_bytes 218 FROM reserved_signing_keys 219 WHERE public_key_did_key = $1 220 AND used_at IS NULL 221 AND expires_at > NOW() 222 FOR UPDATE 223 "#, 224 signing_key_did 225 ) 226 .fetch_optional(&state.db) 227 .await; 228 match reserved { 229 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 230 Ok(None) => { 231 return ( 232 StatusCode::BAD_REQUEST, 233 Json(json!({ 234 "error": "InvalidSigningKey", 235 "message": "Signing key not found, already used, or expired" 236 })), 237 ) 238 .into_response(); 239 } 240 Err(e) => { 241 error!("Error looking up reserved signing key: {:?}", e); 242 return ( 243 StatusCode::INTERNAL_SERVER_ERROR, 244 Json(json!({"error": "InternalError"})), 245 ) 246 .into_response(); 247 } 248 } 249 } else { 250 let secret_key = SecretKey::random(&mut OsRng); 251 (secret_key.to_bytes().to_vec(), None) 252 }; 253 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 254 Ok(k) => k, 255 Err(e) => { 256 error!("Error creating signing key: {:?}", e); 257 return ( 258 StatusCode::INTERNAL_SERVER_ERROR, 259 Json(json!({"error": "InternalError"})), 260 ) 261 .into_response(); 262 } 263 }; 264 let did = if let Some(d) = &input.did { 265 if d.trim().is_empty() { 266 let rotation_key = std::env::var("PLC_ROTATION_KEY") 267 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 268 let genesis_result = match create_genesis_operation( 269 &signing_key, 270 &rotation_key, 271 &full_handle, 272 &pds_endpoint, 273 ) { 274 Ok(r) => r, 275 Err(e) => { 276 error!("Error creating PLC genesis operation: {:?}", e); 277 return ( 278 StatusCode::INTERNAL_SERVER_ERROR, 279 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 280 ) 281 .into_response(); 282 } 283 }; 284 let plc_client = PlcClient::new(None); 285 if let Err(e) = plc_client 286 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 287 .await 288 { 289 error!("Failed to submit PLC genesis operation: {:?}", e); 290 return ( 291 StatusCode::BAD_GATEWAY, 292 Json(json!({ 293 "error": "UpstreamError", 294 "message": format!("Failed to register DID with PLC directory: {}", e) 295 })), 296 ) 297 .into_response(); 298 } 299 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 300 genesis_result.did 301 } else if d.starts_with("did:web:") { 302 if let Err(e) = verify_did_web(d, &hostname, &input.handle).await { 303 return ( 304 StatusCode::BAD_REQUEST, 305 Json(json!({"error": "InvalidDid", "message": e})), 306 ) 307 .into_response(); 308 } 309 d.clone() 310 } else if d.starts_with("did:plc:") && is_migration { 311 d.clone() 312 } else { 313 return ( 314 StatusCode::BAD_REQUEST, 315 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 316 ) 317 .into_response(); 318 } 319 } else { 320 let rotation_key = std::env::var("PLC_ROTATION_KEY") 321 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 322 let genesis_result = match create_genesis_operation( 323 &signing_key, 324 &rotation_key, 325 &full_handle, 326 &pds_endpoint, 327 ) { 328 Ok(r) => r, 329 Err(e) => { 330 error!("Error creating PLC genesis operation: {:?}", e); 331 return ( 332 StatusCode::INTERNAL_SERVER_ERROR, 333 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 334 ) 335 .into_response(); 336 } 337 }; 338 let plc_client = PlcClient::new(None); 339 if let Err(e) = plc_client 340 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 341 .await 342 { 343 error!("Failed to submit PLC genesis operation: {:?}", e); 344 return ( 345 StatusCode::BAD_GATEWAY, 346 Json(json!({ 347 "error": "UpstreamError", 348 "message": format!("Failed to register DID with PLC directory: {}", e) 349 })), 350 ) 351 .into_response(); 352 } 353 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 354 genesis_result.did 355 }; 356 let mut tx = match state.db.begin().await { 357 Ok(tx) => tx, 358 Err(e) => { 359 error!("Error starting transaction: {:?}", e); 360 return ( 361 StatusCode::INTERNAL_SERVER_ERROR, 362 Json(json!({"error": "InternalError"})), 363 ) 364 .into_response(); 365 } 366 }; 367 if is_migration { 368 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> = 369 sqlx::query_as( 370 "SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE", 371 ) 372 .bind(&did) 373 .fetch_optional(&mut *tx) 374 .await 375 .unwrap_or(None); 376 if let Some((account_id, old_handle, deactivated_at)) = existing_account { 377 if deactivated_at.is_some() { 378 info!(did = %did, old_handle = %old_handle, new_handle = %short_handle, "Preparing existing account for inbound migration"); 379 let update_result: Result<_, sqlx::Error> = sqlx::query( 380 "UPDATE users SET handle = $1 WHERE id = $2", 381 ) 382 .bind(short_handle) 383 .bind(account_id) 384 .execute(&mut *tx) 385 .await; 386 if let Err(e) = update_result { 387 if let Some(db_err) = e.as_database_error() { 388 if db_err.constraint().map(|c| c.contains("handle")).unwrap_or(false) { 389 return ( 390 StatusCode::BAD_REQUEST, 391 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})), 392 ) 393 .into_response(); 394 } 395 } 396 error!("Error reactivating account: {:?}", e); 397 return ( 398 StatusCode::INTERNAL_SERVER_ERROR, 399 Json(json!({"error": "InternalError"})), 400 ) 401 .into_response(); 402 } 403 if let Err(e) = tx.commit().await { 404 error!("Error committing reactivation: {:?}", e); 405 return ( 406 StatusCode::INTERNAL_SERVER_ERROR, 407 Json(json!({"error": "InternalError"})), 408 ) 409 .into_response(); 410 } 411 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as( 412 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 413 ) 414 .bind(account_id) 415 .fetch_optional(&state.db) 416 .await 417 .unwrap_or(None); 418 let secret_key_bytes = match key_row { 419 Some((key_bytes, encryption_version)) => { 420 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) { 421 Ok(k) => k, 422 Err(e) => { 423 error!("Error decrypting key for reactivated account: {:?}", e); 424 return ( 425 StatusCode::INTERNAL_SERVER_ERROR, 426 Json(json!({"error": "InternalError"})), 427 ) 428 .into_response(); 429 } 430 } 431 } 432 None => { 433 error!("No signing key found for reactivated account"); 434 return ( 435 StatusCode::INTERNAL_SERVER_ERROR, 436 Json(json!({"error": "InternalError", "message": "Account signing key not found"})), 437 ) 438 .into_response(); 439 } 440 }; 441 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 442 Ok(m) => m, 443 Err(e) => { 444 error!("Error creating access token: {:?}", e); 445 return ( 446 StatusCode::INTERNAL_SERVER_ERROR, 447 Json(json!({"error": "InternalError"})), 448 ) 449 .into_response(); 450 } 451 }; 452 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 453 Ok(m) => m, 454 Err(e) => { 455 error!("Error creating refresh token: {:?}", e); 456 return ( 457 StatusCode::INTERNAL_SERVER_ERROR, 458 Json(json!({"error": "InternalError"})), 459 ) 460 .into_response(); 461 } 462 }; 463 let session_result: Result<_, sqlx::Error> = sqlx::query( 464 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 465 ) 466 .bind(&did) 467 .bind(&access_meta.jti) 468 .bind(&refresh_meta.jti) 469 .bind(access_meta.expires_at) 470 .bind(refresh_meta.expires_at) 471 .execute(&state.db) 472 .await; 473 if let Err(e) = session_result { 474 error!("Error creating session: {:?}", e); 475 return ( 476 StatusCode::INTERNAL_SERVER_ERROR, 477 Json(json!({"error": "InternalError"})), 478 ) 479 .into_response(); 480 } 481 return ( 482 StatusCode::OK, 483 Json(CreateAccountOutput { 484 handle: full_handle.clone(), 485 did, 486 access_jwt: Some(access_meta.token), 487 refresh_jwt: Some(refresh_meta.token), 488 verification_required: false, 489 verification_channel: "email".to_string(), 490 }), 491 ) 492 .into_response(); 493 } else { 494 return ( 495 StatusCode::BAD_REQUEST, 496 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})), 497 ) 498 .into_response(); 499 } 500 } 501 } 502 let exists_result: Option<(i32,)> = sqlx::query_as( 503 "SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL", 504 ) 505 .bind(short_handle) 506 .fetch_optional(&mut *tx) 507 .await 508 .unwrap_or(None); 509 if exists_result.is_some() { 510 return ( 511 StatusCode::BAD_REQUEST, 512 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 513 ) 514 .into_response(); 515 } 516 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 517 .map(|v| v == "true" || v == "1") 518 .unwrap_or(false); 519 if invite_code_required && input.invite_code.as_ref().map(|c| c.trim().is_empty()).unwrap_or(true) { 520 return ( 521 StatusCode::BAD_REQUEST, 522 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 523 ) 524 .into_response(); 525 } 526 if let Some(code) = &input.invite_code { 527 if !code.trim().is_empty() { 528 let invite_query = sqlx::query!( 529 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 530 code 531 ) 532 .fetch_optional(&mut *tx) 533 .await; 534 match invite_query { 535 Ok(Some(row)) => { 536 if row.available_uses <= 0 { 537 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 538 } 539 let update_invite = sqlx::query!( 540 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 541 code 542 ) 543 .execute(&mut *tx) 544 .await; 545 if let Err(e) = update_invite { 546 error!("Error updating invite code: {:?}", e); 547 return ( 548 StatusCode::INTERNAL_SERVER_ERROR, 549 Json(json!({"error": "InternalError"})), 550 ) 551 .into_response(); 552 } 553 } 554 Ok(None) => { 555 return ( 556 StatusCode::BAD_REQUEST, 557 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 558 ) 559 .into_response(); 560 } 561 Err(e) => { 562 error!("Error checking invite code: {:?}", e); 563 return ( 564 StatusCode::INTERNAL_SERVER_ERROR, 565 Json(json!({"error": "InternalError"})), 566 ) 567 .into_response(); 568 } 569 } 570 } 571 } 572 let password_hash = match hash(&input.password, DEFAULT_COST) { 573 Ok(h) => h, 574 Err(e) => { 575 error!("Error hashing password: {:?}", e); 576 return ( 577 StatusCode::INTERNAL_SERVER_ERROR, 578 Json(json!({"error": "InternalError"})), 579 ) 580 .into_response(); 581 } 582 }; 583 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 584 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30); 585 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 586 .fetch_one(&mut *tx) 587 .await 588 .map(|c| c.unwrap_or(0) == 0) 589 .unwrap_or(false); 590 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration { 591 Some(chrono::Utc::now()) 592 } else { 593 None 594 }; 595 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 596 r#"INSERT INTO users ( 597 handle, email, did, password_hash, 598 preferred_comms_channel, 599 discord_id, telegram_username, signal_number, 600 is_admin, deactivated_at, email_verified 601 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 602 ) 603 .bind(short_handle) 604 .bind(&email) 605 .bind(&did) 606 .bind(&password_hash) 607 .bind(verification_channel) 608 .bind( 609 input 610 .discord_id 611 .as_deref() 612 .map(|s| s.trim()) 613 .filter(|s| !s.is_empty()), 614 ) 615 .bind( 616 input 617 .telegram_username 618 .as_deref() 619 .map(|s| s.trim()) 620 .filter(|s| !s.is_empty()), 621 ) 622 .bind( 623 input 624 .signal_number 625 .as_deref() 626 .map(|s| s.trim()) 627 .filter(|s| !s.is_empty()), 628 ) 629 .bind(is_first_user) 630 .bind(deactivated_at) 631 .bind(is_migration) 632 .fetch_one(&mut *tx) 633 .await; 634 let user_id = match user_insert { 635 Ok((id,)) => id, 636 Err(e) => { 637 if let Some(db_err) = e.as_database_error() 638 && db_err.code().as_deref() == Some("23505") { 639 let constraint = db_err.constraint().unwrap_or(""); 640 if constraint.contains("handle") || constraint.contains("users_handle") { 641 return ( 642 StatusCode::BAD_REQUEST, 643 Json(json!({ 644 "error": "HandleNotAvailable", 645 "message": "Handle already taken" 646 })), 647 ) 648 .into_response(); 649 } else if constraint.contains("email") || constraint.contains("users_email") { 650 return ( 651 StatusCode::BAD_REQUEST, 652 Json(json!({ 653 "error": "InvalidEmail", 654 "message": "Email already registered" 655 })), 656 ) 657 .into_response(); 658 } else if constraint.contains("did") || constraint.contains("users_did") { 659 return ( 660 StatusCode::BAD_REQUEST, 661 Json(json!({ 662 "error": "AccountAlreadyExists", 663 "message": "An account with this DID already exists" 664 })), 665 ) 666 .into_response(); 667 } 668 } 669 error!("Error inserting user: {:?}", e); 670 return ( 671 StatusCode::INTERNAL_SERVER_ERROR, 672 Json(json!({"error": "InternalError"})), 673 ) 674 .into_response(); 675 } 676 }; 677 678 if !is_migration { 679 if let Err(e) = sqlx::query!( 680 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, 'email', $2, $3, $4)", 681 user_id, 682 verification_code, 683 email, 684 code_expires_at 685 ) 686 .execute(&mut *tx) 687 .await { 688 error!("Error inserting verification code: {:?}", e); 689 return ( 690 StatusCode::INTERNAL_SERVER_ERROR, 691 Json(json!({"error": "InternalError"})), 692 ) 693 .into_response(); 694 } 695 } 696 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 697 Ok(enc) => enc, 698 Err(e) => { 699 error!("Error encrypting user key: {:?}", e); 700 return ( 701 StatusCode::INTERNAL_SERVER_ERROR, 702 Json(json!({"error": "InternalError"})), 703 ) 704 .into_response(); 705 } 706 }; 707 let key_insert = sqlx::query!( 708 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 709 user_id, 710 &encrypted_key_bytes[..], 711 crate::config::ENCRYPTION_VERSION 712 ) 713 .execute(&mut *tx) 714 .await; 715 if let Err(e) = key_insert { 716 error!("Error inserting user key: {:?}", e); 717 return ( 718 StatusCode::INTERNAL_SERVER_ERROR, 719 Json(json!({"error": "InternalError"})), 720 ) 721 .into_response(); 722 } 723 if let Some(key_id) = reserved_key_id { 724 let mark_used = sqlx::query!( 725 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 726 key_id 727 ) 728 .execute(&mut *tx) 729 .await; 730 if let Err(e) = mark_used { 731 error!("Error marking reserved key as used: {:?}", e); 732 return ( 733 StatusCode::INTERNAL_SERVER_ERROR, 734 Json(json!({"error": "InternalError"})), 735 ) 736 .into_response(); 737 } 738 } 739 let mst = Mst::new(Arc::new(state.block_store.clone())); 740 let mst_root = match mst.persist().await { 741 Ok(c) => c, 742 Err(e) => { 743 error!("Error persisting MST: {:?}", e); 744 return ( 745 StatusCode::INTERNAL_SERVER_ERROR, 746 Json(json!({"error": "InternalError"})), 747 ) 748 .into_response(); 749 } 750 }; 751 let did_obj = match Did::new(&did) { 752 Ok(d) => d, 753 Err(_) => { 754 return ( 755 StatusCode::INTERNAL_SERVER_ERROR, 756 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 757 ) 758 .into_response(); 759 } 760 }; 761 let rev = Tid::now(LimitedU32::MIN); 762 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None); 763 let signed_commit = match unsigned_commit.sign(&signing_key) { 764 Ok(c) => c, 765 Err(e) => { 766 error!("Error signing genesis commit: {:?}", e); 767 return ( 768 StatusCode::INTERNAL_SERVER_ERROR, 769 Json(json!({"error": "InternalError"})), 770 ) 771 .into_response(); 772 } 773 }; 774 let commit_bytes = match signed_commit.to_cbor() { 775 Ok(b) => b, 776 Err(e) => { 777 error!("Error serializing genesis commit: {:?}", e); 778 return ( 779 StatusCode::INTERNAL_SERVER_ERROR, 780 Json(json!({"error": "InternalError"})), 781 ) 782 .into_response(); 783 } 784 }; 785 let commit_cid = match state.block_store.put(&commit_bytes).await { 786 Ok(c) => c, 787 Err(e) => { 788 error!("Error saving genesis commit: {:?}", e); 789 return ( 790 StatusCode::INTERNAL_SERVER_ERROR, 791 Json(json!({"error": "InternalError"})), 792 ) 793 .into_response(); 794 } 795 }; 796 let commit_cid_str = commit_cid.to_string(); 797 let repo_insert = sqlx::query!( 798 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 799 user_id, 800 commit_cid_str 801 ) 802 .execute(&mut *tx) 803 .await; 804 if let Err(e) = repo_insert { 805 error!("Error initializing repo: {:?}", e); 806 return ( 807 StatusCode::INTERNAL_SERVER_ERROR, 808 Json(json!({"error": "InternalError"})), 809 ) 810 .into_response(); 811 } 812 if let Some(code) = &input.invite_code { 813 if !code.trim().is_empty() { 814 let use_insert = sqlx::query!( 815 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 816 code, 817 user_id 818 ) 819 .execute(&mut *tx) 820 .await; 821 if let Err(e) = use_insert { 822 error!("Error recording invite usage: {:?}", e); 823 return ( 824 StatusCode::INTERNAL_SERVER_ERROR, 825 Json(json!({"error": "InternalError"})), 826 ) 827 .into_response(); 828 } 829 } 830 } 831 if let Err(e) = tx.commit().await { 832 error!("Error committing transaction: {:?}", e); 833 return ( 834 StatusCode::INTERNAL_SERVER_ERROR, 835 Json(json!({"error": "InternalError"})), 836 ) 837 .into_response(); 838 } 839 if !is_migration { 840 if let Err(e) = 841 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&full_handle)).await 842 { 843 warn!("Failed to sequence identity event for {}: {}", did, e); 844 } 845 if let Err(e) = crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 846 { 847 warn!("Failed to sequence account event for {}: {}", did, e); 848 } 849 let profile_record = json!({ 850 "$type": "app.bsky.actor.profile", 851 "displayName": input.handle 852 }); 853 if let Err(e) = crate::api::repo::record::create_record_internal( 854 &state, 855 &did, 856 "app.bsky.actor.profile", 857 "self", 858 &profile_record, 859 ) 860 .await 861 { 862 warn!("Failed to create default profile for {}: {}", did, e); 863 } 864 if let Some(ref recipient) = verification_recipient { 865 if let Err(e) = crate::comms::enqueue_signup_verification( 866 &state.db, 867 user_id, 868 verification_channel, 869 recipient, 870 &verification_code, 871 ) 872 .await 873 { 874 warn!( 875 "Failed to enqueue signup verification notification: {:?}", 876 e 877 ); 878 } 879 } 880 } 881 882 let (access_jwt, refresh_jwt) = if is_migration { 883 let access_meta = 884 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 885 Ok(m) => m, 886 Err(e) => { 887 error!("Error creating access token for migration: {:?}", e); 888 return ( 889 StatusCode::INTERNAL_SERVER_ERROR, 890 Json(json!({"error": "InternalError"})), 891 ) 892 .into_response(); 893 } 894 }; 895 let refresh_meta = 896 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 897 Ok(m) => m, 898 Err(e) => { 899 error!("Error creating refresh token for migration: {:?}", e); 900 return ( 901 StatusCode::INTERNAL_SERVER_ERROR, 902 Json(json!({"error": "InternalError"})), 903 ) 904 .into_response(); 905 } 906 }; 907 if let Err(e) = sqlx::query!( 908 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 909 did, 910 access_meta.jti, 911 refresh_meta.jti, 912 access_meta.expires_at, 913 refresh_meta.expires_at 914 ) 915 .execute(&state.db) 916 .await 917 { 918 error!("Error creating session for migration: {:?}", e); 919 return ( 920 StatusCode::INTERNAL_SERVER_ERROR, 921 Json(json!({"error": "InternalError"})), 922 ) 923 .into_response(); 924 } 925 (Some(access_meta.token), Some(refresh_meta.token)) 926 } else { 927 (None, None) 928 }; 929 930 ( 931 StatusCode::OK, 932 Json(CreateAccountOutput { 933 handle: full_handle.clone(), 934 did, 935 access_jwt, 936 refresh_jwt, 937 verification_required: !is_migration, 938 verification_channel: verification_channel.to_string(), 939 }), 940 ) 941 .into_response() 942}