this repo has no description
1use super::did::verify_did_web;
2use crate::api::error::ApiError;
3use crate::api::repo::record::utils::create_signed_commit;
4use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
5use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
6use crate::state::{AppState, RateLimitKind};
7use crate::types::{Did, Handle, PlainPassword};
8use crate::validation::validate_password;
9use axum::{
10 Json,
11 extract::State,
12 http::{HeaderMap, StatusCode},
13 response::{IntoResponse, Response},
14};
15use serde_json::json;
16use bcrypt::{DEFAULT_COST, hash};
17use jacquard::types::{integer::LimitedU32, string::Tid};
18use jacquard_repo::{mst::Mst, storage::BlockStore};
19use k256::{SecretKey, ecdsa::SigningKey};
20use rand::rngs::OsRng;
21use serde::{Deserialize, Serialize};
22use std::sync::Arc;
23use tracing::{debug, error, info, warn};
24
25fn extract_client_ip(headers: &HeaderMap) -> String {
26 if let Some(forwarded) = headers.get("x-forwarded-for")
27 && let Ok(value) = forwarded.to_str()
28 && let Some(first_ip) = value.split(',').next()
29 {
30 return first_ip.trim().to_string();
31 }
32 if let Some(real_ip) = headers.get("x-real-ip")
33 && let Ok(value) = real_ip.to_str()
34 {
35 return value.trim().to_string();
36 }
37 "unknown".to_string()
38}
39
40#[derive(Deserialize)]
41#[serde(rename_all = "camelCase")]
42pub struct CreateAccountInput {
43 pub handle: String,
44 pub email: Option<String>,
45 pub password: PlainPassword,
46 pub invite_code: Option<String>,
47 pub did: Option<String>,
48 pub did_type: Option<String>,
49 pub signing_key: Option<String>,
50 pub verification_channel: Option<String>,
51 pub discord_id: Option<String>,
52 pub telegram_username: Option<String>,
53 pub signal_number: Option<String>,
54}
55
56#[derive(Serialize)]
57#[serde(rename_all = "camelCase")]
58pub struct CreateAccountOutput {
59 pub handle: Handle,
60 pub did: Did,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub did_doc: Option<serde_json::Value>,
63 pub access_jwt: String,
64 pub refresh_jwt: String,
65 pub verification_required: bool,
66 pub verification_channel: String,
67}
68
69pub async fn create_account(
70 State(state): State<AppState>,
71 headers: HeaderMap,
72 Json(input): Json<CreateAccountInput>,
73) -> Response {
74 let is_potential_migration = input
75 .did
76 .as_ref()
77 .map(|d| d.starts_with("did:plc:"))
78 .unwrap_or(false);
79 if is_potential_migration {
80 info!(
81 "[MIGRATION] createAccount called for potential migration did={:?} handle={}",
82 input.did, input.handle
83 );
84 } else {
85 info!("create_account called");
86 }
87 let client_ip = extract_client_ip(&headers);
88 if !state
89 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
90 .await
91 {
92 warn!(ip = %client_ip, "Account creation rate limit exceeded");
93 return ApiError::RateLimitExceeded(Some("Too many account creation attempts. Please try again later.".into(),))
94 .into_response();
95 }
96
97 let migration_auth = if let Some(token) =
98 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
99 {
100 if is_service_token(&token) {
101 let verifier = ServiceTokenVerifier::new();
102 match verifier
103 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
104 .await
105 {
106 Ok(claims) => {
107 debug!("Service token verified for migration: iss={}", claims.iss);
108 Some(claims.iss)
109 }
110 Err(e) => {
111 error!("Service token verification failed: {:?}", e);
112 return ApiError::AuthenticationFailed(Some(format!(
113 "Service token verification failed: {}",
114 e
115 )))
116 .into_response();
117 }
118 }
119 } else {
120 None
121 }
122 } else {
123 None
124 };
125
126 let is_did_web_byod = migration_auth.is_some()
127 && input
128 .did
129 .as_ref()
130 .map(|d| d.starts_with("did:web:"))
131 .unwrap_or(false);
132
133 let is_migration = migration_auth.is_some()
134 && input
135 .did
136 .as_ref()
137 .map(|d| d.starts_with("did:plc:"))
138 .unwrap_or(false);
139
140 if (is_migration || is_did_web_byod)
141 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref())
142 {
143 if provided_did != auth_did {
144 info!(
145 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}",
146 auth_did, provided_did
147 );
148 return ApiError::AuthorizationError(format!(
149 "Service token issuer {} does not match DID {}",
150 auth_did, provided_did
151 ))
152 .into_response();
153 }
154 if is_did_web_byod {
155 info!(did = %provided_did, "Processing did:web BYOD account creation");
156 } else {
157 info!(
158 "[MIGRATION] createAccount: Service token verified, processing migration for did={}",
159 provided_did
160 );
161 }
162 }
163
164 let hostname_for_validation =
165 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
166 let pds_suffix = format!(".{}", hostname_for_validation);
167
168 let validated_short_handle = if !input.handle.contains('.')
169 || input.handle.ends_with(&pds_suffix)
170 {
171 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
172 input
173 .handle
174 .strip_suffix(&pds_suffix)
175 .unwrap_or(&input.handle)
176 } else {
177 &input.handle
178 };
179 match crate::api::validation::validate_short_handle(handle_to_validate) {
180 Ok(h) => h,
181 Err(e) => {
182 return ApiError::from(e).into_response();
183 }
184 }
185 } else {
186 if input.handle.contains(' ') || input.handle.contains('\t') {
187 return ApiError::InvalidRequest("Handle cannot contain spaces".into()).into_response();
188 }
189 for c in input.handle.chars() {
190 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
191 return ApiError::InvalidRequest(format!(
192 "Handle contains invalid character: {}",
193 c
194 ))
195 .into_response();
196 }
197 }
198 let handle_lower = input.handle.to_lowercase();
199 if crate::moderation::has_explicit_slur(&handle_lower) {
200 return ApiError::InvalidRequest("Inappropriate language in handle".into())
201 .into_response();
202 }
203 handle_lower
204 };
205 let email: Option<String> = input
206 .email
207 .as_ref()
208 .map(|e| e.trim().to_string())
209 .filter(|e| !e.is_empty());
210 if let Some(ref email) = email
211 && !crate::api::validation::is_valid_email(email)
212 {
213 return ApiError::InvalidEmail.into_response();
214 }
215 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
216 let valid_channels = ["email", "discord", "telegram", "signal"];
217 if !valid_channels.contains(&verification_channel) && !is_migration {
218 return ApiError::InvalidVerificationChannel.into_response();
219 }
220 let verification_recipient = if is_migration {
221 None
222 } else {
223 Some(match verification_channel {
224 "email" => match &input.email {
225 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
226 _ => return ApiError::MissingEmail.into_response(),
227 },
228 "discord" => match &input.discord_id {
229 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
230 _ => return ApiError::MissingDiscordId.into_response(),
231 },
232 "telegram" => match &input.telegram_username {
233 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
234 _ => return ApiError::MissingTelegramUsername.into_response(),
235 },
236 "signal" => match &input.signal_number {
237 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
238 _ => return ApiError::MissingSignalNumber.into_response(),
239 },
240 _ => return ApiError::InvalidVerificationChannel.into_response(),
241 })
242 };
243 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
244 let pds_endpoint = format!("https://{}", hostname);
245 let suffix = format!(".{}", hostname);
246 let handle = if input.handle.ends_with(&suffix) {
247 format!("{}.{}", validated_short_handle, hostname)
248 } else if input.handle.contains('.') {
249 validated_short_handle.clone()
250 } else {
251 format!("{}.{}", validated_short_handle, hostname)
252 };
253 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
254 if let Some(signing_key_did) = &input.signing_key {
255 let reserved = sqlx::query!(
256 r#"
257 SELECT id, private_key_bytes
258 FROM reserved_signing_keys
259 WHERE public_key_did_key = $1
260 AND used_at IS NULL
261 AND expires_at > NOW()
262 FOR UPDATE
263 "#,
264 signing_key_did
265 )
266 .fetch_optional(&state.db)
267 .await;
268 match reserved {
269 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
270 Ok(None) => {
271 return ApiError::InvalidSigningKey.into_response();
272 }
273 Err(e) => {
274 error!("Error looking up reserved signing key: {:?}", e);
275 return ApiError::InternalError(None).into_response();
276 }
277 }
278 } else {
279 let secret_key = SecretKey::random(&mut OsRng);
280 (secret_key.to_bytes().to_vec(), None)
281 };
282 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
283 Ok(k) => k,
284 Err(e) => {
285 error!("Error creating signing key: {:?}", e);
286 return ApiError::InternalError(None).into_response();
287 }
288 };
289 let did_type = input.did_type.as_deref().unwrap_or("plc");
290 let did = match did_type {
291 "web" => {
292 if !crate::api::server::meta::is_self_hosted_did_web_enabled() {
293 return ApiError::SelfHostedDidWebDisabled.into_response();
294 }
295 let subdomain_host = format!("{}.{}", input.handle, hostname);
296 let encoded_subdomain = subdomain_host.replace(':', "%3A");
297 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
298 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
299 self_hosted_did
300 }
301 "web-external" => {
302 let d = match &input.did {
303 Some(d) if !d.trim().is_empty() => d,
304 _ => {
305 return ApiError::InvalidRequest(
306 "External did:web requires the 'did' field to be provided".into(),
307 )
308 .into_response();
309 }
310 };
311 if !d.starts_with("did:web:") {
312 return ApiError::InvalidDid("External DID must be a did:web".into())
313 .into_response();
314 }
315 if !is_did_web_byod
316 && let Err(e) =
317 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
318 {
319 return ApiError::InvalidDid(e).into_response();
320 }
321 info!(did = %d, "Creating external did:web account");
322 d.clone()
323 }
324 _ => {
325 if let Some(d) = &input.did {
326 if d.starts_with("did:plc:") && is_migration {
327 info!(did = %d, "Migration with existing did:plc");
328 d.clone()
329 } else if d.starts_with("did:web:") {
330 if !is_did_web_byod
331 && let Err(e) = verify_did_web(
332 d,
333 &hostname,
334 &input.handle,
335 input.signing_key.as_deref(),
336 )
337 .await
338 {
339 return ApiError::InvalidDid(e).into_response();
340 }
341 d.clone()
342 } else if !d.trim().is_empty() {
343 return ApiError::InvalidDid(
344 "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth.".into()
345 )
346 .into_response();
347 } else {
348 let rotation_key = std::env::var("PLC_ROTATION_KEY")
349 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
350 let genesis_result = match create_genesis_operation(
351 &signing_key,
352 &rotation_key,
353 &handle,
354 &pds_endpoint,
355 ) {
356 Ok(r) => r,
357 Err(e) => {
358 error!("Error creating PLC genesis operation: {:?}", e);
359 return ApiError::InternalError(Some(
360 "Failed to create PLC operation".into(),
361 ))
362 .into_response();
363 }
364 };
365 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone()));
366 if let Err(e) = plc_client
367 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
368 .await
369 {
370 error!("Failed to submit PLC genesis operation: {:?}", e);
371 return ApiError::UpstreamErrorMsg(format!(
372 "Failed to register DID with PLC directory: {}",
373 e
374 ))
375 .into_response();
376 }
377 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
378 genesis_result.did
379 }
380 } else {
381 let rotation_key = std::env::var("PLC_ROTATION_KEY")
382 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
383 let genesis_result = match create_genesis_operation(
384 &signing_key,
385 &rotation_key,
386 &handle,
387 &pds_endpoint,
388 ) {
389 Ok(r) => r,
390 Err(e) => {
391 error!("Error creating PLC genesis operation: {:?}", e);
392 return ApiError::InternalError(Some(
393 "Failed to create PLC operation".into(),
394 ))
395 .into_response();
396 }
397 };
398 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone()));
399 if let Err(e) = plc_client
400 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
401 .await
402 {
403 error!("Failed to submit PLC genesis operation: {:?}", e);
404 return ApiError::UpstreamErrorMsg(format!(
405 "Failed to register DID with PLC directory: {}",
406 e
407 ))
408 .into_response();
409 }
410 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
411 genesis_result.did
412 }
413 }
414 };
415 let mut tx = match state.db.begin().await {
416 Ok(tx) => tx,
417 Err(e) => {
418 error!("Error starting transaction: {:?}", e);
419 return ApiError::InternalError(None).into_response();
420 }
421 };
422 if is_migration {
423 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
424 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
425 .bind(&did)
426 .fetch_optional(&mut *tx)
427 .await
428 .unwrap_or(None);
429 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
430 if deactivated_at.is_some() {
431 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
432 let update_result: Result<_, sqlx::Error> =
433 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
434 .bind(&handle)
435 .bind(account_id)
436 .execute(&mut *tx)
437 .await;
438 if let Err(e) = update_result {
439 if let Some(db_err) = e.as_database_error()
440 && db_err
441 .constraint()
442 .map(|c| c.contains("handle"))
443 .unwrap_or(false)
444 {
445 return ApiError::HandleTaken.into_response();
446 }
447 error!("Error reactivating account: {:?}", e);
448 return ApiError::InternalError(None).into_response();
449 }
450 if let Err(e) = tx.commit().await {
451 error!("Error committing reactivation: {:?}", e);
452 return ApiError::InternalError(None).into_response();
453 }
454 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
455 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
456 )
457 .bind(account_id)
458 .fetch_optional(&state.db)
459 .await
460 .unwrap_or(None);
461 let secret_key_bytes = match key_row {
462 Some((key_bytes, encryption_version)) => {
463 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
464 Ok(k) => k,
465 Err(e) => {
466 error!("Error decrypting key for reactivated account: {:?}", e);
467 return ApiError::InternalError(None).into_response();
468 }
469 }
470 }
471 None => {
472 error!("No signing key found for reactivated account");
473 return ApiError::InternalError(Some(
474 "Account signing key not found".into(),
475 ))
476 .into_response();
477 }
478 };
479 let access_meta =
480 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
481 Ok(m) => m,
482 Err(e) => {
483 error!("Error creating access token: {:?}", e);
484 return ApiError::InternalError(None).into_response();
485 }
486 };
487 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
488 &did,
489 &secret_key_bytes,
490 ) {
491 Ok(m) => m,
492 Err(e) => {
493 error!("Error creating refresh token: {:?}", e);
494 return ApiError::InternalError(None).into_response();
495 }
496 };
497 let session_result: Result<_, sqlx::Error> = sqlx::query(
498 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
499 )
500 .bind(&did)
501 .bind(&access_meta.jti)
502 .bind(&refresh_meta.jti)
503 .bind(access_meta.expires_at)
504 .bind(refresh_meta.expires_at)
505 .execute(&state.db)
506 .await;
507 if let Err(e) = session_result {
508 error!("Error creating session: {:?}", e);
509 return ApiError::InternalError(None).into_response();
510 }
511 return (
512 axum::http::StatusCode::OK,
513 Json(CreateAccountOutput {
514 handle: handle.clone().into(),
515 did: did.clone().into(),
516 did_doc: state.did_resolver.resolve_did_document(&did).await,
517 access_jwt: access_meta.token,
518 refresh_jwt: refresh_meta.token,
519 verification_required: false,
520 verification_channel: "email".to_string(),
521 }),
522 )
523 .into_response();
524 } else {
525 return ApiError::AccountAlreadyExists.into_response();
526 }
527 }
528 }
529 let exists_result: Option<(i32,)> =
530 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
531 .bind(&handle)
532 .fetch_optional(&mut *tx)
533 .await
534 .unwrap_or(None);
535 if exists_result.is_some() {
536 return ApiError::HandleTaken.into_response();
537 }
538 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
539 .map(|v| v == "true" || v == "1")
540 .unwrap_or(false);
541 if invite_code_required
542 && input
543 .invite_code
544 .as_ref()
545 .map(|c| c.trim().is_empty())
546 .unwrap_or(true)
547 {
548 return ApiError::InviteCodeRequired.into_response();
549 }
550 if let Some(code) = &input.invite_code
551 && !code.trim().is_empty()
552 {
553 let invite_query = sqlx::query!(
554 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
555 code
556 )
557 .fetch_optional(&mut *tx)
558 .await;
559 match invite_query {
560 Ok(Some(row)) => {
561 if row.available_uses <= 0 {
562 return ApiError::InvalidInviteCode.into_response();
563 }
564 let update_invite = sqlx::query!(
565 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
566 code
567 )
568 .execute(&mut *tx)
569 .await;
570 if let Err(e) = update_invite {
571 error!("Error updating invite code: {:?}", e);
572 return ApiError::InternalError(None).into_response();
573 }
574 }
575 Ok(None) => {
576 return ApiError::InvalidInviteCode.into_response();
577 }
578 Err(e) => {
579 error!("Error checking invite code: {:?}", e);
580 return ApiError::InternalError(None).into_response();
581 }
582 }
583 }
584 if let Err(e) = validate_password(&input.password) {
585 return ApiError::InvalidRequest(e.to_string()).into_response();
586 }
587
588 let password_clone = input.password.clone();
589 let password_hash =
590 match tokio::task::spawn_blocking(move || hash(&password_clone, DEFAULT_COST)).await {
591 Ok(Ok(h)) => h,
592 Ok(Err(e)) => {
593 error!("Error hashing password: {:?}", e);
594 return ApiError::InternalError(None).into_response();
595 }
596 Err(e) => {
597 error!("Failed to spawn blocking task: {:?}", e);
598 return ApiError::InternalError(None).into_response();
599 }
600 };
601 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
602 .fetch_one(&mut *tx)
603 .await
604 .map(|c| c.unwrap_or(0) == 0)
605 .unwrap_or(false);
606 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod {
607 Some(chrono::Utc::now())
608 } else {
609 None
610 };
611 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
612 r#"INSERT INTO users (
613 handle, email, did, password_hash,
614 preferred_comms_channel,
615 discord_id, telegram_username, signal_number,
616 is_admin, deactivated_at, email_verified
617 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
618 )
619 .bind(&handle)
620 .bind(&email)
621 .bind(&did)
622 .bind(&password_hash)
623 .bind(verification_channel)
624 .bind(
625 input
626 .discord_id
627 .as_deref()
628 .map(|s| s.trim())
629 .filter(|s| !s.is_empty()),
630 )
631 .bind(
632 input
633 .telegram_username
634 .as_deref()
635 .map(|s| s.trim())
636 .filter(|s| !s.is_empty()),
637 )
638 .bind(
639 input
640 .signal_number
641 .as_deref()
642 .map(|s| s.trim())
643 .filter(|s| !s.is_empty()),
644 )
645 .bind(is_first_user)
646 .bind(deactivated_at)
647 .bind(false)
648 .fetch_one(&mut *tx)
649 .await;
650 let user_id = match user_insert {
651 Ok((id,)) => id,
652 Err(e) => {
653 if let Some(db_err) = e.as_database_error()
654 && db_err.code().as_deref() == Some("23505")
655 {
656 let constraint = db_err.constraint().unwrap_or("");
657 if constraint.contains("handle") || constraint.contains("users_handle") {
658 return ApiError::HandleNotAvailable(None).into_response();
659 } else if constraint.contains("email") || constraint.contains("users_email") {
660 return ApiError::EmailTaken.into_response();
661 } else if constraint.contains("did") || constraint.contains("users_did") {
662 return ApiError::AccountAlreadyExists.into_response();
663 }
664 }
665 error!("Error inserting user: {:?}", e);
666 return ApiError::InternalError(None).into_response();
667 }
668 };
669
670 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
671 Ok(enc) => enc,
672 Err(e) => {
673 error!("Error encrypting user key: {:?}", e);
674 return ApiError::InternalError(None).into_response();
675 }
676 };
677 let key_insert = sqlx::query!(
678 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
679 user_id,
680 &encrypted_key_bytes[..],
681 crate::config::ENCRYPTION_VERSION
682 )
683 .execute(&mut *tx)
684 .await;
685 if let Err(e) = key_insert {
686 error!("Error inserting user key: {:?}", e);
687 return ApiError::InternalError(None).into_response();
688 }
689 if let Some(key_id) = reserved_key_id {
690 let mark_used = sqlx::query!(
691 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
692 key_id
693 )
694 .execute(&mut *tx)
695 .await;
696 if let Err(e) = mark_used {
697 error!("Error marking reserved key as used: {:?}", e);
698 return ApiError::InternalError(None).into_response();
699 }
700 }
701 let mst = Mst::new(Arc::new(state.block_store.clone()));
702 let mst_root = match mst.persist().await {
703 Ok(c) => c,
704 Err(e) => {
705 error!("Error persisting MST: {:?}", e);
706 return ApiError::InternalError(None).into_response();
707 }
708 };
709 let rev = Tid::now(LimitedU32::MIN);
710 let (commit_bytes, _sig) =
711 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) {
712 Ok(result) => result,
713 Err(e) => {
714 error!("Error creating genesis commit: {:?}", e);
715 return ApiError::InternalError(None).into_response();
716 }
717 };
718 let commit_cid = match state.block_store.put(&commit_bytes).await {
719 Ok(c) => c,
720 Err(e) => {
721 error!("Error saving genesis commit: {:?}", e);
722 return ApiError::InternalError(None).into_response();
723 }
724 };
725 let commit_cid_str = commit_cid.to_string();
726 let rev_str = rev.as_ref().to_string();
727 let repo_insert = sqlx::query!(
728 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)",
729 user_id,
730 commit_cid_str,
731 rev_str
732 )
733 .execute(&mut *tx)
734 .await;
735 if let Err(e) = repo_insert {
736 error!("Error initializing repo: {:?}", e);
737 return ApiError::InternalError(None).into_response();
738 }
739 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()];
740 if let Err(e) = sqlx::query!(
741 r#"
742 INSERT INTO user_blocks (user_id, block_cid)
743 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
744 ON CONFLICT (user_id, block_cid) DO NOTHING
745 "#,
746 user_id,
747 &genesis_block_cids
748 )
749 .execute(&mut *tx)
750 .await
751 {
752 error!("Error inserting user_blocks: {:?}", e);
753 return ApiError::InternalError(None).into_response();
754 }
755 if let Some(code) = &input.invite_code
756 && !code.trim().is_empty()
757 {
758 let use_insert = sqlx::query!(
759 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
760 code,
761 user_id
762 )
763 .execute(&mut *tx)
764 .await;
765 if let Err(e) = use_insert {
766 error!("Error recording invite usage: {:?}", e);
767 return ApiError::InternalError(None).into_response();
768 }
769 }
770 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() {
771 let birthdate_pref = json!({
772 "$type": "app.bsky.actor.defs#personalDetailsPref",
773 "birthDate": "1998-05-06T00:00:00.000Z"
774 });
775 if let Err(e) = sqlx::query!(
776 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3)
777 ON CONFLICT (user_id, name) DO NOTHING",
778 user_id,
779 "app.bsky.actor.defs#personalDetailsPref",
780 birthdate_pref
781 )
782 .execute(&mut *tx)
783 .await
784 {
785 warn!("Failed to set default birthdate preference: {:?}", e);
786 }
787 }
788 if let Err(e) = tx.commit().await {
789 error!("Error committing transaction: {:?}", e);
790 return ApiError::InternalError(None).into_response();
791 }
792 if !is_migration && !is_did_web_byod {
793 if let Err(e) =
794 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
795 {
796 warn!("Failed to sequence identity event for {}: {}", did, e);
797 }
798 if let Err(e) =
799 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
800 {
801 warn!("Failed to sequence account event for {}: {}", did, e);
802 }
803 if let Err(e) = crate::api::repo::record::sequence_genesis_commit(
804 &state,
805 &did,
806 &commit_cid,
807 &mst_root,
808 &rev_str,
809 )
810 .await
811 {
812 warn!("Failed to sequence commit event for {}: {}", did, e);
813 }
814 if let Err(e) = crate::api::repo::record::sequence_sync_event(
815 &state,
816 &did,
817 &commit_cid_str,
818 Some(rev.as_ref()),
819 )
820 .await
821 {
822 warn!("Failed to sequence sync event for {}: {}", did, e);
823 }
824 let profile_record = json!({
825 "$type": "app.bsky.actor.profile",
826 "displayName": input.handle
827 });
828 if let Err(e) = crate::api::repo::record::create_record_internal(
829 &state,
830 &did,
831 "app.bsky.actor.profile",
832 "self",
833 &profile_record,
834 )
835 .await
836 {
837 warn!("Failed to create default profile for {}: {}", did, e);
838 }
839 }
840 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
841 if !is_migration {
842 if let Some(ref recipient) = verification_recipient {
843 let verification_token = crate::auth::verification_token::generate_signup_token(
844 &did,
845 verification_channel,
846 recipient,
847 );
848 let formatted_token =
849 crate::auth::verification_token::format_token_for_display(&verification_token);
850 if let Err(e) = crate::comms::enqueue_signup_verification(
851 &state.db,
852 user_id,
853 verification_channel,
854 recipient,
855 &formatted_token,
856 None,
857 )
858 .await
859 {
860 warn!(
861 "Failed to enqueue signup verification notification: {:?}",
862 e
863 );
864 }
865 }
866 } else if let Some(ref user_email) = email {
867 let token = crate::auth::verification_token::generate_migration_token(&did, user_email);
868 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
869 if let Err(e) = crate::comms::enqueue_migration_verification(
870 &state.db,
871 user_id,
872 user_email,
873 &formatted_token,
874 &hostname,
875 )
876 .await
877 {
878 warn!("Failed to enqueue migration verification email: {:?}", e);
879 }
880 }
881
882 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes)
883 {
884 Ok(m) => m,
885 Err(e) => {
886 error!("createAccount: Error creating access token: {:?}", e);
887 return ApiError::InternalError(None).into_response();
888 }
889 };
890 let refresh_meta =
891 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
892 Ok(m) => m,
893 Err(e) => {
894 error!("createAccount: Error creating refresh token: {:?}", e);
895 return ApiError::InternalError(None).into_response();
896 }
897 };
898 if let Err(e) = sqlx::query!(
899 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
900 did,
901 access_meta.jti,
902 refresh_meta.jti,
903 access_meta.expires_at,
904 refresh_meta.expires_at
905 )
906 .execute(&state.db)
907 .await
908 {
909 error!("createAccount: Error creating session: {:?}", e);
910 return ApiError::InternalError(None).into_response();
911 }
912
913 let did_doc = state.did_resolver.resolve_did_document(&did).await;
914
915 if is_migration {
916 info!(
917 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}",
918 did, handle
919 );
920 }
921
922 (
923 StatusCode::OK,
924 Json(CreateAccountOutput {
925 handle: handle.clone().into(),
926 did: did.into(),
927 did_doc,
928 access_jwt: access_meta.token,
929 refresh_jwt: refresh_meta.token,
930 verification_required: !is_migration,
931 verification_channel: verification_channel.to_string(),
932 }),
933 )
934 .into_response()
935}