this repo has no description
1use crate::api::error::ApiError; 2use crate::api::repo::record::utils::create_signed_commit; 3use crate::auth::BearerAuth; 4use crate::delegation::{self, DelegationActionType}; 5use crate::oauth::db as oauth_db; 6use crate::state::{AppState, RateLimitKind}; 7use crate::types::{Did, Handle, Nsid, Rkey}; 8use crate::util::extract_client_ip; 9use axum::{ 10 Json, 11 extract::{Query, State}, 12 http::{HeaderMap, StatusCode}, 13 response::{IntoResponse, Response}, 14}; 15use jacquard::types::{integer::LimitedU32, string::Tid}; 16use jacquard_repo::{mst::Mst, storage::BlockStore}; 17use serde::{Deserialize, Serialize}; 18use serde_json::json; 19use std::sync::Arc; 20use tracing::{error, info, warn}; 21 22#[derive(Debug, Serialize)] 23#[serde(rename_all = "camelCase")] 24pub struct ControllerInfo { 25 pub did: Did, 26 pub handle: Handle, 27 pub granted_scopes: String, 28 pub granted_at: chrono::DateTime<chrono::Utc>, 29 pub is_active: bool, 30} 31 32#[derive(Debug, Serialize)] 33pub struct ListControllersResponse { 34 pub controllers: Vec<ControllerInfo>, 35} 36 37pub async fn list_controllers(State(state): State<AppState>, auth: BearerAuth) -> Response { 38 let controllers = match delegation::get_delegations_for_account(&state.db, &auth.0.did).await { 39 Ok(c) => c, 40 Err(e) => { 41 tracing::error!("Failed to list controllers: {:?}", e); 42 return ApiError::InternalError(Some("Failed to list controllers".into())) 43 .into_response(); 44 } 45 }; 46 47 Json(ListControllersResponse { 48 controllers: controllers 49 .into_iter() 50 .map(|c| ControllerInfo { 51 did: c.did.into(), 52 handle: c.handle, 53 granted_scopes: c.granted_scopes, 54 granted_at: c.granted_at, 55 is_active: c.is_active, 56 }) 57 .collect(), 58 }) 59 .into_response() 60} 61 62#[derive(Debug, Deserialize)] 63pub struct AddControllerInput { 64 pub controller_did: Did, 65 pub granted_scopes: String, 66} 67 68pub async fn add_controller( 69 State(state): State<AppState>, 70 auth: BearerAuth, 71 Json(input): Json<AddControllerInput>, 72) -> Response { 73 if let Err(e) = delegation::scopes::validate_delegation_scopes(&input.granted_scopes) { 74 return ApiError::InvalidScopes(e).into_response(); 75 } 76 77 let controller_exists: bool = sqlx::query_scalar!( 78 r#"SELECT EXISTS(SELECT 1 FROM users WHERE did = $1) as "exists!""#, 79 input.controller_did.as_str() 80 ) 81 .fetch_one(&state.db) 82 .await 83 .unwrap_or(false); 84 85 if !controller_exists { 86 return ApiError::ControllerNotFound.into_response(); 87 } 88 89 match delegation::controls_any_accounts(&state.db, &auth.0.did).await { 90 Ok(true) => { 91 return ApiError::InvalidDelegation( 92 "Cannot add controllers to an account that controls other accounts".into(), 93 ) 94 .into_response(); 95 } 96 Err(e) => { 97 tracing::error!("Failed to check delegation status: {:?}", e); 98 return ApiError::InternalError(Some("Failed to verify delegation status".into())) 99 .into_response(); 100 } 101 Ok(false) => {} 102 } 103 104 match delegation::has_any_controllers(&state.db, &input.controller_did).await { 105 Ok(true) => { 106 return ApiError::InvalidDelegation( 107 "Cannot add a controlled account as a controller".into(), 108 ) 109 .into_response(); 110 } 111 Err(e) => { 112 tracing::error!("Failed to check controller status: {:?}", e); 113 return ApiError::InternalError(Some("Failed to verify controller status".into())) 114 .into_response(); 115 } 116 Ok(false) => {} 117 } 118 119 match delegation::create_delegation( 120 &state.db, 121 &auth.0.did, 122 &input.controller_did, 123 &input.granted_scopes, 124 &auth.0.did, 125 ) 126 .await 127 { 128 Ok(_) => { 129 let _ = delegation::log_delegation_action( 130 &state.db, 131 &auth.0.did, 132 &auth.0.did, 133 Some(&input.controller_did), 134 DelegationActionType::GrantCreated, 135 Some(serde_json::json!({ 136 "granted_scopes": input.granted_scopes 137 })), 138 None, 139 None, 140 ) 141 .await; 142 143 ( 144 StatusCode::OK, 145 Json(serde_json::json!({ 146 "success": true 147 })), 148 ) 149 .into_response() 150 } 151 Err(e) => { 152 tracing::error!("Failed to add controller: {:?}", e); 153 ApiError::InternalError(Some("Failed to add controller".into())).into_response() 154 } 155 } 156} 157 158#[derive(Debug, Deserialize)] 159pub struct RemoveControllerInput { 160 pub controller_did: Did, 161} 162 163pub async fn remove_controller( 164 State(state): State<AppState>, 165 auth: BearerAuth, 166 Json(input): Json<RemoveControllerInput>, 167) -> Response { 168 match delegation::revoke_delegation(&state.db, &auth.0.did, &input.controller_did, &auth.0.did) 169 .await 170 { 171 Ok(true) => { 172 let revoked_app_passwords = sqlx::query_scalar!( 173 r#"DELETE FROM app_passwords 174 WHERE user_id = (SELECT id FROM users WHERE did = $1) 175 AND created_by_controller_did = $2 176 RETURNING id"#, 177 &auth.0.did, 178 input.controller_did.as_str() 179 ) 180 .fetch_all(&state.db) 181 .await 182 .map(|r| r.len()) 183 .unwrap_or(0); 184 185 let revoked_oauth_tokens = oauth_db::revoke_tokens_for_controller( 186 &state.db, 187 &auth.0.did, 188 &input.controller_did, 189 ) 190 .await 191 .unwrap_or(0); 192 193 let _ = delegation::log_delegation_action( 194 &state.db, 195 &auth.0.did, 196 &auth.0.did, 197 Some(&input.controller_did), 198 DelegationActionType::GrantRevoked, 199 Some(serde_json::json!({ 200 "revoked_app_passwords": revoked_app_passwords, 201 "revoked_oauth_tokens": revoked_oauth_tokens 202 })), 203 None, 204 None, 205 ) 206 .await; 207 208 ( 209 StatusCode::OK, 210 Json(serde_json::json!({ 211 "success": true 212 })), 213 ) 214 .into_response() 215 } 216 Ok(false) => ApiError::DelegationNotFound.into_response(), 217 Err(e) => { 218 tracing::error!("Failed to remove controller: {:?}", e); 219 ApiError::InternalError(Some("Failed to remove controller".into())).into_response() 220 } 221 } 222} 223 224#[derive(Debug, Deserialize)] 225pub struct UpdateControllerScopesInput { 226 pub controller_did: Did, 227 pub granted_scopes: String, 228} 229 230pub async fn update_controller_scopes( 231 State(state): State<AppState>, 232 auth: BearerAuth, 233 Json(input): Json<UpdateControllerScopesInput>, 234) -> Response { 235 if let Err(e) = delegation::scopes::validate_delegation_scopes(&input.granted_scopes) { 236 return ApiError::InvalidScopes(e).into_response(); 237 } 238 239 match delegation::update_delegation_scopes( 240 &state.db, 241 &auth.0.did, 242 &input.controller_did, 243 &input.granted_scopes, 244 ) 245 .await 246 { 247 Ok(true) => { 248 let _ = delegation::log_delegation_action( 249 &state.db, 250 &auth.0.did, 251 &auth.0.did, 252 Some(&input.controller_did), 253 DelegationActionType::ScopesModified, 254 Some(serde_json::json!({ 255 "new_scopes": input.granted_scopes 256 })), 257 None, 258 None, 259 ) 260 .await; 261 262 ( 263 StatusCode::OK, 264 Json(serde_json::json!({ 265 "success": true 266 })), 267 ) 268 .into_response() 269 } 270 Ok(false) => ApiError::DelegationNotFound.into_response(), 271 Err(e) => { 272 tracing::error!("Failed to update controller scopes: {:?}", e); 273 ApiError::InternalError(Some("Failed to update controller scopes".into())) 274 .into_response() 275 } 276 } 277} 278 279#[derive(Debug, Serialize)] 280#[serde(rename_all = "camelCase")] 281pub struct DelegatedAccountInfo { 282 pub did: Did, 283 pub handle: Handle, 284 pub granted_scopes: String, 285 pub granted_at: chrono::DateTime<chrono::Utc>, 286} 287 288#[derive(Debug, Serialize)] 289pub struct ListControlledAccountsResponse { 290 pub accounts: Vec<DelegatedAccountInfo>, 291} 292 293pub async fn list_controlled_accounts(State(state): State<AppState>, auth: BearerAuth) -> Response { 294 let accounts = match delegation::get_accounts_controlled_by(&state.db, &auth.0.did).await { 295 Ok(a) => a, 296 Err(e) => { 297 tracing::error!("Failed to list controlled accounts: {:?}", e); 298 return ApiError::InternalError(Some("Failed to list controlled accounts".into())) 299 .into_response(); 300 } 301 }; 302 303 Json(ListControlledAccountsResponse { 304 accounts: accounts 305 .into_iter() 306 .map(|a| DelegatedAccountInfo { 307 did: a.did.into(), 308 handle: a.handle, 309 granted_scopes: a.granted_scopes, 310 granted_at: a.granted_at, 311 }) 312 .collect(), 313 }) 314 .into_response() 315} 316 317#[derive(Debug, Deserialize)] 318pub struct AuditLogParams { 319 #[serde(default = "default_limit")] 320 pub limit: i64, 321 #[serde(default)] 322 pub offset: i64, 323} 324 325fn default_limit() -> i64 { 326 50 327} 328 329#[derive(Debug, Serialize)] 330#[serde(rename_all = "camelCase")] 331pub struct AuditLogEntry { 332 pub id: String, 333 pub delegated_did: Did, 334 pub actor_did: Did, 335 pub controller_did: Option<Did>, 336 pub action_type: String, 337 pub action_details: Option<serde_json::Value>, 338 pub created_at: chrono::DateTime<chrono::Utc>, 339} 340 341#[derive(Debug, Serialize)] 342pub struct GetAuditLogResponse { 343 pub entries: Vec<AuditLogEntry>, 344 pub total: i64, 345} 346 347pub async fn get_audit_log( 348 State(state): State<AppState>, 349 auth: BearerAuth, 350 Query(params): Query<AuditLogParams>, 351) -> Response { 352 let limit = params.limit.clamp(1, 100); 353 let offset = params.offset.max(0); 354 355 let entries = 356 match delegation::audit::get_audit_log_for_account(&state.db, &auth.0.did, limit, offset) 357 .await 358 { 359 Ok(e) => e, 360 Err(e) => { 361 tracing::error!("Failed to get audit log: {:?}", e); 362 return ApiError::InternalError(Some("Failed to get audit log".into())) 363 .into_response(); 364 } 365 }; 366 367 let total = delegation::audit::count_audit_log_entries(&state.db, &auth.0.did) 368 .await 369 .unwrap_or_default(); 370 371 Json(GetAuditLogResponse { 372 entries: entries 373 .into_iter() 374 .map(|e| AuditLogEntry { 375 id: e.id.to_string(), 376 delegated_did: e.delegated_did.into(), 377 actor_did: e.actor_did.into(), 378 controller_did: e.controller_did.map(Into::into), 379 action_type: format!("{:?}", e.action_type), 380 action_details: e.action_details, 381 created_at: e.created_at, 382 }) 383 .collect(), 384 total, 385 }) 386 .into_response() 387} 388 389#[derive(Debug, Serialize)] 390pub struct ScopePresetInfo { 391 pub name: &'static str, 392 pub label: &'static str, 393 pub description: &'static str, 394 pub scopes: &'static str, 395} 396 397#[derive(Debug, Serialize)] 398pub struct GetScopePresetsResponse { 399 pub presets: Vec<ScopePresetInfo>, 400} 401 402pub async fn get_scope_presets() -> Response { 403 Json(GetScopePresetsResponse { 404 presets: delegation::SCOPE_PRESETS 405 .iter() 406 .map(|p| ScopePresetInfo { 407 name: p.name, 408 label: p.label, 409 description: p.description, 410 scopes: p.scopes, 411 }) 412 .collect(), 413 }) 414 .into_response() 415} 416 417#[derive(Debug, Deserialize)] 418#[serde(rename_all = "camelCase")] 419pub struct CreateDelegatedAccountInput { 420 pub handle: String, 421 pub email: Option<String>, 422 pub controller_scopes: String, 423 pub invite_code: Option<String>, 424} 425 426#[derive(Debug, Serialize)] 427#[serde(rename_all = "camelCase")] 428pub struct CreateDelegatedAccountResponse { 429 pub did: Did, 430 pub handle: Handle, 431} 432 433pub async fn create_delegated_account( 434 State(state): State<AppState>, 435 headers: HeaderMap, 436 auth: BearerAuth, 437 Json(input): Json<CreateDelegatedAccountInput>, 438) -> Response { 439 let client_ip = extract_client_ip(&headers); 440 if !state 441 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip) 442 .await 443 { 444 warn!(ip = %client_ip, "Delegated account creation rate limit exceeded"); 445 return ApiError::RateLimitExceeded(Some( 446 "Too many account creation attempts. Please try again later.".into(), 447 )) 448 .into_response(); 449 } 450 451 if let Err(e) = delegation::scopes::validate_delegation_scopes(&input.controller_scopes) { 452 return ApiError::InvalidScopes(e).into_response(); 453 } 454 455 match delegation::has_any_controllers(&state.db, &auth.0.did).await { 456 Ok(true) => { 457 return ApiError::InvalidDelegation( 458 "Cannot create delegated accounts from a controlled account".into(), 459 ) 460 .into_response(); 461 } 462 Err(e) => { 463 tracing::error!("Failed to check controller status: {:?}", e); 464 return ApiError::InternalError(Some("Failed to verify controller status".into())) 465 .into_response(); 466 } 467 Ok(false) => {} 468 } 469 470 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 471 let pds_suffix = format!(".{}", hostname); 472 473 let handle = if !input.handle.contains('.') || input.handle.ends_with(&pds_suffix) { 474 let handle_to_validate = if input.handle.ends_with(&pds_suffix) { 475 input 476 .handle 477 .strip_suffix(&pds_suffix) 478 .unwrap_or(&input.handle) 479 } else { 480 &input.handle 481 }; 482 match crate::api::validation::validate_short_handle(handle_to_validate) { 483 Ok(h) => format!("{}.{}", h, hostname), 484 Err(e) => { 485 return ApiError::InvalidRequest(e.to_string()).into_response(); 486 } 487 } 488 } else { 489 input.handle.to_lowercase() 490 }; 491 492 let email = input 493 .email 494 .as_ref() 495 .map(|e| e.trim().to_string()) 496 .filter(|e| !e.is_empty()); 497 if let Some(ref email) = email 498 && !crate::api::validation::is_valid_email(email) 499 { 500 return ApiError::InvalidEmail.into_response(); 501 } 502 503 if let Some(ref code) = input.invite_code { 504 let valid = sqlx::query_scalar!( 505 "SELECT available_uses > 0 AND NOT disabled FROM invite_codes WHERE code = $1", 506 code 507 ) 508 .fetch_optional(&state.db) 509 .await 510 .ok() 511 .flatten() 512 .unwrap_or(Some(false)); 513 514 if valid != Some(true) { 515 return ApiError::InvalidInviteCode.into_response(); 516 } 517 } else { 518 let invite_required = std::env::var("INVITE_CODE_REQUIRED") 519 .map(|v| v == "true" || v == "1") 520 .unwrap_or(false); 521 if invite_required { 522 return ApiError::InviteCodeRequired.into_response(); 523 } 524 } 525 526 use k256::ecdsa::SigningKey; 527 use rand::rngs::OsRng; 528 529 let pds_endpoint = format!("https://{}", hostname); 530 let secret_key = k256::SecretKey::random(&mut OsRng); 531 let secret_key_bytes = secret_key.to_bytes().to_vec(); 532 533 let signing_key = match SigningKey::from_slice(&secret_key_bytes) { 534 Ok(k) => k, 535 Err(e) => { 536 error!("Error creating signing key: {:?}", e); 537 return ApiError::InternalError(None).into_response(); 538 } 539 }; 540 541 let rotation_key = std::env::var("PLC_ROTATION_KEY") 542 .unwrap_or_else(|_| crate::plc::signing_key_to_did_key(&signing_key)); 543 544 let genesis_result = match crate::plc::create_genesis_operation( 545 &signing_key, 546 &rotation_key, 547 &handle, 548 &pds_endpoint, 549 ) { 550 Ok(r) => r, 551 Err(e) => { 552 error!("Error creating PLC genesis operation: {:?}", e); 553 return ApiError::InternalError(Some("Failed to create PLC operation".into())) 554 .into_response(); 555 } 556 }; 557 558 let plc_client = crate::plc::PlcClient::with_cache(None, Some(state.cache.clone())); 559 if let Err(e) = plc_client 560 .send_operation(&genesis_result.did, &genesis_result.signed_operation) 561 .await 562 { 563 error!("Failed to submit PLC genesis operation: {:?}", e); 564 return ApiError::UpstreamErrorMsg(format!( 565 "Failed to register DID with PLC directory: {}", 566 e 567 )) 568 .into_response(); 569 } 570 571 let did = Did::new_unchecked(&genesis_result.did); 572 let handle = Handle::new_unchecked(&handle); 573 info!(did = %did, handle = %handle, controller = %&auth.0.did, "Created DID for delegated account"); 574 575 let mut tx = match state.db.begin().await { 576 Ok(tx) => tx, 577 Err(e) => { 578 error!("Error starting transaction: {:?}", e); 579 return ApiError::InternalError(None).into_response(); 580 } 581 }; 582 583 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as( 584 r#"INSERT INTO users ( 585 handle, email, did, password_hash, password_required, 586 account_type, preferred_comms_channel 587 ) VALUES ($1, $2, $3, NULL, FALSE, 'delegated'::account_type, 'email'::comms_channel) RETURNING id"#, 588 ) 589 .bind(handle.as_str()) 590 .bind(&email) 591 .bind(did.as_str()) 592 .fetch_one(&mut *tx) 593 .await; 594 595 let user_id = match user_insert { 596 Ok((id,)) => id, 597 Err(e) => { 598 if let Some(db_err) = e.as_database_error() 599 && db_err.code().as_deref() == Some("23505") 600 { 601 let constraint = db_err.constraint().unwrap_or(""); 602 if constraint.contains("handle") { 603 return ApiError::HandleNotAvailable(None).into_response(); 604 } else if constraint.contains("email") { 605 return ApiError::EmailTaken.into_response(); 606 } 607 } 608 error!("Error inserting user: {:?}", e); 609 return ApiError::InternalError(None).into_response(); 610 } 611 }; 612 613 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) { 614 Ok(bytes) => bytes, 615 Err(e) => { 616 error!("Error encrypting signing key: {:?}", e); 617 return ApiError::InternalError(None).into_response(); 618 } 619 }; 620 621 if let Err(e) = sqlx::query!( 622 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())", 623 user_id, 624 &encrypted_key_bytes[..], 625 crate::config::ENCRYPTION_VERSION 626 ) 627 .execute(&mut *tx) 628 .await 629 { 630 error!("Error inserting user key: {:?}", e); 631 return ApiError::InternalError(None).into_response(); 632 } 633 634 if let Err(e) = sqlx::query!( 635 r#"INSERT INTO account_delegations (delegated_did, controller_did, granted_scopes, granted_by) 636 VALUES ($1, $2, $3, $4)"#, 637 did.as_str(), 638 auth.0.did.as_str(), 639 input.controller_scopes, 640 auth.0.did.as_str() 641 ) 642 .execute(&mut *tx) 643 .await 644 { 645 error!("Error creating initial delegation: {:?}", e); 646 return ApiError::InternalError(None).into_response(); 647 } 648 649 let mst = Mst::new(Arc::new(state.block_store.clone())); 650 let mst_root = match mst.persist().await { 651 Ok(c) => c, 652 Err(e) => { 653 error!("Error persisting MST: {:?}", e); 654 return ApiError::InternalError(None).into_response(); 655 } 656 }; 657 let rev = Tid::now(LimitedU32::MIN); 658 let (commit_bytes, _sig) = 659 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) { 660 Ok(result) => result, 661 Err(e) => { 662 error!("Error creating genesis commit: {:?}", e); 663 return ApiError::InternalError(None).into_response(); 664 } 665 }; 666 let commit_cid: cid::Cid = match state.block_store.put(&commit_bytes).await { 667 Ok(c) => c, 668 Err(e) => { 669 error!("Error saving genesis commit: {:?}", e); 670 return ApiError::InternalError(None).into_response(); 671 } 672 }; 673 let commit_cid_str = commit_cid.to_string(); 674 let rev_str = rev.as_ref().to_string(); 675 if let Err(e) = sqlx::query!( 676 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", 677 user_id, 678 commit_cid_str, 679 rev_str 680 ) 681 .execute(&mut *tx) 682 .await 683 { 684 error!("Error inserting repo: {:?}", e); 685 return ApiError::InternalError(None).into_response(); 686 } 687 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()]; 688 if let Err(e) = sqlx::query!( 689 r#" 690 INSERT INTO user_blocks (user_id, block_cid) 691 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 692 ON CONFLICT (user_id, block_cid) DO NOTHING 693 "#, 694 user_id, 695 &genesis_block_cids 696 ) 697 .execute(&mut *tx) 698 .await 699 { 700 error!("Error inserting user_blocks: {:?}", e); 701 return ApiError::InternalError(None).into_response(); 702 } 703 704 if let Some(ref code) = input.invite_code { 705 let _ = sqlx::query!( 706 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1", 707 code 708 ) 709 .execute(&mut *tx) 710 .await; 711 712 let _ = sqlx::query!( 713 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)", 714 code, 715 user_id 716 ) 717 .execute(&mut *tx) 718 .await; 719 } 720 721 if let Err(e) = tx.commit().await { 722 error!("Error committing transaction: {:?}", e); 723 return ApiError::InternalError(None).into_response(); 724 } 725 726 if let Err(e) = 727 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await 728 { 729 warn!("Failed to sequence identity event for {}: {}", did, e); 730 } 731 if let Err(e) = crate::api::repo::record::sequence_account_event(&state, &did, true, None).await 732 { 733 warn!("Failed to sequence account event for {}: {}", did, e); 734 } 735 736 let profile_record = json!({ 737 "$type": "app.bsky.actor.profile", 738 "displayName": handle 739 }); 740 let profile_collection = Nsid::new_unchecked("app.bsky.actor.profile"); 741 let profile_rkey = Rkey::new_unchecked("self"); 742 if let Err(e) = crate::api::repo::record::create_record_internal( 743 &state, 744 &did, 745 &profile_collection, 746 &profile_rkey, 747 &profile_record, 748 ) 749 .await 750 { 751 warn!("Failed to create default profile for {}: {}", did, e); 752 } 753 754 let _ = delegation::log_delegation_action( 755 &state.db, 756 &did, 757 &auth.0.did, 758 Some(&auth.0.did), 759 DelegationActionType::GrantCreated, 760 Some(json!({ 761 "account_created": true, 762 "granted_scopes": input.controller_scopes 763 })), 764 None, 765 None, 766 ) 767 .await; 768 769 info!(did = %did, handle = %handle, controller = %&auth.0.did, "Delegated account created"); 770 771 Json(CreateDelegatedAccountResponse { 772 did: did.into(), 773 handle: handle.into(), 774 }) 775 .into_response() 776}