this repo has no description
1use super::did::verify_did_web;
2use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
3use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
4use crate::state::{AppState, RateLimitKind};
5use axum::{
6 Json,
7 extract::State,
8 http::{HeaderMap, StatusCode},
9 response::{IntoResponse, Response},
10};
11use bcrypt::{DEFAULT_COST, hash};
12use jacquard::types::{did::Did, integer::LimitedU32, string::Tid};
13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
14use k256::{SecretKey, ecdsa::SigningKey};
15use rand::rngs::OsRng;
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use std::sync::Arc;
19use tracing::{debug, error, info, warn};
20
21fn extract_client_ip(headers: &HeaderMap) -> String {
22 if let Some(forwarded) = headers.get("x-forwarded-for")
23 && let Ok(value) = forwarded.to_str()
24 && let Some(first_ip) = value.split(',').next()
25 {
26 return first_ip.trim().to_string();
27 }
28 if let Some(real_ip) = headers.get("x-real-ip")
29 && let Ok(value) = real_ip.to_str()
30 {
31 return value.trim().to_string();
32 }
33 "unknown".to_string()
34}
35
36#[derive(Deserialize)]
37#[serde(rename_all = "camelCase")]
38pub struct CreateAccountInput {
39 pub handle: String,
40 pub email: Option<String>,
41 pub password: String,
42 pub invite_code: Option<String>,
43 pub did: Option<String>,
44 pub did_type: Option<String>,
45 pub signing_key: Option<String>,
46 pub verification_channel: Option<String>,
47 pub discord_id: Option<String>,
48 pub telegram_username: Option<String>,
49 pub signal_number: Option<String>,
50}
51
52#[derive(Serialize)]
53#[serde(rename_all = "camelCase")]
54pub struct CreateAccountOutput {
55 pub handle: String,
56 pub did: String,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 pub access_jwt: Option<String>,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub refresh_jwt: Option<String>,
61 pub verification_required: bool,
62 pub verification_channel: String,
63}
64
65pub async fn create_account(
66 State(state): State<AppState>,
67 headers: HeaderMap,
68 Json(input): Json<CreateAccountInput>,
69) -> Response {
70 info!("create_account called");
71 let client_ip = extract_client_ip(&headers);
72 if !state
73 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
74 .await
75 {
76 warn!(ip = %client_ip, "Account creation rate limit exceeded");
77 return (
78 StatusCode::TOO_MANY_REQUESTS,
79 Json(json!({
80 "error": "RateLimitExceeded",
81 "message": "Too many account creation attempts. Please try again later."
82 })),
83 )
84 .into_response();
85 }
86
87 let migration_auth = if let Some(token) =
88 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
89 {
90 if is_service_token(&token) {
91 let verifier = ServiceTokenVerifier::new();
92 match verifier
93 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
94 .await
95 {
96 Ok(claims) => {
97 debug!("Service token verified for migration: iss={}", claims.iss);
98 Some(claims.iss)
99 }
100 Err(e) => {
101 error!("Service token verification failed: {:?}", e);
102 return (
103 StatusCode::UNAUTHORIZED,
104 Json(json!({
105 "error": "AuthenticationFailed",
106 "message": format!("Service token verification failed: {}", e)
107 })),
108 )
109 .into_response();
110 }
111 }
112 } else {
113 None
114 }
115 } else {
116 None
117 };
118
119 let is_migration = migration_auth.is_some()
120 && input
121 .did
122 .as_ref()
123 .map(|d| d.starts_with("did:plc:"))
124 .unwrap_or(false);
125
126 if is_migration {
127 let migration_did = input.did.as_ref().unwrap();
128 let auth_did = migration_auth.as_ref().unwrap();
129 if migration_did != auth_did {
130 return (
131 StatusCode::FORBIDDEN,
132 Json(json!({
133 "error": "AuthorizationError",
134 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did)
135 })),
136 )
137 .into_response();
138 }
139 info!(did = %migration_did, "Processing account migration");
140 }
141
142 if input.handle.contains('!') || input.handle.contains('@') {
143 return (
144 StatusCode::BAD_REQUEST,
145 Json(
146 json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"}),
147 ),
148 )
149 .into_response();
150 }
151 let email: Option<String> = input
152 .email
153 .as_ref()
154 .map(|e| e.trim().to_string())
155 .filter(|e| !e.is_empty());
156 if let Some(ref email) = email
157 && !crate::api::validation::is_valid_email(email)
158 {
159 return (
160 StatusCode::BAD_REQUEST,
161 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
162 )
163 .into_response();
164 }
165 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
166 let valid_channels = ["email", "discord", "telegram", "signal"];
167 if !valid_channels.contains(&verification_channel) && !is_migration {
168 return (
169 StatusCode::BAD_REQUEST,
170 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
171 )
172 .into_response();
173 }
174 let verification_recipient = if is_migration {
175 None
176 } else {
177 Some(match verification_channel {
178 "email" => match &input.email {
179 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
180 _ => return (
181 StatusCode::BAD_REQUEST,
182 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
183 ).into_response(),
184 },
185 "discord" => match &input.discord_id {
186 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
187 _ => return (
188 StatusCode::BAD_REQUEST,
189 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
190 ).into_response(),
191 },
192 "telegram" => match &input.telegram_username {
193 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
194 _ => return (
195 StatusCode::BAD_REQUEST,
196 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
197 ).into_response(),
198 },
199 "signal" => match &input.signal_number {
200 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
201 _ => return (
202 StatusCode::BAD_REQUEST,
203 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
204 ).into_response(),
205 },
206 _ => return (
207 StatusCode::BAD_REQUEST,
208 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
209 ).into_response(),
210 })
211 };
212 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
213 let pds_endpoint = format!("https://{}", hostname);
214 let suffix = format!(".{}", hostname);
215 let short_handle = if input.handle.ends_with(&suffix) {
216 input.handle.strip_suffix(&suffix).unwrap_or(&input.handle)
217 } else {
218 &input.handle
219 };
220 let full_handle = format!("{}.{}", short_handle, hostname);
221 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
222 if let Some(signing_key_did) = &input.signing_key {
223 let reserved = sqlx::query!(
224 r#"
225 SELECT id, private_key_bytes
226 FROM reserved_signing_keys
227 WHERE public_key_did_key = $1
228 AND used_at IS NULL
229 AND expires_at > NOW()
230 FOR UPDATE
231 "#,
232 signing_key_did
233 )
234 .fetch_optional(&state.db)
235 .await;
236 match reserved {
237 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
238 Ok(None) => {
239 return (
240 StatusCode::BAD_REQUEST,
241 Json(json!({
242 "error": "InvalidSigningKey",
243 "message": "Signing key not found, already used, or expired"
244 })),
245 )
246 .into_response();
247 }
248 Err(e) => {
249 error!("Error looking up reserved signing key: {:?}", e);
250 return (
251 StatusCode::INTERNAL_SERVER_ERROR,
252 Json(json!({"error": "InternalError"})),
253 )
254 .into_response();
255 }
256 }
257 } else {
258 let secret_key = SecretKey::random(&mut OsRng);
259 (secret_key.to_bytes().to_vec(), None)
260 };
261 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
262 Ok(k) => k,
263 Err(e) => {
264 error!("Error creating signing key: {:?}", e);
265 return (
266 StatusCode::INTERNAL_SERVER_ERROR,
267 Json(json!({"error": "InternalError"})),
268 )
269 .into_response();
270 }
271 };
272 let did_type = input.did_type.as_deref().unwrap_or("plc");
273 let did = match did_type {
274 "web" => {
275 let subdomain_host = format!("{}.{}", input.handle, hostname);
276 let encoded_subdomain = subdomain_host.replace(':', "%3A");
277 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
278 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
279 self_hosted_did
280 }
281 "web-external" => {
282 let d = match &input.did {
283 Some(d) if !d.trim().is_empty() => d,
284 _ => {
285 return (
286 StatusCode::BAD_REQUEST,
287 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})),
288 )
289 .into_response();
290 }
291 };
292 if !d.starts_with("did:web:") {
293 return (
294 StatusCode::BAD_REQUEST,
295 Json(
296 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}),
297 ),
298 )
299 .into_response();
300 }
301 if let Err(e) = verify_did_web(d, &hostname, &input.handle).await {
302 return (
303 StatusCode::BAD_REQUEST,
304 Json(json!({"error": "InvalidDid", "message": e})),
305 )
306 .into_response();
307 }
308 info!(did = %d, "Creating external did:web account");
309 d.clone()
310 }
311 _ => {
312 if let Some(d) = &input.did {
313 if d.starts_with("did:plc:") && is_migration {
314 info!(did = %d, "Migration with existing did:plc");
315 d.clone()
316 } else if d.starts_with("did:web:") {
317 if let Err(e) = verify_did_web(d, &hostname, &input.handle).await {
318 return (
319 StatusCode::BAD_REQUEST,
320 Json(json!({"error": "InvalidDid", "message": e})),
321 )
322 .into_response();
323 }
324 d.clone()
325 } else if !d.trim().is_empty() {
326 return (
327 StatusCode::BAD_REQUEST,
328 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
329 )
330 .into_response();
331 } else {
332 let rotation_key = std::env::var("PLC_ROTATION_KEY")
333 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
334 let genesis_result = match create_genesis_operation(
335 &signing_key,
336 &rotation_key,
337 &full_handle,
338 &pds_endpoint,
339 ) {
340 Ok(r) => r,
341 Err(e) => {
342 error!("Error creating PLC genesis operation: {:?}", e);
343 return (
344 StatusCode::INTERNAL_SERVER_ERROR,
345 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
346 )
347 .into_response();
348 }
349 };
350 let plc_client = PlcClient::new(None);
351 if let Err(e) = plc_client
352 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
353 .await
354 {
355 error!("Failed to submit PLC genesis operation: {:?}", e);
356 return (
357 StatusCode::BAD_GATEWAY,
358 Json(json!({
359 "error": "UpstreamError",
360 "message": format!("Failed to register DID with PLC directory: {}", e)
361 })),
362 )
363 .into_response();
364 }
365 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
366 genesis_result.did
367 }
368 } else {
369 let rotation_key = std::env::var("PLC_ROTATION_KEY")
370 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
371 let genesis_result = match create_genesis_operation(
372 &signing_key,
373 &rotation_key,
374 &full_handle,
375 &pds_endpoint,
376 ) {
377 Ok(r) => r,
378 Err(e) => {
379 error!("Error creating PLC genesis operation: {:?}", e);
380 return (
381 StatusCode::INTERNAL_SERVER_ERROR,
382 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
383 )
384 .into_response();
385 }
386 };
387 let plc_client = PlcClient::new(None);
388 if let Err(e) = plc_client
389 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
390 .await
391 {
392 error!("Failed to submit PLC genesis operation: {:?}", e);
393 return (
394 StatusCode::BAD_GATEWAY,
395 Json(json!({
396 "error": "UpstreamError",
397 "message": format!("Failed to register DID with PLC directory: {}", e)
398 })),
399 )
400 .into_response();
401 }
402 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
403 genesis_result.did
404 }
405 }
406 };
407 let mut tx = match state.db.begin().await {
408 Ok(tx) => tx,
409 Err(e) => {
410 error!("Error starting transaction: {:?}", e);
411 return (
412 StatusCode::INTERNAL_SERVER_ERROR,
413 Json(json!({"error": "InternalError"})),
414 )
415 .into_response();
416 }
417 };
418 if is_migration {
419 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
420 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
421 .bind(&did)
422 .fetch_optional(&mut *tx)
423 .await
424 .unwrap_or(None);
425 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
426 if deactivated_at.is_some() {
427 info!(did = %did, old_handle = %old_handle, new_handle = %short_handle, "Preparing existing account for inbound migration");
428 let update_result: Result<_, sqlx::Error> =
429 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
430 .bind(short_handle)
431 .bind(account_id)
432 .execute(&mut *tx)
433 .await;
434 if let Err(e) = update_result {
435 if let Some(db_err) = e.as_database_error()
436 && db_err
437 .constraint()
438 .map(|c| c.contains("handle"))
439 .unwrap_or(false)
440 {
441 return (
442 StatusCode::BAD_REQUEST,
443 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
444 )
445 .into_response();
446 }
447 error!("Error reactivating account: {:?}", e);
448 return (
449 StatusCode::INTERNAL_SERVER_ERROR,
450 Json(json!({"error": "InternalError"})),
451 )
452 .into_response();
453 }
454 if let Err(e) = tx.commit().await {
455 error!("Error committing reactivation: {:?}", e);
456 return (
457 StatusCode::INTERNAL_SERVER_ERROR,
458 Json(json!({"error": "InternalError"})),
459 )
460 .into_response();
461 }
462 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
463 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
464 )
465 .bind(account_id)
466 .fetch_optional(&state.db)
467 .await
468 .unwrap_or(None);
469 let secret_key_bytes = match key_row {
470 Some((key_bytes, encryption_version)) => {
471 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
472 Ok(k) => k,
473 Err(e) => {
474 error!("Error decrypting key for reactivated account: {:?}", e);
475 return (
476 StatusCode::INTERNAL_SERVER_ERROR,
477 Json(json!({"error": "InternalError"})),
478 )
479 .into_response();
480 }
481 }
482 }
483 None => {
484 error!("No signing key found for reactivated account");
485 return (
486 StatusCode::INTERNAL_SERVER_ERROR,
487 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
488 )
489 .into_response();
490 }
491 };
492 let access_meta =
493 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
494 Ok(m) => m,
495 Err(e) => {
496 error!("Error creating access token: {:?}", e);
497 return (
498 StatusCode::INTERNAL_SERVER_ERROR,
499 Json(json!({"error": "InternalError"})),
500 )
501 .into_response();
502 }
503 };
504 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
505 &did,
506 &secret_key_bytes,
507 ) {
508 Ok(m) => m,
509 Err(e) => {
510 error!("Error creating refresh token: {:?}", e);
511 return (
512 StatusCode::INTERNAL_SERVER_ERROR,
513 Json(json!({"error": "InternalError"})),
514 )
515 .into_response();
516 }
517 };
518 let session_result: Result<_, sqlx::Error> = sqlx::query(
519 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
520 )
521 .bind(&did)
522 .bind(&access_meta.jti)
523 .bind(&refresh_meta.jti)
524 .bind(access_meta.expires_at)
525 .bind(refresh_meta.expires_at)
526 .execute(&state.db)
527 .await;
528 if let Err(e) = session_result {
529 error!("Error creating session: {:?}", e);
530 return (
531 StatusCode::INTERNAL_SERVER_ERROR,
532 Json(json!({"error": "InternalError"})),
533 )
534 .into_response();
535 }
536 return (
537 StatusCode::OK,
538 Json(CreateAccountOutput {
539 handle: full_handle.clone(),
540 did,
541 access_jwt: Some(access_meta.token),
542 refresh_jwt: Some(refresh_meta.token),
543 verification_required: false,
544 verification_channel: "email".to_string(),
545 }),
546 )
547 .into_response();
548 } else {
549 return (
550 StatusCode::BAD_REQUEST,
551 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
552 )
553 .into_response();
554 }
555 }
556 }
557 let exists_result: Option<(i32,)> =
558 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
559 .bind(short_handle)
560 .fetch_optional(&mut *tx)
561 .await
562 .unwrap_or(None);
563 if exists_result.is_some() {
564 return (
565 StatusCode::BAD_REQUEST,
566 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
567 )
568 .into_response();
569 }
570 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
571 .map(|v| v == "true" || v == "1")
572 .unwrap_or(false);
573 if invite_code_required
574 && input
575 .invite_code
576 .as_ref()
577 .map(|c| c.trim().is_empty())
578 .unwrap_or(true)
579 {
580 return (
581 StatusCode::BAD_REQUEST,
582 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
583 )
584 .into_response();
585 }
586 if let Some(code) = &input.invite_code
587 && !code.trim().is_empty()
588 {
589 let invite_query = sqlx::query!(
590 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
591 code
592 )
593 .fetch_optional(&mut *tx)
594 .await;
595 match invite_query {
596 Ok(Some(row)) => {
597 if row.available_uses <= 0 {
598 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
599 }
600 let update_invite = sqlx::query!(
601 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
602 code
603 )
604 .execute(&mut *tx)
605 .await;
606 if let Err(e) = update_invite {
607 error!("Error updating invite code: {:?}", e);
608 return (
609 StatusCode::INTERNAL_SERVER_ERROR,
610 Json(json!({"error": "InternalError"})),
611 )
612 .into_response();
613 }
614 }
615 Ok(None) => {
616 return (
617 StatusCode::BAD_REQUEST,
618 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
619 )
620 .into_response();
621 }
622 Err(e) => {
623 error!("Error checking invite code: {:?}", e);
624 return (
625 StatusCode::INTERNAL_SERVER_ERROR,
626 Json(json!({"error": "InternalError"})),
627 )
628 .into_response();
629 }
630 }
631 }
632 let password_hash = match hash(&input.password, DEFAULT_COST) {
633 Ok(h) => h,
634 Err(e) => {
635 error!("Error hashing password: {:?}", e);
636 return (
637 StatusCode::INTERNAL_SERVER_ERROR,
638 Json(json!({"error": "InternalError"})),
639 )
640 .into_response();
641 }
642 };
643 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
644 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30);
645 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
646 .fetch_one(&mut *tx)
647 .await
648 .map(|c| c.unwrap_or(0) == 0)
649 .unwrap_or(false);
650 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration {
651 Some(chrono::Utc::now())
652 } else {
653 None
654 };
655 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
656 r#"INSERT INTO users (
657 handle, email, did, password_hash,
658 preferred_comms_channel,
659 discord_id, telegram_username, signal_number,
660 is_admin, deactivated_at, email_verified
661 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
662 )
663 .bind(short_handle)
664 .bind(&email)
665 .bind(&did)
666 .bind(&password_hash)
667 .bind(verification_channel)
668 .bind(
669 input
670 .discord_id
671 .as_deref()
672 .map(|s| s.trim())
673 .filter(|s| !s.is_empty()),
674 )
675 .bind(
676 input
677 .telegram_username
678 .as_deref()
679 .map(|s| s.trim())
680 .filter(|s| !s.is_empty()),
681 )
682 .bind(
683 input
684 .signal_number
685 .as_deref()
686 .map(|s| s.trim())
687 .filter(|s| !s.is_empty()),
688 )
689 .bind(is_first_user)
690 .bind(deactivated_at)
691 .bind(is_migration)
692 .fetch_one(&mut *tx)
693 .await;
694 let user_id = match user_insert {
695 Ok((id,)) => id,
696 Err(e) => {
697 if let Some(db_err) = e.as_database_error()
698 && db_err.code().as_deref() == Some("23505")
699 {
700 let constraint = db_err.constraint().unwrap_or("");
701 if constraint.contains("handle") || constraint.contains("users_handle") {
702 return (
703 StatusCode::BAD_REQUEST,
704 Json(json!({
705 "error": "HandleNotAvailable",
706 "message": "Handle already taken"
707 })),
708 )
709 .into_response();
710 } else if constraint.contains("email") || constraint.contains("users_email") {
711 return (
712 StatusCode::BAD_REQUEST,
713 Json(json!({
714 "error": "InvalidEmail",
715 "message": "Email already registered"
716 })),
717 )
718 .into_response();
719 } else if constraint.contains("did") || constraint.contains("users_did") {
720 return (
721 StatusCode::BAD_REQUEST,
722 Json(json!({
723 "error": "AccountAlreadyExists",
724 "message": "An account with this DID already exists"
725 })),
726 )
727 .into_response();
728 }
729 }
730 error!("Error inserting user: {:?}", e);
731 return (
732 StatusCode::INTERNAL_SERVER_ERROR,
733 Json(json!({"error": "InternalError"})),
734 )
735 .into_response();
736 }
737 };
738
739 if !is_migration
740 && let Err(e) = sqlx::query!(
741 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, 'email', $2, $3, $4)",
742 user_id,
743 verification_code,
744 email,
745 code_expires_at
746 )
747 .execute(&mut *tx)
748 .await {
749 error!("Error inserting verification code: {:?}", e);
750 return (
751 StatusCode::INTERNAL_SERVER_ERROR,
752 Json(json!({"error": "InternalError"})),
753 )
754 .into_response();
755 }
756 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
757 Ok(enc) => enc,
758 Err(e) => {
759 error!("Error encrypting user key: {:?}", e);
760 return (
761 StatusCode::INTERNAL_SERVER_ERROR,
762 Json(json!({"error": "InternalError"})),
763 )
764 .into_response();
765 }
766 };
767 let key_insert = sqlx::query!(
768 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
769 user_id,
770 &encrypted_key_bytes[..],
771 crate::config::ENCRYPTION_VERSION
772 )
773 .execute(&mut *tx)
774 .await;
775 if let Err(e) = key_insert {
776 error!("Error inserting user key: {:?}", e);
777 return (
778 StatusCode::INTERNAL_SERVER_ERROR,
779 Json(json!({"error": "InternalError"})),
780 )
781 .into_response();
782 }
783 if let Some(key_id) = reserved_key_id {
784 let mark_used = sqlx::query!(
785 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
786 key_id
787 )
788 .execute(&mut *tx)
789 .await;
790 if let Err(e) = mark_used {
791 error!("Error marking reserved key as used: {:?}", e);
792 return (
793 StatusCode::INTERNAL_SERVER_ERROR,
794 Json(json!({"error": "InternalError"})),
795 )
796 .into_response();
797 }
798 }
799 let mst = Mst::new(Arc::new(state.block_store.clone()));
800 let mst_root = match mst.persist().await {
801 Ok(c) => c,
802 Err(e) => {
803 error!("Error persisting MST: {:?}", e);
804 return (
805 StatusCode::INTERNAL_SERVER_ERROR,
806 Json(json!({"error": "InternalError"})),
807 )
808 .into_response();
809 }
810 };
811 let did_obj = match Did::new(&did) {
812 Ok(d) => d,
813 Err(_) => {
814 return (
815 StatusCode::INTERNAL_SERVER_ERROR,
816 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
817 )
818 .into_response();
819 }
820 };
821 let rev = Tid::now(LimitedU32::MIN);
822 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None);
823 let signed_commit = match unsigned_commit.sign(&signing_key) {
824 Ok(c) => c,
825 Err(e) => {
826 error!("Error signing genesis commit: {:?}", e);
827 return (
828 StatusCode::INTERNAL_SERVER_ERROR,
829 Json(json!({"error": "InternalError"})),
830 )
831 .into_response();
832 }
833 };
834 let commit_bytes = match signed_commit.to_cbor() {
835 Ok(b) => b,
836 Err(e) => {
837 error!("Error serializing genesis commit: {:?}", e);
838 return (
839 StatusCode::INTERNAL_SERVER_ERROR,
840 Json(json!({"error": "InternalError"})),
841 )
842 .into_response();
843 }
844 };
845 let commit_cid = match state.block_store.put(&commit_bytes).await {
846 Ok(c) => c,
847 Err(e) => {
848 error!("Error saving genesis commit: {:?}", e);
849 return (
850 StatusCode::INTERNAL_SERVER_ERROR,
851 Json(json!({"error": "InternalError"})),
852 )
853 .into_response();
854 }
855 };
856 let commit_cid_str = commit_cid.to_string();
857 let repo_insert = sqlx::query!(
858 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
859 user_id,
860 commit_cid_str
861 )
862 .execute(&mut *tx)
863 .await;
864 if let Err(e) = repo_insert {
865 error!("Error initializing repo: {:?}", e);
866 return (
867 StatusCode::INTERNAL_SERVER_ERROR,
868 Json(json!({"error": "InternalError"})),
869 )
870 .into_response();
871 }
872 if let Some(code) = &input.invite_code
873 && !code.trim().is_empty()
874 {
875 let use_insert = sqlx::query!(
876 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
877 code,
878 user_id
879 )
880 .execute(&mut *tx)
881 .await;
882 if let Err(e) = use_insert {
883 error!("Error recording invite usage: {:?}", e);
884 return (
885 StatusCode::INTERNAL_SERVER_ERROR,
886 Json(json!({"error": "InternalError"})),
887 )
888 .into_response();
889 }
890 }
891 if let Err(e) = tx.commit().await {
892 error!("Error committing transaction: {:?}", e);
893 return (
894 StatusCode::INTERNAL_SERVER_ERROR,
895 Json(json!({"error": "InternalError"})),
896 )
897 .into_response();
898 }
899 if !is_migration {
900 if let Err(e) =
901 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&full_handle))
902 .await
903 {
904 warn!("Failed to sequence identity event for {}: {}", did, e);
905 }
906 if let Err(e) =
907 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
908 {
909 warn!("Failed to sequence account event for {}: {}", did, e);
910 }
911 let profile_record = json!({
912 "$type": "app.bsky.actor.profile",
913 "displayName": input.handle
914 });
915 if let Err(e) = crate::api::repo::record::create_record_internal(
916 &state,
917 &did,
918 "app.bsky.actor.profile",
919 "self",
920 &profile_record,
921 )
922 .await
923 {
924 warn!("Failed to create default profile for {}: {}", did, e);
925 }
926 if let Some(ref recipient) = verification_recipient
927 && let Err(e) = crate::comms::enqueue_signup_verification(
928 &state.db,
929 user_id,
930 verification_channel,
931 recipient,
932 &verification_code,
933 )
934 .await
935 {
936 warn!(
937 "Failed to enqueue signup verification notification: {:?}",
938 e
939 );
940 }
941 }
942
943 let (access_jwt, refresh_jwt) = if is_migration {
944 let access_meta =
945 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
946 Ok(m) => m,
947 Err(e) => {
948 error!("Error creating access token for migration: {:?}", e);
949 return (
950 StatusCode::INTERNAL_SERVER_ERROR,
951 Json(json!({"error": "InternalError"})),
952 )
953 .into_response();
954 }
955 };
956 let refresh_meta =
957 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
958 Ok(m) => m,
959 Err(e) => {
960 error!("Error creating refresh token for migration: {:?}", e);
961 return (
962 StatusCode::INTERNAL_SERVER_ERROR,
963 Json(json!({"error": "InternalError"})),
964 )
965 .into_response();
966 }
967 };
968 if let Err(e) = sqlx::query!(
969 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
970 did,
971 access_meta.jti,
972 refresh_meta.jti,
973 access_meta.expires_at,
974 refresh_meta.expires_at
975 )
976 .execute(&state.db)
977 .await
978 {
979 error!("Error creating session for migration: {:?}", e);
980 return (
981 StatusCode::INTERNAL_SERVER_ERROR,
982 Json(json!({"error": "InternalError"})),
983 )
984 .into_response();
985 }
986 (Some(access_meta.token), Some(refresh_meta.token))
987 } else {
988 (None, None)
989 };
990
991 (
992 StatusCode::OK,
993 Json(CreateAccountOutput {
994 handle: full_handle.clone(),
995 did,
996 access_jwt,
997 refresh_jwt,
998 verification_required: !is_migration,
999 verification_channel: verification_channel.to_string(),
1000 }),
1001 )
1002 .into_response()
1003}