this repo has no description
1use super::did::verify_did_web;
2use crate::api::error::ApiError;
3use crate::api::repo::record::utils::create_signed_commit;
4use crate::auth::{ServiceTokenVerifier, is_service_token};
5use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
6use crate::state::{AppState, RateLimitKind};
7use crate::types::{Did, Handle, Nsid, PlainPassword, Rkey};
8use crate::validation::validate_password;
9use axum::{
10 Json,
11 extract::State,
12 http::{HeaderMap, StatusCode},
13 response::{IntoResponse, Response},
14};
15use bcrypt::{DEFAULT_COST, hash};
16use jacquard::types::{integer::LimitedU32, string::Tid};
17use jacquard_repo::{mst::Mst, storage::BlockStore};
18use k256::{SecretKey, ecdsa::SigningKey};
19use rand::rngs::OsRng;
20use serde::{Deserialize, Serialize};
21use serde_json::json;
22use std::sync::Arc;
23use tracing::{debug, error, info, warn};
24
25fn extract_client_ip(headers: &HeaderMap) -> String {
26 if let Some(forwarded) = headers.get("x-forwarded-for")
27 && let Ok(value) = forwarded.to_str()
28 && let Some(first_ip) = value.split(',').next()
29 {
30 return first_ip.trim().to_string();
31 }
32 if let Some(real_ip) = headers.get("x-real-ip")
33 && let Ok(value) = real_ip.to_str()
34 {
35 return value.trim().to_string();
36 }
37 "unknown".to_string()
38}
39
40#[derive(Deserialize)]
41#[serde(rename_all = "camelCase")]
42pub struct CreateAccountInput {
43 pub handle: String,
44 pub email: Option<String>,
45 pub password: PlainPassword,
46 pub invite_code: Option<String>,
47 pub did: Option<String>,
48 pub did_type: Option<String>,
49 pub signing_key: Option<String>,
50 pub verification_channel: Option<String>,
51 pub discord_id: Option<String>,
52 pub telegram_username: Option<String>,
53 pub signal_number: Option<String>,
54}
55
56#[derive(Serialize)]
57#[serde(rename_all = "camelCase")]
58pub struct CreateAccountOutput {
59 pub handle: Handle,
60 pub did: Did,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub did_doc: Option<serde_json::Value>,
63 pub access_jwt: String,
64 pub refresh_jwt: String,
65 pub verification_required: bool,
66 pub verification_channel: String,
67}
68
69pub async fn create_account(
70 State(state): State<AppState>,
71 headers: HeaderMap,
72 Json(input): Json<CreateAccountInput>,
73) -> Response {
74 let is_potential_migration = input
75 .did
76 .as_ref()
77 .map(|d| d.starts_with("did:plc:"))
78 .unwrap_or(false);
79 if is_potential_migration {
80 info!(
81 "[MIGRATION] createAccount called for potential migration did={:?} handle={}",
82 input.did, input.handle
83 );
84 } else {
85 info!("create_account called");
86 }
87 let client_ip = extract_client_ip(&headers);
88 if !state
89 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
90 .await
91 {
92 warn!(ip = %client_ip, "Account creation rate limit exceeded");
93 return ApiError::RateLimitExceeded(Some(
94 "Too many account creation attempts. Please try again later.".into(),
95 ))
96 .into_response();
97 }
98
99 let migration_auth = if let Some(extracted) = crate::auth::extract_auth_token_from_header(
100 headers.get("Authorization").and_then(|h| h.to_str().ok()),
101 ) {
102 let token = extracted.token;
103 if is_service_token(&token) {
104 let verifier = ServiceTokenVerifier::new();
105 match verifier
106 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
107 .await
108 {
109 Ok(claims) => {
110 debug!("Service token verified for migration: iss={}", claims.iss);
111 Some(claims.iss)
112 }
113 Err(e) => {
114 error!("Service token verification failed: {:?}", e);
115 return ApiError::AuthenticationFailed(Some(format!(
116 "Service token verification failed: {}",
117 e
118 )))
119 .into_response();
120 }
121 }
122 } else {
123 None
124 }
125 } else {
126 None
127 };
128
129 let is_did_web_byod = migration_auth.is_some()
130 && input
131 .did
132 .as_ref()
133 .map(|d| d.starts_with("did:web:"))
134 .unwrap_or(false);
135
136 let is_migration = migration_auth.is_some()
137 && input
138 .did
139 .as_ref()
140 .map(|d| d.starts_with("did:plc:"))
141 .unwrap_or(false);
142
143 if (is_migration || is_did_web_byod)
144 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref())
145 {
146 if provided_did != auth_did {
147 info!(
148 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}",
149 auth_did, provided_did
150 );
151 return ApiError::AuthorizationError(format!(
152 "Service token issuer {} does not match DID {}",
153 auth_did, provided_did
154 ))
155 .into_response();
156 }
157 if is_did_web_byod {
158 info!(did = %provided_did, "Processing did:web BYOD account creation");
159 } else {
160 info!(
161 "[MIGRATION] createAccount: Service token verified, processing migration for did={}",
162 provided_did
163 );
164 }
165 }
166
167 let hostname_for_validation =
168 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
169 let pds_suffix = format!(".{}", hostname_for_validation);
170
171 let validated_short_handle = if !input.handle.contains('.')
172 || input.handle.ends_with(&pds_suffix)
173 {
174 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
175 input
176 .handle
177 .strip_suffix(&pds_suffix)
178 .unwrap_or(&input.handle)
179 } else {
180 &input.handle
181 };
182 match crate::api::validation::validate_short_handle(handle_to_validate) {
183 Ok(h) => h,
184 Err(e) => {
185 return ApiError::from(e).into_response();
186 }
187 }
188 } else {
189 if input.handle.contains(' ') || input.handle.contains('\t') {
190 return ApiError::InvalidRequest("Handle cannot contain spaces".into()).into_response();
191 }
192 if let Some(c) = input
193 .handle
194 .chars()
195 .find(|c| !c.is_ascii_alphanumeric() && *c != '.' && *c != '-')
196 {
197 return ApiError::InvalidRequest(format!("Handle contains invalid character: {}", c))
198 .into_response();
199 }
200 let handle_lower = input.handle.to_lowercase();
201 if crate::moderation::has_explicit_slur(&handle_lower) {
202 return ApiError::InvalidRequest("Inappropriate language in handle".into())
203 .into_response();
204 }
205 handle_lower
206 };
207 let email: Option<String> = input
208 .email
209 .as_ref()
210 .map(|e| e.trim().to_string())
211 .filter(|e| !e.is_empty());
212 if let Some(ref email) = email
213 && !crate::api::validation::is_valid_email(email)
214 {
215 return ApiError::InvalidEmail.into_response();
216 }
217 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
218 let valid_channels = ["email", "discord", "telegram", "signal"];
219 if !valid_channels.contains(&verification_channel) && !is_migration {
220 return ApiError::InvalidVerificationChannel.into_response();
221 }
222 let verification_recipient = if is_migration {
223 None
224 } else {
225 Some(match verification_channel {
226 "email" => match &input.email {
227 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
228 _ => return ApiError::MissingEmail.into_response(),
229 },
230 "discord" => match &input.discord_id {
231 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
232 _ => return ApiError::MissingDiscordId.into_response(),
233 },
234 "telegram" => match &input.telegram_username {
235 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
236 _ => return ApiError::MissingTelegramUsername.into_response(),
237 },
238 "signal" => match &input.signal_number {
239 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
240 _ => return ApiError::MissingSignalNumber.into_response(),
241 },
242 _ => return ApiError::InvalidVerificationChannel.into_response(),
243 })
244 };
245 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
246 let pds_endpoint = format!("https://{}", hostname);
247 let suffix = format!(".{}", hostname);
248 let handle = if input.handle.ends_with(&suffix) {
249 format!("{}.{}", validated_short_handle, hostname)
250 } else if input.handle.contains('.') {
251 validated_short_handle.clone()
252 } else {
253 format!("{}.{}", validated_short_handle, hostname)
254 };
255 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
256 if let Some(signing_key_did) = &input.signing_key {
257 let reserved = sqlx::query!(
258 r#"
259 SELECT id, private_key_bytes
260 FROM reserved_signing_keys
261 WHERE public_key_did_key = $1
262 AND used_at IS NULL
263 AND expires_at > NOW()
264 FOR UPDATE
265 "#,
266 signing_key_did
267 )
268 .fetch_optional(&state.db)
269 .await;
270 match reserved {
271 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
272 Ok(None) => {
273 return ApiError::InvalidSigningKey.into_response();
274 }
275 Err(e) => {
276 error!("Error looking up reserved signing key: {:?}", e);
277 return ApiError::InternalError(None).into_response();
278 }
279 }
280 } else {
281 let secret_key = SecretKey::random(&mut OsRng);
282 (secret_key.to_bytes().to_vec(), None)
283 };
284 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
285 Ok(k) => k,
286 Err(e) => {
287 error!("Error creating signing key: {:?}", e);
288 return ApiError::InternalError(None).into_response();
289 }
290 };
291 let did_type = input.did_type.as_deref().unwrap_or("plc");
292 let did = match did_type {
293 "web" => {
294 if !crate::api::server::meta::is_self_hosted_did_web_enabled() {
295 return ApiError::SelfHostedDidWebDisabled.into_response();
296 }
297 let subdomain_host = format!("{}.{}", input.handle, hostname);
298 let encoded_subdomain = subdomain_host.replace(':', "%3A");
299 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
300 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
301 self_hosted_did
302 }
303 "web-external" => {
304 let d = match &input.did {
305 Some(d) if !d.trim().is_empty() => d,
306 _ => {
307 return ApiError::InvalidRequest(
308 "External did:web requires the 'did' field to be provided".into(),
309 )
310 .into_response();
311 }
312 };
313 if !d.starts_with("did:web:") {
314 return ApiError::InvalidDid("External DID must be a did:web".into())
315 .into_response();
316 }
317 if !is_did_web_byod
318 && let Err(e) =
319 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
320 {
321 return ApiError::InvalidDid(e).into_response();
322 }
323 info!(did = %d, "Creating external did:web account");
324 d.clone()
325 }
326 _ => {
327 if let Some(d) = &input.did {
328 if d.starts_with("did:plc:") && is_migration {
329 info!(did = %d, "Migration with existing did:plc");
330 d.clone()
331 } else if d.starts_with("did:web:") {
332 if !is_did_web_byod
333 && let Err(e) = verify_did_web(
334 d,
335 &hostname,
336 &input.handle,
337 input.signing_key.as_deref(),
338 )
339 .await
340 {
341 return ApiError::InvalidDid(e).into_response();
342 }
343 d.clone()
344 } else if !d.trim().is_empty() {
345 return ApiError::InvalidDid(
346 "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth.".into()
347 )
348 .into_response();
349 } else {
350 let rotation_key = std::env::var("PLC_ROTATION_KEY")
351 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
352 let genesis_result = match create_genesis_operation(
353 &signing_key,
354 &rotation_key,
355 &handle,
356 &pds_endpoint,
357 ) {
358 Ok(r) => r,
359 Err(e) => {
360 error!("Error creating PLC genesis operation: {:?}", e);
361 return ApiError::InternalError(Some(
362 "Failed to create PLC operation".into(),
363 ))
364 .into_response();
365 }
366 };
367 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone()));
368 if let Err(e) = plc_client
369 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
370 .await
371 {
372 error!("Failed to submit PLC genesis operation: {:?}", e);
373 return ApiError::UpstreamErrorMsg(format!(
374 "Failed to register DID with PLC directory: {}",
375 e
376 ))
377 .into_response();
378 }
379 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
380 genesis_result.did
381 }
382 } else {
383 let rotation_key = std::env::var("PLC_ROTATION_KEY")
384 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
385 let genesis_result = match create_genesis_operation(
386 &signing_key,
387 &rotation_key,
388 &handle,
389 &pds_endpoint,
390 ) {
391 Ok(r) => r,
392 Err(e) => {
393 error!("Error creating PLC genesis operation: {:?}", e);
394 return ApiError::InternalError(Some(
395 "Failed to create PLC operation".into(),
396 ))
397 .into_response();
398 }
399 };
400 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone()));
401 if let Err(e) = plc_client
402 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
403 .await
404 {
405 error!("Failed to submit PLC genesis operation: {:?}", e);
406 return ApiError::UpstreamErrorMsg(format!(
407 "Failed to register DID with PLC directory: {}",
408 e
409 ))
410 .into_response();
411 }
412 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
413 genesis_result.did
414 }
415 }
416 };
417 let mut tx = match state.db.begin().await {
418 Ok(tx) => tx,
419 Err(e) => {
420 error!("Error starting transaction: {:?}", e);
421 return ApiError::InternalError(None).into_response();
422 }
423 };
424 if is_migration {
425 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
426 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
427 .bind(&did)
428 .fetch_optional(&mut *tx)
429 .await
430 .unwrap_or(None);
431 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
432 if deactivated_at.is_some() {
433 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
434 let update_result: Result<_, sqlx::Error> =
435 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
436 .bind(&handle)
437 .bind(account_id)
438 .execute(&mut *tx)
439 .await;
440 if let Err(e) = update_result {
441 if let Some(db_err) = e.as_database_error()
442 && db_err
443 .constraint()
444 .map(|c| c.contains("handle"))
445 .unwrap_or(false)
446 {
447 return ApiError::HandleTaken.into_response();
448 }
449 error!("Error reactivating account: {:?}", e);
450 return ApiError::InternalError(None).into_response();
451 }
452 if let Err(e) = tx.commit().await {
453 error!("Error committing reactivation: {:?}", e);
454 return ApiError::InternalError(None).into_response();
455 }
456 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
457 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
458 )
459 .bind(account_id)
460 .fetch_optional(&state.db)
461 .await
462 .unwrap_or(None);
463 let secret_key_bytes = match key_row {
464 Some((key_bytes, encryption_version)) => {
465 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
466 Ok(k) => k,
467 Err(e) => {
468 error!("Error decrypting key for reactivated account: {:?}", e);
469 return ApiError::InternalError(None).into_response();
470 }
471 }
472 }
473 None => {
474 error!("No signing key found for reactivated account");
475 return ApiError::InternalError(Some(
476 "Account signing key not found".into(),
477 ))
478 .into_response();
479 }
480 };
481 let access_meta =
482 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
483 Ok(m) => m,
484 Err(e) => {
485 error!("Error creating access token: {:?}", e);
486 return ApiError::InternalError(None).into_response();
487 }
488 };
489 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
490 &did,
491 &secret_key_bytes,
492 ) {
493 Ok(m) => m,
494 Err(e) => {
495 error!("Error creating refresh token: {:?}", e);
496 return ApiError::InternalError(None).into_response();
497 }
498 };
499 let session_result: Result<_, sqlx::Error> = sqlx::query(
500 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
501 )
502 .bind(&did)
503 .bind(&access_meta.jti)
504 .bind(&refresh_meta.jti)
505 .bind(access_meta.expires_at)
506 .bind(refresh_meta.expires_at)
507 .execute(&state.db)
508 .await;
509 if let Err(e) = session_result {
510 error!("Error creating session: {:?}", e);
511 return ApiError::InternalError(None).into_response();
512 }
513 return (
514 axum::http::StatusCode::OK,
515 Json(CreateAccountOutput {
516 handle: handle.clone().into(),
517 did: did.clone().into(),
518 did_doc: state.did_resolver.resolve_did_document(&did).await,
519 access_jwt: access_meta.token,
520 refresh_jwt: refresh_meta.token,
521 verification_required: false,
522 verification_channel: "email".to_string(),
523 }),
524 )
525 .into_response();
526 } else {
527 return ApiError::AccountAlreadyExists.into_response();
528 }
529 }
530 }
531 let exists_result: Option<(i32,)> =
532 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
533 .bind(&handle)
534 .fetch_optional(&mut *tx)
535 .await
536 .unwrap_or(None);
537 if exists_result.is_some() {
538 return ApiError::HandleTaken.into_response();
539 }
540 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
541 .map(|v| v == "true" || v == "1")
542 .unwrap_or(false);
543 if invite_code_required
544 && input
545 .invite_code
546 .as_ref()
547 .map(|c| c.trim().is_empty())
548 .unwrap_or(true)
549 {
550 return ApiError::InviteCodeRequired.into_response();
551 }
552 if let Some(code) = &input.invite_code
553 && !code.trim().is_empty()
554 {
555 let invite_query = sqlx::query!(
556 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
557 code
558 )
559 .fetch_optional(&mut *tx)
560 .await;
561 match invite_query {
562 Ok(Some(row)) => {
563 if row.available_uses <= 0 {
564 return ApiError::InvalidInviteCode.into_response();
565 }
566 let update_invite = sqlx::query!(
567 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
568 code
569 )
570 .execute(&mut *tx)
571 .await;
572 if let Err(e) = update_invite {
573 error!("Error updating invite code: {:?}", e);
574 return ApiError::InternalError(None).into_response();
575 }
576 }
577 Ok(None) => {
578 return ApiError::InvalidInviteCode.into_response();
579 }
580 Err(e) => {
581 error!("Error checking invite code: {:?}", e);
582 return ApiError::InternalError(None).into_response();
583 }
584 }
585 }
586 if let Err(e) = validate_password(&input.password) {
587 return ApiError::InvalidRequest(e.to_string()).into_response();
588 }
589
590 let password_clone = input.password.clone();
591 let password_hash =
592 match tokio::task::spawn_blocking(move || hash(&password_clone, DEFAULT_COST)).await {
593 Ok(Ok(h)) => h,
594 Ok(Err(e)) => {
595 error!("Error hashing password: {:?}", e);
596 return ApiError::InternalError(None).into_response();
597 }
598 Err(e) => {
599 error!("Failed to spawn blocking task: {:?}", e);
600 return ApiError::InternalError(None).into_response();
601 }
602 };
603 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
604 .fetch_one(&mut *tx)
605 .await
606 .map(|c| c.unwrap_or(0) == 0)
607 .unwrap_or(false);
608 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod {
609 Some(chrono::Utc::now())
610 } else {
611 None
612 };
613 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
614 r#"INSERT INTO users (
615 handle, email, did, password_hash,
616 preferred_comms_channel,
617 discord_id, telegram_username, signal_number,
618 is_admin, deactivated_at, email_verified
619 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
620 )
621 .bind(&handle)
622 .bind(&email)
623 .bind(&did)
624 .bind(&password_hash)
625 .bind(verification_channel)
626 .bind(
627 input
628 .discord_id
629 .as_deref()
630 .map(|s| s.trim())
631 .filter(|s| !s.is_empty()),
632 )
633 .bind(
634 input
635 .telegram_username
636 .as_deref()
637 .map(|s| s.trim())
638 .filter(|s| !s.is_empty()),
639 )
640 .bind(
641 input
642 .signal_number
643 .as_deref()
644 .map(|s| s.trim())
645 .filter(|s| !s.is_empty()),
646 )
647 .bind(is_first_user)
648 .bind(deactivated_at)
649 .bind(false)
650 .fetch_one(&mut *tx)
651 .await;
652 let user_id = match user_insert {
653 Ok((id,)) => id,
654 Err(e) => {
655 if let Some(db_err) = e.as_database_error()
656 && db_err.code().as_deref() == Some("23505")
657 {
658 let constraint = db_err.constraint().unwrap_or("");
659 if constraint.contains("handle") || constraint.contains("users_handle") {
660 return ApiError::HandleNotAvailable(None).into_response();
661 } else if constraint.contains("email") || constraint.contains("users_email") {
662 return ApiError::EmailTaken.into_response();
663 } else if constraint.contains("did") || constraint.contains("users_did") {
664 return ApiError::AccountAlreadyExists.into_response();
665 }
666 }
667 error!("Error inserting user: {:?}", e);
668 return ApiError::InternalError(None).into_response();
669 }
670 };
671
672 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
673 Ok(enc) => enc,
674 Err(e) => {
675 error!("Error encrypting user key: {:?}", e);
676 return ApiError::InternalError(None).into_response();
677 }
678 };
679 let key_insert = sqlx::query!(
680 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
681 user_id,
682 &encrypted_key_bytes[..],
683 crate::config::ENCRYPTION_VERSION
684 )
685 .execute(&mut *tx)
686 .await;
687 if let Err(e) = key_insert {
688 error!("Error inserting user key: {:?}", e);
689 return ApiError::InternalError(None).into_response();
690 }
691 if let Some(key_id) = reserved_key_id {
692 let mark_used = sqlx::query!(
693 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
694 key_id
695 )
696 .execute(&mut *tx)
697 .await;
698 if let Err(e) = mark_used {
699 error!("Error marking reserved key as used: {:?}", e);
700 return ApiError::InternalError(None).into_response();
701 }
702 }
703 let mst = Mst::new(Arc::new(state.block_store.clone()));
704 let mst_root = match mst.persist().await {
705 Ok(c) => c,
706 Err(e) => {
707 error!("Error persisting MST: {:?}", e);
708 return ApiError::InternalError(None).into_response();
709 }
710 };
711 let rev = Tid::now(LimitedU32::MIN);
712 let did_for_commit = Did::new_unchecked(&did);
713 let (commit_bytes, _sig) =
714 match create_signed_commit(&did_for_commit, mst_root, rev.as_ref(), None, &signing_key) {
715 Ok(result) => result,
716 Err(e) => {
717 error!("Error creating genesis commit: {:?}", e);
718 return ApiError::InternalError(None).into_response();
719 }
720 };
721 let commit_cid = match state.block_store.put(&commit_bytes).await {
722 Ok(c) => c,
723 Err(e) => {
724 error!("Error saving genesis commit: {:?}", e);
725 return ApiError::InternalError(None).into_response();
726 }
727 };
728 let commit_cid_str = commit_cid.to_string();
729 let rev_str = rev.as_ref().to_string();
730 let repo_insert = sqlx::query!(
731 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)",
732 user_id,
733 commit_cid_str,
734 rev_str
735 )
736 .execute(&mut *tx)
737 .await;
738 if let Err(e) = repo_insert {
739 error!("Error initializing repo: {:?}", e);
740 return ApiError::InternalError(None).into_response();
741 }
742 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()];
743 if let Err(e) = sqlx::query!(
744 r#"
745 INSERT INTO user_blocks (user_id, block_cid)
746 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
747 ON CONFLICT (user_id, block_cid) DO NOTHING
748 "#,
749 user_id,
750 &genesis_block_cids
751 )
752 .execute(&mut *tx)
753 .await
754 {
755 error!("Error inserting user_blocks: {:?}", e);
756 return ApiError::InternalError(None).into_response();
757 }
758 if let Some(code) = &input.invite_code
759 && !code.trim().is_empty()
760 {
761 let use_insert = sqlx::query!(
762 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
763 code,
764 user_id
765 )
766 .execute(&mut *tx)
767 .await;
768 if let Err(e) = use_insert {
769 error!("Error recording invite usage: {:?}", e);
770 return ApiError::InternalError(None).into_response();
771 }
772 }
773 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() {
774 let birthdate_pref = json!({
775 "$type": "app.bsky.actor.defs#personalDetailsPref",
776 "birthDate": "1998-05-06T00:00:00.000Z"
777 });
778 if let Err(e) = sqlx::query!(
779 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3)
780 ON CONFLICT (user_id, name) DO NOTHING",
781 user_id,
782 "app.bsky.actor.defs#personalDetailsPref",
783 birthdate_pref
784 )
785 .execute(&mut *tx)
786 .await
787 {
788 warn!("Failed to set default birthdate preference: {:?}", e);
789 }
790 }
791 if let Err(e) = tx.commit().await {
792 error!("Error committing transaction: {:?}", e);
793 return ApiError::InternalError(None).into_response();
794 }
795 if !is_migration && !is_did_web_byod {
796 let did_typed = Did::new_unchecked(&did);
797 let handle_typed = Handle::new_unchecked(&handle);
798 if let Err(e) = crate::api::repo::record::sequence_identity_event(
799 &state,
800 &did_typed,
801 Some(&handle_typed),
802 )
803 .await
804 {
805 warn!("Failed to sequence identity event for {}: {}", did, e);
806 }
807 if let Err(e) =
808 crate::api::repo::record::sequence_account_event(&state, &did_typed, true, None).await
809 {
810 warn!("Failed to sequence account event for {}: {}", did, e);
811 }
812 if let Err(e) = crate::api::repo::record::sequence_genesis_commit(
813 &state,
814 &did_typed,
815 &commit_cid,
816 &mst_root,
817 &rev_str,
818 )
819 .await
820 {
821 warn!("Failed to sequence commit event for {}: {}", did, e);
822 }
823 if let Err(e) = crate::api::repo::record::sequence_sync_event(
824 &state,
825 &did_typed,
826 &commit_cid_str,
827 Some(rev.as_ref()),
828 )
829 .await
830 {
831 warn!("Failed to sequence sync event for {}: {}", did, e);
832 }
833 let profile_record = json!({
834 "$type": "app.bsky.actor.profile",
835 "displayName": input.handle
836 });
837 let profile_collection = Nsid::new_unchecked("app.bsky.actor.profile");
838 let profile_rkey = Rkey::new_unchecked("self");
839 if let Err(e) = crate::api::repo::record::create_record_internal(
840 &state,
841 &did_typed,
842 &profile_collection,
843 &profile_rkey,
844 &profile_record,
845 )
846 .await
847 {
848 warn!("Failed to create default profile for {}: {}", did, e);
849 }
850 }
851 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
852 if !is_migration {
853 if let Some(ref recipient) = verification_recipient {
854 let verification_token = crate::auth::verification_token::generate_signup_token(
855 &did,
856 verification_channel,
857 recipient,
858 );
859 let formatted_token =
860 crate::auth::verification_token::format_token_for_display(&verification_token);
861 if let Err(e) = crate::comms::enqueue_signup_verification(
862 &state.db,
863 user_id,
864 verification_channel,
865 recipient,
866 &formatted_token,
867 None,
868 )
869 .await
870 {
871 warn!(
872 "Failed to enqueue signup verification notification: {:?}",
873 e
874 );
875 }
876 }
877 } else if let Some(ref user_email) = email {
878 let token = crate::auth::verification_token::generate_migration_token(&did, user_email);
879 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
880 if let Err(e) = crate::comms::enqueue_migration_verification(
881 &state.db,
882 user_id,
883 user_email,
884 &formatted_token,
885 &hostname,
886 )
887 .await
888 {
889 warn!("Failed to enqueue migration verification email: {:?}", e);
890 }
891 }
892
893 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes)
894 {
895 Ok(m) => m,
896 Err(e) => {
897 error!("createAccount: Error creating access token: {:?}", e);
898 return ApiError::InternalError(None).into_response();
899 }
900 };
901 let refresh_meta =
902 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
903 Ok(m) => m,
904 Err(e) => {
905 error!("createAccount: Error creating refresh token: {:?}", e);
906 return ApiError::InternalError(None).into_response();
907 }
908 };
909 if let Err(e) = sqlx::query!(
910 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
911 did,
912 access_meta.jti,
913 refresh_meta.jti,
914 access_meta.expires_at,
915 refresh_meta.expires_at
916 )
917 .execute(&state.db)
918 .await
919 {
920 error!("createAccount: Error creating session: {:?}", e);
921 return ApiError::InternalError(None).into_response();
922 }
923
924 let did_doc = state.did_resolver.resolve_did_document(&did).await;
925
926 if is_migration {
927 info!(
928 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}",
929 did, handle
930 );
931 }
932
933 (
934 StatusCode::OK,
935 Json(CreateAccountOutput {
936 handle: handle.clone().into(),
937 did: did.into(),
938 did_doc,
939 access_jwt: access_meta.token,
940 refresh_jwt: refresh_meta.token,
941 verification_required: !is_migration,
942 verification_channel: verification_channel.to_string(),
943 }),
944 )
945 .into_response()
946}