this repo has no description
1use super::did::verify_did_web;
2use crate::api::error::ApiError;
3use crate::api::repo::record::utils::create_signed_commit;
4use crate::auth::{ServiceTokenVerifier, is_service_token};
5use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
6use crate::state::{AppState, RateLimitKind};
7use crate::types::{Did, Handle, Nsid, PlainPassword, Rkey};
8use crate::validation::validate_password;
9use axum::{
10 Json,
11 extract::State,
12 http::{HeaderMap, StatusCode},
13 response::{IntoResponse, Response},
14};
15use bcrypt::{DEFAULT_COST, hash};
16use jacquard::types::{integer::LimitedU32, string::Tid};
17use jacquard_repo::{mst::Mst, storage::BlockStore};
18use k256::{SecretKey, ecdsa::SigningKey};
19use rand::rngs::OsRng;
20use serde::{Deserialize, Serialize};
21use serde_json::json;
22use std::sync::Arc;
23use tracing::{debug, error, info, warn};
24
25fn extract_client_ip(headers: &HeaderMap) -> String {
26 if let Some(forwarded) = headers.get("x-forwarded-for")
27 && let Ok(value) = forwarded.to_str()
28 && let Some(first_ip) = value.split(',').next()
29 {
30 return first_ip.trim().to_string();
31 }
32 if let Some(real_ip) = headers.get("x-real-ip")
33 && let Ok(value) = real_ip.to_str()
34 {
35 return value.trim().to_string();
36 }
37 "unknown".to_string()
38}
39
40#[derive(Deserialize)]
41#[serde(rename_all = "camelCase")]
42pub struct CreateAccountInput {
43 pub handle: String,
44 pub email: Option<String>,
45 pub password: PlainPassword,
46 pub invite_code: Option<String>,
47 pub did: Option<String>,
48 pub did_type: Option<String>,
49 pub signing_key: Option<String>,
50 pub verification_channel: Option<String>,
51 pub discord_id: Option<String>,
52 pub telegram_username: Option<String>,
53 pub signal_number: Option<String>,
54}
55
56#[derive(Serialize)]
57#[serde(rename_all = "camelCase")]
58pub struct CreateAccountOutput {
59 pub handle: Handle,
60 pub did: Did,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub did_doc: Option<serde_json::Value>,
63 pub access_jwt: String,
64 pub refresh_jwt: String,
65 pub verification_required: bool,
66 pub verification_channel: String,
67}
68
69pub async fn create_account(
70 State(state): State<AppState>,
71 headers: HeaderMap,
72 Json(input): Json<CreateAccountInput>,
73) -> Response {
74 let is_potential_migration = input
75 .did
76 .as_ref()
77 .map(|d| d.starts_with("did:plc:"))
78 .unwrap_or(false);
79 if is_potential_migration {
80 info!(
81 "[MIGRATION] createAccount called for potential migration did={:?} handle={}",
82 input.did, input.handle
83 );
84 } else {
85 info!("create_account called");
86 }
87 let client_ip = extract_client_ip(&headers);
88 if !state
89 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
90 .await
91 {
92 warn!(ip = %client_ip, "Account creation rate limit exceeded");
93 return ApiError::RateLimitExceeded(Some(
94 "Too many account creation attempts. Please try again later.".into(),
95 ))
96 .into_response();
97 }
98
99 let migration_auth = if let Some(extracted) = crate::auth::extract_auth_token_from_header(
100 headers.get("Authorization").and_then(|h| h.to_str().ok()),
101 ) {
102 let token = extracted.token;
103 if is_service_token(&token) {
104 let verifier = ServiceTokenVerifier::new();
105 match verifier
106 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
107 .await
108 {
109 Ok(claims) => {
110 debug!("Service token verified for migration: iss={}", claims.iss);
111 Some(claims.iss)
112 }
113 Err(e) => {
114 error!("Service token verification failed: {:?}", e);
115 return ApiError::AuthenticationFailed(Some(format!(
116 "Service token verification failed: {}",
117 e
118 )))
119 .into_response();
120 }
121 }
122 } else {
123 None
124 }
125 } else {
126 None
127 };
128
129 let is_did_web_byod = migration_auth.is_some()
130 && input
131 .did
132 .as_ref()
133 .map(|d| d.starts_with("did:web:"))
134 .unwrap_or(false);
135
136 let is_migration = migration_auth.is_some()
137 && input
138 .did
139 .as_ref()
140 .map(|d| d.starts_with("did:plc:"))
141 .unwrap_or(false);
142
143 if (is_migration || is_did_web_byod)
144 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref())
145 {
146 if provided_did != auth_did {
147 info!(
148 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}",
149 auth_did, provided_did
150 );
151 return ApiError::AuthorizationError(format!(
152 "Service token issuer {} does not match DID {}",
153 auth_did, provided_did
154 ))
155 .into_response();
156 }
157 if is_did_web_byod {
158 info!(did = %provided_did, "Processing did:web BYOD account creation");
159 } else {
160 info!(
161 "[MIGRATION] createAccount: Service token verified, processing migration for did={}",
162 provided_did
163 );
164 }
165 }
166
167 let hostname_for_validation =
168 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
169 let pds_suffix = format!(".{}", hostname_for_validation);
170
171 let validated_short_handle = if !input.handle.contains('.')
172 || input.handle.ends_with(&pds_suffix)
173 {
174 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
175 input
176 .handle
177 .strip_suffix(&pds_suffix)
178 .unwrap_or(&input.handle)
179 } else {
180 &input.handle
181 };
182 match crate::api::validation::validate_short_handle(handle_to_validate) {
183 Ok(h) => h,
184 Err(e) => {
185 return ApiError::from(e).into_response();
186 }
187 }
188 } else {
189 if input.handle.contains(' ') || input.handle.contains('\t') {
190 return ApiError::InvalidRequest("Handle cannot contain spaces".into()).into_response();
191 }
192 for c in input.handle.chars() {
193 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
194 return ApiError::InvalidRequest(format!(
195 "Handle contains invalid character: {}",
196 c
197 ))
198 .into_response();
199 }
200 }
201 let handle_lower = input.handle.to_lowercase();
202 if crate::moderation::has_explicit_slur(&handle_lower) {
203 return ApiError::InvalidRequest("Inappropriate language in handle".into())
204 .into_response();
205 }
206 handle_lower
207 };
208 let email: Option<String> = input
209 .email
210 .as_ref()
211 .map(|e| e.trim().to_string())
212 .filter(|e| !e.is_empty());
213 if let Some(ref email) = email
214 && !crate::api::validation::is_valid_email(email)
215 {
216 return ApiError::InvalidEmail.into_response();
217 }
218 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
219 let valid_channels = ["email", "discord", "telegram", "signal"];
220 if !valid_channels.contains(&verification_channel) && !is_migration {
221 return ApiError::InvalidVerificationChannel.into_response();
222 }
223 let verification_recipient = if is_migration {
224 None
225 } else {
226 Some(match verification_channel {
227 "email" => match &input.email {
228 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
229 _ => return ApiError::MissingEmail.into_response(),
230 },
231 "discord" => match &input.discord_id {
232 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
233 _ => return ApiError::MissingDiscordId.into_response(),
234 },
235 "telegram" => match &input.telegram_username {
236 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
237 _ => return ApiError::MissingTelegramUsername.into_response(),
238 },
239 "signal" => match &input.signal_number {
240 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
241 _ => return ApiError::MissingSignalNumber.into_response(),
242 },
243 _ => return ApiError::InvalidVerificationChannel.into_response(),
244 })
245 };
246 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
247 let pds_endpoint = format!("https://{}", hostname);
248 let suffix = format!(".{}", hostname);
249 let handle = if input.handle.ends_with(&suffix) {
250 format!("{}.{}", validated_short_handle, hostname)
251 } else if input.handle.contains('.') {
252 validated_short_handle.clone()
253 } else {
254 format!("{}.{}", validated_short_handle, hostname)
255 };
256 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
257 if let Some(signing_key_did) = &input.signing_key {
258 let reserved = sqlx::query!(
259 r#"
260 SELECT id, private_key_bytes
261 FROM reserved_signing_keys
262 WHERE public_key_did_key = $1
263 AND used_at IS NULL
264 AND expires_at > NOW()
265 FOR UPDATE
266 "#,
267 signing_key_did
268 )
269 .fetch_optional(&state.db)
270 .await;
271 match reserved {
272 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
273 Ok(None) => {
274 return ApiError::InvalidSigningKey.into_response();
275 }
276 Err(e) => {
277 error!("Error looking up reserved signing key: {:?}", e);
278 return ApiError::InternalError(None).into_response();
279 }
280 }
281 } else {
282 let secret_key = SecretKey::random(&mut OsRng);
283 (secret_key.to_bytes().to_vec(), None)
284 };
285 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
286 Ok(k) => k,
287 Err(e) => {
288 error!("Error creating signing key: {:?}", e);
289 return ApiError::InternalError(None).into_response();
290 }
291 };
292 let did_type = input.did_type.as_deref().unwrap_or("plc");
293 let did = match did_type {
294 "web" => {
295 if !crate::api::server::meta::is_self_hosted_did_web_enabled() {
296 return ApiError::SelfHostedDidWebDisabled.into_response();
297 }
298 let subdomain_host = format!("{}.{}", input.handle, hostname);
299 let encoded_subdomain = subdomain_host.replace(':', "%3A");
300 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
301 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
302 self_hosted_did
303 }
304 "web-external" => {
305 let d = match &input.did {
306 Some(d) if !d.trim().is_empty() => d,
307 _ => {
308 return ApiError::InvalidRequest(
309 "External did:web requires the 'did' field to be provided".into(),
310 )
311 .into_response();
312 }
313 };
314 if !d.starts_with("did:web:") {
315 return ApiError::InvalidDid("External DID must be a did:web".into())
316 .into_response();
317 }
318 if !is_did_web_byod
319 && let Err(e) =
320 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
321 {
322 return ApiError::InvalidDid(e).into_response();
323 }
324 info!(did = %d, "Creating external did:web account");
325 d.clone()
326 }
327 _ => {
328 if let Some(d) = &input.did {
329 if d.starts_with("did:plc:") && is_migration {
330 info!(did = %d, "Migration with existing did:plc");
331 d.clone()
332 } else if d.starts_with("did:web:") {
333 if !is_did_web_byod
334 && let Err(e) = verify_did_web(
335 d,
336 &hostname,
337 &input.handle,
338 input.signing_key.as_deref(),
339 )
340 .await
341 {
342 return ApiError::InvalidDid(e).into_response();
343 }
344 d.clone()
345 } else if !d.trim().is_empty() {
346 return ApiError::InvalidDid(
347 "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth.".into()
348 )
349 .into_response();
350 } else {
351 let rotation_key = std::env::var("PLC_ROTATION_KEY")
352 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
353 let genesis_result = match create_genesis_operation(
354 &signing_key,
355 &rotation_key,
356 &handle,
357 &pds_endpoint,
358 ) {
359 Ok(r) => r,
360 Err(e) => {
361 error!("Error creating PLC genesis operation: {:?}", e);
362 return ApiError::InternalError(Some(
363 "Failed to create PLC operation".into(),
364 ))
365 .into_response();
366 }
367 };
368 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone()));
369 if let Err(e) = plc_client
370 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
371 .await
372 {
373 error!("Failed to submit PLC genesis operation: {:?}", e);
374 return ApiError::UpstreamErrorMsg(format!(
375 "Failed to register DID with PLC directory: {}",
376 e
377 ))
378 .into_response();
379 }
380 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
381 genesis_result.did
382 }
383 } else {
384 let rotation_key = std::env::var("PLC_ROTATION_KEY")
385 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
386 let genesis_result = match create_genesis_operation(
387 &signing_key,
388 &rotation_key,
389 &handle,
390 &pds_endpoint,
391 ) {
392 Ok(r) => r,
393 Err(e) => {
394 error!("Error creating PLC genesis operation: {:?}", e);
395 return ApiError::InternalError(Some(
396 "Failed to create PLC operation".into(),
397 ))
398 .into_response();
399 }
400 };
401 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone()));
402 if let Err(e) = plc_client
403 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
404 .await
405 {
406 error!("Failed to submit PLC genesis operation: {:?}", e);
407 return ApiError::UpstreamErrorMsg(format!(
408 "Failed to register DID with PLC directory: {}",
409 e
410 ))
411 .into_response();
412 }
413 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
414 genesis_result.did
415 }
416 }
417 };
418 let mut tx = match state.db.begin().await {
419 Ok(tx) => tx,
420 Err(e) => {
421 error!("Error starting transaction: {:?}", e);
422 return ApiError::InternalError(None).into_response();
423 }
424 };
425 if is_migration {
426 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
427 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
428 .bind(&did)
429 .fetch_optional(&mut *tx)
430 .await
431 .unwrap_or(None);
432 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
433 if deactivated_at.is_some() {
434 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
435 let update_result: Result<_, sqlx::Error> =
436 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
437 .bind(&handle)
438 .bind(account_id)
439 .execute(&mut *tx)
440 .await;
441 if let Err(e) = update_result {
442 if let Some(db_err) = e.as_database_error()
443 && db_err
444 .constraint()
445 .map(|c| c.contains("handle"))
446 .unwrap_or(false)
447 {
448 return ApiError::HandleTaken.into_response();
449 }
450 error!("Error reactivating account: {:?}", e);
451 return ApiError::InternalError(None).into_response();
452 }
453 if let Err(e) = tx.commit().await {
454 error!("Error committing reactivation: {:?}", e);
455 return ApiError::InternalError(None).into_response();
456 }
457 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
458 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
459 )
460 .bind(account_id)
461 .fetch_optional(&state.db)
462 .await
463 .unwrap_or(None);
464 let secret_key_bytes = match key_row {
465 Some((key_bytes, encryption_version)) => {
466 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
467 Ok(k) => k,
468 Err(e) => {
469 error!("Error decrypting key for reactivated account: {:?}", e);
470 return ApiError::InternalError(None).into_response();
471 }
472 }
473 }
474 None => {
475 error!("No signing key found for reactivated account");
476 return ApiError::InternalError(Some(
477 "Account signing key not found".into(),
478 ))
479 .into_response();
480 }
481 };
482 let access_meta =
483 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
484 Ok(m) => m,
485 Err(e) => {
486 error!("Error creating access token: {:?}", e);
487 return ApiError::InternalError(None).into_response();
488 }
489 };
490 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
491 &did,
492 &secret_key_bytes,
493 ) {
494 Ok(m) => m,
495 Err(e) => {
496 error!("Error creating refresh token: {:?}", e);
497 return ApiError::InternalError(None).into_response();
498 }
499 };
500 let session_result: Result<_, sqlx::Error> = sqlx::query(
501 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
502 )
503 .bind(&did)
504 .bind(&access_meta.jti)
505 .bind(&refresh_meta.jti)
506 .bind(access_meta.expires_at)
507 .bind(refresh_meta.expires_at)
508 .execute(&state.db)
509 .await;
510 if let Err(e) = session_result {
511 error!("Error creating session: {:?}", e);
512 return ApiError::InternalError(None).into_response();
513 }
514 return (
515 axum::http::StatusCode::OK,
516 Json(CreateAccountOutput {
517 handle: handle.clone().into(),
518 did: did.clone().into(),
519 did_doc: state.did_resolver.resolve_did_document(&did).await,
520 access_jwt: access_meta.token,
521 refresh_jwt: refresh_meta.token,
522 verification_required: false,
523 verification_channel: "email".to_string(),
524 }),
525 )
526 .into_response();
527 } else {
528 return ApiError::AccountAlreadyExists.into_response();
529 }
530 }
531 }
532 let exists_result: Option<(i32,)> =
533 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
534 .bind(&handle)
535 .fetch_optional(&mut *tx)
536 .await
537 .unwrap_or(None);
538 if exists_result.is_some() {
539 return ApiError::HandleTaken.into_response();
540 }
541 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
542 .map(|v| v == "true" || v == "1")
543 .unwrap_or(false);
544 if invite_code_required
545 && input
546 .invite_code
547 .as_ref()
548 .map(|c| c.trim().is_empty())
549 .unwrap_or(true)
550 {
551 return ApiError::InviteCodeRequired.into_response();
552 }
553 if let Some(code) = &input.invite_code
554 && !code.trim().is_empty()
555 {
556 let invite_query = sqlx::query!(
557 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
558 code
559 )
560 .fetch_optional(&mut *tx)
561 .await;
562 match invite_query {
563 Ok(Some(row)) => {
564 if row.available_uses <= 0 {
565 return ApiError::InvalidInviteCode.into_response();
566 }
567 let update_invite = sqlx::query!(
568 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
569 code
570 )
571 .execute(&mut *tx)
572 .await;
573 if let Err(e) = update_invite {
574 error!("Error updating invite code: {:?}", e);
575 return ApiError::InternalError(None).into_response();
576 }
577 }
578 Ok(None) => {
579 return ApiError::InvalidInviteCode.into_response();
580 }
581 Err(e) => {
582 error!("Error checking invite code: {:?}", e);
583 return ApiError::InternalError(None).into_response();
584 }
585 }
586 }
587 if let Err(e) = validate_password(&input.password) {
588 return ApiError::InvalidRequest(e.to_string()).into_response();
589 }
590
591 let password_clone = input.password.clone();
592 let password_hash =
593 match tokio::task::spawn_blocking(move || hash(&password_clone, DEFAULT_COST)).await {
594 Ok(Ok(h)) => h,
595 Ok(Err(e)) => {
596 error!("Error hashing password: {:?}", e);
597 return ApiError::InternalError(None).into_response();
598 }
599 Err(e) => {
600 error!("Failed to spawn blocking task: {:?}", e);
601 return ApiError::InternalError(None).into_response();
602 }
603 };
604 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
605 .fetch_one(&mut *tx)
606 .await
607 .map(|c| c.unwrap_or(0) == 0)
608 .unwrap_or(false);
609 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod {
610 Some(chrono::Utc::now())
611 } else {
612 None
613 };
614 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
615 r#"INSERT INTO users (
616 handle, email, did, password_hash,
617 preferred_comms_channel,
618 discord_id, telegram_username, signal_number,
619 is_admin, deactivated_at, email_verified
620 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
621 )
622 .bind(&handle)
623 .bind(&email)
624 .bind(&did)
625 .bind(&password_hash)
626 .bind(verification_channel)
627 .bind(
628 input
629 .discord_id
630 .as_deref()
631 .map(|s| s.trim())
632 .filter(|s| !s.is_empty()),
633 )
634 .bind(
635 input
636 .telegram_username
637 .as_deref()
638 .map(|s| s.trim())
639 .filter(|s| !s.is_empty()),
640 )
641 .bind(
642 input
643 .signal_number
644 .as_deref()
645 .map(|s| s.trim())
646 .filter(|s| !s.is_empty()),
647 )
648 .bind(is_first_user)
649 .bind(deactivated_at)
650 .bind(false)
651 .fetch_one(&mut *tx)
652 .await;
653 let user_id = match user_insert {
654 Ok((id,)) => id,
655 Err(e) => {
656 if let Some(db_err) = e.as_database_error()
657 && db_err.code().as_deref() == Some("23505")
658 {
659 let constraint = db_err.constraint().unwrap_or("");
660 if constraint.contains("handle") || constraint.contains("users_handle") {
661 return ApiError::HandleNotAvailable(None).into_response();
662 } else if constraint.contains("email") || constraint.contains("users_email") {
663 return ApiError::EmailTaken.into_response();
664 } else if constraint.contains("did") || constraint.contains("users_did") {
665 return ApiError::AccountAlreadyExists.into_response();
666 }
667 }
668 error!("Error inserting user: {:?}", e);
669 return ApiError::InternalError(None).into_response();
670 }
671 };
672
673 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
674 Ok(enc) => enc,
675 Err(e) => {
676 error!("Error encrypting user key: {:?}", e);
677 return ApiError::InternalError(None).into_response();
678 }
679 };
680 let key_insert = sqlx::query!(
681 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
682 user_id,
683 &encrypted_key_bytes[..],
684 crate::config::ENCRYPTION_VERSION
685 )
686 .execute(&mut *tx)
687 .await;
688 if let Err(e) = key_insert {
689 error!("Error inserting user key: {:?}", e);
690 return ApiError::InternalError(None).into_response();
691 }
692 if let Some(key_id) = reserved_key_id {
693 let mark_used = sqlx::query!(
694 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
695 key_id
696 )
697 .execute(&mut *tx)
698 .await;
699 if let Err(e) = mark_used {
700 error!("Error marking reserved key as used: {:?}", e);
701 return ApiError::InternalError(None).into_response();
702 }
703 }
704 let mst = Mst::new(Arc::new(state.block_store.clone()));
705 let mst_root = match mst.persist().await {
706 Ok(c) => c,
707 Err(e) => {
708 error!("Error persisting MST: {:?}", e);
709 return ApiError::InternalError(None).into_response();
710 }
711 };
712 let rev = Tid::now(LimitedU32::MIN);
713 let did_for_commit = Did::new_unchecked(&did);
714 let (commit_bytes, _sig) =
715 match create_signed_commit(&did_for_commit, mst_root, rev.as_ref(), None, &signing_key) {
716 Ok(result) => result,
717 Err(e) => {
718 error!("Error creating genesis commit: {:?}", e);
719 return ApiError::InternalError(None).into_response();
720 }
721 };
722 let commit_cid = match state.block_store.put(&commit_bytes).await {
723 Ok(c) => c,
724 Err(e) => {
725 error!("Error saving genesis commit: {:?}", e);
726 return ApiError::InternalError(None).into_response();
727 }
728 };
729 let commit_cid_str = commit_cid.to_string();
730 let rev_str = rev.as_ref().to_string();
731 let repo_insert = sqlx::query!(
732 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)",
733 user_id,
734 commit_cid_str,
735 rev_str
736 )
737 .execute(&mut *tx)
738 .await;
739 if let Err(e) = repo_insert {
740 error!("Error initializing repo: {:?}", e);
741 return ApiError::InternalError(None).into_response();
742 }
743 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()];
744 if let Err(e) = sqlx::query!(
745 r#"
746 INSERT INTO user_blocks (user_id, block_cid)
747 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
748 ON CONFLICT (user_id, block_cid) DO NOTHING
749 "#,
750 user_id,
751 &genesis_block_cids
752 )
753 .execute(&mut *tx)
754 .await
755 {
756 error!("Error inserting user_blocks: {:?}", e);
757 return ApiError::InternalError(None).into_response();
758 }
759 if let Some(code) = &input.invite_code
760 && !code.trim().is_empty()
761 {
762 let use_insert = sqlx::query!(
763 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
764 code,
765 user_id
766 )
767 .execute(&mut *tx)
768 .await;
769 if let Err(e) = use_insert {
770 error!("Error recording invite usage: {:?}", e);
771 return ApiError::InternalError(None).into_response();
772 }
773 }
774 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() {
775 let birthdate_pref = json!({
776 "$type": "app.bsky.actor.defs#personalDetailsPref",
777 "birthDate": "1998-05-06T00:00:00.000Z"
778 });
779 if let Err(e) = sqlx::query!(
780 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3)
781 ON CONFLICT (user_id, name) DO NOTHING",
782 user_id,
783 "app.bsky.actor.defs#personalDetailsPref",
784 birthdate_pref
785 )
786 .execute(&mut *tx)
787 .await
788 {
789 warn!("Failed to set default birthdate preference: {:?}", e);
790 }
791 }
792 if let Err(e) = tx.commit().await {
793 error!("Error committing transaction: {:?}", e);
794 return ApiError::InternalError(None).into_response();
795 }
796 if !is_migration && !is_did_web_byod {
797 let did_typed = Did::new_unchecked(&did);
798 let handle_typed = Handle::new_unchecked(&handle);
799 if let Err(e) =
800 crate::api::repo::record::sequence_identity_event(&state, &did_typed, Some(&handle_typed)).await
801 {
802 warn!("Failed to sequence identity event for {}: {}", did, e);
803 }
804 if let Err(e) =
805 crate::api::repo::record::sequence_account_event(&state, &did_typed, true, None).await
806 {
807 warn!("Failed to sequence account event for {}: {}", did, e);
808 }
809 if let Err(e) = crate::api::repo::record::sequence_genesis_commit(
810 &state,
811 &did_typed,
812 &commit_cid,
813 &mst_root,
814 &rev_str,
815 )
816 .await
817 {
818 warn!("Failed to sequence commit event for {}: {}", did, e);
819 }
820 if let Err(e) = crate::api::repo::record::sequence_sync_event(
821 &state,
822 &did_typed,
823 &commit_cid_str,
824 Some(rev.as_ref()),
825 )
826 .await
827 {
828 warn!("Failed to sequence sync event for {}: {}", did, e);
829 }
830 let profile_record = json!({
831 "$type": "app.bsky.actor.profile",
832 "displayName": input.handle
833 });
834 let profile_collection = Nsid::new_unchecked("app.bsky.actor.profile");
835 let profile_rkey = Rkey::new_unchecked("self");
836 if let Err(e) = crate::api::repo::record::create_record_internal(
837 &state,
838 &did_typed,
839 &profile_collection,
840 &profile_rkey,
841 &profile_record,
842 )
843 .await
844 {
845 warn!("Failed to create default profile for {}: {}", did, e);
846 }
847 }
848 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
849 if !is_migration {
850 if let Some(ref recipient) = verification_recipient {
851 let verification_token = crate::auth::verification_token::generate_signup_token(
852 &did,
853 verification_channel,
854 recipient,
855 );
856 let formatted_token =
857 crate::auth::verification_token::format_token_for_display(&verification_token);
858 if let Err(e) = crate::comms::enqueue_signup_verification(
859 &state.db,
860 user_id,
861 verification_channel,
862 recipient,
863 &formatted_token,
864 None,
865 )
866 .await
867 {
868 warn!(
869 "Failed to enqueue signup verification notification: {:?}",
870 e
871 );
872 }
873 }
874 } else if let Some(ref user_email) = email {
875 let token = crate::auth::verification_token::generate_migration_token(&did, user_email);
876 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
877 if let Err(e) = crate::comms::enqueue_migration_verification(
878 &state.db,
879 user_id,
880 user_email,
881 &formatted_token,
882 &hostname,
883 )
884 .await
885 {
886 warn!("Failed to enqueue migration verification email: {:?}", e);
887 }
888 }
889
890 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes)
891 {
892 Ok(m) => m,
893 Err(e) => {
894 error!("createAccount: Error creating access token: {:?}", e);
895 return ApiError::InternalError(None).into_response();
896 }
897 };
898 let refresh_meta =
899 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
900 Ok(m) => m,
901 Err(e) => {
902 error!("createAccount: Error creating refresh token: {:?}", e);
903 return ApiError::InternalError(None).into_response();
904 }
905 };
906 if let Err(e) = sqlx::query!(
907 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
908 did,
909 access_meta.jti,
910 refresh_meta.jti,
911 access_meta.expires_at,
912 refresh_meta.expires_at
913 )
914 .execute(&state.db)
915 .await
916 {
917 error!("createAccount: Error creating session: {:?}", e);
918 return ApiError::InternalError(None).into_response();
919 }
920
921 let did_doc = state.did_resolver.resolve_did_document(&did).await;
922
923 if is_migration {
924 info!(
925 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}",
926 did, handle
927 );
928 }
929
930 (
931 StatusCode::OK,
932 Json(CreateAccountOutput {
933 handle: handle.clone().into(),
934 did: did.into(),
935 did_doc,
936 access_jwt: access_meta.token,
937 refresh_jwt: refresh_meta.token,
938 verification_required: !is_migration,
939 verification_channel: verification_channel.to_string(),
940 }),
941 )
942 .into_response()
943}