this repo has no description
1use super::did::verify_did_web; 2use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::{DEFAULT_COST, hash}; 11use jacquard::types::{did::Did, integer::LimitedU32, string::Tid}; 12use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 13use k256::{SecretKey, ecdsa::SigningKey}; 14use rand::rngs::OsRng; 15use serde::{Deserialize, Serialize}; 16use serde_json::json; 17use std::sync::Arc; 18use tracing::{error, info, warn}; 19 20fn extract_client_ip(headers: &HeaderMap) -> String { 21 if let Some(forwarded) = headers.get("x-forwarded-for") 22 && let Ok(value) = forwarded.to_str() 23 && let Some(first_ip) = value.split(',').next() { 24 return first_ip.trim().to_string(); 25 } 26 if let Some(real_ip) = headers.get("x-real-ip") 27 && let Ok(value) = real_ip.to_str() { 28 return value.trim().to_string(); 29 } 30 "unknown".to_string() 31} 32 33#[derive(Deserialize)] 34#[serde(rename_all = "camelCase")] 35pub struct CreateAccountInput { 36 pub handle: String, 37 pub email: Option<String>, 38 pub password: String, 39 pub invite_code: Option<String>, 40 pub did: Option<String>, 41 pub signing_key: Option<String>, 42 pub verification_channel: Option<String>, 43 pub discord_id: Option<String>, 44 pub telegram_username: Option<String>, 45 pub signal_number: Option<String>, 46} 47 48#[derive(Serialize)] 49#[serde(rename_all = "camelCase")] 50pub struct CreateAccountOutput { 51 pub handle: String, 52 pub did: String, 53 pub verification_required: bool, 54 pub verification_channel: String, 55} 56 57pub async fn create_account( 58 State(state): State<AppState>, 59 headers: HeaderMap, 60 Json(input): Json<CreateAccountInput>, 61) -> Response { 62 info!("create_account called"); 63 let client_ip = extract_client_ip(&headers); 64 if !state 65 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 66 .await 67 { 68 warn!(ip = %client_ip, "Account creation rate limit exceeded"); 69 return ( 70 StatusCode::TOO_MANY_REQUESTS, 71 Json(json!({ 72 "error": "RateLimitExceeded", 73 "message": "Too many account creation attempts. Please try again later." 74 })), 75 ) 76 .into_response(); 77 } 78 if input.handle.contains('!') || input.handle.contains('@') { 79 return ( 80 StatusCode::BAD_REQUEST, 81 Json( 82 json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"}), 83 ), 84 ) 85 .into_response(); 86 } 87 let email: Option<String> = input 88 .email 89 .as_ref() 90 .map(|e| e.trim().to_string()) 91 .filter(|e| !e.is_empty()); 92 if let Some(ref email) = email 93 && !crate::api::validation::is_valid_email(email) { 94 return ( 95 StatusCode::BAD_REQUEST, 96 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})), 97 ) 98 .into_response(); 99 } 100 let verification_channel = input.verification_channel.as_deref().unwrap_or("email"); 101 let valid_channels = ["email", "discord", "telegram", "signal"]; 102 if !valid_channels.contains(&verification_channel) { 103 return ( 104 StatusCode::BAD_REQUEST, 105 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})), 106 ) 107 .into_response(); 108 } 109 let verification_recipient = match verification_channel { 110 "email" => match &input.email { 111 Some(email) if !email.trim().is_empty() => email.trim().to_string(), 112 _ => return ( 113 StatusCode::BAD_REQUEST, 114 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})), 115 ).into_response(), 116 }, 117 "discord" => match &input.discord_id { 118 Some(id) if !id.trim().is_empty() => id.trim().to_string(), 119 _ => return ( 120 StatusCode::BAD_REQUEST, 121 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})), 122 ).into_response(), 123 }, 124 "telegram" => match &input.telegram_username { 125 Some(username) if !username.trim().is_empty() => username.trim().to_string(), 126 _ => return ( 127 StatusCode::BAD_REQUEST, 128 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})), 129 ).into_response(), 130 }, 131 "signal" => match &input.signal_number { 132 Some(number) if !number.trim().is_empty() => number.trim().to_string(), 133 _ => return ( 134 StatusCode::BAD_REQUEST, 135 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})), 136 ).into_response(), 137 }, 138 _ => return ( 139 StatusCode::BAD_REQUEST, 140 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})), 141 ).into_response(), 142 }; 143 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 144 let pds_endpoint = format!("https://{}", hostname); 145 let suffix = format!(".{}", hostname); 146 let short_handle = if input.handle.ends_with(&suffix) { 147 input.handle.strip_suffix(&suffix).unwrap_or(&input.handle) 148 } else { 149 &input.handle 150 }; 151 let full_handle = format!("{}.{}", short_handle, hostname); 152 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) = 153 if let Some(signing_key_did) = &input.signing_key { 154 let reserved = sqlx::query!( 155 r#" 156 SELECT id, private_key_bytes 157 FROM reserved_signing_keys 158 WHERE public_key_did_key = $1 159 AND used_at IS NULL 160 AND expires_at > NOW() 161 FOR UPDATE 162 "#, 163 signing_key_did 164 ) 165 .fetch_optional(&state.db) 166 .await; 167 match reserved { 168 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)), 169 Ok(None) => { 170 return ( 171 StatusCode::BAD_REQUEST, 172 Json(json!({ 173 "error": "InvalidSigningKey", 174 "message": "Signing key not found, already used, or expired" 175 })), 176 ) 177 .into_response(); 178 } 179 Err(e) => { 180 error!("Error looking up reserved signing key: {:?}", e); 181 return ( 182 StatusCode::INTERNAL_SERVER_ERROR, 183 Json(json!({"error": "InternalError"})), 184 ) 185 .into_response(); 186 } 187 } 188 } else { 189 let secret_key = SecretKey::random(&mut OsRng); 190 (secret_key.to_bytes().to_vec(), None) 191 }; 192 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 193 Ok(k) => k, 194 Err(e) => { 195 error!("Error creating signing key: {:?}", e); 196 return ( 197 StatusCode::INTERNAL_SERVER_ERROR, 198 Json(json!({"error": "InternalError"})), 199 ) 200 .into_response(); 201 } 202 }; 203 let did = if let Some(d) = &input.did { 204 if d.trim().is_empty() { 205 let rotation_key = std::env::var("PLC_ROTATION_KEY") 206 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 207 let genesis_result = match create_genesis_operation( 208 &signing_key, 209 &rotation_key, 210 &full_handle, 211 &pds_endpoint, 212 ) { 213 Ok(r) => r, 214 Err(e) => { 215 error!("Error creating PLC genesis operation: {:?}", e); 216 return ( 217 StatusCode::INTERNAL_SERVER_ERROR, 218 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 219 ) 220 .into_response(); 221 } 222 }; 223 let plc_client = PlcClient::new(None); 224 if let Err(e) = plc_client 225 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 226 .await 227 { 228 error!("Failed to submit PLC genesis operation: {:?}", e); 229 return ( 230 StatusCode::BAD_GATEWAY, 231 Json(json!({ 232 "error": "UpstreamError", 233 "message": format!("Failed to register DID with PLC directory: {}", e) 234 })), 235 ) 236 .into_response(); 237 } 238 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 239 genesis_result.did 240 } else if d.starts_with("did:web:") { 241 if let Err(e) = verify_did_web(d, &hostname, &input.handle).await { 242 return ( 243 StatusCode::BAD_REQUEST, 244 Json(json!({"error": "InvalidDid", "message": e})), 245 ) 246 .into_response(); 247 } 248 d.clone() 249 } else { 250 return ( 251 StatusCode::BAD_REQUEST, 252 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc"})), 253 ) 254 .into_response(); 255 } 256 } else { 257 let rotation_key = std::env::var("PLC_ROTATION_KEY") 258 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key)); 259 let genesis_result = match create_genesis_operation( 260 &signing_key, 261 &rotation_key, 262 &full_handle, 263 &pds_endpoint, 264 ) { 265 Ok(r) => r, 266 Err(e) => { 267 error!("Error creating PLC genesis operation: {:?}", e); 268 return ( 269 StatusCode::INTERNAL_SERVER_ERROR, 270 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})), 271 ) 272 .into_response(); 273 } 274 }; 275 let plc_client = PlcClient::new(None); 276 if let Err(e) = plc_client 277 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 278 .await 279 { 280 error!("Failed to submit PLC genesis operation: {:?}", e); 281 return ( 282 StatusCode::BAD_GATEWAY, 283 Json(json!({ 284 "error": "UpstreamError", 285 "message": format!("Failed to register DID with PLC directory: {}", e) 286 })), 287 ) 288 .into_response(); 289 } 290 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory"); 291 genesis_result.did 292 }; 293 let mut tx = match state.db.begin().await { 294 Ok(tx) => tx, 295 Err(e) => { 296 error!("Error starting transaction: {:?}", e); 297 return ( 298 StatusCode::INTERNAL_SERVER_ERROR, 299 Json(json!({"error": "InternalError"})), 300 ) 301 .into_response(); 302 } 303 }; 304 let exists_query = sqlx::query!("SELECT 1 as one FROM users WHERE handle = $1", short_handle) 305 .fetch_optional(&mut *tx) 306 .await; 307 match exists_query { 308 Ok(Some(_)) => { 309 return ( 310 StatusCode::BAD_REQUEST, 311 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})), 312 ) 313 .into_response(); 314 } 315 Err(e) => { 316 error!("Error checking handle: {:?}", e); 317 return ( 318 StatusCode::INTERNAL_SERVER_ERROR, 319 Json(json!({"error": "InternalError"})), 320 ) 321 .into_response(); 322 } 323 Ok(None) => {} 324 } 325 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 326 .map(|v| v == "true" || v == "1") 327 .unwrap_or(false); 328 if invite_code_required && input.invite_code.as_ref().map(|c| c.trim().is_empty()).unwrap_or(true) { 329 return ( 330 StatusCode::BAD_REQUEST, 331 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})), 332 ) 333 .into_response(); 334 } 335 if let Some(code) = &input.invite_code { 336 if !code.trim().is_empty() { 337 let invite_query = sqlx::query!( 338 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE", 339 code 340 ) 341 .fetch_optional(&mut *tx) 342 .await; 343 match invite_query { 344 Ok(Some(row)) => { 345 if row.available_uses <= 0 { 346 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response(); 347 } 348 let update_invite = sqlx::query!( 349 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 350 code 351 ) 352 .execute(&mut *tx) 353 .await; 354 if let Err(e) = update_invite { 355 error!("Error updating invite code: {:?}", e); 356 return ( 357 StatusCode::INTERNAL_SERVER_ERROR, 358 Json(json!({"error": "InternalError"})), 359 ) 360 .into_response(); 361 } 362 } 363 Ok(None) => { 364 return ( 365 StatusCode::BAD_REQUEST, 366 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})), 367 ) 368 .into_response(); 369 } 370 Err(e) => { 371 error!("Error checking invite code: {:?}", e); 372 return ( 373 StatusCode::INTERNAL_SERVER_ERROR, 374 Json(json!({"error": "InternalError"})), 375 ) 376 .into_response(); 377 } 378 } 379 } 380 } 381 let password_hash = match hash(&input.password, DEFAULT_COST) { 382 Ok(h) => h, 383 Err(e) => { 384 error!("Error hashing password: {:?}", e); 385 return ( 386 StatusCode::INTERNAL_SERVER_ERROR, 387 Json(json!({"error": "InternalError"})), 388 ) 389 .into_response(); 390 } 391 }; 392 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 393 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30); 394 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users") 395 .fetch_one(&mut *tx) 396 .await 397 .map(|c| c.unwrap_or(0) == 0) 398 .unwrap_or(false); 399 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 400 r#"INSERT INTO users ( 401 handle, email, did, password_hash, 402 preferred_comms_channel, 403 discord_id, telegram_username, signal_number, 404 is_admin 405 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9) RETURNING id"#, 406 ) 407 .bind(short_handle) 408 .bind(&email) 409 .bind(&did) 410 .bind(&password_hash) 411 .bind(verification_channel) 412 .bind( 413 input 414 .discord_id 415 .as_deref() 416 .map(|s| s.trim()) 417 .filter(|s| !s.is_empty()), 418 ) 419 .bind( 420 input 421 .telegram_username 422 .as_deref() 423 .map(|s| s.trim()) 424 .filter(|s| !s.is_empty()), 425 ) 426 .bind( 427 input 428 .signal_number 429 .as_deref() 430 .map(|s| s.trim()) 431 .filter(|s| !s.is_empty()), 432 ) 433 .bind(is_first_user) 434 .fetch_one(&mut *tx) 435 .await; 436 let user_id = match user_insert { 437 Ok((id,)) => id, 438 Err(e) => { 439 if let Some(db_err) = e.as_database_error() 440 && db_err.code().as_deref() == Some("23505") { 441 let constraint = db_err.constraint().unwrap_or(""); 442 if constraint.contains("handle") || constraint.contains("users_handle") { 443 return ( 444 StatusCode::BAD_REQUEST, 445 Json(json!({ 446 "error": "HandleNotAvailable", 447 "message": "Handle already taken" 448 })), 449 ) 450 .into_response(); 451 } else if constraint.contains("email") || constraint.contains("users_email") { 452 return ( 453 StatusCode::BAD_REQUEST, 454 Json(json!({ 455 "error": "InvalidEmail", 456 "message": "Email already registered" 457 })), 458 ) 459 .into_response(); 460 } else if constraint.contains("did") || constraint.contains("users_did") { 461 return ( 462 StatusCode::BAD_REQUEST, 463 Json(json!({ 464 "error": "AccountAlreadyExists", 465 "message": "An account with this DID already exists" 466 })), 467 ) 468 .into_response(); 469 } 470 } 471 error!("Error inserting user: {:?}", e); 472 return ( 473 StatusCode::INTERNAL_SERVER_ERROR, 474 Json(json!({"error": "InternalError"})), 475 ) 476 .into_response(); 477 } 478 }; 479 480 if let Err(e) = sqlx::query!( 481 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, 'email', $2, $3, $4)", 482 user_id, 483 verification_code, 484 email, 485 code_expires_at 486 ) 487 .execute(&mut *tx) 488 .await { 489 error!("Error inserting verification code: {:?}", e); 490 return ( 491 StatusCode::INTERNAL_SERVER_ERROR, 492 Json(json!({"error": "InternalError"})), 493 ) 494 .into_response(); 495 } 496 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 497 Ok(enc) => enc, 498 Err(e) => { 499 error!("Error encrypting user key: {:?}", e); 500 return ( 501 StatusCode::INTERNAL_SERVER_ERROR, 502 Json(json!({"error": "InternalError"})), 503 ) 504 .into_response(); 505 } 506 }; 507 let key_insert = sqlx::query!( 508 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 509 user_id, 510 &encrypted_key_bytes[..], 511 crate::config::ENCRYPTION_VERSION 512 ) 513 .execute(&mut *tx) 514 .await; 515 if let Err(e) = key_insert { 516 error!("Error inserting user key: {:?}", e); 517 return ( 518 StatusCode::INTERNAL_SERVER_ERROR, 519 Json(json!({"error": "InternalError"})), 520 ) 521 .into_response(); 522 } 523 if let Some(key_id) = reserved_key_id { 524 let mark_used = sqlx::query!( 525 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1", 526 key_id 527 ) 528 .execute(&mut *tx) 529 .await; 530 if let Err(e) = mark_used { 531 error!("Error marking reserved key as used: {:?}", e); 532 return ( 533 StatusCode::INTERNAL_SERVER_ERROR, 534 Json(json!({"error": "InternalError"})), 535 ) 536 .into_response(); 537 } 538 } 539 let mst = Mst::new(Arc::new(state.block_store.clone())); 540 let mst_root = match mst.persist().await { 541 Ok(c) => c, 542 Err(e) => { 543 error!("Error persisting MST: {:?}", e); 544 return ( 545 StatusCode::INTERNAL_SERVER_ERROR, 546 Json(json!({"error": "InternalError"})), 547 ) 548 .into_response(); 549 } 550 }; 551 let did_obj = match Did::new(&did) { 552 Ok(d) => d, 553 Err(_) => { 554 return ( 555 StatusCode::INTERNAL_SERVER_ERROR, 556 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 557 ) 558 .into_response(); 559 } 560 }; 561 let rev = Tid::now(LimitedU32::MIN); 562 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None); 563 let signed_commit = match unsigned_commit.sign(&signing_key) { 564 Ok(c) => c, 565 Err(e) => { 566 error!("Error signing genesis commit: {:?}", e); 567 return ( 568 StatusCode::INTERNAL_SERVER_ERROR, 569 Json(json!({"error": "InternalError"})), 570 ) 571 .into_response(); 572 } 573 }; 574 let commit_bytes = match signed_commit.to_cbor() { 575 Ok(b) => b, 576 Err(e) => { 577 error!("Error serializing genesis commit: {:?}", e); 578 return ( 579 StatusCode::INTERNAL_SERVER_ERROR, 580 Json(json!({"error": "InternalError"})), 581 ) 582 .into_response(); 583 } 584 }; 585 let commit_cid = match state.block_store.put(&commit_bytes).await { 586 Ok(c) => c, 587 Err(e) => { 588 error!("Error saving genesis commit: {:?}", e); 589 return ( 590 StatusCode::INTERNAL_SERVER_ERROR, 591 Json(json!({"error": "InternalError"})), 592 ) 593 .into_response(); 594 } 595 }; 596 let commit_cid_str = commit_cid.to_string(); 597 let repo_insert = sqlx::query!( 598 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 599 user_id, 600 commit_cid_str 601 ) 602 .execute(&mut *tx) 603 .await; 604 if let Err(e) = repo_insert { 605 error!("Error initializing repo: {:?}", e); 606 return ( 607 StatusCode::INTERNAL_SERVER_ERROR, 608 Json(json!({"error": "InternalError"})), 609 ) 610 .into_response(); 611 } 612 if let Some(code) = &input.invite_code { 613 if !code.trim().is_empty() { 614 let use_insert = sqlx::query!( 615 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 616 code, 617 user_id 618 ) 619 .execute(&mut *tx) 620 .await; 621 if let Err(e) = use_insert { 622 error!("Error recording invite usage: {:?}", e); 623 return ( 624 StatusCode::INTERNAL_SERVER_ERROR, 625 Json(json!({"error": "InternalError"})), 626 ) 627 .into_response(); 628 } 629 } 630 } 631 if let Err(e) = tx.commit().await { 632 error!("Error committing transaction: {:?}", e); 633 return ( 634 StatusCode::INTERNAL_SERVER_ERROR, 635 Json(json!({"error": "InternalError"})), 636 ) 637 .into_response(); 638 } 639 if let Err(e) = 640 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&full_handle)).await 641 { 642 warn!("Failed to sequence identity event for {}: {}", did, e); 643 } 644 if let Err(e) = crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 645 { 646 warn!("Failed to sequence account event for {}: {}", did, e); 647 } 648 let profile_record = json!({ 649 "$type": "app.bsky.actor.profile", 650 "displayName": input.handle 651 }); 652 if let Err(e) = crate::api::repo::record::create_record_internal( 653 &state, 654 &did, 655 "app.bsky.actor.profile", 656 "self", 657 &profile_record, 658 ) 659 .await 660 { 661 warn!("Failed to create default profile for {}: {}", did, e); 662 } 663 if let Err(e) = crate::comms::enqueue_signup_verification( 664 &state.db, 665 user_id, 666 verification_channel, 667 &verification_recipient, 668 &verification_code, 669 ) 670 .await 671 { 672 warn!( 673 "Failed to enqueue signup verification notification: {:?}", 674 e 675 ); 676 } 677 ( 678 StatusCode::OK, 679 Json(CreateAccountOutput { 680 handle: short_handle.to_string(), 681 did, 682 verification_required: true, 683 verification_channel: verification_channel.to_string(), 684 }), 685 ) 686 .into_response() 687}