this repo has no description
1use super::did::verify_did_web;
2use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
3use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
4use crate::state::{AppState, RateLimitKind};
5use axum::{
6 Json,
7 extract::State,
8 http::{HeaderMap, StatusCode},
9 response::{IntoResponse, Response},
10};
11use bcrypt::{DEFAULT_COST, hash};
12use jacquard::types::{did::Did, integer::LimitedU32, string::Tid};
13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
14use k256::{SecretKey, ecdsa::SigningKey};
15use rand::rngs::OsRng;
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use std::sync::Arc;
19use tracing::{debug, error, info, warn};
20
21fn extract_client_ip(headers: &HeaderMap) -> String {
22 if let Some(forwarded) = headers.get("x-forwarded-for")
23 && let Ok(value) = forwarded.to_str()
24 && let Some(first_ip) = value.split(',').next()
25 {
26 return first_ip.trim().to_string();
27 }
28 if let Some(real_ip) = headers.get("x-real-ip")
29 && let Ok(value) = real_ip.to_str()
30 {
31 return value.trim().to_string();
32 }
33 "unknown".to_string()
34}
35
36#[derive(Deserialize)]
37#[serde(rename_all = "camelCase")]
38pub struct CreateAccountInput {
39 pub handle: String,
40 pub email: Option<String>,
41 pub password: String,
42 pub invite_code: Option<String>,
43 pub did: Option<String>,
44 pub signing_key: Option<String>,
45 pub verification_channel: Option<String>,
46 pub discord_id: Option<String>,
47 pub telegram_username: Option<String>,
48 pub signal_number: Option<String>,
49}
50
51#[derive(Serialize)]
52#[serde(rename_all = "camelCase")]
53pub struct CreateAccountOutput {
54 pub handle: String,
55 pub did: String,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub access_jwt: Option<String>,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub refresh_jwt: Option<String>,
60 pub verification_required: bool,
61 pub verification_channel: String,
62}
63
64pub async fn create_account(
65 State(state): State<AppState>,
66 headers: HeaderMap,
67 Json(input): Json<CreateAccountInput>,
68) -> Response {
69 info!("create_account called");
70 let client_ip = extract_client_ip(&headers);
71 if !state
72 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
73 .await
74 {
75 warn!(ip = %client_ip, "Account creation rate limit exceeded");
76 return (
77 StatusCode::TOO_MANY_REQUESTS,
78 Json(json!({
79 "error": "RateLimitExceeded",
80 "message": "Too many account creation attempts. Please try again later."
81 })),
82 )
83 .into_response();
84 }
85
86 let migration_auth = if let Some(token) =
87 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
88 {
89 if is_service_token(&token) {
90 let verifier = ServiceTokenVerifier::new();
91 match verifier
92 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
93 .await
94 {
95 Ok(claims) => {
96 debug!("Service token verified for migration: iss={}", claims.iss);
97 Some(claims.iss)
98 }
99 Err(e) => {
100 error!("Service token verification failed: {:?}", e);
101 return (
102 StatusCode::UNAUTHORIZED,
103 Json(json!({
104 "error": "AuthenticationFailed",
105 "message": format!("Service token verification failed: {}", e)
106 })),
107 )
108 .into_response();
109 }
110 }
111 } else {
112 None
113 }
114 } else {
115 None
116 };
117
118 let is_migration = migration_auth.is_some()
119 && input
120 .did
121 .as_ref()
122 .map(|d| d.starts_with("did:plc:"))
123 .unwrap_or(false);
124
125 if is_migration {
126 let migration_did = input.did.as_ref().unwrap();
127 let auth_did = migration_auth.as_ref().unwrap();
128 if migration_did != auth_did {
129 return (
130 StatusCode::FORBIDDEN,
131 Json(json!({
132 "error": "AuthorizationError",
133 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did)
134 })),
135 )
136 .into_response();
137 }
138 info!(did = %migration_did, "Processing account migration");
139 }
140
141 if input.handle.contains('!') || input.handle.contains('@') {
142 return (
143 StatusCode::BAD_REQUEST,
144 Json(
145 json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"}),
146 ),
147 )
148 .into_response();
149 }
150 let email: Option<String> = input
151 .email
152 .as_ref()
153 .map(|e| e.trim().to_string())
154 .filter(|e| !e.is_empty());
155 if let Some(ref email) = email
156 && !crate::api::validation::is_valid_email(email)
157 {
158 return (
159 StatusCode::BAD_REQUEST,
160 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
161 )
162 .into_response();
163 }
164 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
165 let valid_channels = ["email", "discord", "telegram", "signal"];
166 if !valid_channels.contains(&verification_channel) && !is_migration {
167 return (
168 StatusCode::BAD_REQUEST,
169 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
170 )
171 .into_response();
172 }
173 let verification_recipient = if is_migration {
174 None
175 } else {
176 Some(match verification_channel {
177 "email" => match &input.email {
178 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
179 _ => return (
180 StatusCode::BAD_REQUEST,
181 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
182 ).into_response(),
183 },
184 "discord" => match &input.discord_id {
185 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
186 _ => return (
187 StatusCode::BAD_REQUEST,
188 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
189 ).into_response(),
190 },
191 "telegram" => match &input.telegram_username {
192 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
193 _ => return (
194 StatusCode::BAD_REQUEST,
195 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
196 ).into_response(),
197 },
198 "signal" => match &input.signal_number {
199 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
200 _ => return (
201 StatusCode::BAD_REQUEST,
202 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
203 ).into_response(),
204 },
205 _ => return (
206 StatusCode::BAD_REQUEST,
207 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
208 ).into_response(),
209 })
210 };
211 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
212 let pds_endpoint = format!("https://{}", hostname);
213 let suffix = format!(".{}", hostname);
214 let short_handle = if input.handle.ends_with(&suffix) {
215 input.handle.strip_suffix(&suffix).unwrap_or(&input.handle)
216 } else {
217 &input.handle
218 };
219 let full_handle = format!("{}.{}", short_handle, hostname);
220 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
221 if let Some(signing_key_did) = &input.signing_key {
222 let reserved = sqlx::query!(
223 r#"
224 SELECT id, private_key_bytes
225 FROM reserved_signing_keys
226 WHERE public_key_did_key = $1
227 AND used_at IS NULL
228 AND expires_at > NOW()
229 FOR UPDATE
230 "#,
231 signing_key_did
232 )
233 .fetch_optional(&state.db)
234 .await;
235 match reserved {
236 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
237 Ok(None) => {
238 return (
239 StatusCode::BAD_REQUEST,
240 Json(json!({
241 "error": "InvalidSigningKey",
242 "message": "Signing key not found, already used, or expired"
243 })),
244 )
245 .into_response();
246 }
247 Err(e) => {
248 error!("Error looking up reserved signing key: {:?}", e);
249 return (
250 StatusCode::INTERNAL_SERVER_ERROR,
251 Json(json!({"error": "InternalError"})),
252 )
253 .into_response();
254 }
255 }
256 } else {
257 let secret_key = SecretKey::random(&mut OsRng);
258 (secret_key.to_bytes().to_vec(), None)
259 };
260 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
261 Ok(k) => k,
262 Err(e) => {
263 error!("Error creating signing key: {:?}", e);
264 return (
265 StatusCode::INTERNAL_SERVER_ERROR,
266 Json(json!({"error": "InternalError"})),
267 )
268 .into_response();
269 }
270 };
271 let did = if let Some(d) = &input.did {
272 if d.trim().is_empty() {
273 let rotation_key = std::env::var("PLC_ROTATION_KEY")
274 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
275 let genesis_result = match create_genesis_operation(
276 &signing_key,
277 &rotation_key,
278 &full_handle,
279 &pds_endpoint,
280 ) {
281 Ok(r) => r,
282 Err(e) => {
283 error!("Error creating PLC genesis operation: {:?}", e);
284 return (
285 StatusCode::INTERNAL_SERVER_ERROR,
286 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
287 )
288 .into_response();
289 }
290 };
291 let plc_client = PlcClient::new(None);
292 if let Err(e) = plc_client
293 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
294 .await
295 {
296 error!("Failed to submit PLC genesis operation: {:?}", e);
297 return (
298 StatusCode::BAD_GATEWAY,
299 Json(json!({
300 "error": "UpstreamError",
301 "message": format!("Failed to register DID with PLC directory: {}", e)
302 })),
303 )
304 .into_response();
305 }
306 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
307 genesis_result.did
308 } else if d.starts_with("did:web:") {
309 if let Err(e) = verify_did_web(d, &hostname, &input.handle).await {
310 return (
311 StatusCode::BAD_REQUEST,
312 Json(json!({"error": "InvalidDid", "message": e})),
313 )
314 .into_response();
315 }
316 d.clone()
317 } else if d.starts_with("did:plc:") && is_migration {
318 d.clone()
319 } else {
320 return (
321 StatusCode::BAD_REQUEST,
322 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
323 )
324 .into_response();
325 }
326 } else {
327 let rotation_key = std::env::var("PLC_ROTATION_KEY")
328 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
329 let genesis_result = match create_genesis_operation(
330 &signing_key,
331 &rotation_key,
332 &full_handle,
333 &pds_endpoint,
334 ) {
335 Ok(r) => r,
336 Err(e) => {
337 error!("Error creating PLC genesis operation: {:?}", e);
338 return (
339 StatusCode::INTERNAL_SERVER_ERROR,
340 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
341 )
342 .into_response();
343 }
344 };
345 let plc_client = PlcClient::new(None);
346 if let Err(e) = plc_client
347 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
348 .await
349 {
350 error!("Failed to submit PLC genesis operation: {:?}", e);
351 return (
352 StatusCode::BAD_GATEWAY,
353 Json(json!({
354 "error": "UpstreamError",
355 "message": format!("Failed to register DID with PLC directory: {}", e)
356 })),
357 )
358 .into_response();
359 }
360 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
361 genesis_result.did
362 };
363 let mut tx = match state.db.begin().await {
364 Ok(tx) => tx,
365 Err(e) => {
366 error!("Error starting transaction: {:?}", e);
367 return (
368 StatusCode::INTERNAL_SERVER_ERROR,
369 Json(json!({"error": "InternalError"})),
370 )
371 .into_response();
372 }
373 };
374 if is_migration {
375 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
376 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
377 .bind(&did)
378 .fetch_optional(&mut *tx)
379 .await
380 .unwrap_or(None);
381 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
382 if deactivated_at.is_some() {
383 info!(did = %did, old_handle = %old_handle, new_handle = %short_handle, "Preparing existing account for inbound migration");
384 let update_result: Result<_, sqlx::Error> =
385 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
386 .bind(short_handle)
387 .bind(account_id)
388 .execute(&mut *tx)
389 .await;
390 if let Err(e) = update_result {
391 if let Some(db_err) = e.as_database_error()
392 && db_err
393 .constraint()
394 .map(|c| c.contains("handle"))
395 .unwrap_or(false)
396 {
397 return (
398 StatusCode::BAD_REQUEST,
399 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
400 )
401 .into_response();
402 }
403 error!("Error reactivating account: {:?}", e);
404 return (
405 StatusCode::INTERNAL_SERVER_ERROR,
406 Json(json!({"error": "InternalError"})),
407 )
408 .into_response();
409 }
410 if let Err(e) = tx.commit().await {
411 error!("Error committing reactivation: {:?}", e);
412 return (
413 StatusCode::INTERNAL_SERVER_ERROR,
414 Json(json!({"error": "InternalError"})),
415 )
416 .into_response();
417 }
418 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
419 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
420 )
421 .bind(account_id)
422 .fetch_optional(&state.db)
423 .await
424 .unwrap_or(None);
425 let secret_key_bytes = match key_row {
426 Some((key_bytes, encryption_version)) => {
427 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
428 Ok(k) => k,
429 Err(e) => {
430 error!("Error decrypting key for reactivated account: {:?}", e);
431 return (
432 StatusCode::INTERNAL_SERVER_ERROR,
433 Json(json!({"error": "InternalError"})),
434 )
435 .into_response();
436 }
437 }
438 }
439 None => {
440 error!("No signing key found for reactivated account");
441 return (
442 StatusCode::INTERNAL_SERVER_ERROR,
443 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
444 )
445 .into_response();
446 }
447 };
448 let access_meta =
449 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
450 Ok(m) => m,
451 Err(e) => {
452 error!("Error creating access token: {:?}", e);
453 return (
454 StatusCode::INTERNAL_SERVER_ERROR,
455 Json(json!({"error": "InternalError"})),
456 )
457 .into_response();
458 }
459 };
460 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
461 &did,
462 &secret_key_bytes,
463 ) {
464 Ok(m) => m,
465 Err(e) => {
466 error!("Error creating refresh token: {:?}", e);
467 return (
468 StatusCode::INTERNAL_SERVER_ERROR,
469 Json(json!({"error": "InternalError"})),
470 )
471 .into_response();
472 }
473 };
474 let session_result: Result<_, sqlx::Error> = sqlx::query(
475 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
476 )
477 .bind(&did)
478 .bind(&access_meta.jti)
479 .bind(&refresh_meta.jti)
480 .bind(access_meta.expires_at)
481 .bind(refresh_meta.expires_at)
482 .execute(&state.db)
483 .await;
484 if let Err(e) = session_result {
485 error!("Error creating session: {:?}", e);
486 return (
487 StatusCode::INTERNAL_SERVER_ERROR,
488 Json(json!({"error": "InternalError"})),
489 )
490 .into_response();
491 }
492 return (
493 StatusCode::OK,
494 Json(CreateAccountOutput {
495 handle: full_handle.clone(),
496 did,
497 access_jwt: Some(access_meta.token),
498 refresh_jwt: Some(refresh_meta.token),
499 verification_required: false,
500 verification_channel: "email".to_string(),
501 }),
502 )
503 .into_response();
504 } else {
505 return (
506 StatusCode::BAD_REQUEST,
507 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
508 )
509 .into_response();
510 }
511 }
512 }
513 let exists_result: Option<(i32,)> =
514 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
515 .bind(short_handle)
516 .fetch_optional(&mut *tx)
517 .await
518 .unwrap_or(None);
519 if exists_result.is_some() {
520 return (
521 StatusCode::BAD_REQUEST,
522 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
523 )
524 .into_response();
525 }
526 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
527 .map(|v| v == "true" || v == "1")
528 .unwrap_or(false);
529 if invite_code_required
530 && input
531 .invite_code
532 .as_ref()
533 .map(|c| c.trim().is_empty())
534 .unwrap_or(true)
535 {
536 return (
537 StatusCode::BAD_REQUEST,
538 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
539 )
540 .into_response();
541 }
542 if let Some(code) = &input.invite_code
543 && !code.trim().is_empty()
544 {
545 let invite_query = sqlx::query!(
546 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
547 code
548 )
549 .fetch_optional(&mut *tx)
550 .await;
551 match invite_query {
552 Ok(Some(row)) => {
553 if row.available_uses <= 0 {
554 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
555 }
556 let update_invite = sqlx::query!(
557 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
558 code
559 )
560 .execute(&mut *tx)
561 .await;
562 if let Err(e) = update_invite {
563 error!("Error updating invite code: {:?}", e);
564 return (
565 StatusCode::INTERNAL_SERVER_ERROR,
566 Json(json!({"error": "InternalError"})),
567 )
568 .into_response();
569 }
570 }
571 Ok(None) => {
572 return (
573 StatusCode::BAD_REQUEST,
574 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
575 )
576 .into_response();
577 }
578 Err(e) => {
579 error!("Error checking invite code: {:?}", e);
580 return (
581 StatusCode::INTERNAL_SERVER_ERROR,
582 Json(json!({"error": "InternalError"})),
583 )
584 .into_response();
585 }
586 }
587 }
588 let password_hash = match hash(&input.password, DEFAULT_COST) {
589 Ok(h) => h,
590 Err(e) => {
591 error!("Error hashing password: {:?}", e);
592 return (
593 StatusCode::INTERNAL_SERVER_ERROR,
594 Json(json!({"error": "InternalError"})),
595 )
596 .into_response();
597 }
598 };
599 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
600 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30);
601 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
602 .fetch_one(&mut *tx)
603 .await
604 .map(|c| c.unwrap_or(0) == 0)
605 .unwrap_or(false);
606 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration {
607 Some(chrono::Utc::now())
608 } else {
609 None
610 };
611 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
612 r#"INSERT INTO users (
613 handle, email, did, password_hash,
614 preferred_comms_channel,
615 discord_id, telegram_username, signal_number,
616 is_admin, deactivated_at, email_verified
617 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
618 )
619 .bind(short_handle)
620 .bind(&email)
621 .bind(&did)
622 .bind(&password_hash)
623 .bind(verification_channel)
624 .bind(
625 input
626 .discord_id
627 .as_deref()
628 .map(|s| s.trim())
629 .filter(|s| !s.is_empty()),
630 )
631 .bind(
632 input
633 .telegram_username
634 .as_deref()
635 .map(|s| s.trim())
636 .filter(|s| !s.is_empty()),
637 )
638 .bind(
639 input
640 .signal_number
641 .as_deref()
642 .map(|s| s.trim())
643 .filter(|s| !s.is_empty()),
644 )
645 .bind(is_first_user)
646 .bind(deactivated_at)
647 .bind(is_migration)
648 .fetch_one(&mut *tx)
649 .await;
650 let user_id = match user_insert {
651 Ok((id,)) => id,
652 Err(e) => {
653 if let Some(db_err) = e.as_database_error()
654 && db_err.code().as_deref() == Some("23505")
655 {
656 let constraint = db_err.constraint().unwrap_or("");
657 if constraint.contains("handle") || constraint.contains("users_handle") {
658 return (
659 StatusCode::BAD_REQUEST,
660 Json(json!({
661 "error": "HandleNotAvailable",
662 "message": "Handle already taken"
663 })),
664 )
665 .into_response();
666 } else if constraint.contains("email") || constraint.contains("users_email") {
667 return (
668 StatusCode::BAD_REQUEST,
669 Json(json!({
670 "error": "InvalidEmail",
671 "message": "Email already registered"
672 })),
673 )
674 .into_response();
675 } else if constraint.contains("did") || constraint.contains("users_did") {
676 return (
677 StatusCode::BAD_REQUEST,
678 Json(json!({
679 "error": "AccountAlreadyExists",
680 "message": "An account with this DID already exists"
681 })),
682 )
683 .into_response();
684 }
685 }
686 error!("Error inserting user: {:?}", e);
687 return (
688 StatusCode::INTERNAL_SERVER_ERROR,
689 Json(json!({"error": "InternalError"})),
690 )
691 .into_response();
692 }
693 };
694
695 if !is_migration
696 && let Err(e) = sqlx::query!(
697 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, 'email', $2, $3, $4)",
698 user_id,
699 verification_code,
700 email,
701 code_expires_at
702 )
703 .execute(&mut *tx)
704 .await {
705 error!("Error inserting verification code: {:?}", e);
706 return (
707 StatusCode::INTERNAL_SERVER_ERROR,
708 Json(json!({"error": "InternalError"})),
709 )
710 .into_response();
711 }
712 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
713 Ok(enc) => enc,
714 Err(e) => {
715 error!("Error encrypting user key: {:?}", e);
716 return (
717 StatusCode::INTERNAL_SERVER_ERROR,
718 Json(json!({"error": "InternalError"})),
719 )
720 .into_response();
721 }
722 };
723 let key_insert = sqlx::query!(
724 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
725 user_id,
726 &encrypted_key_bytes[..],
727 crate::config::ENCRYPTION_VERSION
728 )
729 .execute(&mut *tx)
730 .await;
731 if let Err(e) = key_insert {
732 error!("Error inserting user key: {:?}", e);
733 return (
734 StatusCode::INTERNAL_SERVER_ERROR,
735 Json(json!({"error": "InternalError"})),
736 )
737 .into_response();
738 }
739 if let Some(key_id) = reserved_key_id {
740 let mark_used = sqlx::query!(
741 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
742 key_id
743 )
744 .execute(&mut *tx)
745 .await;
746 if let Err(e) = mark_used {
747 error!("Error marking reserved key as used: {:?}", e);
748 return (
749 StatusCode::INTERNAL_SERVER_ERROR,
750 Json(json!({"error": "InternalError"})),
751 )
752 .into_response();
753 }
754 }
755 let mst = Mst::new(Arc::new(state.block_store.clone()));
756 let mst_root = match mst.persist().await {
757 Ok(c) => c,
758 Err(e) => {
759 error!("Error persisting MST: {:?}", e);
760 return (
761 StatusCode::INTERNAL_SERVER_ERROR,
762 Json(json!({"error": "InternalError"})),
763 )
764 .into_response();
765 }
766 };
767 let did_obj = match Did::new(&did) {
768 Ok(d) => d,
769 Err(_) => {
770 return (
771 StatusCode::INTERNAL_SERVER_ERROR,
772 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
773 )
774 .into_response();
775 }
776 };
777 let rev = Tid::now(LimitedU32::MIN);
778 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None);
779 let signed_commit = match unsigned_commit.sign(&signing_key) {
780 Ok(c) => c,
781 Err(e) => {
782 error!("Error signing genesis commit: {:?}", e);
783 return (
784 StatusCode::INTERNAL_SERVER_ERROR,
785 Json(json!({"error": "InternalError"})),
786 )
787 .into_response();
788 }
789 };
790 let commit_bytes = match signed_commit.to_cbor() {
791 Ok(b) => b,
792 Err(e) => {
793 error!("Error serializing genesis commit: {:?}", e);
794 return (
795 StatusCode::INTERNAL_SERVER_ERROR,
796 Json(json!({"error": "InternalError"})),
797 )
798 .into_response();
799 }
800 };
801 let commit_cid = match state.block_store.put(&commit_bytes).await {
802 Ok(c) => c,
803 Err(e) => {
804 error!("Error saving genesis commit: {:?}", e);
805 return (
806 StatusCode::INTERNAL_SERVER_ERROR,
807 Json(json!({"error": "InternalError"})),
808 )
809 .into_response();
810 }
811 };
812 let commit_cid_str = commit_cid.to_string();
813 let repo_insert = sqlx::query!(
814 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
815 user_id,
816 commit_cid_str
817 )
818 .execute(&mut *tx)
819 .await;
820 if let Err(e) = repo_insert {
821 error!("Error initializing repo: {:?}", e);
822 return (
823 StatusCode::INTERNAL_SERVER_ERROR,
824 Json(json!({"error": "InternalError"})),
825 )
826 .into_response();
827 }
828 if let Some(code) = &input.invite_code
829 && !code.trim().is_empty()
830 {
831 let use_insert = sqlx::query!(
832 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
833 code,
834 user_id
835 )
836 .execute(&mut *tx)
837 .await;
838 if let Err(e) = use_insert {
839 error!("Error recording invite usage: {:?}", e);
840 return (
841 StatusCode::INTERNAL_SERVER_ERROR,
842 Json(json!({"error": "InternalError"})),
843 )
844 .into_response();
845 }
846 }
847 if let Err(e) = tx.commit().await {
848 error!("Error committing transaction: {:?}", e);
849 return (
850 StatusCode::INTERNAL_SERVER_ERROR,
851 Json(json!({"error": "InternalError"})),
852 )
853 .into_response();
854 }
855 if !is_migration {
856 if let Err(e) =
857 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&full_handle))
858 .await
859 {
860 warn!("Failed to sequence identity event for {}: {}", did, e);
861 }
862 if let Err(e) =
863 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
864 {
865 warn!("Failed to sequence account event for {}: {}", did, e);
866 }
867 let profile_record = json!({
868 "$type": "app.bsky.actor.profile",
869 "displayName": input.handle
870 });
871 if let Err(e) = crate::api::repo::record::create_record_internal(
872 &state,
873 &did,
874 "app.bsky.actor.profile",
875 "self",
876 &profile_record,
877 )
878 .await
879 {
880 warn!("Failed to create default profile for {}: {}", did, e);
881 }
882 if let Some(ref recipient) = verification_recipient
883 && let Err(e) = crate::comms::enqueue_signup_verification(
884 &state.db,
885 user_id,
886 verification_channel,
887 recipient,
888 &verification_code,
889 )
890 .await
891 {
892 warn!(
893 "Failed to enqueue signup verification notification: {:?}",
894 e
895 );
896 }
897 }
898
899 let (access_jwt, refresh_jwt) = if is_migration {
900 let access_meta =
901 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
902 Ok(m) => m,
903 Err(e) => {
904 error!("Error creating access token for migration: {:?}", e);
905 return (
906 StatusCode::INTERNAL_SERVER_ERROR,
907 Json(json!({"error": "InternalError"})),
908 )
909 .into_response();
910 }
911 };
912 let refresh_meta =
913 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
914 Ok(m) => m,
915 Err(e) => {
916 error!("Error creating refresh token for migration: {:?}", e);
917 return (
918 StatusCode::INTERNAL_SERVER_ERROR,
919 Json(json!({"error": "InternalError"})),
920 )
921 .into_response();
922 }
923 };
924 if let Err(e) = sqlx::query!(
925 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
926 did,
927 access_meta.jti,
928 refresh_meta.jti,
929 access_meta.expires_at,
930 refresh_meta.expires_at
931 )
932 .execute(&state.db)
933 .await
934 {
935 error!("Error creating session for migration: {:?}", e);
936 return (
937 StatusCode::INTERNAL_SERVER_ERROR,
938 Json(json!({"error": "InternalError"})),
939 )
940 .into_response();
941 }
942 (Some(access_meta.token), Some(refresh_meta.token))
943 } else {
944 (None, None)
945 };
946
947 (
948 StatusCode::OK,
949 Json(CreateAccountOutput {
950 handle: full_handle.clone(),
951 did,
952 access_jwt,
953 refresh_jwt,
954 verification_required: !is_migration,
955 verification_channel: verification_channel.to_string(),
956 }),
957 )
958 .into_response()
959}