Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql
at main 1462 lines 60 kB view raw
1/// Mutation resolvers for admin GraphQL API 2import admin_session as session 3import backfill 4import backfill_state 5import database/executor.{type Executor} 6import database/repositories/actors 7import database/repositories/config as config_repo 8import database/repositories/jetstream_activity 9import database/repositories/label_definitions 10import database/repositories/labels 11import database/repositories/lexicons 12import database/repositories/oauth_clients 13import database/repositories/records 14import database/repositories/reports 15import database/types 16import gleam/erlang/process.{type Subject} 17import gleam/list 18import gleam/option.{type Option, None, Some} 19import gleam/string 20import graphql/admin/converters 21import graphql/admin/types as admin_types 22import importer 23import jetstream_consumer 24import lib/oauth/did_cache 25import lib/oauth/scopes/validator as scope_validator 26import lib/oauth/token_generator 27import lib/oauth/validator 28import logging 29import swell/schema 30import swell/value 31import wisp 32 33/// Validate that a string is a valid DID format 34/// Valid format: did:<method>:<identifier> 35fn is_valid_did(s: String) -> Bool { 36 case string.starts_with(s, "did:") { 37 False -> False 38 True -> { 39 let parts = string.split(s, ":") 40 case parts { 41 ["did", method, identifier, ..] -> method != "" && identifier != "" 42 _ -> False 43 } 44 } 45 } 46} 47 48/// Validate that all requested scopes are in the supported list 49fn validate_scope_against_supported( 50 requested_scope: String, 51 supported_scopes: List(String), 52) -> Result(Nil, String) { 53 let requested = 54 requested_scope 55 |> string.split(" ") 56 |> list.map(string.trim) 57 |> list.filter(fn(s) { !string.is_empty(s) }) 58 59 let invalid = 60 list.filter(requested, fn(s) { !list.contains(supported_scopes, s) }) 61 62 case invalid { 63 [] -> Ok(Nil) 64 _ -> 65 Error( 66 "Unsupported scope(s): " 67 <> string.join(invalid, ", ") 68 <> ". Supported: " 69 <> string.join(supported_scopes, ", "), 70 ) 71 } 72} 73 74/// Build the Mutation root type with all mutation resolvers 75pub fn mutation_type( 76 conn: Executor, 77 req: wisp.Request, 78 jetstream_subject: Option(Subject(jetstream_consumer.ManagerMessage)), 79 did_cache: Subject(did_cache.Message), 80 oauth_supported_scopes: List(String), 81 backfill_state_subject: Subject(backfill_state.Message), 82) -> schema.Type { 83 schema.object_type("Mutation", "Root mutation type", [ 84 // updateSettings mutation - consolidated settings update 85 schema.field_with_args( 86 "updateSettings", 87 schema.non_null(admin_types.settings_type()), 88 "Update system settings (domain authority and/or admin DIDs)", 89 [ 90 schema.argument( 91 "domainAuthority", 92 schema.string_type(), 93 "New domain authority value (optional)", 94 None, 95 ), 96 schema.argument( 97 "adminDids", 98 schema.list_type(schema.non_null(schema.string_type())), 99 "New admin DIDs list (optional)", 100 None, 101 ), 102 schema.argument( 103 "relayUrl", 104 schema.string_type(), 105 "New relay URL (optional)", 106 None, 107 ), 108 schema.argument( 109 "plcDirectoryUrl", 110 schema.string_type(), 111 "New PLC directory URL (optional)", 112 None, 113 ), 114 schema.argument( 115 "jetstreamUrl", 116 schema.string_type(), 117 "New Jetstream URL (optional)", 118 None, 119 ), 120 schema.argument( 121 "oauthSupportedScopes", 122 schema.string_type(), 123 "New OAuth supported scopes space-separated (optional)", 124 None, 125 ), 126 ], 127 fn(ctx) { 128 // Check admin privileges 129 case session.get_current_session(req, conn, did_cache) { 130 Error(_) -> Error("Authentication required") 131 Ok(sess) -> { 132 case config_repo.is_admin(conn, sess.did) { 133 False -> Error("Admin privileges required") 134 True -> { 135 // Update domain authority if provided 136 let domain_authority_result = case 137 schema.get_argument(ctx, "domainAuthority") 138 { 139 Some(value.String(authority)) -> { 140 // Validate not empty 141 case string.trim(authority) { 142 "" -> Error("Domain authority cannot be empty") 143 trimmed_authority -> { 144 case 145 config_repo.set( 146 conn, 147 "domain_authority", 148 trimmed_authority, 149 ) 150 { 151 Ok(_) -> { 152 // Restart Jetstream consumer 153 case jetstream_subject { 154 Some(consumer) -> { 155 logging.log( 156 logging.Info, 157 "[updateSettings] Restarting Jetstream consumer...", 158 ) 159 let _ = jetstream_consumer.restart(consumer) 160 Nil 161 } 162 None -> Nil 163 } 164 Ok(Nil) 165 } 166 Error(_) -> Error("Failed to update domain authority") 167 } 168 } 169 } 170 } 171 _ -> Ok(Nil) 172 } 173 174 case domain_authority_result { 175 Error(err) -> Error(err) 176 Ok(_) -> { 177 // Update admin DIDs if provided 178 let admin_dids_result = case 179 schema.get_argument(ctx, "adminDids") 180 { 181 Some(value.List(dids)) -> { 182 let did_strings = 183 list.filter_map(dids, fn(d) { 184 case d { 185 value.String(s) -> Ok(s) 186 _ -> Error(Nil) 187 } 188 }) 189 // Validate at least one admin 190 case did_strings { 191 [] -> Error("Cannot have zero admins") 192 _ -> { 193 // Validate all DIDs have correct format 194 let invalid_dids = 195 list.filter(did_strings, fn(d) { 196 !is_valid_did(d) 197 }) 198 case invalid_dids { 199 [first_invalid, ..] -> 200 Error( 201 "Invalid DID format: " 202 <> first_invalid 203 <> ". DIDs must start with 'did:' followed by method and identifier (e.g., did:plc:abc123)", 204 ) 205 [] -> { 206 case 207 config_repo.set_admin_dids(conn, did_strings) 208 { 209 Ok(_) -> Ok(Nil) 210 Error(_) -> 211 Error("Failed to update admin DIDs") 212 } 213 } 214 } 215 } 216 } 217 } 218 _ -> Ok(Nil) 219 } 220 221 case admin_dids_result { 222 Error(err) -> Error(err) 223 Ok(_) -> { 224 // Update relay URL if provided 225 let relay_url_result = case 226 schema.get_argument(ctx, "relayUrl") 227 { 228 Some(value.String(url)) -> { 229 case string.trim(url) { 230 "" -> Error("Relay URL cannot be empty") 231 trimmed_url -> { 232 case 233 config_repo.set_relay_url(conn, trimmed_url) 234 { 235 Ok(_) -> Ok(Nil) 236 Error(_) -> 237 Error("Failed to update relay URL") 238 } 239 } 240 } 241 } 242 _ -> Ok(Nil) 243 } 244 245 case relay_url_result { 246 Error(err) -> Error(err) 247 Ok(_) -> { 248 // Update PLC directory URL if provided 249 let plc_url_result = case 250 schema.get_argument(ctx, "plcDirectoryUrl") 251 { 252 Some(value.String(url)) -> { 253 case string.trim(url) { 254 "" -> 255 Error("PLC directory URL cannot be empty") 256 trimmed_url -> { 257 case 258 config_repo.set_plc_directory_url( 259 conn, 260 trimmed_url, 261 ) 262 { 263 Ok(_) -> Ok(True) 264 Error(_) -> 265 Error( 266 "Failed to update PLC directory URL", 267 ) 268 } 269 } 270 } 271 } 272 _ -> Ok(False) 273 } 274 275 case plc_url_result { 276 Error(err) -> Error(err) 277 Ok(plc_changed) -> { 278 // Restart Jetstream if PLC URL changed 279 case plc_changed { 280 True -> { 281 case jetstream_subject { 282 Some(consumer) -> { 283 logging.log( 284 logging.Info, 285 "[updateSettings] Restarting Jetstream consumer due to PLC URL change", 286 ) 287 case 288 jetstream_consumer.restart(consumer) 289 { 290 Ok(_) -> 291 logging.log( 292 logging.Info, 293 "[updateSettings] Jetstream consumer restarted", 294 ) 295 Error(err) -> 296 logging.log( 297 logging.Error, 298 "[updateSettings] Failed to restart Jetstream: " 299 <> err, 300 ) 301 } 302 } 303 None -> Nil 304 } 305 } 306 False -> Nil 307 } 308 // Update Jetstream URL if provided and restart consumer 309 let jetstream_url_result = case 310 schema.get_argument(ctx, "jetstreamUrl") 311 { 312 Some(value.String(url)) -> { 313 case string.trim(url) { 314 "" -> 315 Error("Jetstream URL cannot be empty") 316 trimmed_url -> { 317 case 318 config_repo.set_jetstream_url( 319 conn, 320 trimmed_url, 321 ) 322 { 323 Ok(_) -> Ok(True) 324 Error(_) -> 325 Error( 326 "Failed to update Jetstream URL", 327 ) 328 } 329 } 330 } 331 } 332 _ -> Ok(False) 333 } 334 335 case jetstream_url_result { 336 Error(err) -> Error(err) 337 Ok(jetstream_url_changed) -> { 338 // If Jetstream URL changed, restart consumer 339 case jetstream_url_changed { 340 True -> { 341 case jetstream_subject { 342 Some(consumer) -> { 343 logging.log( 344 logging.Info, 345 "[updateSettings] Restarting Jetstream consumer due to URL change", 346 ) 347 case 348 jetstream_consumer.restart( 349 consumer, 350 ) 351 { 352 Ok(_) -> 353 logging.log( 354 logging.Info, 355 "[updateSettings] Jetstream consumer restarted", 356 ) 357 Error(err) -> 358 logging.log( 359 logging.Error, 360 "[updateSettings] Failed to restart Jetstream: " 361 <> err, 362 ) 363 } 364 } 365 None -> Nil 366 } 367 } 368 False -> Nil 369 } 370 371 // Update OAuth supported scopes if provided (with validation) 372 let oauth_scopes_result = case 373 schema.get_argument( 374 ctx, 375 "oauthSupportedScopes", 376 ) 377 { 378 Some(value.String(scopes)) -> { 379 case string.trim(scopes) { 380 "" -> 381 Error( 382 "OAuth supported scopes cannot be empty", 383 ) 384 trimmed_scopes -> { 385 // Validate scope format (accepts any valid ATProto scope) 386 case 387 scope_validator.validate_scope_format( 388 trimmed_scopes, 389 ) 390 { 391 Ok(_) -> { 392 // Validation passed, save to database 393 case 394 config_repo.set_oauth_supported_scopes( 395 conn, 396 trimmed_scopes, 397 ) 398 { 399 Ok(_) -> Ok(Nil) 400 Error(_) -> 401 Error( 402 "Failed to save OAuth scopes", 403 ) 404 } 405 } 406 Error(err) -> { 407 logging.log( 408 logging.Error, 409 "[updateSettings] Invalid OAuth scope: " 410 <> string.inspect(err), 411 ) 412 Error("Invalid OAuth scope") 413 } 414 } 415 } 416 } 417 } 418 _ -> Ok(Nil) 419 } 420 421 case oauth_scopes_result { 422 Error(err) -> Error(err) 423 Ok(_) -> { 424 // Return updated settings 425 let final_authority = case 426 config_repo.get( 427 conn, 428 "domain_authority", 429 ) 430 { 431 Ok(a) -> a 432 Error(_) -> "" 433 } 434 let final_admin_dids = 435 config_repo.get_admin_dids(conn) 436 let final_relay_url = 437 config_repo.get_relay_url(conn) 438 let final_plc_directory_url = 439 config_repo.get_plc_directory_url( 440 conn, 441 ) 442 let final_jetstream_url = 443 config_repo.get_jetstream_url(conn) 444 let final_oauth_scopes = 445 config_repo.get_oauth_supported_scopes( 446 conn, 447 ) 448 449 Ok(converters.settings_to_value( 450 final_authority, 451 final_admin_dids, 452 final_relay_url, 453 final_plc_directory_url, 454 final_jetstream_url, 455 final_oauth_scopes, 456 )) 457 } 458 } 459 } 460 } 461 } 462 } 463 } 464 } 465 } 466 } 467 } 468 } 469 } 470 } 471 } 472 } 473 }, 474 ), 475 // uploadLexicons mutation 476 schema.field_with_args( 477 "uploadLexicons", 478 schema.non_null(schema.boolean_type()), 479 "Upload and import lexicons from base64-encoded ZIP", 480 [ 481 schema.argument( 482 "zipBase64", 483 schema.non_null(schema.string_type()), 484 "Base64-encoded ZIP file containing lexicon JSON files", 485 None, 486 ), 487 ], 488 fn(ctx) { 489 case schema.get_argument(ctx, "zipBase64") { 490 Some(value.String(zip_base64)) -> { 491 // Import lexicons from base64-encoded ZIP 492 case importer.import_lexicons_from_base64_zip(zip_base64, conn) { 493 Ok(_stats) -> { 494 // Restart Jetstream consumer to pick up newly imported collections 495 case jetstream_subject { 496 Some(consumer) -> { 497 logging.log( 498 logging.Info, 499 "[uploadLexicons] Restarting Jetstream consumer with new lexicons...", 500 ) 501 case jetstream_consumer.restart(consumer) { 502 Ok(_) -> { 503 logging.log( 504 logging.Info, 505 "[uploadLexicons] Jetstream consumer restarted successfully", 506 ) 507 Ok(value.Boolean(True)) 508 } 509 Error(err) -> { 510 logging.log( 511 logging.Error, 512 "[uploadLexicons] Failed to restart Jetstream consumer: " 513 <> err, 514 ) 515 Error( 516 "Lexicons imported but failed to restart Jetstream consumer: " 517 <> err, 518 ) 519 } 520 } 521 } 522 None -> { 523 logging.log( 524 logging.Info, 525 "[uploadLexicons] Jetstream consumer not running, skipping restart", 526 ) 527 Ok(value.Boolean(True)) 528 } 529 } 530 } 531 Error(err) -> Error("Failed to import lexicons: " <> err) 532 } 533 } 534 _ -> Error("Invalid zipBase64 argument") 535 } 536 }, 537 ), 538 // resetAll mutation 539 schema.field_with_args( 540 "resetAll", 541 schema.non_null(schema.boolean_type()), 542 "Reset all data (requires RESET confirmation and admin privileges)", 543 [ 544 schema.argument( 545 "confirm", 546 schema.non_null(schema.string_type()), 547 "Must be the string 'RESET' to confirm", 548 None, 549 ), 550 ], 551 fn(ctx) { 552 // Check if user is authenticated and admin 553 case session.get_current_session(req, conn, did_cache) { 554 Ok(sess) -> { 555 case config_repo.is_admin(conn, sess.did) { 556 True -> { 557 case schema.get_argument(ctx, "confirm") { 558 Some(value.String("RESET")) -> { 559 // Call multiple database functions to reset all data 560 let _ = records.delete_all(conn) 561 let _ = actors.delete_all(conn) 562 let _ = lexicons.delete_all(conn) 563 let _ = config_repo.delete_domain_authority(conn) 564 let _ = jetstream_activity.delete_all(conn) 565 566 // Restart Jetstream consumer after reset 567 case jetstream_subject { 568 Some(consumer) -> { 569 logging.log( 570 logging.Info, 571 "[resetAll] Restarting Jetstream consumer after reset...", 572 ) 573 let _ = jetstream_consumer.restart(consumer) 574 Nil 575 } 576 None -> Nil 577 } 578 579 Ok(value.Boolean(True)) 580 } 581 Some(value.String(_)) -> Error("Confirmation must be 'RESET'") 582 _ -> Error("Invalid confirm argument") 583 } 584 } 585 False -> Error("Admin privileges required to reset all data") 586 } 587 } 588 Error(_) -> Error("Authentication required to reset all data") 589 } 590 }, 591 ), 592 // triggerBackfill mutation 593 schema.field( 594 "triggerBackfill", 595 schema.non_null(schema.boolean_type()), 596 "Trigger a background backfill operation for all collections (admin only)", 597 fn(_ctx) { 598 // Check if user is authenticated and admin 599 case session.get_current_session(req, conn, did_cache) { 600 Ok(sess) -> { 601 case config_repo.is_admin(conn, sess.did) { 602 True -> { 603 // Mark backfill as started 604 process.send( 605 backfill_state_subject, 606 backfill_state.StartBackfill, 607 ) 608 609 // Spawn background process to run backfill 610 process.spawn_unlinked(fn() { 611 logging.log( 612 logging.Info, 613 "[triggerBackfill] Starting background backfill...", 614 ) 615 616 // Get all record-type collections from database (only backfill records, not queries/procedures) 617 let collections = case lexicons.get_record_types(conn) { 618 Ok(lexicon_list) -> 619 list.map(lexicon_list, fn(lex) { lex.id }) 620 Error(_) -> [] 621 } 622 623 // Get domain authority to determine external collections 624 let domain_authority = case 625 config_repo.get(conn, "domain_authority") 626 { 627 Ok(authority) -> authority 628 Error(_) -> "" 629 } 630 631 // Split collections into primary and external 632 let #(primary_collections, external_collections) = 633 list.partition(collections, fn(collection) { 634 backfill.nsid_matches_domain_authority( 635 collection, 636 domain_authority, 637 ) 638 }) 639 640 // Run backfill with default config and empty repo list (fetches from relay) 641 let config = backfill.default_config(conn) 642 backfill.backfill_collections( 643 [], 644 primary_collections, 645 external_collections, 646 config, 647 conn, 648 ) 649 650 logging.log( 651 logging.Info, 652 "[triggerBackfill] Background backfill completed", 653 ) 654 655 // Mark backfill as stopped 656 process.send( 657 backfill_state_subject, 658 backfill_state.StopBackfill, 659 ) 660 }) 661 662 // Return immediately 663 Ok(value.Boolean(True)) 664 } 665 False -> Error("Admin privileges required to trigger backfill") 666 } 667 } 668 Error(_) -> Error("Authentication required to trigger backfill") 669 } 670 }, 671 ), 672 // backfillActor mutation - sync a specific actor's collections 673 schema.field_with_args( 674 "backfillActor", 675 schema.non_null(schema.boolean_type()), 676 "Trigger a background backfill for a specific actor's collections", 677 [ 678 schema.argument( 679 "did", 680 schema.non_null(schema.string_type()), 681 "The DID of the actor to backfill", 682 None, 683 ), 684 ], 685 fn(ctx) { 686 // Check if user is authenticated (any logged-in user can trigger) 687 case session.get_current_session(req, conn, did_cache) { 688 Ok(_sess) -> { 689 case schema.get_argument(ctx, "did") { 690 Some(value.String(did)) -> { 691 // Get all record-type collections from database 692 let collections = case lexicons.get_record_types(conn) { 693 Ok(lexicon_list) -> list.map(lexicon_list, fn(lex) { lex.id }) 694 Error(_) -> [] 695 } 696 697 // Get domain authority to determine external collections 698 let domain_authority = case 699 config_repo.get(conn, "domain_authority") 700 { 701 Ok(authority) -> authority 702 Error(_) -> "" 703 } 704 705 // Split collections into primary and external 706 let #(primary_collections, external_collections) = 707 list.partition(collections, fn(collection) { 708 backfill.nsid_matches_domain_authority( 709 collection, 710 domain_authority, 711 ) 712 }) 713 714 // Get PLC URL from database config 715 let plc_url = config_repo.get_plc_directory_url(conn) 716 717 // Spawn background process to run backfill for this actor 718 process.spawn_unlinked(fn() { 719 logging.log( 720 logging.Info, 721 "[backfillActor] Starting background backfill for " <> did, 722 ) 723 724 case 725 backfill.rescue(fn() { 726 backfill.backfill_collections_for_actor( 727 conn, 728 did, 729 primary_collections, 730 external_collections, 731 plc_url, 732 ) 733 }) 734 { 735 Ok(_) -> 736 logging.log( 737 logging.Info, 738 "[backfillActor] Background backfill completed for " 739 <> did, 740 ) 741 Error(err) -> 742 logging.log( 743 logging.Error, 744 "[backfillActor] Background backfill FAILED for " 745 <> did 746 <> ": " 747 <> string.inspect(err), 748 ) 749 } 750 }) 751 752 // Return immediately 753 Ok(value.Boolean(True)) 754 } 755 _ -> Error("DID argument is required") 756 } 757 } 758 Error(_) -> Error("Authentication required to trigger backfill") 759 } 760 }, 761 ), 762 // createOAuthClient mutation 763 schema.field_with_args( 764 "createOAuthClient", 765 schema.non_null(admin_types.oauth_client_type()), 766 "Create a new OAuth client (admin only)", 767 [ 768 schema.argument( 769 "clientName", 770 schema.non_null(schema.string_type()), 771 "Client display name", 772 None, 773 ), 774 schema.argument( 775 "clientType", 776 schema.non_null(schema.string_type()), 777 "PUBLIC or CONFIDENTIAL", 778 None, 779 ), 780 schema.argument( 781 "redirectUris", 782 schema.non_null( 783 schema.list_type(schema.non_null(schema.string_type())), 784 ), 785 "Allowed redirect URIs", 786 None, 787 ), 788 schema.argument( 789 "scope", 790 schema.non_null(schema.string_type()), 791 "OAuth scopes (space-separated)", 792 None, 793 ), 794 ], 795 fn(ctx) { 796 case session.get_current_session(req, conn, did_cache) { 797 Ok(sess) -> { 798 case config_repo.is_admin(conn, sess.did) { 799 True -> { 800 case 801 schema.get_argument(ctx, "clientName"), 802 schema.get_argument(ctx, "clientType"), 803 schema.get_argument(ctx, "redirectUris"), 804 schema.get_argument(ctx, "scope") 805 { 806 Some(value.String(name)), 807 Some(value.String(type_str)), 808 Some(value.List(uris)), 809 Some(value.String(scope)) 810 -> { 811 // Validate client name 812 let trimmed_name = string.trim(name) 813 case trimmed_name { 814 "" -> Error("Client name cannot be empty") 815 _ -> { 816 let client_type = case string.uppercase(type_str) { 817 "CONFIDENTIAL" -> types.Confidential 818 _ -> types.Public 819 } 820 let redirect_uris = 821 list.filter_map(uris, fn(u) { 822 case u { 823 value.String(s) -> 824 case string.trim(s) { 825 "" -> Error(Nil) 826 trimmed -> Ok(trimmed) 827 } 828 _ -> Error(Nil) 829 } 830 }) 831 // Validate at least one redirect URI 832 case redirect_uris { 833 [] -> Error("At least one redirect URI is required") 834 _ -> { 835 // Validate each redirect URI format 836 let invalid_uri = 837 list.find(redirect_uris, fn(uri) { 838 case validator.validate_redirect_uri(uri) { 839 Ok(_) -> False 840 Error(_) -> True 841 } 842 }) 843 case invalid_uri { 844 Ok(uri) -> 845 Error( 846 "Invalid redirect URI: " 847 <> uri 848 <> ". URIs must use https://, or http:// only for localhost.", 849 ) 850 Error(_) -> { 851 // Validate scope against supported scopes 852 case 853 validate_scope_against_supported( 854 scope, 855 oauth_supported_scopes, 856 ) 857 { 858 Error(err) -> Error(err) 859 Ok(_) -> { 860 let now = 861 token_generator.current_timestamp() 862 let client_id = 863 token_generator.generate_client_id() 864 let client_secret = case client_type { 865 types.Confidential -> 866 Some( 867 token_generator.generate_client_secret(), 868 ) 869 types.Public -> None 870 } 871 let client = 872 types.OAuthClient( 873 client_id: client_id, 874 client_secret: client_secret, 875 client_name: trimmed_name, 876 redirect_uris: redirect_uris, 877 grant_types: [ 878 types.AuthorizationCode, 879 types.RefreshToken, 880 ], 881 response_types: [types.Code], 882 scope: case string.trim(scope) { 883 "" -> None 884 s -> Some(s) 885 }, 886 token_endpoint_auth_method: case 887 client_type 888 { 889 types.Confidential -> 890 types.ClientSecretPost 891 types.Public -> types.AuthNone 892 }, 893 client_type: client_type, 894 created_at: now, 895 updated_at: now, 896 metadata: "{}", 897 access_token_expiration: 3600, 898 refresh_token_expiration: 86_400 * 30, 899 require_redirect_exact: True, 900 registration_access_token: None, 901 jwks: None, 902 ) 903 case oauth_clients.insert(conn, client) { 904 Ok(_) -> 905 Ok(converters.oauth_client_to_value( 906 client, 907 )) 908 Error(_) -> 909 Error("Failed to create OAuth client") 910 } 911 } 912 } 913 } 914 } 915 } 916 } 917 } 918 } 919 } 920 _, _, _, _ -> Error("Invalid arguments") 921 } 922 } 923 False -> Error("Admin privileges required") 924 } 925 } 926 Error(_) -> Error("Authentication required") 927 } 928 }, 929 ), 930 // updateOAuthClient mutation 931 schema.field_with_args( 932 "updateOAuthClient", 933 schema.non_null(admin_types.oauth_client_type()), 934 "Update an existing OAuth client (admin only)", 935 [ 936 schema.argument( 937 "clientId", 938 schema.non_null(schema.string_type()), 939 "Client ID to update", 940 None, 941 ), 942 schema.argument( 943 "clientName", 944 schema.non_null(schema.string_type()), 945 "New client display name", 946 None, 947 ), 948 schema.argument( 949 "redirectUris", 950 schema.non_null( 951 schema.list_type(schema.non_null(schema.string_type())), 952 ), 953 "New redirect URIs", 954 None, 955 ), 956 schema.argument( 957 "scope", 958 schema.non_null(schema.string_type()), 959 "OAuth scopes (space-separated)", 960 None, 961 ), 962 ], 963 fn(ctx) { 964 case session.get_current_session(req, conn, did_cache) { 965 Ok(sess) -> { 966 case config_repo.is_admin(conn, sess.did) { 967 True -> { 968 case 969 schema.get_argument(ctx, "clientId"), 970 schema.get_argument(ctx, "clientName"), 971 schema.get_argument(ctx, "redirectUris"), 972 schema.get_argument(ctx, "scope") 973 { 974 Some(value.String(client_id)), 975 Some(value.String(name)), 976 Some(value.List(uris)), 977 Some(value.String(scope)) 978 -> { 979 // Validate client name 980 let trimmed_name = string.trim(name) 981 case trimmed_name { 982 "" -> Error("Client name cannot be empty") 983 _ -> { 984 case oauth_clients.get(conn, client_id) { 985 Ok(Some(existing)) -> { 986 let redirect_uris = 987 list.filter_map(uris, fn(u) { 988 case u { 989 value.String(s) -> 990 case string.trim(s) { 991 "" -> Error(Nil) 992 trimmed -> Ok(trimmed) 993 } 994 _ -> Error(Nil) 995 } 996 }) 997 // Validate at least one redirect URI 998 case redirect_uris { 999 [] -> 1000 Error("At least one redirect URI is required") 1001 _ -> { 1002 // Validate each redirect URI format 1003 let invalid_uri = 1004 list.find(redirect_uris, fn(uri) { 1005 case validator.validate_redirect_uri(uri) { 1006 Ok(_) -> False 1007 Error(_) -> True 1008 } 1009 }) 1010 case invalid_uri { 1011 Ok(uri) -> 1012 Error( 1013 "Invalid redirect URI: " 1014 <> uri 1015 <> ". URIs must use https://, or http:// only for localhost.", 1016 ) 1017 Error(_) -> { 1018 // Validate scope against supported scopes 1019 case 1020 validate_scope_against_supported( 1021 scope, 1022 oauth_supported_scopes, 1023 ) 1024 { 1025 Error(err) -> Error(err) 1026 Ok(_) -> { 1027 let updated = 1028 types.OAuthClient( 1029 ..existing, 1030 client_name: trimmed_name, 1031 redirect_uris: redirect_uris, 1032 scope: case string.trim(scope) { 1033 "" -> None 1034 s -> Some(s) 1035 }, 1036 updated_at: token_generator.current_timestamp(), 1037 ) 1038 case 1039 oauth_clients.update(conn, updated) 1040 { 1041 Ok(_) -> 1042 Ok(converters.oauth_client_to_value( 1043 updated, 1044 )) 1045 Error(_) -> 1046 Error( 1047 "Failed to update OAuth client", 1048 ) 1049 } 1050 } 1051 } 1052 } 1053 } 1054 } 1055 } 1056 } 1057 Ok(None) -> Error("OAuth client not found") 1058 Error(_) -> Error("Failed to fetch OAuth client") 1059 } 1060 } 1061 } 1062 } 1063 _, _, _, _ -> Error("Invalid arguments") 1064 } 1065 } 1066 False -> Error("Admin privileges required") 1067 } 1068 } 1069 Error(_) -> Error("Authentication required") 1070 } 1071 }, 1072 ), 1073 // deleteOAuthClient mutation 1074 schema.field_with_args( 1075 "deleteOAuthClient", 1076 schema.non_null(schema.boolean_type()), 1077 "Delete an OAuth client (admin only)", 1078 [ 1079 schema.argument( 1080 "clientId", 1081 schema.non_null(schema.string_type()), 1082 "Client ID to delete", 1083 None, 1084 ), 1085 ], 1086 fn(ctx) { 1087 case session.get_current_session(req, conn, did_cache) { 1088 Ok(sess) -> { 1089 case config_repo.is_admin(conn, sess.did) { 1090 True -> { 1091 case schema.get_argument(ctx, "clientId") { 1092 Some(value.String(client_id)) -> { 1093 case client_id { 1094 "admin" -> Error("Cannot delete internal admin client") 1095 _ -> { 1096 case oauth_clients.delete(conn, client_id) { 1097 Ok(_) -> Ok(value.Boolean(True)) 1098 Error(_) -> Error("Failed to delete OAuth client") 1099 } 1100 } 1101 } 1102 } 1103 _ -> Error("Invalid clientId argument") 1104 } 1105 } 1106 False -> Error("Admin privileges required") 1107 } 1108 } 1109 Error(_) -> Error("Authentication required") 1110 } 1111 }, 1112 ), 1113 // createLabel mutation (admin only) 1114 schema.field_with_args( 1115 "createLabel", 1116 schema.non_null(admin_types.label_type()), 1117 "Create a label on a record or account (admin only)", 1118 [ 1119 schema.argument( 1120 "uri", 1121 schema.non_null(schema.string_type()), 1122 "Subject URI (at:// or did:)", 1123 None, 1124 ), 1125 schema.argument( 1126 "val", 1127 schema.non_null(schema.string_type()), 1128 "Label value", 1129 None, 1130 ), 1131 schema.argument( 1132 "cid", 1133 schema.string_type(), 1134 "Optional CID for version-specific label", 1135 None, 1136 ), 1137 schema.argument( 1138 "exp", 1139 schema.string_type(), 1140 "Optional expiration datetime", 1141 None, 1142 ), 1143 ], 1144 fn(ctx) { 1145 case session.get_current_session(req, conn, did_cache) { 1146 Ok(sess) -> { 1147 case config_repo.is_admin(conn, sess.did) { 1148 True -> { 1149 case 1150 schema.get_argument(ctx, "uri"), 1151 schema.get_argument(ctx, "val") 1152 { 1153 Some(value.String(uri)), Some(value.String(val)) -> { 1154 // Validate URI format 1155 case labels.is_valid_subject_uri(uri) { 1156 False -> 1157 Error( 1158 "Invalid URI format. Must be at://did/collection/rkey or a DID", 1159 ) 1160 True -> { 1161 // Validate label value exists 1162 case label_definitions.exists(conn, val) { 1163 Ok(True) -> { 1164 let cid = case schema.get_argument(ctx, "cid") { 1165 Some(value.String(c)) -> Some(c) 1166 _ -> None 1167 } 1168 let exp = case schema.get_argument(ctx, "exp") { 1169 Some(value.String(e)) -> Some(e) 1170 _ -> None 1171 } 1172 case 1173 labels.insert(conn, sess.did, uri, cid, val, exp) 1174 { 1175 Ok(label) -> Ok(converters.label_to_value(label)) 1176 Error(_) -> Error("Failed to create label") 1177 } 1178 } 1179 Ok(False) -> Error("Unknown label value: " <> val) 1180 Error(_) -> Error("Failed to validate label value") 1181 } 1182 } 1183 } 1184 } 1185 _, _ -> Error("uri and val are required") 1186 } 1187 } 1188 False -> Error("Admin privileges required") 1189 } 1190 } 1191 Error(_) -> Error("Authentication required") 1192 } 1193 }, 1194 ), 1195 // negateLabel mutation (admin only) 1196 schema.field_with_args( 1197 "negateLabel", 1198 schema.non_null(admin_types.label_type()), 1199 "Negate (retract) a label on a record or account (admin only)", 1200 [ 1201 schema.argument( 1202 "uri", 1203 schema.non_null(schema.string_type()), 1204 "Subject URI", 1205 None, 1206 ), 1207 schema.argument( 1208 "val", 1209 schema.non_null(schema.string_type()), 1210 "Label value to negate", 1211 None, 1212 ), 1213 ], 1214 fn(ctx) { 1215 case session.get_current_session(req, conn, did_cache) { 1216 Ok(sess) -> { 1217 case config_repo.is_admin(conn, sess.did) { 1218 True -> { 1219 case 1220 schema.get_argument(ctx, "uri"), 1221 schema.get_argument(ctx, "val") 1222 { 1223 Some(value.String(uri)), Some(value.String(val)) -> { 1224 // Validate URI format 1225 case labels.is_valid_subject_uri(uri) { 1226 False -> 1227 Error( 1228 "Invalid URI format. Must be at://did/collection/rkey or a DID", 1229 ) 1230 True -> { 1231 case labels.insert_negation(conn, sess.did, uri, val) { 1232 Ok(label) -> Ok(converters.label_to_value(label)) 1233 Error(_) -> Error("Failed to negate label") 1234 } 1235 } 1236 } 1237 } 1238 _, _ -> Error("uri and val are required") 1239 } 1240 } 1241 False -> Error("Admin privileges required") 1242 } 1243 } 1244 Error(_) -> Error("Authentication required") 1245 } 1246 }, 1247 ), 1248 // createLabelDefinition mutation (admin only) 1249 schema.field_with_args( 1250 "createLabelDefinition", 1251 schema.non_null(admin_types.label_definition_type()), 1252 "Create a custom label definition (admin only)", 1253 [ 1254 schema.argument( 1255 "val", 1256 schema.non_null(schema.string_type()), 1257 "Label value", 1258 None, 1259 ), 1260 schema.argument( 1261 "description", 1262 schema.non_null(schema.string_type()), 1263 "Description", 1264 None, 1265 ), 1266 schema.argument( 1267 "severity", 1268 schema.non_null(admin_types.label_severity_enum()), 1269 "Severity level", 1270 None, 1271 ), 1272 schema.argument( 1273 "defaultVisibility", 1274 schema.string_type(), 1275 "Default visibility setting (ignore, show, warn, hide). Defaults to warn.", 1276 None, 1277 ), 1278 ], 1279 fn(ctx) { 1280 case session.get_current_session(req, conn, did_cache) { 1281 Ok(sess) -> { 1282 case config_repo.is_admin(conn, sess.did) { 1283 True -> { 1284 // Extract severity as string from either Enum or String 1285 let severity_opt = case schema.get_argument(ctx, "severity") { 1286 Some(value.Enum(s)) -> Some(string.lowercase(s)) 1287 Some(value.String(s)) -> Some(string.lowercase(s)) 1288 _ -> None 1289 } 1290 // Extract defaultVisibility (defaults to "warn") 1291 let default_visibility = case 1292 schema.get_argument(ctx, "defaultVisibility") 1293 { 1294 Some(value.Enum(v)) -> string.lowercase(v) 1295 Some(value.String(v)) -> string.lowercase(v) 1296 _ -> "warn" 1297 } 1298 // Validate defaultVisibility 1299 case label_definitions.validate_visibility(default_visibility) { 1300 Error(e) -> Error(e) 1301 Ok(_) -> { 1302 case 1303 schema.get_argument(ctx, "val"), 1304 schema.get_argument(ctx, "description"), 1305 severity_opt 1306 { 1307 Some(value.String(val)), 1308 Some(value.String(desc)), 1309 Some(severity) 1310 -> { 1311 case 1312 label_definitions.insert( 1313 conn, 1314 val, 1315 desc, 1316 severity, 1317 default_visibility, 1318 ) 1319 { 1320 Ok(_) -> { 1321 case label_definitions.get(conn, val) { 1322 Ok(Some(def)) -> 1323 Ok(converters.label_definition_to_value(def)) 1324 _ -> Error("Failed to fetch created definition") 1325 } 1326 } 1327 Error(_) -> Error("Failed to create label definition") 1328 } 1329 } 1330 _, _, _ -> 1331 Error("val, description, and severity are required") 1332 } 1333 } 1334 } 1335 } 1336 False -> Error("Admin privileges required") 1337 } 1338 } 1339 Error(_) -> Error("Authentication required") 1340 } 1341 }, 1342 ), 1343 // resolveReport mutation (admin only) 1344 schema.field_with_args( 1345 "resolveReport", 1346 schema.non_null(admin_types.report_type()), 1347 "Resolve a moderation report (admin only)", 1348 [ 1349 schema.argument( 1350 "id", 1351 schema.non_null(schema.int_type()), 1352 "Report ID", 1353 None, 1354 ), 1355 schema.argument( 1356 "action", 1357 schema.non_null(admin_types.report_action_enum()), 1358 "Action to take", 1359 None, 1360 ), 1361 schema.argument( 1362 "labelVal", 1363 schema.string_type(), 1364 "Label value to apply (required if action is APPLY_LABEL)", 1365 None, 1366 ), 1367 ], 1368 fn(ctx) { 1369 case session.get_current_session(req, conn, did_cache) { 1370 Ok(sess) -> { 1371 case config_repo.is_admin(conn, sess.did) { 1372 True -> { 1373 // Extract action as string from either Enum or String 1374 let action_opt = case schema.get_argument(ctx, "action") { 1375 Some(value.Enum(a)) -> Some(a) 1376 Some(value.String(a)) -> Some(a) 1377 _ -> None 1378 } 1379 case schema.get_argument(ctx, "id"), action_opt { 1380 Some(value.Int(id)), Some(action) -> { 1381 // Get the report first 1382 case reports.get(conn, id) { 1383 Ok(Some(report)) -> { 1384 case action { 1385 "APPLY_LABEL" -> { 1386 case schema.get_argument(ctx, "labelVal") { 1387 Some(value.String(label_val)) -> { 1388 // Validate label value exists 1389 case label_definitions.exists(conn, label_val) { 1390 Ok(True) -> { 1391 // Create the label 1392 case 1393 labels.insert( 1394 conn, 1395 sess.did, 1396 report.subject_uri, 1397 None, 1398 label_val, 1399 None, 1400 ) 1401 { 1402 Ok(_) -> { 1403 // Mark report as resolved 1404 case 1405 reports.resolve( 1406 conn, 1407 id, 1408 "resolved", 1409 sess.did, 1410 ) 1411 { 1412 Ok(resolved) -> 1413 Ok(converters.report_to_value( 1414 resolved, 1415 )) 1416 Error(_) -> 1417 Error("Failed to resolve report") 1418 } 1419 } 1420 Error(_) -> Error("Failed to apply label") 1421 } 1422 } 1423 Ok(False) -> 1424 Error("Unknown label value: " <> label_val) 1425 Error(_) -> 1426 Error("Failed to validate label value") 1427 } 1428 } 1429 _ -> 1430 Error( 1431 "labelVal is required when action is APPLY_LABEL", 1432 ) 1433 } 1434 } 1435 "DISMISS" -> { 1436 case 1437 reports.resolve(conn, id, "dismissed", sess.did) 1438 { 1439 Ok(resolved) -> 1440 Ok(converters.report_to_value(resolved)) 1441 Error(_) -> Error("Failed to dismiss report") 1442 } 1443 } 1444 _ -> Error("Invalid action") 1445 } 1446 } 1447 Ok(None) -> Error("Report not found") 1448 Error(_) -> Error("Failed to fetch report") 1449 } 1450 } 1451 _, _ -> Error("id and action are required") 1452 } 1453 } 1454 False -> Error("Admin privileges required") 1455 } 1456 } 1457 Error(_) -> Error("Authentication required") 1458 } 1459 }, 1460 ), 1461 ]) 1462}