Auto-indexing service and GraphQL API for AT Protocol Records
quickslice.slices.network/
atproto
gleam
graphql
1/// Mutation Resolvers for lexicon GraphQL API
2///
3/// Implements GraphQL mutation resolvers with AT Protocol integration.
4/// These resolvers handle authentication, validation, and database operations.
5import actor_validator
6import atproto_auth
7import backfill
8import database/executor.{type Executor}
9import database/repositories/label_definitions
10import database/repositories/label_preferences
11import database/repositories/lexicons
12import database/repositories/records
13import database/repositories/reports
14import dpop
15import gleam/dict
16import gleam/dynamic
17import gleam/dynamic/decode
18import gleam/erlang/process.{type Subject}
19import gleam/int
20import gleam/json
21import gleam/list
22import gleam/option
23import gleam/result
24import gleam/string
25import honk
26import honk/errors
27import lexicon_graphql/input/union as union_input
28import lib/oauth/did_cache
29import pubsub
30import swell/schema
31import swell/value
32import timestamp
33
34/// Context for mutation execution
35pub type MutationContext {
36 MutationContext(
37 db: Executor,
38 did_cache: Subject(did_cache.Message),
39 signing_key: option.Option(String),
40 atp_client_id: String,
41 plc_url: String,
42 collection_ids: List(String),
43 external_collection_ids: List(String),
44 )
45}
46
47// ─── Private Auth Helpers ───────────────────────────────────────────
48
49/// Authenticated session info returned by auth helper
50type AuthenticatedSession {
51 AuthenticatedSession(
52 user_info: atproto_auth.UserInfo,
53 session: atproto_auth.AtprotoSession,
54 )
55}
56
57/// Lightweight auth that only verifies the token
58/// Use this for mutations that don't need ATP session (e.g., label preferences)
59fn get_viewer_auth(
60 resolver_ctx: schema.Context,
61 db: executor.Executor,
62) -> Result(atproto_auth.UserInfo, String) {
63 // Extract auth token from context data
64 let token = case resolver_ctx.data {
65 option.Some(value.Object(fields)) -> {
66 case list.key_find(fields, "auth_token") {
67 Ok(value.String(t)) -> Ok(t)
68 Ok(_) -> Error("auth_token must be a string")
69 Error(_) ->
70 Error("Authentication required. Please provide Authorization header.")
71 }
72 }
73 _ -> Error("Authentication required. Please provide Authorization header.")
74 }
75
76 use token <- result.try(token)
77
78 // Verify OAuth token
79 atproto_auth.verify_token(db, token)
80 |> result.map_error(fn(err) {
81 case err {
82 atproto_auth.UnauthorizedToken -> "Unauthorized"
83 atproto_auth.TokenExpired -> "Token expired"
84 atproto_auth.MissingAuthHeader -> "Missing authentication"
85 atproto_auth.InvalidAuthHeader -> "Invalid authentication header"
86 _ -> "Authentication error"
87 }
88 })
89}
90
91/// Extract token, verify auth, ensure actor exists, get ATP session
92fn get_authenticated_session(
93 resolver_ctx: schema.Context,
94 ctx: MutationContext,
95) -> Result(AuthenticatedSession, String) {
96 // Step 1: Extract auth token from context data
97 let token = case resolver_ctx.data {
98 option.Some(value.Object(fields)) -> {
99 case list.key_find(fields, "auth_token") {
100 Ok(value.String(t)) -> Ok(t)
101 Ok(_) -> Error("auth_token must be a string")
102 Error(_) ->
103 Error("Authentication required. Please provide Authorization header.")
104 }
105 }
106 _ -> Error("Authentication required. Please provide Authorization header.")
107 }
108
109 use token <- result.try(token)
110
111 // Step 2: Verify OAuth token
112 use user_info <- result.try(
113 atproto_auth.verify_token(ctx.db, token)
114 |> result.map_error(fn(err) {
115 case err {
116 atproto_auth.UnauthorizedToken -> "Unauthorized"
117 atproto_auth.TokenExpired -> "Token expired"
118 atproto_auth.MissingAuthHeader -> "Missing authentication"
119 atproto_auth.InvalidAuthHeader -> "Invalid authentication header"
120 _ -> "Authentication error"
121 }
122 }),
123 )
124
125 // Step 3: Ensure actor exists in database
126 use is_new_actor <- result.try(actor_validator.ensure_actor_exists(
127 ctx.db,
128 user_info.did,
129 ctx.plc_url,
130 ))
131
132 // If new actor, spawn backfill for all collections
133 case is_new_actor {
134 True -> {
135 process.spawn_unlinked(fn() {
136 backfill.backfill_collections_for_actor(
137 ctx.db,
138 user_info.did,
139 ctx.collection_ids,
140 ctx.external_collection_ids,
141 ctx.plc_url,
142 )
143 })
144 Nil
145 }
146 False -> Nil
147 }
148
149 // Step 4: Get AT Protocol session
150 use session <- result.try(
151 atproto_auth.get_atp_session(
152 ctx.db,
153 ctx.did_cache,
154 token,
155 ctx.signing_key,
156 ctx.atp_client_id,
157 )
158 |> result.map_error(fn(err) {
159 case err {
160 atproto_auth.SessionNotFound -> "Session not found"
161 atproto_auth.SessionNotReady -> "Session not ready"
162 atproto_auth.RefreshFailed(msg) -> "Token refresh failed: " <> msg
163 atproto_auth.DIDResolutionFailed(msg) ->
164 "DID resolution failed: " <> msg
165 _ -> "Failed to get ATP session"
166 }
167 }),
168 )
169
170 Ok(AuthenticatedSession(user_info: user_info, session: session))
171}
172
173// ─── Private Blob Helpers ──────────────────────────────────────────
174
175/// Convert GraphQL value to JSON value (not string)
176fn graphql_value_to_json_value(val: value.Value) -> json.Json {
177 case val {
178 value.String(s) -> json.string(s)
179 value.Int(i) -> json.int(i)
180 value.Float(f) -> json.float(f)
181 value.Boolean(b) -> json.bool(b)
182 value.Null -> json.null()
183 value.Enum(e) -> json.string(e)
184 value.List(items) -> json.array(items, graphql_value_to_json_value)
185 value.Object(fields) -> {
186 json.object(
187 fields
188 |> list.map(fn(field) {
189 let #(key, val) = field
190 #(key, graphql_value_to_json_value(val))
191 }),
192 )
193 }
194 }
195}
196
197/// Get blob field paths from a lexicon for a given collection
198fn get_blob_paths(
199 collection: String,
200 lexicons: List(json.Json),
201) -> List(List(String)) {
202 let lexicon =
203 list.find(lexicons, fn(lex) {
204 case json.parse(json.to_string(lex), decode.at(["id"], decode.string)) {
205 Ok(id) -> id == collection
206 Error(_) -> False
207 }
208 })
209
210 case lexicon {
211 Ok(lex) -> {
212 let properties_decoder =
213 decode.at(
214 ["defs", "main", "record", "properties"],
215 decode.dict(decode.string, decode.dynamic),
216 )
217 case json.parse(json.to_string(lex), properties_decoder) {
218 Ok(properties) -> extract_blob_paths_from_properties(properties, [])
219 Error(_) -> []
220 }
221 }
222 Error(_) -> []
223 }
224}
225
226/// Recursively extract blob paths from lexicon properties
227fn extract_blob_paths_from_properties(
228 properties: dict.Dict(String, dynamic.Dynamic),
229 current_path: List(String),
230) -> List(List(String)) {
231 dict.fold(properties, [], fn(acc, field_name, field_def) {
232 let field_path = list.append(current_path, [field_name])
233 let type_result = decode.run(field_def, decode.at(["type"], decode.string))
234
235 case type_result {
236 Ok("blob") -> [field_path, ..acc]
237 Ok("object") -> {
238 let nested_props_result =
239 decode.run(
240 field_def,
241 decode.at(
242 ["properties"],
243 decode.dict(decode.string, decode.dynamic),
244 ),
245 )
246 case nested_props_result {
247 Ok(nested_props) -> {
248 let nested_paths =
249 extract_blob_paths_from_properties(nested_props, field_path)
250 list.append(nested_paths, acc)
251 }
252 Error(_) -> acc
253 }
254 }
255 Ok("array") -> {
256 let items_type_result =
257 decode.run(field_def, decode.at(["items", "type"], decode.string))
258 case items_type_result {
259 Ok("blob") -> [field_path, ..acc]
260 Ok("object") -> {
261 let item_props_result =
262 decode.run(
263 field_def,
264 decode.at(
265 ["items", "properties"],
266 decode.dict(decode.string, decode.dynamic),
267 ),
268 )
269 case item_props_result {
270 Ok(item_props) -> {
271 let nested_paths =
272 extract_blob_paths_from_properties(item_props, field_path)
273 list.append(nested_paths, acc)
274 }
275 Error(_) -> acc
276 }
277 }
278 _ -> acc
279 }
280 }
281 _ -> acc
282 }
283 })
284}
285
286/// Transform blob inputs in a value from GraphQL format to AT Protocol format
287fn transform_blob_inputs(
288 input: value.Value,
289 blob_paths: List(List(String)),
290) -> value.Value {
291 transform_value_at_paths(input, blob_paths, [])
292}
293
294/// Recursively transform values at blob paths
295fn transform_value_at_paths(
296 val: value.Value,
297 blob_paths: List(List(String)),
298 current_path: List(String),
299) -> value.Value {
300 case val {
301 value.Object(fields) -> {
302 let is_blob_path =
303 list.any(blob_paths, fn(path) {
304 path == current_path && current_path != []
305 })
306
307 case is_blob_path {
308 True -> transform_blob_object(fields)
309 False -> {
310 value.Object(
311 list.map(fields, fn(field) {
312 let #(key, field_val) = field
313 let new_path = list.append(current_path, [key])
314 #(key, transform_value_at_paths(field_val, blob_paths, new_path))
315 }),
316 )
317 }
318 }
319 }
320 value.List(items) -> {
321 let is_blob_array_path =
322 list.any(blob_paths, fn(path) {
323 path == current_path && current_path != []
324 })
325
326 case is_blob_array_path {
327 True -> {
328 value.List(
329 list.map(items, fn(item) {
330 case item {
331 value.Object(item_fields) -> transform_blob_object(item_fields)
332 _ -> item
333 }
334 }),
335 )
336 }
337 False -> {
338 let paths_through_here =
339 list.filter(blob_paths, fn(path) {
340 list.length(path) > list.length(current_path)
341 && list.take(path, list.length(current_path)) == current_path
342 })
343
344 case list.is_empty(paths_through_here) {
345 True -> val
346 False -> {
347 value.List(
348 list.map(items, fn(item) {
349 transform_value_at_paths(item, blob_paths, current_path)
350 }),
351 )
352 }
353 }
354 }
355 }
356 }
357 _ -> val
358 }
359}
360
361/// Transform a BlobInput object to AT Protocol blob format
362fn transform_blob_object(fields: List(#(String, value.Value))) -> value.Value {
363 let ref = case list.key_find(fields, "ref") {
364 Ok(value.String(r)) -> r
365 _ -> ""
366 }
367 let mime_type = case list.key_find(fields, "mimeType") {
368 Ok(value.String(m)) -> m
369 _ -> ""
370 }
371 let size = case list.key_find(fields, "size") {
372 Ok(value.Int(s)) -> s
373 _ -> 0
374 }
375
376 case ref != "" && mime_type != "" {
377 True ->
378 value.Object([
379 #("$type", value.String("blob")),
380 #("ref", value.Object([#("$link", value.String(ref))])),
381 #("mimeType", value.String(mime_type)),
382 #("size", value.Int(size)),
383 ])
384 False -> value.Object(fields)
385 }
386}
387
388// ─── Private Union Helpers ────────────────────────────────────────
389
390/// Union field info: path to field and list of possible type refs
391type UnionFieldInfo {
392 UnionFieldInfo(path: List(String), refs: List(String))
393}
394
395/// Get union field info from a lexicon for a given collection
396fn get_union_fields(
397 collection: String,
398 lexicons: List(json.Json),
399) -> List(UnionFieldInfo) {
400 let lexicon =
401 list.find(lexicons, fn(lex) {
402 case json.parse(json.to_string(lex), decode.at(["id"], decode.string)) {
403 Ok(id) -> id == collection
404 Error(_) -> False
405 }
406 })
407
408 case lexicon {
409 Ok(lex) -> {
410 let properties_decoder =
411 decode.at(
412 ["defs", "main", "record", "properties"],
413 decode.dict(decode.string, decode.dynamic),
414 )
415 case json.parse(json.to_string(lex), properties_decoder) {
416 Ok(properties) -> extract_union_fields_from_properties(properties, [])
417 Error(_) -> []
418 }
419 }
420 Error(_) -> []
421 }
422}
423
424/// Recursively extract union fields from lexicon properties
425fn extract_union_fields_from_properties(
426 properties: dict.Dict(String, dynamic.Dynamic),
427 current_path: List(String),
428) -> List(UnionFieldInfo) {
429 dict.fold(properties, [], fn(acc, field_name, field_def) {
430 let field_path = list.append(current_path, [field_name])
431 let type_result = decode.run(field_def, decode.at(["type"], decode.string))
432
433 case type_result {
434 Ok("union") -> {
435 // Extract refs from the union definition
436 let refs_result =
437 decode.run(field_def, decode.at(["refs"], decode.list(decode.string)))
438 case refs_result {
439 Ok(refs) -> [UnionFieldInfo(path: field_path, refs: refs), ..acc]
440 Error(_) -> acc
441 }
442 }
443 Ok("object") -> {
444 let nested_props_result =
445 decode.run(
446 field_def,
447 decode.at(
448 ["properties"],
449 decode.dict(decode.string, decode.dynamic),
450 ),
451 )
452 case nested_props_result {
453 Ok(nested_props) -> {
454 let nested_fields =
455 extract_union_fields_from_properties(nested_props, field_path)
456 list.append(nested_fields, acc)
457 }
458 Error(_) -> acc
459 }
460 }
461 Ok("array") -> {
462 let items_type_result =
463 decode.run(field_def, decode.at(["items", "type"], decode.string))
464 case items_type_result {
465 Ok("union") -> {
466 let refs_result =
467 decode.run(
468 field_def,
469 decode.at(["items", "refs"], decode.list(decode.string)),
470 )
471 case refs_result {
472 Ok(refs) -> [UnionFieldInfo(path: field_path, refs: refs), ..acc]
473 Error(_) -> acc
474 }
475 }
476 Ok("object") -> {
477 let item_props_result =
478 decode.run(
479 field_def,
480 decode.at(
481 ["items", "properties"],
482 decode.dict(decode.string, decode.dynamic),
483 ),
484 )
485 case item_props_result {
486 Ok(item_props) -> {
487 let nested_fields =
488 extract_union_fields_from_properties(item_props, field_path)
489 list.append(nested_fields, acc)
490 }
491 Error(_) -> acc
492 }
493 }
494 _ -> acc
495 }
496 }
497 _ -> acc
498 }
499 })
500}
501
502/// Transform union inputs by adding $type based on the discriminator
503fn transform_union_inputs(
504 input: value.Value,
505 union_fields: List(UnionFieldInfo),
506) -> value.Value {
507 transform_unions_at_paths(input, union_fields, [])
508}
509
510/// Recursively transform union values at specified paths
511fn transform_unions_at_paths(
512 val: value.Value,
513 union_fields: List(UnionFieldInfo),
514 current_path: List(String),
515) -> value.Value {
516 case val {
517 value.Object(fields) -> {
518 // Check if current path matches a union field
519 let matching_union =
520 list.find(union_fields, fn(uf) { uf.path == current_path })
521
522 case matching_union {
523 Ok(union_info) -> transform_union_object(fields, union_info.refs)
524 Error(_) -> {
525 // Recurse into object fields
526 value.Object(
527 list.map(fields, fn(field) {
528 let #(key, field_val) = field
529 let new_path = list.append(current_path, [key])
530 #(
531 key,
532 transform_unions_at_paths(field_val, union_fields, new_path),
533 )
534 }),
535 )
536 }
537 }
538 }
539 value.List(items) -> {
540 // Check if current path is a union array
541 let matching_union =
542 list.find(union_fields, fn(uf) { uf.path == current_path })
543
544 case matching_union {
545 Ok(union_info) -> {
546 // Transform each item in the array
547 value.List(
548 list.map(items, fn(item) {
549 case item {
550 value.Object(item_fields) ->
551 transform_union_object(item_fields, union_info.refs)
552 _ -> item
553 }
554 }),
555 )
556 }
557 Error(_) -> {
558 // Recurse into list items
559 value.List(
560 list.map(items, fn(item) {
561 transform_unions_at_paths(item, union_fields, current_path)
562 }),
563 )
564 }
565 }
566 }
567 _ -> val
568 }
569}
570
571/// Transform a union object from GraphQL discriminated format to AT Protocol format
572/// GraphQL input: { type: "SELF_LABELS", selfLabels: { values: [...] } }
573/// AT Protocol output: { $type: "com.atproto.label.defs#selfLabels", values: [...] }
574fn transform_union_object(
575 fields: List(#(String, value.Value)),
576 refs: List(String),
577) -> value.Value {
578 // Find the "type" discriminator field
579 let type_field = list.key_find(fields, "type")
580
581 case type_field {
582 Ok(value.Enum(enum_value)) -> {
583 // Convert enum value back to ref
584 let matching_ref = find_ref_for_enum_value(enum_value, refs)
585 case matching_ref {
586 Ok(ref) -> {
587 // Find the variant field (same name as the short ref name)
588 let short_name = enum_value_to_short_name(enum_value)
589 case list.key_find(fields, short_name) {
590 Ok(value.Object(variant_fields)) -> {
591 // Build AT Protocol format: variant fields + $type
592 value.Object([#("$type", value.String(ref)), ..variant_fields])
593 }
594 _ -> {
595 // No variant data, just return $type
596 value.Object([#("$type", value.String(ref))])
597 }
598 }
599 }
600 Error(_) -> value.Object(fields)
601 }
602 }
603 Ok(value.String(str_value)) -> {
604 // Handle string type discriminator (fallback)
605 let matching_ref = find_ref_for_enum_value(str_value, refs)
606 case matching_ref {
607 Ok(ref) -> {
608 let short_name = enum_value_to_short_name(str_value)
609 case list.key_find(fields, short_name) {
610 Ok(value.Object(variant_fields)) -> {
611 value.Object([#("$type", value.String(ref)), ..variant_fields])
612 }
613 _ -> value.Object([#("$type", value.String(ref))])
614 }
615 }
616 Error(_) -> value.Object(fields)
617 }
618 }
619 _ -> value.Object(fields)
620 }
621}
622
623/// Find the ref that matches an enum value
624/// "SELF_LABELS" matches "com.atproto.label.defs#selfLabels"
625fn find_ref_for_enum_value(
626 enum_value: String,
627 refs: List(String),
628) -> Result(String, Nil) {
629 list.find(refs, fn(ref) { union_input.ref_to_enum_value(ref) == enum_value })
630}
631
632/// Convert SCREAMING_SNAKE_CASE to camelCase for field lookup
633/// "SELF_LABELS" -> "selfLabels"
634fn enum_value_to_short_name(enum_value: String) -> String {
635 union_input.screaming_snake_to_camel(enum_value)
636}
637
638/// Decode base64 string to bit array
639fn decode_base64(base64_str: String) -> Result(BitArray, Nil) {
640 Ok(do_erlang_base64_decode(base64_str))
641}
642
643/// Extract blob fields from dynamic PDS response
644fn extract_blob_from_dynamic(
645 blob_dynamic: dynamic.Dynamic,
646 did: String,
647) -> Result(value.Value, String) {
648 let ref_link_decoder = {
649 use link <- decode.field("$link", decode.string)
650 decode.success(link)
651 }
652
653 let full_decoder = {
654 use mime_type <- decode.field("mimeType", decode.string)
655 use size <- decode.field("size", decode.int)
656 use ref <- decode.field("ref", ref_link_decoder)
657 decode.success(#(ref, mime_type, size))
658 }
659
660 use #(ref, mime_type, size) <- result.try(
661 decode.run(blob_dynamic, full_decoder)
662 |> result.map_error(fn(_) { "Failed to decode blob fields" }),
663 )
664
665 Ok(
666 value.Object([
667 #("ref", value.String(ref)),
668 #("mime_type", value.String(mime_type)),
669 #("size", value.Int(size)),
670 #("did", value.String(did)),
671 ]),
672 )
673}
674
675/// Erlang FFI: base64:decode/1 returns BitArray directly (not Result)
676@external(erlang, "base64", "decode")
677fn do_erlang_base64_decode(a: String) -> BitArray
678
679// ─── Public Resolver Factories ─────────────────────────────────────
680
681/// Create a resolver factory for create mutations
682pub fn create_resolver_factory(
683 collection: String,
684 ctx: MutationContext,
685) -> schema.Resolver {
686 fn(resolver_ctx: schema.Context) -> Result(value.Value, String) {
687 // Get authenticated session using helper
688 use auth <- result.try(get_authenticated_session(resolver_ctx, ctx))
689
690 // Get input and rkey from arguments
691 let input_result = case schema.get_argument(resolver_ctx, "input") {
692 option.Some(val) -> Ok(val)
693 option.None -> Error("Missing required argument: input")
694 }
695
696 use input <- result.try(input_result)
697
698 let rkey = case schema.get_argument(resolver_ctx, "rkey") {
699 option.Some(value.String(r)) -> option.Some(r)
700 _ -> option.None
701 }
702
703 // Fetch lexicons for validation and blob path extraction
704 use all_lexicon_records <- result.try(
705 lexicons.get_all(ctx.db)
706 |> result.map_error(fn(_) { "Failed to fetch lexicons" }),
707 )
708
709 use all_lex_jsons <- result.try(
710 all_lexicon_records
711 |> list.try_map(fn(lex) {
712 honk.parse_json_string(lex.json)
713 |> result.map_error(fn(e) {
714 "Failed to parse lexicon JSON: " <> errors.to_string(e)
715 })
716 }),
717 )
718
719 // Transform blob inputs from GraphQL format to AT Protocol format
720 let blob_paths = get_blob_paths(collection, all_lex_jsons)
721 let blob_transformed = transform_blob_inputs(input, blob_paths)
722
723 // Transform union inputs from GraphQL discriminated format to AT Protocol format
724 let union_fields = get_union_fields(collection, all_lex_jsons)
725 let transformed_input =
726 transform_union_inputs(blob_transformed, union_fields)
727
728 let record_json_value = graphql_value_to_json_value(transformed_input)
729 let record_json_string = json.to_string(record_json_value)
730
731 // Validate against lexicon
732 use _ <- result.try(
733 honk.validate_record(all_lex_jsons, collection, record_json_value)
734 |> result.map_error(fn(err) {
735 "Validation failed: " <> errors.to_string(err)
736 }),
737 )
738
739 // Call createRecord via AT Protocol
740 let create_body =
741 case rkey {
742 option.Some(r) ->
743 json.object([
744 #("repo", json.string(auth.user_info.did)),
745 #("collection", json.string(collection)),
746 #("rkey", json.string(r)),
747 #("record", record_json_value),
748 ])
749 option.None ->
750 json.object([
751 #("repo", json.string(auth.user_info.did)),
752 #("collection", json.string(collection)),
753 #("record", record_json_value),
754 ])
755 }
756 |> json.to_string
757
758 let pds_url =
759 auth.session.pds_endpoint <> "/xrpc/com.atproto.repo.createRecord"
760
761 use response <- result.try(
762 dpop.make_dpop_request("POST", pds_url, auth.session, create_body)
763 |> result.map_error(fn(_) { "Failed to create record on PDS" }),
764 )
765
766 use #(uri, cid) <- result.try(case response.status {
767 200 | 201 -> {
768 let response_decoder = {
769 use uri <- decode.field("uri", decode.string)
770 use cid <- decode.field("cid", decode.string)
771 decode.success(#(uri, cid))
772 }
773 json.parse(response.body, response_decoder)
774 |> result.map_error(fn(_) {
775 "Failed to parse PDS success response. Body: " <> response.body
776 })
777 }
778 _ ->
779 Error(
780 "PDS request failed with status "
781 <> int.to_string(response.status)
782 <> ": "
783 <> response.body,
784 )
785 })
786
787 // Index the created record in the database
788 use _ <- result.try(
789 records.insert(
790 ctx.db,
791 uri,
792 cid,
793 auth.user_info.did,
794 collection,
795 record_json_string,
796 )
797 |> result.map_error(fn(_) { "Failed to index record in database" }),
798 )
799
800 // Publish event for GraphQL subscriptions
801 pubsub.publish(pubsub.RecordEvent(
802 uri: uri,
803 cid: cid,
804 did: auth.user_info.did,
805 collection: collection,
806 value: record_json_string,
807 indexed_at: timestamp.current_iso8601(),
808 operation: pubsub.Create,
809 ))
810
811 Ok(
812 value.Object([
813 #("uri", value.String(uri)),
814 #("cid", value.String(cid)),
815 #("did", value.String(auth.user_info.did)),
816 #("collection", value.String(collection)),
817 #("indexedAt", value.String("")),
818 #("value", input),
819 ]),
820 )
821 }
822}
823
824/// Create a resolver factory for update mutations
825pub fn update_resolver_factory(
826 collection: String,
827 ctx: MutationContext,
828) -> schema.Resolver {
829 fn(resolver_ctx: schema.Context) -> Result(value.Value, String) {
830 // Get authenticated session using helper
831 use auth <- result.try(get_authenticated_session(resolver_ctx, ctx))
832
833 // Get rkey (required) and input from arguments
834 let rkey_result = case schema.get_argument(resolver_ctx, "rkey") {
835 option.Some(value.String(r)) -> Ok(r)
836 option.Some(_) -> Error("rkey must be a string")
837 option.None -> Error("Missing required argument: rkey")
838 }
839
840 use rkey <- result.try(rkey_result)
841
842 let input_result = case schema.get_argument(resolver_ctx, "input") {
843 option.Some(val) -> Ok(val)
844 option.None -> Error("Missing required argument: input")
845 }
846
847 use input <- result.try(input_result)
848
849 // Fetch lexicons for validation and blob path extraction
850 use all_lexicon_records <- result.try(
851 lexicons.get_all(ctx.db)
852 |> result.map_error(fn(_) { "Failed to fetch lexicons" }),
853 )
854
855 use all_lex_jsons <- result.try(
856 all_lexicon_records
857 |> list.try_map(fn(lex) {
858 honk.parse_json_string(lex.json)
859 |> result.map_error(fn(e) {
860 "Failed to parse lexicon JSON: " <> errors.to_string(e)
861 })
862 }),
863 )
864
865 // Transform blob inputs from GraphQL format to AT Protocol format
866 let blob_paths = get_blob_paths(collection, all_lex_jsons)
867 let blob_transformed = transform_blob_inputs(input, blob_paths)
868
869 // Transform union inputs from GraphQL discriminated format to AT Protocol format
870 let union_fields = get_union_fields(collection, all_lex_jsons)
871 let transformed_input =
872 transform_union_inputs(blob_transformed, union_fields)
873
874 let record_json_value = graphql_value_to_json_value(transformed_input)
875 let record_json_string = json.to_string(record_json_value)
876
877 // Validate against lexicon
878 use _ <- result.try(
879 honk.validate_record(all_lex_jsons, collection, record_json_value)
880 |> result.map_error(fn(err) {
881 "Validation failed: " <> errors.to_string(err)
882 }),
883 )
884
885 // Call putRecord via AT Protocol
886 let update_body =
887 json.object([
888 #("repo", json.string(auth.user_info.did)),
889 #("collection", json.string(collection)),
890 #("rkey", json.string(rkey)),
891 #("record", record_json_value),
892 ])
893 |> json.to_string
894
895 let pds_url =
896 auth.session.pds_endpoint <> "/xrpc/com.atproto.repo.putRecord"
897
898 use response <- result.try(
899 dpop.make_dpop_request("POST", pds_url, auth.session, update_body)
900 |> result.map_error(fn(_) { "Failed to update record on PDS" }),
901 )
902
903 use #(uri, cid) <- result.try(case response.status {
904 200 | 201 -> {
905 let response_decoder = {
906 use uri <- decode.field("uri", decode.string)
907 use cid <- decode.field("cid", decode.string)
908 decode.success(#(uri, cid))
909 }
910 json.parse(response.body, response_decoder)
911 |> result.map_error(fn(_) {
912 "Failed to parse PDS success response. Body: " <> response.body
913 })
914 }
915 _ ->
916 Error(
917 "PDS request failed with status "
918 <> int.to_string(response.status)
919 <> ": "
920 <> response.body,
921 )
922 })
923
924 // Update the record in the database
925 use _ <- result.try(
926 records.update(ctx.db, uri, cid, record_json_string)
927 |> result.map_error(fn(_) { "Failed to update record in database" }),
928 )
929
930 // Publish event for GraphQL subscriptions
931 pubsub.publish(pubsub.RecordEvent(
932 uri: uri,
933 cid: cid,
934 did: auth.user_info.did,
935 collection: collection,
936 value: record_json_string,
937 indexed_at: timestamp.current_iso8601(),
938 operation: pubsub.Update,
939 ))
940
941 Ok(
942 value.Object([
943 #("uri", value.String(uri)),
944 #("cid", value.String(cid)),
945 #("did", value.String(auth.user_info.did)),
946 #("collection", value.String(collection)),
947 #("indexedAt", value.String("")),
948 #("value", input),
949 ]),
950 )
951 }
952}
953
954/// Create a resolver factory for delete mutations
955pub fn delete_resolver_factory(
956 collection: String,
957 ctx: MutationContext,
958) -> schema.Resolver {
959 fn(resolver_ctx: schema.Context) -> Result(value.Value, String) {
960 // Get authenticated session using helper
961 use auth <- result.try(get_authenticated_session(resolver_ctx, ctx))
962
963 // Get rkey (required) from arguments
964 let rkey_result = case schema.get_argument(resolver_ctx, "rkey") {
965 option.Some(value.String(r)) -> Ok(r)
966 option.Some(_) -> Error("rkey must be a string")
967 option.None -> Error("Missing required argument: rkey")
968 }
969
970 use rkey <- result.try(rkey_result)
971
972 // Build the record URI to be deleted
973 let uri = "at://" <> auth.user_info.did <> "/" <> collection <> "/" <> rkey
974
975 // Call deleteRecord via AT Protocol
976 let delete_body =
977 json.object([
978 #("repo", json.string(auth.user_info.did)),
979 #("collection", json.string(collection)),
980 #("rkey", json.string(rkey)),
981 ])
982 |> json.to_string
983
984 let pds_url =
985 auth.session.pds_endpoint <> "/xrpc/com.atproto.repo.deleteRecord"
986
987 use response <- result.try(
988 dpop.make_dpop_request("POST", pds_url, auth.session, delete_body)
989 |> result.map_error(fn(_) { "Failed to delete record on PDS" }),
990 )
991
992 use _ <- result.try(case response.status {
993 200 | 201 | 204 -> Ok(Nil)
994 _ ->
995 Error(
996 "PDS delete request failed with status "
997 <> int.to_string(response.status)
998 <> ": "
999 <> response.body,
1000 )
1001 })
1002
1003 // Delete the record from the database
1004 use _ <- result.try(
1005 records.delete(ctx.db, uri)
1006 |> result.map_error(fn(_) { "Failed to delete record from database" }),
1007 )
1008
1009 // Publish event for GraphQL subscriptions
1010 pubsub.publish(pubsub.RecordEvent(
1011 uri: uri,
1012 cid: "",
1013 did: auth.user_info.did,
1014 collection: collection,
1015 value: "",
1016 indexed_at: timestamp.current_iso8601(),
1017 operation: pubsub.Delete,
1018 ))
1019
1020 Ok(value.Object([#("uri", value.String(uri))]))
1021 }
1022}
1023
1024/// Create a resolver for uploadBlob mutation
1025pub fn upload_blob_resolver_factory(ctx: MutationContext) -> schema.Resolver {
1026 fn(resolver_ctx: schema.Context) -> Result(value.Value, String) {
1027 // Get authenticated session using helper
1028 use auth <- result.try(get_authenticated_session(resolver_ctx, ctx))
1029
1030 // Get data and mimeType from arguments
1031 let data_result = case schema.get_argument(resolver_ctx, "data") {
1032 option.Some(value.String(d)) -> Ok(d)
1033 option.Some(_) -> Error("data must be a string")
1034 option.None -> Error("Missing required argument: data")
1035 }
1036
1037 use data_base64 <- result.try(data_result)
1038
1039 let mime_type_result = case schema.get_argument(resolver_ctx, "mimeType") {
1040 option.Some(value.String(m)) -> Ok(m)
1041 option.Some(_) -> Error("mimeType must be a string")
1042 option.None -> Error("Missing required argument: mimeType")
1043 }
1044
1045 use mime_type <- result.try(mime_type_result)
1046
1047 // Decode base64 data to binary
1048 use binary_data <- result.try(
1049 decode_base64(data_base64)
1050 |> result.map_error(fn(_) { "Failed to decode base64 data" }),
1051 )
1052
1053 // Upload blob to PDS
1054 let pds_url =
1055 auth.session.pds_endpoint <> "/xrpc/com.atproto.repo.uploadBlob"
1056
1057 use response <- result.try(
1058 dpop.make_dpop_request_with_binary(
1059 "POST",
1060 pds_url,
1061 auth.session,
1062 binary_data,
1063 mime_type,
1064 )
1065 |> result.map_error(fn(_) { "Failed to upload blob to PDS" }),
1066 )
1067
1068 use blob_ref <- result.try(case response.status {
1069 200 | 201 -> {
1070 let response_decoder = {
1071 use blob <- decode.field("blob", decode.dynamic)
1072 decode.success(blob)
1073 }
1074
1075 case json.parse(response.body, response_decoder) {
1076 Ok(blob_dynamic) ->
1077 extract_blob_from_dynamic(blob_dynamic, auth.user_info.did)
1078 Error(_) ->
1079 Error("Failed to parse PDS response. Body: " <> response.body)
1080 }
1081 }
1082 _ ->
1083 Error(
1084 "PDS request failed with status "
1085 <> int.to_string(response.status)
1086 <> ": "
1087 <> response.body,
1088 )
1089 })
1090
1091 Ok(blob_ref)
1092 }
1093}
1094
1095/// Create a resolver for createReport mutation
1096/// Allows authenticated users to submit moderation reports
1097pub fn create_report_resolver_factory(ctx: MutationContext) -> schema.Resolver {
1098 fn(resolver_ctx: schema.Context) -> Result(value.Value, String) {
1099 // Get authenticated session using helper
1100 use auth <- result.try(get_authenticated_session(resolver_ctx, ctx))
1101
1102 // Get subjectUri (required) and reasonType (required) from arguments
1103 let subject_uri_result = case
1104 schema.get_argument(resolver_ctx, "subjectUri")
1105 {
1106 option.Some(value.String(u)) -> Ok(u)
1107 option.Some(_) -> Error("subjectUri must be a string")
1108 option.None -> Error("Missing required argument: subjectUri")
1109 }
1110
1111 use subject_uri <- result.try(subject_uri_result)
1112
1113 let reason_type_result = case
1114 schema.get_argument(resolver_ctx, "reasonType")
1115 {
1116 option.Some(value.Enum(r)) -> Ok(string.lowercase(r))
1117 option.Some(value.String(r)) -> Ok(string.lowercase(r))
1118 option.Some(_) -> Error("reasonType must be a string")
1119 option.None -> Error("Missing required argument: reasonType")
1120 }
1121
1122 use reason_type <- result.try(reason_type_result)
1123
1124 // Validate reason_type
1125 let valid_reasons = [
1126 "spam",
1127 "violation",
1128 "misleading",
1129 "sexual",
1130 "rude",
1131 "other",
1132 ]
1133 use _ <- result.try(case list.contains(valid_reasons, reason_type) {
1134 True -> Ok(Nil)
1135 False ->
1136 Error(
1137 "Invalid reasonType. Must be one of: "
1138 <> string.join(valid_reasons, ", "),
1139 )
1140 })
1141
1142 // Get optional reason text
1143 let reason = case schema.get_argument(resolver_ctx, "reason") {
1144 option.Some(value.String(r)) -> option.Some(r)
1145 _ -> option.None
1146 }
1147
1148 // Insert the report
1149 use report <- result.try(
1150 reports.insert(
1151 ctx.db,
1152 auth.user_info.did,
1153 subject_uri,
1154 reason_type,
1155 reason,
1156 )
1157 |> result.map_error(fn(_) { "Failed to create report" }),
1158 )
1159
1160 // Return the created report
1161 let reason_value = case report.reason {
1162 option.Some(r) -> value.String(r)
1163 option.None -> value.Null
1164 }
1165
1166 Ok(
1167 value.Object([
1168 #("id", value.Int(report.id)),
1169 #("reporterDid", value.String(report.reporter_did)),
1170 #("subjectUri", value.String(report.subject_uri)),
1171 #("reasonType", value.Enum(string.uppercase(report.reason_type))),
1172 #("reason", reason_value),
1173 #("status", value.Enum("PENDING")),
1174 #("createdAt", value.String(report.created_at)),
1175 ]),
1176 )
1177 }
1178}
1179
1180// ─── Label Preference Mutation ────────────────────────────────────────────
1181
1182/// Resolver factory for setLabelPreference mutation
1183pub fn set_label_preference_resolver_factory(
1184 ctx: MutationContext,
1185) -> schema.Resolver {
1186 fn(resolver_ctx: schema.Context) -> Result(value.Value, String) {
1187 // Get viewer auth (lightweight - no ATP session needed)
1188 use user_info <- result.try(get_viewer_auth(resolver_ctx, ctx.db))
1189
1190 // Get val (required) argument
1191 let val_result = case schema.get_argument(resolver_ctx, "val") {
1192 option.Some(value.String(v)) -> Ok(v)
1193 option.Some(_) -> Error("val must be a string")
1194 option.None -> Error("Missing required argument: val")
1195 }
1196
1197 use val <- result.try(val_result)
1198
1199 // Get visibility (required) argument
1200 let visibility_result = case
1201 schema.get_argument(resolver_ctx, "visibility")
1202 {
1203 option.Some(value.Enum(v)) -> Ok(string.lowercase(v))
1204 option.Some(value.String(v)) -> Ok(string.lowercase(v))
1205 option.Some(_) -> Error("visibility must be a valid enum value")
1206 option.None -> Error("Missing required argument: visibility")
1207 }
1208
1209 use visibility <- result.try(visibility_result)
1210
1211 // Validate not a system label (starts with !)
1212 use _ <- result.try(case string.starts_with(val, "!") {
1213 True -> Error("Cannot set preference for system labels")
1214 False -> Ok(Nil)
1215 })
1216
1217 // Validate visibility is a valid value
1218 use _ <- result.try(label_definitions.validate_visibility(visibility))
1219
1220 // Validate label exists
1221 use def <- result.try(case label_definitions.get(ctx.db, val) {
1222 Ok(option.None) -> Error("Unknown label: " <> val)
1223 Error(_) -> Error("Failed to validate label")
1224 Ok(option.Some(d)) -> Ok(d)
1225 })
1226
1227 // Set the preference
1228 use _ <- result.try(
1229 label_preferences.set(ctx.db, user_info.did, val, visibility)
1230 |> result.map_error(fn(_) { "Failed to set label preference" }),
1231 )
1232
1233 // Return the updated preference
1234 Ok(
1235 value.Object([
1236 #("val", value.String(def.val)),
1237 #("description", value.String(def.description)),
1238 #("severity", value.Enum(string.uppercase(def.severity))),
1239 #(
1240 "defaultVisibility",
1241 value.Enum(string.uppercase(def.default_visibility)),
1242 ),
1243 #("visibility", value.Enum(string.uppercase(visibility))),
1244 ]),
1245 )
1246 }
1247}