Auto-indexing service and GraphQL API for AT Protocol Records
quickslice.slices.network/
atproto
gleam
graphql
1/// Mutation resolvers for admin GraphQL API
2import admin_session as session
3import backfill
4import backfill_state
5import database/executor.{type Executor}
6import database/repositories/actors
7import database/repositories/config as config_repo
8import database/repositories/jetstream_activity
9import database/repositories/label_definitions
10import database/repositories/labels
11import database/repositories/lexicons
12import database/repositories/oauth_clients
13import database/repositories/records
14import database/repositories/reports
15import database/types
16import gleam/erlang/process.{type Subject}
17import gleam/list
18import gleam/option.{type Option, None, Some}
19import gleam/string
20import graphql/admin/converters
21import graphql/admin/types as admin_types
22import importer
23import jetstream_consumer
24import lib/oauth/did_cache
25import lib/oauth/scopes/validator as scope_validator
26import lib/oauth/token_generator
27import lib/oauth/validator
28import logging
29import swell/schema
30import swell/value
31import wisp
32
33/// Validate that a string is a valid DID format
34/// Valid format: did:<method>:<identifier>
35fn is_valid_did(s: String) -> Bool {
36 case string.starts_with(s, "did:") {
37 False -> False
38 True -> {
39 let parts = string.split(s, ":")
40 case parts {
41 ["did", method, identifier, ..] -> method != "" && identifier != ""
42 _ -> False
43 }
44 }
45 }
46}
47
48/// Validate that all requested scopes are in the supported list
49fn validate_scope_against_supported(
50 requested_scope: String,
51 supported_scopes: List(String),
52) -> Result(Nil, String) {
53 let requested =
54 requested_scope
55 |> string.split(" ")
56 |> list.map(string.trim)
57 |> list.filter(fn(s) { !string.is_empty(s) })
58
59 let invalid =
60 list.filter(requested, fn(s) { !list.contains(supported_scopes, s) })
61
62 case invalid {
63 [] -> Ok(Nil)
64 _ ->
65 Error(
66 "Unsupported scope(s): "
67 <> string.join(invalid, ", ")
68 <> ". Supported: "
69 <> string.join(supported_scopes, ", "),
70 )
71 }
72}
73
74/// Build the Mutation root type with all mutation resolvers
75pub fn mutation_type(
76 conn: Executor,
77 req: wisp.Request,
78 jetstream_subject: Option(Subject(jetstream_consumer.ManagerMessage)),
79 did_cache: Subject(did_cache.Message),
80 oauth_supported_scopes: List(String),
81 backfill_state_subject: Subject(backfill_state.Message),
82) -> schema.Type {
83 schema.object_type("Mutation", "Root mutation type", [
84 // updateSettings mutation - consolidated settings update
85 schema.field_with_args(
86 "updateSettings",
87 schema.non_null(admin_types.settings_type()),
88 "Update system settings (domain authority and/or admin DIDs)",
89 [
90 schema.argument(
91 "domainAuthority",
92 schema.string_type(),
93 "New domain authority value (optional)",
94 None,
95 ),
96 schema.argument(
97 "adminDids",
98 schema.list_type(schema.non_null(schema.string_type())),
99 "New admin DIDs list (optional)",
100 None,
101 ),
102 schema.argument(
103 "relayUrl",
104 schema.string_type(),
105 "New relay URL (optional)",
106 None,
107 ),
108 schema.argument(
109 "plcDirectoryUrl",
110 schema.string_type(),
111 "New PLC directory URL (optional)",
112 None,
113 ),
114 schema.argument(
115 "jetstreamUrl",
116 schema.string_type(),
117 "New Jetstream URL (optional)",
118 None,
119 ),
120 schema.argument(
121 "oauthSupportedScopes",
122 schema.string_type(),
123 "New OAuth supported scopes space-separated (optional)",
124 None,
125 ),
126 ],
127 fn(ctx) {
128 // Check admin privileges
129 case session.get_current_session(req, conn, did_cache) {
130 Error(_) -> Error("Authentication required")
131 Ok(sess) -> {
132 case config_repo.is_admin(conn, sess.did) {
133 False -> Error("Admin privileges required")
134 True -> {
135 // Update domain authority if provided
136 let domain_authority_result = case
137 schema.get_argument(ctx, "domainAuthority")
138 {
139 Some(value.String(authority)) -> {
140 // Validate not empty
141 case string.trim(authority) {
142 "" -> Error("Domain authority cannot be empty")
143 trimmed_authority -> {
144 case
145 config_repo.set(
146 conn,
147 "domain_authority",
148 trimmed_authority,
149 )
150 {
151 Ok(_) -> {
152 // Restart Jetstream consumer
153 case jetstream_subject {
154 Some(consumer) -> {
155 logging.log(
156 logging.Info,
157 "[updateSettings] Restarting Jetstream consumer...",
158 )
159 let _ = jetstream_consumer.restart(consumer)
160 Nil
161 }
162 None -> Nil
163 }
164 Ok(Nil)
165 }
166 Error(_) -> Error("Failed to update domain authority")
167 }
168 }
169 }
170 }
171 _ -> Ok(Nil)
172 }
173
174 case domain_authority_result {
175 Error(err) -> Error(err)
176 Ok(_) -> {
177 // Update admin DIDs if provided
178 let admin_dids_result = case
179 schema.get_argument(ctx, "adminDids")
180 {
181 Some(value.List(dids)) -> {
182 let did_strings =
183 list.filter_map(dids, fn(d) {
184 case d {
185 value.String(s) -> Ok(s)
186 _ -> Error(Nil)
187 }
188 })
189 // Validate at least one admin
190 case did_strings {
191 [] -> Error("Cannot have zero admins")
192 _ -> {
193 // Validate all DIDs have correct format
194 let invalid_dids =
195 list.filter(did_strings, fn(d) {
196 !is_valid_did(d)
197 })
198 case invalid_dids {
199 [first_invalid, ..] ->
200 Error(
201 "Invalid DID format: "
202 <> first_invalid
203 <> ". DIDs must start with 'did:' followed by method and identifier (e.g., did:plc:abc123)",
204 )
205 [] -> {
206 case
207 config_repo.set_admin_dids(conn, did_strings)
208 {
209 Ok(_) -> Ok(Nil)
210 Error(_) ->
211 Error("Failed to update admin DIDs")
212 }
213 }
214 }
215 }
216 }
217 }
218 _ -> Ok(Nil)
219 }
220
221 case admin_dids_result {
222 Error(err) -> Error(err)
223 Ok(_) -> {
224 // Update relay URL if provided
225 let relay_url_result = case
226 schema.get_argument(ctx, "relayUrl")
227 {
228 Some(value.String(url)) -> {
229 case string.trim(url) {
230 "" -> Error("Relay URL cannot be empty")
231 trimmed_url -> {
232 case
233 config_repo.set_relay_url(conn, trimmed_url)
234 {
235 Ok(_) -> Ok(Nil)
236 Error(_) ->
237 Error("Failed to update relay URL")
238 }
239 }
240 }
241 }
242 _ -> Ok(Nil)
243 }
244
245 case relay_url_result {
246 Error(err) -> Error(err)
247 Ok(_) -> {
248 // Update PLC directory URL if provided
249 let plc_url_result = case
250 schema.get_argument(ctx, "plcDirectoryUrl")
251 {
252 Some(value.String(url)) -> {
253 case string.trim(url) {
254 "" ->
255 Error("PLC directory URL cannot be empty")
256 trimmed_url -> {
257 case
258 config_repo.set_plc_directory_url(
259 conn,
260 trimmed_url,
261 )
262 {
263 Ok(_) -> Ok(True)
264 Error(_) ->
265 Error(
266 "Failed to update PLC directory URL",
267 )
268 }
269 }
270 }
271 }
272 _ -> Ok(False)
273 }
274
275 case plc_url_result {
276 Error(err) -> Error(err)
277 Ok(plc_changed) -> {
278 // Restart Jetstream if PLC URL changed
279 case plc_changed {
280 True -> {
281 case jetstream_subject {
282 Some(consumer) -> {
283 logging.log(
284 logging.Info,
285 "[updateSettings] Restarting Jetstream consumer due to PLC URL change",
286 )
287 case
288 jetstream_consumer.restart(consumer)
289 {
290 Ok(_) ->
291 logging.log(
292 logging.Info,
293 "[updateSettings] Jetstream consumer restarted",
294 )
295 Error(err) ->
296 logging.log(
297 logging.Error,
298 "[updateSettings] Failed to restart Jetstream: "
299 <> err,
300 )
301 }
302 }
303 None -> Nil
304 }
305 }
306 False -> Nil
307 }
308 // Update Jetstream URL if provided and restart consumer
309 let jetstream_url_result = case
310 schema.get_argument(ctx, "jetstreamUrl")
311 {
312 Some(value.String(url)) -> {
313 case string.trim(url) {
314 "" ->
315 Error("Jetstream URL cannot be empty")
316 trimmed_url -> {
317 case
318 config_repo.set_jetstream_url(
319 conn,
320 trimmed_url,
321 )
322 {
323 Ok(_) -> Ok(True)
324 Error(_) ->
325 Error(
326 "Failed to update Jetstream URL",
327 )
328 }
329 }
330 }
331 }
332 _ -> Ok(False)
333 }
334
335 case jetstream_url_result {
336 Error(err) -> Error(err)
337 Ok(jetstream_url_changed) -> {
338 // If Jetstream URL changed, restart consumer
339 case jetstream_url_changed {
340 True -> {
341 case jetstream_subject {
342 Some(consumer) -> {
343 logging.log(
344 logging.Info,
345 "[updateSettings] Restarting Jetstream consumer due to URL change",
346 )
347 case
348 jetstream_consumer.restart(
349 consumer,
350 )
351 {
352 Ok(_) ->
353 logging.log(
354 logging.Info,
355 "[updateSettings] Jetstream consumer restarted",
356 )
357 Error(err) ->
358 logging.log(
359 logging.Error,
360 "[updateSettings] Failed to restart Jetstream: "
361 <> err,
362 )
363 }
364 }
365 None -> Nil
366 }
367 }
368 False -> Nil
369 }
370
371 // Update OAuth supported scopes if provided (with validation)
372 let oauth_scopes_result = case
373 schema.get_argument(
374 ctx,
375 "oauthSupportedScopes",
376 )
377 {
378 Some(value.String(scopes)) -> {
379 case string.trim(scopes) {
380 "" ->
381 Error(
382 "OAuth supported scopes cannot be empty",
383 )
384 trimmed_scopes -> {
385 // Validate scope format (accepts any valid ATProto scope)
386 case
387 scope_validator.validate_scope_format(
388 trimmed_scopes,
389 )
390 {
391 Ok(_) -> {
392 // Validation passed, save to database
393 case
394 config_repo.set_oauth_supported_scopes(
395 conn,
396 trimmed_scopes,
397 )
398 {
399 Ok(_) -> Ok(Nil)
400 Error(_) ->
401 Error(
402 "Failed to save OAuth scopes",
403 )
404 }
405 }
406 Error(err) -> {
407 logging.log(
408 logging.Error,
409 "[updateSettings] Invalid OAuth scope: "
410 <> string.inspect(err),
411 )
412 Error("Invalid OAuth scope")
413 }
414 }
415 }
416 }
417 }
418 _ -> Ok(Nil)
419 }
420
421 case oauth_scopes_result {
422 Error(err) -> Error(err)
423 Ok(_) -> {
424 // Return updated settings
425 let final_authority = case
426 config_repo.get(
427 conn,
428 "domain_authority",
429 )
430 {
431 Ok(a) -> a
432 Error(_) -> ""
433 }
434 let final_admin_dids =
435 config_repo.get_admin_dids(conn)
436 let final_relay_url =
437 config_repo.get_relay_url(conn)
438 let final_plc_directory_url =
439 config_repo.get_plc_directory_url(
440 conn,
441 )
442 let final_jetstream_url =
443 config_repo.get_jetstream_url(conn)
444 let final_oauth_scopes =
445 config_repo.get_oauth_supported_scopes(
446 conn,
447 )
448
449 Ok(converters.settings_to_value(
450 final_authority,
451 final_admin_dids,
452 final_relay_url,
453 final_plc_directory_url,
454 final_jetstream_url,
455 final_oauth_scopes,
456 ))
457 }
458 }
459 }
460 }
461 }
462 }
463 }
464 }
465 }
466 }
467 }
468 }
469 }
470 }
471 }
472 }
473 },
474 ),
475 // uploadLexicons mutation
476 schema.field_with_args(
477 "uploadLexicons",
478 schema.non_null(schema.boolean_type()),
479 "Upload and import lexicons from base64-encoded ZIP",
480 [
481 schema.argument(
482 "zipBase64",
483 schema.non_null(schema.string_type()),
484 "Base64-encoded ZIP file containing lexicon JSON files",
485 None,
486 ),
487 ],
488 fn(ctx) {
489 case schema.get_argument(ctx, "zipBase64") {
490 Some(value.String(zip_base64)) -> {
491 // Import lexicons from base64-encoded ZIP
492 case importer.import_lexicons_from_base64_zip(zip_base64, conn) {
493 Ok(_stats) -> {
494 // Restart Jetstream consumer to pick up newly imported collections
495 case jetstream_subject {
496 Some(consumer) -> {
497 logging.log(
498 logging.Info,
499 "[uploadLexicons] Restarting Jetstream consumer with new lexicons...",
500 )
501 case jetstream_consumer.restart(consumer) {
502 Ok(_) -> {
503 logging.log(
504 logging.Info,
505 "[uploadLexicons] Jetstream consumer restarted successfully",
506 )
507 Ok(value.Boolean(True))
508 }
509 Error(err) -> {
510 logging.log(
511 logging.Error,
512 "[uploadLexicons] Failed to restart Jetstream consumer: "
513 <> err,
514 )
515 Error(
516 "Lexicons imported but failed to restart Jetstream consumer: "
517 <> err,
518 )
519 }
520 }
521 }
522 None -> {
523 logging.log(
524 logging.Info,
525 "[uploadLexicons] Jetstream consumer not running, skipping restart",
526 )
527 Ok(value.Boolean(True))
528 }
529 }
530 }
531 Error(err) -> Error("Failed to import lexicons: " <> err)
532 }
533 }
534 _ -> Error("Invalid zipBase64 argument")
535 }
536 },
537 ),
538 // resetAll mutation
539 schema.field_with_args(
540 "resetAll",
541 schema.non_null(schema.boolean_type()),
542 "Reset all data (requires RESET confirmation and admin privileges)",
543 [
544 schema.argument(
545 "confirm",
546 schema.non_null(schema.string_type()),
547 "Must be the string 'RESET' to confirm",
548 None,
549 ),
550 ],
551 fn(ctx) {
552 // Check if user is authenticated and admin
553 case session.get_current_session(req, conn, did_cache) {
554 Ok(sess) -> {
555 case config_repo.is_admin(conn, sess.did) {
556 True -> {
557 case schema.get_argument(ctx, "confirm") {
558 Some(value.String("RESET")) -> {
559 // Call multiple database functions to reset all data
560 let _ = records.delete_all(conn)
561 let _ = actors.delete_all(conn)
562 let _ = lexicons.delete_all(conn)
563 let _ = config_repo.delete_domain_authority(conn)
564 let _ = jetstream_activity.delete_all(conn)
565
566 // Restart Jetstream consumer after reset
567 case jetstream_subject {
568 Some(consumer) -> {
569 logging.log(
570 logging.Info,
571 "[resetAll] Restarting Jetstream consumer after reset...",
572 )
573 let _ = jetstream_consumer.restart(consumer)
574 Nil
575 }
576 None -> Nil
577 }
578
579 Ok(value.Boolean(True))
580 }
581 Some(value.String(_)) -> Error("Confirmation must be 'RESET'")
582 _ -> Error("Invalid confirm argument")
583 }
584 }
585 False -> Error("Admin privileges required to reset all data")
586 }
587 }
588 Error(_) -> Error("Authentication required to reset all data")
589 }
590 },
591 ),
592 // triggerBackfill mutation
593 schema.field(
594 "triggerBackfill",
595 schema.non_null(schema.boolean_type()),
596 "Trigger a background backfill operation for all collections (admin only)",
597 fn(_ctx) {
598 // Check if user is authenticated and admin
599 case session.get_current_session(req, conn, did_cache) {
600 Ok(sess) -> {
601 case config_repo.is_admin(conn, sess.did) {
602 True -> {
603 // Mark backfill as started
604 process.send(
605 backfill_state_subject,
606 backfill_state.StartBackfill,
607 )
608
609 // Spawn background process to run backfill
610 process.spawn_unlinked(fn() {
611 logging.log(
612 logging.Info,
613 "[triggerBackfill] Starting background backfill...",
614 )
615
616 // Get all record-type collections from database (only backfill records, not queries/procedures)
617 let collections = case lexicons.get_record_types(conn) {
618 Ok(lexicon_list) ->
619 list.map(lexicon_list, fn(lex) { lex.id })
620 Error(_) -> []
621 }
622
623 // Get domain authority to determine external collections
624 let domain_authority = case
625 config_repo.get(conn, "domain_authority")
626 {
627 Ok(authority) -> authority
628 Error(_) -> ""
629 }
630
631 // Split collections into primary and external
632 let #(primary_collections, external_collections) =
633 list.partition(collections, fn(collection) {
634 backfill.nsid_matches_domain_authority(
635 collection,
636 domain_authority,
637 )
638 })
639
640 // Run backfill with default config and empty repo list (fetches from relay)
641 let config = backfill.default_config(conn)
642 backfill.backfill_collections(
643 [],
644 primary_collections,
645 external_collections,
646 config,
647 conn,
648 )
649
650 logging.log(
651 logging.Info,
652 "[triggerBackfill] Background backfill completed",
653 )
654
655 // Mark backfill as stopped
656 process.send(
657 backfill_state_subject,
658 backfill_state.StopBackfill,
659 )
660 })
661
662 // Return immediately
663 Ok(value.Boolean(True))
664 }
665 False -> Error("Admin privileges required to trigger backfill")
666 }
667 }
668 Error(_) -> Error("Authentication required to trigger backfill")
669 }
670 },
671 ),
672 // backfillActor mutation - sync a specific actor's collections
673 schema.field_with_args(
674 "backfillActor",
675 schema.non_null(schema.boolean_type()),
676 "Trigger a background backfill for a specific actor's collections",
677 [
678 schema.argument(
679 "did",
680 schema.non_null(schema.string_type()),
681 "The DID of the actor to backfill",
682 None,
683 ),
684 ],
685 fn(ctx) {
686 // Check if user is authenticated (any logged-in user can trigger)
687 case session.get_current_session(req, conn, did_cache) {
688 Ok(_sess) -> {
689 case schema.get_argument(ctx, "did") {
690 Some(value.String(did)) -> {
691 // Get all record-type collections from database
692 let collections = case lexicons.get_record_types(conn) {
693 Ok(lexicon_list) -> list.map(lexicon_list, fn(lex) { lex.id })
694 Error(_) -> []
695 }
696
697 // Get domain authority to determine external collections
698 let domain_authority = case
699 config_repo.get(conn, "domain_authority")
700 {
701 Ok(authority) -> authority
702 Error(_) -> ""
703 }
704
705 // Split collections into primary and external
706 let #(primary_collections, external_collections) =
707 list.partition(collections, fn(collection) {
708 backfill.nsid_matches_domain_authority(
709 collection,
710 domain_authority,
711 )
712 })
713
714 // Get PLC URL from database config
715 let plc_url = config_repo.get_plc_directory_url(conn)
716
717 // Spawn background process to run backfill for this actor
718 process.spawn_unlinked(fn() {
719 logging.log(
720 logging.Info,
721 "[backfillActor] Starting background backfill for " <> did,
722 )
723
724 case
725 backfill.rescue(fn() {
726 backfill.backfill_collections_for_actor(
727 conn,
728 did,
729 primary_collections,
730 external_collections,
731 plc_url,
732 )
733 })
734 {
735 Ok(_) ->
736 logging.log(
737 logging.Info,
738 "[backfillActor] Background backfill completed for "
739 <> did,
740 )
741 Error(err) ->
742 logging.log(
743 logging.Error,
744 "[backfillActor] Background backfill FAILED for "
745 <> did
746 <> ": "
747 <> string.inspect(err),
748 )
749 }
750 })
751
752 // Return immediately
753 Ok(value.Boolean(True))
754 }
755 _ -> Error("DID argument is required")
756 }
757 }
758 Error(_) -> Error("Authentication required to trigger backfill")
759 }
760 },
761 ),
762 // createOAuthClient mutation
763 schema.field_with_args(
764 "createOAuthClient",
765 schema.non_null(admin_types.oauth_client_type()),
766 "Create a new OAuth client (admin only)",
767 [
768 schema.argument(
769 "clientName",
770 schema.non_null(schema.string_type()),
771 "Client display name",
772 None,
773 ),
774 schema.argument(
775 "clientType",
776 schema.non_null(schema.string_type()),
777 "PUBLIC or CONFIDENTIAL",
778 None,
779 ),
780 schema.argument(
781 "redirectUris",
782 schema.non_null(
783 schema.list_type(schema.non_null(schema.string_type())),
784 ),
785 "Allowed redirect URIs",
786 None,
787 ),
788 schema.argument(
789 "scope",
790 schema.non_null(schema.string_type()),
791 "OAuth scopes (space-separated)",
792 None,
793 ),
794 ],
795 fn(ctx) {
796 case session.get_current_session(req, conn, did_cache) {
797 Ok(sess) -> {
798 case config_repo.is_admin(conn, sess.did) {
799 True -> {
800 case
801 schema.get_argument(ctx, "clientName"),
802 schema.get_argument(ctx, "clientType"),
803 schema.get_argument(ctx, "redirectUris"),
804 schema.get_argument(ctx, "scope")
805 {
806 Some(value.String(name)),
807 Some(value.String(type_str)),
808 Some(value.List(uris)),
809 Some(value.String(scope))
810 -> {
811 // Validate client name
812 let trimmed_name = string.trim(name)
813 case trimmed_name {
814 "" -> Error("Client name cannot be empty")
815 _ -> {
816 let client_type = case string.uppercase(type_str) {
817 "CONFIDENTIAL" -> types.Confidential
818 _ -> types.Public
819 }
820 let redirect_uris =
821 list.filter_map(uris, fn(u) {
822 case u {
823 value.String(s) ->
824 case string.trim(s) {
825 "" -> Error(Nil)
826 trimmed -> Ok(trimmed)
827 }
828 _ -> Error(Nil)
829 }
830 })
831 // Validate at least one redirect URI
832 case redirect_uris {
833 [] -> Error("At least one redirect URI is required")
834 _ -> {
835 // Validate each redirect URI format
836 let invalid_uri =
837 list.find(redirect_uris, fn(uri) {
838 case validator.validate_redirect_uri(uri) {
839 Ok(_) -> False
840 Error(_) -> True
841 }
842 })
843 case invalid_uri {
844 Ok(uri) ->
845 Error(
846 "Invalid redirect URI: "
847 <> uri
848 <> ". URIs must use https://, or http:// only for localhost.",
849 )
850 Error(_) -> {
851 // Validate scope against supported scopes
852 case
853 validate_scope_against_supported(
854 scope,
855 oauth_supported_scopes,
856 )
857 {
858 Error(err) -> Error(err)
859 Ok(_) -> {
860 let now =
861 token_generator.current_timestamp()
862 let client_id =
863 token_generator.generate_client_id()
864 let client_secret = case client_type {
865 types.Confidential ->
866 Some(
867 token_generator.generate_client_secret(),
868 )
869 types.Public -> None
870 }
871 let client =
872 types.OAuthClient(
873 client_id: client_id,
874 client_secret: client_secret,
875 client_name: trimmed_name,
876 redirect_uris: redirect_uris,
877 grant_types: [
878 types.AuthorizationCode,
879 types.RefreshToken,
880 ],
881 response_types: [types.Code],
882 scope: case string.trim(scope) {
883 "" -> None
884 s -> Some(s)
885 },
886 token_endpoint_auth_method: case
887 client_type
888 {
889 types.Confidential ->
890 types.ClientSecretPost
891 types.Public -> types.AuthNone
892 },
893 client_type: client_type,
894 created_at: now,
895 updated_at: now,
896 metadata: "{}",
897 access_token_expiration: 3600,
898 refresh_token_expiration: 86_400 * 30,
899 require_redirect_exact: True,
900 registration_access_token: None,
901 jwks: None,
902 )
903 case oauth_clients.insert(conn, client) {
904 Ok(_) ->
905 Ok(converters.oauth_client_to_value(
906 client,
907 ))
908 Error(_) ->
909 Error("Failed to create OAuth client")
910 }
911 }
912 }
913 }
914 }
915 }
916 }
917 }
918 }
919 }
920 _, _, _, _ -> Error("Invalid arguments")
921 }
922 }
923 False -> Error("Admin privileges required")
924 }
925 }
926 Error(_) -> Error("Authentication required")
927 }
928 },
929 ),
930 // updateOAuthClient mutation
931 schema.field_with_args(
932 "updateOAuthClient",
933 schema.non_null(admin_types.oauth_client_type()),
934 "Update an existing OAuth client (admin only)",
935 [
936 schema.argument(
937 "clientId",
938 schema.non_null(schema.string_type()),
939 "Client ID to update",
940 None,
941 ),
942 schema.argument(
943 "clientName",
944 schema.non_null(schema.string_type()),
945 "New client display name",
946 None,
947 ),
948 schema.argument(
949 "redirectUris",
950 schema.non_null(
951 schema.list_type(schema.non_null(schema.string_type())),
952 ),
953 "New redirect URIs",
954 None,
955 ),
956 schema.argument(
957 "scope",
958 schema.non_null(schema.string_type()),
959 "OAuth scopes (space-separated)",
960 None,
961 ),
962 ],
963 fn(ctx) {
964 case session.get_current_session(req, conn, did_cache) {
965 Ok(sess) -> {
966 case config_repo.is_admin(conn, sess.did) {
967 True -> {
968 case
969 schema.get_argument(ctx, "clientId"),
970 schema.get_argument(ctx, "clientName"),
971 schema.get_argument(ctx, "redirectUris"),
972 schema.get_argument(ctx, "scope")
973 {
974 Some(value.String(client_id)),
975 Some(value.String(name)),
976 Some(value.List(uris)),
977 Some(value.String(scope))
978 -> {
979 // Validate client name
980 let trimmed_name = string.trim(name)
981 case trimmed_name {
982 "" -> Error("Client name cannot be empty")
983 _ -> {
984 case oauth_clients.get(conn, client_id) {
985 Ok(Some(existing)) -> {
986 let redirect_uris =
987 list.filter_map(uris, fn(u) {
988 case u {
989 value.String(s) ->
990 case string.trim(s) {
991 "" -> Error(Nil)
992 trimmed -> Ok(trimmed)
993 }
994 _ -> Error(Nil)
995 }
996 })
997 // Validate at least one redirect URI
998 case redirect_uris {
999 [] ->
1000 Error("At least one redirect URI is required")
1001 _ -> {
1002 // Validate each redirect URI format
1003 let invalid_uri =
1004 list.find(redirect_uris, fn(uri) {
1005 case validator.validate_redirect_uri(uri) {
1006 Ok(_) -> False
1007 Error(_) -> True
1008 }
1009 })
1010 case invalid_uri {
1011 Ok(uri) ->
1012 Error(
1013 "Invalid redirect URI: "
1014 <> uri
1015 <> ". URIs must use https://, or http:// only for localhost.",
1016 )
1017 Error(_) -> {
1018 // Validate scope against supported scopes
1019 case
1020 validate_scope_against_supported(
1021 scope,
1022 oauth_supported_scopes,
1023 )
1024 {
1025 Error(err) -> Error(err)
1026 Ok(_) -> {
1027 let updated =
1028 types.OAuthClient(
1029 ..existing,
1030 client_name: trimmed_name,
1031 redirect_uris: redirect_uris,
1032 scope: case string.trim(scope) {
1033 "" -> None
1034 s -> Some(s)
1035 },
1036 updated_at: token_generator.current_timestamp(),
1037 )
1038 case
1039 oauth_clients.update(conn, updated)
1040 {
1041 Ok(_) ->
1042 Ok(converters.oauth_client_to_value(
1043 updated,
1044 ))
1045 Error(_) ->
1046 Error(
1047 "Failed to update OAuth client",
1048 )
1049 }
1050 }
1051 }
1052 }
1053 }
1054 }
1055 }
1056 }
1057 Ok(None) -> Error("OAuth client not found")
1058 Error(_) -> Error("Failed to fetch OAuth client")
1059 }
1060 }
1061 }
1062 }
1063 _, _, _, _ -> Error("Invalid arguments")
1064 }
1065 }
1066 False -> Error("Admin privileges required")
1067 }
1068 }
1069 Error(_) -> Error("Authentication required")
1070 }
1071 },
1072 ),
1073 // deleteOAuthClient mutation
1074 schema.field_with_args(
1075 "deleteOAuthClient",
1076 schema.non_null(schema.boolean_type()),
1077 "Delete an OAuth client (admin only)",
1078 [
1079 schema.argument(
1080 "clientId",
1081 schema.non_null(schema.string_type()),
1082 "Client ID to delete",
1083 None,
1084 ),
1085 ],
1086 fn(ctx) {
1087 case session.get_current_session(req, conn, did_cache) {
1088 Ok(sess) -> {
1089 case config_repo.is_admin(conn, sess.did) {
1090 True -> {
1091 case schema.get_argument(ctx, "clientId") {
1092 Some(value.String(client_id)) -> {
1093 case client_id {
1094 "admin" -> Error("Cannot delete internal admin client")
1095 _ -> {
1096 case oauth_clients.delete(conn, client_id) {
1097 Ok(_) -> Ok(value.Boolean(True))
1098 Error(_) -> Error("Failed to delete OAuth client")
1099 }
1100 }
1101 }
1102 }
1103 _ -> Error("Invalid clientId argument")
1104 }
1105 }
1106 False -> Error("Admin privileges required")
1107 }
1108 }
1109 Error(_) -> Error("Authentication required")
1110 }
1111 },
1112 ),
1113 // createLabel mutation (admin only)
1114 schema.field_with_args(
1115 "createLabel",
1116 schema.non_null(admin_types.label_type()),
1117 "Create a label on a record or account (admin only)",
1118 [
1119 schema.argument(
1120 "uri",
1121 schema.non_null(schema.string_type()),
1122 "Subject URI (at:// or did:)",
1123 None,
1124 ),
1125 schema.argument(
1126 "val",
1127 schema.non_null(schema.string_type()),
1128 "Label value",
1129 None,
1130 ),
1131 schema.argument(
1132 "cid",
1133 schema.string_type(),
1134 "Optional CID for version-specific label",
1135 None,
1136 ),
1137 schema.argument(
1138 "exp",
1139 schema.string_type(),
1140 "Optional expiration datetime",
1141 None,
1142 ),
1143 ],
1144 fn(ctx) {
1145 case session.get_current_session(req, conn, did_cache) {
1146 Ok(sess) -> {
1147 case config_repo.is_admin(conn, sess.did) {
1148 True -> {
1149 case
1150 schema.get_argument(ctx, "uri"),
1151 schema.get_argument(ctx, "val")
1152 {
1153 Some(value.String(uri)), Some(value.String(val)) -> {
1154 // Validate URI format
1155 case labels.is_valid_subject_uri(uri) {
1156 False ->
1157 Error(
1158 "Invalid URI format. Must be at://did/collection/rkey or a DID",
1159 )
1160 True -> {
1161 // Validate label value exists
1162 case label_definitions.exists(conn, val) {
1163 Ok(True) -> {
1164 let cid = case schema.get_argument(ctx, "cid") {
1165 Some(value.String(c)) -> Some(c)
1166 _ -> None
1167 }
1168 let exp = case schema.get_argument(ctx, "exp") {
1169 Some(value.String(e)) -> Some(e)
1170 _ -> None
1171 }
1172 case
1173 labels.insert(conn, sess.did, uri, cid, val, exp)
1174 {
1175 Ok(label) -> Ok(converters.label_to_value(label))
1176 Error(_) -> Error("Failed to create label")
1177 }
1178 }
1179 Ok(False) -> Error("Unknown label value: " <> val)
1180 Error(_) -> Error("Failed to validate label value")
1181 }
1182 }
1183 }
1184 }
1185 _, _ -> Error("uri and val are required")
1186 }
1187 }
1188 False -> Error("Admin privileges required")
1189 }
1190 }
1191 Error(_) -> Error("Authentication required")
1192 }
1193 },
1194 ),
1195 // negateLabel mutation (admin only)
1196 schema.field_with_args(
1197 "negateLabel",
1198 schema.non_null(admin_types.label_type()),
1199 "Negate (retract) a label on a record or account (admin only)",
1200 [
1201 schema.argument(
1202 "uri",
1203 schema.non_null(schema.string_type()),
1204 "Subject URI",
1205 None,
1206 ),
1207 schema.argument(
1208 "val",
1209 schema.non_null(schema.string_type()),
1210 "Label value to negate",
1211 None,
1212 ),
1213 ],
1214 fn(ctx) {
1215 case session.get_current_session(req, conn, did_cache) {
1216 Ok(sess) -> {
1217 case config_repo.is_admin(conn, sess.did) {
1218 True -> {
1219 case
1220 schema.get_argument(ctx, "uri"),
1221 schema.get_argument(ctx, "val")
1222 {
1223 Some(value.String(uri)), Some(value.String(val)) -> {
1224 // Validate URI format
1225 case labels.is_valid_subject_uri(uri) {
1226 False ->
1227 Error(
1228 "Invalid URI format. Must be at://did/collection/rkey or a DID",
1229 )
1230 True -> {
1231 case labels.insert_negation(conn, sess.did, uri, val) {
1232 Ok(label) -> Ok(converters.label_to_value(label))
1233 Error(_) -> Error("Failed to negate label")
1234 }
1235 }
1236 }
1237 }
1238 _, _ -> Error("uri and val are required")
1239 }
1240 }
1241 False -> Error("Admin privileges required")
1242 }
1243 }
1244 Error(_) -> Error("Authentication required")
1245 }
1246 },
1247 ),
1248 // createLabelDefinition mutation (admin only)
1249 schema.field_with_args(
1250 "createLabelDefinition",
1251 schema.non_null(admin_types.label_definition_type()),
1252 "Create a custom label definition (admin only)",
1253 [
1254 schema.argument(
1255 "val",
1256 schema.non_null(schema.string_type()),
1257 "Label value",
1258 None,
1259 ),
1260 schema.argument(
1261 "description",
1262 schema.non_null(schema.string_type()),
1263 "Description",
1264 None,
1265 ),
1266 schema.argument(
1267 "severity",
1268 schema.non_null(admin_types.label_severity_enum()),
1269 "Severity level",
1270 None,
1271 ),
1272 schema.argument(
1273 "defaultVisibility",
1274 schema.string_type(),
1275 "Default visibility setting (ignore, show, warn, hide). Defaults to warn.",
1276 None,
1277 ),
1278 ],
1279 fn(ctx) {
1280 case session.get_current_session(req, conn, did_cache) {
1281 Ok(sess) -> {
1282 case config_repo.is_admin(conn, sess.did) {
1283 True -> {
1284 // Extract severity as string from either Enum or String
1285 let severity_opt = case schema.get_argument(ctx, "severity") {
1286 Some(value.Enum(s)) -> Some(string.lowercase(s))
1287 Some(value.String(s)) -> Some(string.lowercase(s))
1288 _ -> None
1289 }
1290 // Extract defaultVisibility (defaults to "warn")
1291 let default_visibility = case
1292 schema.get_argument(ctx, "defaultVisibility")
1293 {
1294 Some(value.Enum(v)) -> string.lowercase(v)
1295 Some(value.String(v)) -> string.lowercase(v)
1296 _ -> "warn"
1297 }
1298 // Validate defaultVisibility
1299 case label_definitions.validate_visibility(default_visibility) {
1300 Error(e) -> Error(e)
1301 Ok(_) -> {
1302 case
1303 schema.get_argument(ctx, "val"),
1304 schema.get_argument(ctx, "description"),
1305 severity_opt
1306 {
1307 Some(value.String(val)),
1308 Some(value.String(desc)),
1309 Some(severity)
1310 -> {
1311 case
1312 label_definitions.insert(
1313 conn,
1314 val,
1315 desc,
1316 severity,
1317 default_visibility,
1318 )
1319 {
1320 Ok(_) -> {
1321 case label_definitions.get(conn, val) {
1322 Ok(Some(def)) ->
1323 Ok(converters.label_definition_to_value(def))
1324 _ -> Error("Failed to fetch created definition")
1325 }
1326 }
1327 Error(_) -> Error("Failed to create label definition")
1328 }
1329 }
1330 _, _, _ ->
1331 Error("val, description, and severity are required")
1332 }
1333 }
1334 }
1335 }
1336 False -> Error("Admin privileges required")
1337 }
1338 }
1339 Error(_) -> Error("Authentication required")
1340 }
1341 },
1342 ),
1343 // resolveReport mutation (admin only)
1344 schema.field_with_args(
1345 "resolveReport",
1346 schema.non_null(admin_types.report_type()),
1347 "Resolve a moderation report (admin only)",
1348 [
1349 schema.argument(
1350 "id",
1351 schema.non_null(schema.int_type()),
1352 "Report ID",
1353 None,
1354 ),
1355 schema.argument(
1356 "action",
1357 schema.non_null(admin_types.report_action_enum()),
1358 "Action to take",
1359 None,
1360 ),
1361 schema.argument(
1362 "labelVal",
1363 schema.string_type(),
1364 "Label value to apply (required if action is APPLY_LABEL)",
1365 None,
1366 ),
1367 ],
1368 fn(ctx) {
1369 case session.get_current_session(req, conn, did_cache) {
1370 Ok(sess) -> {
1371 case config_repo.is_admin(conn, sess.did) {
1372 True -> {
1373 // Extract action as string from either Enum or String
1374 let action_opt = case schema.get_argument(ctx, "action") {
1375 Some(value.Enum(a)) -> Some(a)
1376 Some(value.String(a)) -> Some(a)
1377 _ -> None
1378 }
1379 case schema.get_argument(ctx, "id"), action_opt {
1380 Some(value.Int(id)), Some(action) -> {
1381 // Get the report first
1382 case reports.get(conn, id) {
1383 Ok(Some(report)) -> {
1384 case action {
1385 "APPLY_LABEL" -> {
1386 case schema.get_argument(ctx, "labelVal") {
1387 Some(value.String(label_val)) -> {
1388 // Validate label value exists
1389 case label_definitions.exists(conn, label_val) {
1390 Ok(True) -> {
1391 // Create the label
1392 case
1393 labels.insert(
1394 conn,
1395 sess.did,
1396 report.subject_uri,
1397 None,
1398 label_val,
1399 None,
1400 )
1401 {
1402 Ok(_) -> {
1403 // Mark report as resolved
1404 case
1405 reports.resolve(
1406 conn,
1407 id,
1408 "resolved",
1409 sess.did,
1410 )
1411 {
1412 Ok(resolved) ->
1413 Ok(converters.report_to_value(
1414 resolved,
1415 ))
1416 Error(_) ->
1417 Error("Failed to resolve report")
1418 }
1419 }
1420 Error(_) -> Error("Failed to apply label")
1421 }
1422 }
1423 Ok(False) ->
1424 Error("Unknown label value: " <> label_val)
1425 Error(_) ->
1426 Error("Failed to validate label value")
1427 }
1428 }
1429 _ ->
1430 Error(
1431 "labelVal is required when action is APPLY_LABEL",
1432 )
1433 }
1434 }
1435 "DISMISS" -> {
1436 case
1437 reports.resolve(conn, id, "dismissed", sess.did)
1438 {
1439 Ok(resolved) ->
1440 Ok(converters.report_to_value(resolved))
1441 Error(_) -> Error("Failed to dismiss report")
1442 }
1443 }
1444 _ -> Error("Invalid action")
1445 }
1446 }
1447 Ok(None) -> Error("Report not found")
1448 Error(_) -> Error("Failed to fetch report")
1449 }
1450 }
1451 _, _ -> Error("id and action are required")
1452 }
1453 }
1454 False -> Error("Admin privileges required")
1455 }
1456 }
1457 Error(_) -> Error("Authentication required")
1458 }
1459 },
1460 ),
1461 ])
1462}