this repo has no description
1use super::did::verify_did_web; 2use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token}; 3use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 4use crate::state::{AppState, RateLimitKind}; 5use axum::{ 6 Json, 7 extract::State, 8 http::{HeaderMap, StatusCode}, 9 response::{IntoResponse, Response}, 10}; 11use bcrypt::{DEFAULT_COST, hash}; 12use jacquard::types::{did::Did, integer::LimitedU32, string::Tid}; 13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 14use k256::{SecretKey, ecdsa::SigningKey}; 15use rand::rngs::OsRng; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use std::sync::Arc; 19use tracing::{debug, error, info, warn}; 20 21fn extract_client_ip(headers: &HeaderMap) -> String { 22 if let Some(forwarded) = headers.get("x-forwarded-for") 23 && let Ok(value) = forwarded.to_str() 24 && let Some(first_ip) = value.split(',').next() { 25 return first_ip.trim().to_string(); 26 } 27 if let Some(real_ip) = headers.get("x-real-ip") 28 && let Ok(value) = real_ip.to_str() { 29 return value.trim().to_string(); 30 } 31 "unknown".to_string() 32} 33 34#[derive(Deserialize)] 35#[serde(rename_all = "camelCase")] 36pub struct CreateAccountInput { 37 pub handle: String, 38 pub email: Option<String>, 39 pub password: String, 40 pub invite_code: Option<String>, 41 pub did: Option<String>, 42 pub signing_key: Option<String>, 43 pub verification_channel: Option<String>, 44 pub discord_id: Option<String>, 45 pub telegram_username: Option<String>, 46 pub signal_number: Option<String>, 47} 48 49#[derive(Serialize)] 50#[serde(rename_all = "camelCase")] 51pub struct CreateAccountOutput { 52 pub handle: String, 53 pub did: String, 54 #[serde(skip_serializing_if = "Option::is_none")] 55 pub access_jwt: Option<String>, 56 #[serde(skip_serializing_if = "Option::is_none")] 57 pub refresh_jwt: Option<String>, 58 pub verification_required: bool, 59 pub verification_channel: String, 60} 61 62pub async fn create_account( 63 State(state): State<AppState>, 64 headers: HeaderMap, 65 Json(input): Json<CreateAccountInput>, 66) -> Response { 67 info!("create_account called"); 68 let client_ip = extract_client_ip(&headers); 69 if !state 70 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 71 .await 72 { 73 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 74 return ( 75 StatusCode::TOO_MANY_REQUESTS, 76 Json(json!({ 77 "error": "RateLimitExceeded", 78 "message": "Too many account creation attempts. Please try again later." 79 })), 80 ) 81 .into_response(); 82 } 83 84 let migration_auth = if let Some(token) = 85 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok())) 86 { 87 if is_service_token(&token) { 88 let verifier = ServiceTokenVerifier::new(); 89 match verifier 90 .verify_service_token(&token, Some("com.atproto.server.createAccount")) 91 .await 92 { 93 Ok(claims) => { 94 debug!("Service token verified for migration: iss={}", claims.iss); 95 Some(claims.iss) 96 } 97 Err(e) => { 98 error!("Service token verification failed: {:?}", e); 99 return ( 100 StatusCode::UNAUTHORIZED, 101 Json(json!({ 102 "error": "AuthenticationFailed", 103 "message": format!("Service token verification failed: {}", e) 104 })), 105 ) 106 .into_response(); 107 } 108 } 109 } else { 110 None 111 } 112 } else { 113 None 114 }; 115 116 let is_migration = migration_auth.is_some() 117 && input.did.as_ref().map(|d| d.starts_with("did:plc:")).unwrap_or(false); 118 119 if is_migration { 120 let migration_did = input.did.as_ref().unwrap(); 121 let auth_did = migration_auth.as_ref().unwrap(); 122 if migration_did != auth_did { 123 return ( 124 StatusCode::FORBIDDEN, 125 Json(json!({ 126 "error": "AuthorizationError", 127 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did) 128 })), 129 ) 130 .into_response(); 131 } 132 info!(did = %migration_did, "Processing account migration"); 133 } 134 135 if input.handle.contains('!') || input.handle.contains('@') { 136 return ( 137 StatusCode::BAD_REQUEST, 138 Json( 139 json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"}), 140 ), 141 ) 142 .into_response(); 143 } 144 let email: Option<String> = input 145 .email 146 .as_ref() 147 .map(|e| e.trim().to_string()) 148 .filter(|e| !e.is_empty()); 149 if let Some(ref email) = email 150 && !crate::api::validation::is_valid_email(email) { 151 return ( 152 StatusCode::BAD_REQUEST, 153 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 154 ) 155 .into_response(); 156 } 157 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 158 let valid_channels = ["email", "discord", "telegram", "signal"]; 159 if !valid_channels.contains(&verification_channel) && !is_migration { 160 return ( 161 StatusCode::BAD_REQUEST, 162 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 163 ) 164 .into_response(); 165 } 166 let verification_recipient = if is_migration { 167 None 168 } else { 169 Some(match verification_channel { 170 "email" => match &input.email { 171 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 172 _ => return ( 173 StatusCode::BAD_REQUEST, 174 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 175 ).into_response(), 176 }, 177 "discord" => match &input.discord_id { 178 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 179 _ => return ( 180 StatusCode::BAD_REQUEST, 181 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 182 ).into_response(), 183 }, 184 "telegram" => match &input.telegram_username { 185 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 186 _ => return ( 187 StatusCode::BAD_REQUEST, 188 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 189 ).into_response(), 190 }, 191 "signal" => match &input.signal_number { 192 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 193 _ => return ( 194 StatusCode::BAD_REQUEST, 195 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 196 ).into_response(), 197 }, 198 _ => return ( 199 StatusCode::BAD_REQUEST, 200 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 201 ).into_response(), 202 }) 203 }; 204 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 205 let pds_endpoint = format!("https://{}", hostname); 206 let suffix = format!(".{}", hostname); 207 let short_handle = if input.handle.ends_with(&suffix) { 208 input.handle.strip_suffix(&suffix).unwrap_or(&input.handle) 209 } else { 210 &input.handle 211 }; 212 let full_handle = format!("{}.{}", short_handle, hostname); 213 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 214 if let Some(signing_key_did) = &input.signing_key { 215 let reserved = sqlx::query!( 216 r#" 217 SELECT id, private_key_bytes 218 FROM reserved_signing_keys 219 WHERE public_key_did_key = $1 220 AND used_at IS NULL 221 AND expires_at > NOW() 222 FOR UPDATE 223 "#, 224 signing_key_did 225 ) 226 .fetch_optional(&state.db) 227 .await; 228 match reserved { 229 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 230 Ok(None) => { 231 return ( 232 StatusCode::BAD_REQUEST, 233 Json(json!({ 234 "error": "InvalidSigningKey", 235 "message": "Signing key not found, already used, or expired" 236 })), 237 ) 238 .into_response(); 239 } 240 Err(e) => { 241 error!("Error looking up reserved signing key: {:?}", e); 242 return ( 243 StatusCode::INTERNAL_SERVER_ERROR, 244 Json(json!({"error": "InternalError"})), 245 ) 246 .into_response(); 247 } 248 } 249 } else { 250 let secret_key = SecretKey::random(&mut OsRng); 251 (secret_key.to_bytes().to_vec(), None) 252 }; 253 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 254 Ok(k) => k, 255 Err(e) => { 256 error!("Error creating signing key: {:?}", e); 257 return ( 258 StatusCode::INTERNAL_SERVER_ERROR, 259 Json(json!({"error": "InternalError"})), 260 ) 261 .into_response(); 262 } 263 }; 264 let did = if let Some(d) = &input.did { 265 if d.trim().is_empty() { 266 let rotation_key = std::env::var("PLC_ROTATION_KEY") 267 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 268 let genesis_result = match create_genesis_operation( 269 &signing_key, 270 &rotation_key, 271 &full_handle, 272 &pds_endpoint, 273 ) { 274 Ok(r) => r, 275 Err(e) => { 276 error!("Error creating PLC genesis operation: {:?}", e); 277 return ( 278 StatusCode::INTERNAL_SERVER_ERROR, 279 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 280 ) 281 .into_response(); 282 } 283 }; 284 let plc_client = PlcClient::new(None); 285 if let Err(e) = plc_client 286 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 287 .await 288 { 289 error!("Failed to submit PLC genesis operation: {:?}", e); 290 return ( 291 StatusCode::BAD_GATEWAY, 292 Json(json!({ 293 "error": "UpstreamError", 294 "message": format!("Failed to register DID with PLC directory: {}", e) 295 })), 296 ) 297 .into_response(); 298 } 299 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 300 genesis_result.did 301 } else if d.starts_with("did:web:") { 302 if let Err(e) = verify_did_web(d, &hostname, &input.handle).await { 303 return ( 304 StatusCode::BAD_REQUEST, 305 Json(json!({"error": "InvalidDid", "message": e})), 306 ) 307 .into_response(); 308 } 309 d.clone() 310 } else if d.starts_with("did:plc:") && is_migration { 311 d.clone() 312 } else { 313 return ( 314 StatusCode::BAD_REQUEST, 315 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})), 316 ) 317 .into_response(); 318 } 319 } else { 320 let rotation_key = std::env::var("PLC_ROTATION_KEY") 321 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 322 let genesis_result = match create_genesis_operation( 323 &signing_key, 324 &rotation_key, 325 &full_handle, 326 &pds_endpoint, 327 ) { 328 Ok(r) => r, 329 Err(e) => { 330 error!("Error creating PLC genesis operation: {:?}", e); 331 return ( 332 StatusCode::INTERNAL_SERVER_ERROR, 333 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 334 ) 335 .into_response(); 336 } 337 }; 338 let plc_client = PlcClient::new(None); 339 if let Err(e) = plc_client 340 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 341 .await 342 { 343 error!("Failed to submit PLC genesis operation: {:?}", e); 344 return ( 345 StatusCode::BAD_GATEWAY, 346 Json(json!({ 347 "error": "UpstreamError", 348 "message": format!("Failed to register DID with PLC directory: {}", e) 349 })), 350 ) 351 .into_response(); 352 } 353 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 354 genesis_result.did 355 }; 356 let mut tx = match state.db.begin().await { 357 Ok(tx) => tx, 358 Err(e) => { 359 error!("Error starting transaction: {:?}", e); 360 return ( 361 StatusCode::INTERNAL_SERVER_ERROR, 362 Json(json!({"error": "InternalError"})), 363 ) 364 .into_response(); 365 } 366 }; 367 let exists_query = sqlx::query!("SELECT 1 as one FROM users WHERE handle = $1", short_handle) 368 .fetch_optional(&mut *tx) 369 .await; 370 match exists_query { 371 Ok(Some(_)) => { 372 return ( 373 StatusCode::BAD_REQUEST, 374 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 375 ) 376 .into_response(); 377 } 378 Err(e) => { 379 error!("Error checking handle: {:?}", e); 380 return ( 381 StatusCode::INTERNAL_SERVER_ERROR, 382 Json(json!({"error": "InternalError"})), 383 ) 384 .into_response(); 385 } 386 Ok(None) => {} 387 } 388 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 389 .map(|v| v == "true" || v == "1") 390 .unwrap_or(false); 391 if invite_code_required && input.invite_code.as_ref().map(|c| c.trim().is_empty()).unwrap_or(true) { 392 return ( 393 StatusCode::BAD_REQUEST, 394 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 395 ) 396 .into_response(); 397 } 398 if let Some(code) = &input.invite_code { 399 if !code.trim().is_empty() { 400 let invite_query = sqlx::query!( 401 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 402 code 403 ) 404 .fetch_optional(&mut *tx) 405 .await; 406 match invite_query { 407 Ok(Some(row)) => { 408 if row.available_uses <= 0 { 409 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 410 } 411 let update_invite = sqlx::query!( 412 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 413 code 414 ) 415 .execute(&mut *tx) 416 .await; 417 if let Err(e) = update_invite { 418 error!("Error updating invite code: {:?}", e); 419 return ( 420 StatusCode::INTERNAL_SERVER_ERROR, 421 Json(json!({"error": "InternalError"})), 422 ) 423 .into_response(); 424 } 425 } 426 Ok(None) => { 427 return ( 428 StatusCode::BAD_REQUEST, 429 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 430 ) 431 .into_response(); 432 } 433 Err(e) => { 434 error!("Error checking invite code: {:?}", e); 435 return ( 436 StatusCode::INTERNAL_SERVER_ERROR, 437 Json(json!({"error": "InternalError"})), 438 ) 439 .into_response(); 440 } 441 } 442 } 443 } 444 let password_hash = match hash(&input.password, DEFAULT_COST) { 445 Ok(h) => h, 446 Err(e) => { 447 error!("Error hashing password: {:?}", e); 448 return ( 449 StatusCode::INTERNAL_SERVER_ERROR, 450 Json(json!({"error": "InternalError"})), 451 ) 452 .into_response(); 453 } 454 }; 455 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 456 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30); 457 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 458 .fetch_one(&mut *tx) 459 .await 460 .map(|c| c.unwrap_or(0) == 0) 461 .unwrap_or(false); 462 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration { 463 Some(chrono::Utc::now()) 464 } else { 465 None 466 }; 467 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 468 r#"INSERT INTO users ( 469 handle, email, did, password_hash, 470 preferred_comms_channel, 471 discord_id, telegram_username, signal_number, 472 is_admin, deactivated_at, email_verified 473 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#, 474 ) 475 .bind(short_handle) 476 .bind(&email) 477 .bind(&did) 478 .bind(&password_hash) 479 .bind(verification_channel) 480 .bind( 481 input 482 .discord_id 483 .as_deref() 484 .map(|s| s.trim()) 485 .filter(|s| !s.is_empty()), 486 ) 487 .bind( 488 input 489 .telegram_username 490 .as_deref() 491 .map(|s| s.trim()) 492 .filter(|s| !s.is_empty()), 493 ) 494 .bind( 495 input 496 .signal_number 497 .as_deref() 498 .map(|s| s.trim()) 499 .filter(|s| !s.is_empty()), 500 ) 501 .bind(is_first_user) 502 .bind(deactivated_at) 503 .bind(is_migration) 504 .fetch_one(&mut *tx) 505 .await; 506 let user_id = match user_insert { 507 Ok((id,)) => id, 508 Err(e) => { 509 if let Some(db_err) = e.as_database_error() 510 && db_err.code().as_deref() == Some("23505") { 511 let constraint = db_err.constraint().unwrap_or(""); 512 if constraint.contains("handle") || constraint.contains("users_handle") { 513 return ( 514 StatusCode::BAD_REQUEST, 515 Json(json!({ 516 "error": "HandleNotAvailable", 517 "message": "Handle already taken" 518 })), 519 ) 520 .into_response(); 521 } else if constraint.contains("email") || constraint.contains("users_email") { 522 return ( 523 StatusCode::BAD_REQUEST, 524 Json(json!({ 525 "error": "InvalidEmail", 526 "message": "Email already registered" 527 })), 528 ) 529 .into_response(); 530 } else if constraint.contains("did") || constraint.contains("users_did") { 531 return ( 532 StatusCode::BAD_REQUEST, 533 Json(json!({ 534 "error": "AccountAlreadyExists", 535 "message": "An account with this DID already exists" 536 })), 537 ) 538 .into_response(); 539 } 540 } 541 error!("Error inserting user: {:?}", e); 542 return ( 543 StatusCode::INTERNAL_SERVER_ERROR, 544 Json(json!({"error": "InternalError"})), 545 ) 546 .into_response(); 547 } 548 }; 549 550 if !is_migration { 551 if let Err(e) = sqlx::query!( 552 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, 'email', $2, $3, $4)", 553 user_id, 554 verification_code, 555 email, 556 code_expires_at 557 ) 558 .execute(&mut *tx) 559 .await { 560 error!("Error inserting verification code: {:?}", e); 561 return ( 562 StatusCode::INTERNAL_SERVER_ERROR, 563 Json(json!({"error": "InternalError"})), 564 ) 565 .into_response(); 566 } 567 } 568 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 569 Ok(enc) => enc, 570 Err(e) => { 571 error!("Error encrypting user key: {:?}", e); 572 return ( 573 StatusCode::INTERNAL_SERVER_ERROR, 574 Json(json!({"error": "InternalError"})), 575 ) 576 .into_response(); 577 } 578 }; 579 let key_insert = sqlx::query!( 580 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 581 user_id, 582 &encrypted_key_bytes[..], 583 crate::config::ENCRYPTION_VERSION 584 ) 585 .execute(&mut *tx) 586 .await; 587 if let Err(e) = key_insert { 588 error!("Error inserting user key: {:?}", e); 589 return ( 590 StatusCode::INTERNAL_SERVER_ERROR, 591 Json(json!({"error": "InternalError"})), 592 ) 593 .into_response(); 594 } 595 if let Some(key_id) = reserved_key_id { 596 let mark_used = sqlx::query!( 597 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 598 key_id 599 ) 600 .execute(&mut *tx) 601 .await; 602 if let Err(e) = mark_used { 603 error!("Error marking reserved key as used: {:?}", e); 604 return ( 605 StatusCode::INTERNAL_SERVER_ERROR, 606 Json(json!({"error": "InternalError"})), 607 ) 608 .into_response(); 609 } 610 } 611 let mst = Mst::new(Arc::new(state.block_store.clone())); 612 let mst_root = match mst.persist().await { 613 Ok(c) => c, 614 Err(e) => { 615 error!("Error persisting MST: {:?}", e); 616 return ( 617 StatusCode::INTERNAL_SERVER_ERROR, 618 Json(json!({"error": "InternalError"})), 619 ) 620 .into_response(); 621 } 622 }; 623 let did_obj = match Did::new(&did) { 624 Ok(d) => d, 625 Err(_) => { 626 return ( 627 StatusCode::INTERNAL_SERVER_ERROR, 628 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 629 ) 630 .into_response(); 631 } 632 }; 633 let rev = Tid::now(LimitedU32::MIN); 634 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None); 635 let signed_commit = match unsigned_commit.sign(&signing_key) { 636 Ok(c) => c, 637 Err(e) => { 638 error!("Error signing genesis commit: {:?}", e); 639 return ( 640 StatusCode::INTERNAL_SERVER_ERROR, 641 Json(json!({"error": "InternalError"})), 642 ) 643 .into_response(); 644 } 645 }; 646 let commit_bytes = match signed_commit.to_cbor() { 647 Ok(b) => b, 648 Err(e) => { 649 error!("Error serializing genesis commit: {:?}", e); 650 return ( 651 StatusCode::INTERNAL_SERVER_ERROR, 652 Json(json!({"error": "InternalError"})), 653 ) 654 .into_response(); 655 } 656 }; 657 let commit_cid = match state.block_store.put(&commit_bytes).await { 658 Ok(c) => c, 659 Err(e) => { 660 error!("Error saving genesis commit: {:?}", e); 661 return ( 662 StatusCode::INTERNAL_SERVER_ERROR, 663 Json(json!({"error": "InternalError"})), 664 ) 665 .into_response(); 666 } 667 }; 668 let commit_cid_str = commit_cid.to_string(); 669 let repo_insert = sqlx::query!( 670 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 671 user_id, 672 commit_cid_str 673 ) 674 .execute(&mut *tx) 675 .await; 676 if let Err(e) = repo_insert { 677 error!("Error initializing repo: {:?}", e); 678 return ( 679 StatusCode::INTERNAL_SERVER_ERROR, 680 Json(json!({"error": "InternalError"})), 681 ) 682 .into_response(); 683 } 684 if let Some(code) = &input.invite_code { 685 if !code.trim().is_empty() { 686 let use_insert = sqlx::query!( 687 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 688 code, 689 user_id 690 ) 691 .execute(&mut *tx) 692 .await; 693 if let Err(e) = use_insert { 694 error!("Error recording invite usage: {:?}", e); 695 return ( 696 StatusCode::INTERNAL_SERVER_ERROR, 697 Json(json!({"error": "InternalError"})), 698 ) 699 .into_response(); 700 } 701 } 702 } 703 if let Err(e) = tx.commit().await { 704 error!("Error committing transaction: {:?}", e); 705 return ( 706 StatusCode::INTERNAL_SERVER_ERROR, 707 Json(json!({"error": "InternalError"})), 708 ) 709 .into_response(); 710 } 711 if !is_migration { 712 if let Err(e) = 713 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&full_handle)).await 714 { 715 warn!("Failed to sequence identity event for {}: {}", did, e); 716 } 717 if let Err(e) = crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 718 { 719 warn!("Failed to sequence account event for {}: {}", did, e); 720 } 721 let profile_record = json!({ 722 "$type": "app.bsky.actor.profile", 723 "displayName": input.handle 724 }); 725 if let Err(e) = crate::api::repo::record::create_record_internal( 726 &state, 727 &did, 728 "app.bsky.actor.profile", 729 "self", 730 &profile_record, 731 ) 732 .await 733 { 734 warn!("Failed to create default profile for {}: {}", did, e); 735 } 736 if let Some(ref recipient) = verification_recipient { 737 if let Err(e) = crate::comms::enqueue_signup_verification( 738 &state.db, 739 user_id, 740 verification_channel, 741 recipient, 742 &verification_code, 743 ) 744 .await 745 { 746 warn!( 747 "Failed to enqueue signup verification notification: {:?}", 748 e 749 ); 750 } 751 } 752 } 753 754 let (access_jwt, refresh_jwt) = if is_migration { 755 let access_meta = 756 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) { 757 Ok(m) => m, 758 Err(e) => { 759 error!("Error creating access token for migration: {:?}", e); 760 return ( 761 StatusCode::INTERNAL_SERVER_ERROR, 762 Json(json!({"error": "InternalError"})), 763 ) 764 .into_response(); 765 } 766 }; 767 let refresh_meta = 768 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 769 Ok(m) => m, 770 Err(e) => { 771 error!("Error creating refresh token for migration: {:?}", e); 772 return ( 773 StatusCode::INTERNAL_SERVER_ERROR, 774 Json(json!({"error": "InternalError"})), 775 ) 776 .into_response(); 777 } 778 }; 779 if let Err(e) = sqlx::query!( 780 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 781 did, 782 access_meta.jti, 783 refresh_meta.jti, 784 access_meta.expires_at, 785 refresh_meta.expires_at 786 ) 787 .execute(&state.db) 788 .await 789 { 790 error!("Error creating session for migration: {:?}", e); 791 return ( 792 StatusCode::INTERNAL_SERVER_ERROR, 793 Json(json!({"error": "InternalError"})), 794 ) 795 .into_response(); 796 } 797 (Some(access_meta.token), Some(refresh_meta.token)) 798 } else { 799 (None, None) 800 }; 801 802 ( 803 StatusCode::OK, 804 Json(CreateAccountOutput { 805 handle: full_handle.clone(), 806 did, 807 access_jwt, 808 refresh_jwt, 809 verification_required: !is_migration, 810 verification_channel: verification_channel.to_string(), 811 }), 812 ) 813 .into_response() 814}