Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql
at main 1247 lines 39 kB view raw
1/// Mutation Resolvers for lexicon GraphQL API 2/// 3/// Implements GraphQL mutation resolvers with AT Protocol integration. 4/// These resolvers handle authentication, validation, and database operations. 5import actor_validator 6import atproto_auth 7import backfill 8import database/executor.{type Executor} 9import database/repositories/label_definitions 10import database/repositories/label_preferences 11import database/repositories/lexicons 12import database/repositories/records 13import database/repositories/reports 14import dpop 15import gleam/dict 16import gleam/dynamic 17import gleam/dynamic/decode 18import gleam/erlang/process.{type Subject} 19import gleam/int 20import gleam/json 21import gleam/list 22import gleam/option 23import gleam/result 24import gleam/string 25import honk 26import honk/errors 27import lexicon_graphql/input/union as union_input 28import lib/oauth/did_cache 29import pubsub 30import swell/schema 31import swell/value 32import timestamp 33 34/// Context for mutation execution 35pub type MutationContext { 36 MutationContext( 37 db: Executor, 38 did_cache: Subject(did_cache.Message), 39 signing_key: option.Option(String), 40 atp_client_id: String, 41 plc_url: String, 42 collection_ids: List(String), 43 external_collection_ids: List(String), 44 ) 45} 46 47// ─── Private Auth Helpers ─────────────────────────────────────────── 48 49/// Authenticated session info returned by auth helper 50type AuthenticatedSession { 51 AuthenticatedSession( 52 user_info: atproto_auth.UserInfo, 53 session: atproto_auth.AtprotoSession, 54 ) 55} 56 57/// Lightweight auth that only verifies the token 58/// Use this for mutations that don't need ATP session (e.g., label preferences) 59fn get_viewer_auth( 60 resolver_ctx: schema.Context, 61 db: executor.Executor, 62) -> Result(atproto_auth.UserInfo, String) { 63 // Extract auth token from context data 64 let token = case resolver_ctx.data { 65 option.Some(value.Object(fields)) -> { 66 case list.key_find(fields, "auth_token") { 67 Ok(value.String(t)) -> Ok(t) 68 Ok(_) -> Error("auth_token must be a string") 69 Error(_) -> 70 Error("Authentication required. Please provide Authorization header.") 71 } 72 } 73 _ -> Error("Authentication required. Please provide Authorization header.") 74 } 75 76 use token <- result.try(token) 77 78 // Verify OAuth token 79 atproto_auth.verify_token(db, token) 80 |> result.map_error(fn(err) { 81 case err { 82 atproto_auth.UnauthorizedToken -> "Unauthorized" 83 atproto_auth.TokenExpired -> "Token expired" 84 atproto_auth.MissingAuthHeader -> "Missing authentication" 85 atproto_auth.InvalidAuthHeader -> "Invalid authentication header" 86 _ -> "Authentication error" 87 } 88 }) 89} 90 91/// Extract token, verify auth, ensure actor exists, get ATP session 92fn get_authenticated_session( 93 resolver_ctx: schema.Context, 94 ctx: MutationContext, 95) -> Result(AuthenticatedSession, String) { 96 // Step 1: Extract auth token from context data 97 let token = case resolver_ctx.data { 98 option.Some(value.Object(fields)) -> { 99 case list.key_find(fields, "auth_token") { 100 Ok(value.String(t)) -> Ok(t) 101 Ok(_) -> Error("auth_token must be a string") 102 Error(_) -> 103 Error("Authentication required. Please provide Authorization header.") 104 } 105 } 106 _ -> Error("Authentication required. Please provide Authorization header.") 107 } 108 109 use token <- result.try(token) 110 111 // Step 2: Verify OAuth token 112 use user_info <- result.try( 113 atproto_auth.verify_token(ctx.db, token) 114 |> result.map_error(fn(err) { 115 case err { 116 atproto_auth.UnauthorizedToken -> "Unauthorized" 117 atproto_auth.TokenExpired -> "Token expired" 118 atproto_auth.MissingAuthHeader -> "Missing authentication" 119 atproto_auth.InvalidAuthHeader -> "Invalid authentication header" 120 _ -> "Authentication error" 121 } 122 }), 123 ) 124 125 // Step 3: Ensure actor exists in database 126 use is_new_actor <- result.try(actor_validator.ensure_actor_exists( 127 ctx.db, 128 user_info.did, 129 ctx.plc_url, 130 )) 131 132 // If new actor, spawn backfill for all collections 133 case is_new_actor { 134 True -> { 135 process.spawn_unlinked(fn() { 136 backfill.backfill_collections_for_actor( 137 ctx.db, 138 user_info.did, 139 ctx.collection_ids, 140 ctx.external_collection_ids, 141 ctx.plc_url, 142 ) 143 }) 144 Nil 145 } 146 False -> Nil 147 } 148 149 // Step 4: Get AT Protocol session 150 use session <- result.try( 151 atproto_auth.get_atp_session( 152 ctx.db, 153 ctx.did_cache, 154 token, 155 ctx.signing_key, 156 ctx.atp_client_id, 157 ) 158 |> result.map_error(fn(err) { 159 case err { 160 atproto_auth.SessionNotFound -> "Session not found" 161 atproto_auth.SessionNotReady -> "Session not ready" 162 atproto_auth.RefreshFailed(msg) -> "Token refresh failed: " <> msg 163 atproto_auth.DIDResolutionFailed(msg) -> 164 "DID resolution failed: " <> msg 165 _ -> "Failed to get ATP session" 166 } 167 }), 168 ) 169 170 Ok(AuthenticatedSession(user_info: user_info, session: session)) 171} 172 173// ─── Private Blob Helpers ────────────────────────────────────────── 174 175/// Convert GraphQL value to JSON value (not string) 176fn graphql_value_to_json_value(val: value.Value) -> json.Json { 177 case val { 178 value.String(s) -> json.string(s) 179 value.Int(i) -> json.int(i) 180 value.Float(f) -> json.float(f) 181 value.Boolean(b) -> json.bool(b) 182 value.Null -> json.null() 183 value.Enum(e) -> json.string(e) 184 value.List(items) -> json.array(items, graphql_value_to_json_value) 185 value.Object(fields) -> { 186 json.object( 187 fields 188 |> list.map(fn(field) { 189 let #(key, val) = field 190 #(key, graphql_value_to_json_value(val)) 191 }), 192 ) 193 } 194 } 195} 196 197/// Get blob field paths from a lexicon for a given collection 198fn get_blob_paths( 199 collection: String, 200 lexicons: List(json.Json), 201) -> List(List(String)) { 202 let lexicon = 203 list.find(lexicons, fn(lex) { 204 case json.parse(json.to_string(lex), decode.at(["id"], decode.string)) { 205 Ok(id) -> id == collection 206 Error(_) -> False 207 } 208 }) 209 210 case lexicon { 211 Ok(lex) -> { 212 let properties_decoder = 213 decode.at( 214 ["defs", "main", "record", "properties"], 215 decode.dict(decode.string, decode.dynamic), 216 ) 217 case json.parse(json.to_string(lex), properties_decoder) { 218 Ok(properties) -> extract_blob_paths_from_properties(properties, []) 219 Error(_) -> [] 220 } 221 } 222 Error(_) -> [] 223 } 224} 225 226/// Recursively extract blob paths from lexicon properties 227fn extract_blob_paths_from_properties( 228 properties: dict.Dict(String, dynamic.Dynamic), 229 current_path: List(String), 230) -> List(List(String)) { 231 dict.fold(properties, [], fn(acc, field_name, field_def) { 232 let field_path = list.append(current_path, [field_name]) 233 let type_result = decode.run(field_def, decode.at(["type"], decode.string)) 234 235 case type_result { 236 Ok("blob") -> [field_path, ..acc] 237 Ok("object") -> { 238 let nested_props_result = 239 decode.run( 240 field_def, 241 decode.at( 242 ["properties"], 243 decode.dict(decode.string, decode.dynamic), 244 ), 245 ) 246 case nested_props_result { 247 Ok(nested_props) -> { 248 let nested_paths = 249 extract_blob_paths_from_properties(nested_props, field_path) 250 list.append(nested_paths, acc) 251 } 252 Error(_) -> acc 253 } 254 } 255 Ok("array") -> { 256 let items_type_result = 257 decode.run(field_def, decode.at(["items", "type"], decode.string)) 258 case items_type_result { 259 Ok("blob") -> [field_path, ..acc] 260 Ok("object") -> { 261 let item_props_result = 262 decode.run( 263 field_def, 264 decode.at( 265 ["items", "properties"], 266 decode.dict(decode.string, decode.dynamic), 267 ), 268 ) 269 case item_props_result { 270 Ok(item_props) -> { 271 let nested_paths = 272 extract_blob_paths_from_properties(item_props, field_path) 273 list.append(nested_paths, acc) 274 } 275 Error(_) -> acc 276 } 277 } 278 _ -> acc 279 } 280 } 281 _ -> acc 282 } 283 }) 284} 285 286/// Transform blob inputs in a value from GraphQL format to AT Protocol format 287fn transform_blob_inputs( 288 input: value.Value, 289 blob_paths: List(List(String)), 290) -> value.Value { 291 transform_value_at_paths(input, blob_paths, []) 292} 293 294/// Recursively transform values at blob paths 295fn transform_value_at_paths( 296 val: value.Value, 297 blob_paths: List(List(String)), 298 current_path: List(String), 299) -> value.Value { 300 case val { 301 value.Object(fields) -> { 302 let is_blob_path = 303 list.any(blob_paths, fn(path) { 304 path == current_path && current_path != [] 305 }) 306 307 case is_blob_path { 308 True -> transform_blob_object(fields) 309 False -> { 310 value.Object( 311 list.map(fields, fn(field) { 312 let #(key, field_val) = field 313 let new_path = list.append(current_path, [key]) 314 #(key, transform_value_at_paths(field_val, blob_paths, new_path)) 315 }), 316 ) 317 } 318 } 319 } 320 value.List(items) -> { 321 let is_blob_array_path = 322 list.any(blob_paths, fn(path) { 323 path == current_path && current_path != [] 324 }) 325 326 case is_blob_array_path { 327 True -> { 328 value.List( 329 list.map(items, fn(item) { 330 case item { 331 value.Object(item_fields) -> transform_blob_object(item_fields) 332 _ -> item 333 } 334 }), 335 ) 336 } 337 False -> { 338 let paths_through_here = 339 list.filter(blob_paths, fn(path) { 340 list.length(path) > list.length(current_path) 341 && list.take(path, list.length(current_path)) == current_path 342 }) 343 344 case list.is_empty(paths_through_here) { 345 True -> val 346 False -> { 347 value.List( 348 list.map(items, fn(item) { 349 transform_value_at_paths(item, blob_paths, current_path) 350 }), 351 ) 352 } 353 } 354 } 355 } 356 } 357 _ -> val 358 } 359} 360 361/// Transform a BlobInput object to AT Protocol blob format 362fn transform_blob_object(fields: List(#(String, value.Value))) -> value.Value { 363 let ref = case list.key_find(fields, "ref") { 364 Ok(value.String(r)) -> r 365 _ -> "" 366 } 367 let mime_type = case list.key_find(fields, "mimeType") { 368 Ok(value.String(m)) -> m 369 _ -> "" 370 } 371 let size = case list.key_find(fields, "size") { 372 Ok(value.Int(s)) -> s 373 _ -> 0 374 } 375 376 case ref != "" && mime_type != "" { 377 True -> 378 value.Object([ 379 #("$type", value.String("blob")), 380 #("ref", value.Object([#("$link", value.String(ref))])), 381 #("mimeType", value.String(mime_type)), 382 #("size", value.Int(size)), 383 ]) 384 False -> value.Object(fields) 385 } 386} 387 388// ─── Private Union Helpers ──────────────────────────────────────── 389 390/// Union field info: path to field and list of possible type refs 391type UnionFieldInfo { 392 UnionFieldInfo(path: List(String), refs: List(String)) 393} 394 395/// Get union field info from a lexicon for a given collection 396fn get_union_fields( 397 collection: String, 398 lexicons: List(json.Json), 399) -> List(UnionFieldInfo) { 400 let lexicon = 401 list.find(lexicons, fn(lex) { 402 case json.parse(json.to_string(lex), decode.at(["id"], decode.string)) { 403 Ok(id) -> id == collection 404 Error(_) -> False 405 } 406 }) 407 408 case lexicon { 409 Ok(lex) -> { 410 let properties_decoder = 411 decode.at( 412 ["defs", "main", "record", "properties"], 413 decode.dict(decode.string, decode.dynamic), 414 ) 415 case json.parse(json.to_string(lex), properties_decoder) { 416 Ok(properties) -> extract_union_fields_from_properties(properties, []) 417 Error(_) -> [] 418 } 419 } 420 Error(_) -> [] 421 } 422} 423 424/// Recursively extract union fields from lexicon properties 425fn extract_union_fields_from_properties( 426 properties: dict.Dict(String, dynamic.Dynamic), 427 current_path: List(String), 428) -> List(UnionFieldInfo) { 429 dict.fold(properties, [], fn(acc, field_name, field_def) { 430 let field_path = list.append(current_path, [field_name]) 431 let type_result = decode.run(field_def, decode.at(["type"], decode.string)) 432 433 case type_result { 434 Ok("union") -> { 435 // Extract refs from the union definition 436 let refs_result = 437 decode.run(field_def, decode.at(["refs"], decode.list(decode.string))) 438 case refs_result { 439 Ok(refs) -> [UnionFieldInfo(path: field_path, refs: refs), ..acc] 440 Error(_) -> acc 441 } 442 } 443 Ok("object") -> { 444 let nested_props_result = 445 decode.run( 446 field_def, 447 decode.at( 448 ["properties"], 449 decode.dict(decode.string, decode.dynamic), 450 ), 451 ) 452 case nested_props_result { 453 Ok(nested_props) -> { 454 let nested_fields = 455 extract_union_fields_from_properties(nested_props, field_path) 456 list.append(nested_fields, acc) 457 } 458 Error(_) -> acc 459 } 460 } 461 Ok("array") -> { 462 let items_type_result = 463 decode.run(field_def, decode.at(["items", "type"], decode.string)) 464 case items_type_result { 465 Ok("union") -> { 466 let refs_result = 467 decode.run( 468 field_def, 469 decode.at(["items", "refs"], decode.list(decode.string)), 470 ) 471 case refs_result { 472 Ok(refs) -> [UnionFieldInfo(path: field_path, refs: refs), ..acc] 473 Error(_) -> acc 474 } 475 } 476 Ok("object") -> { 477 let item_props_result = 478 decode.run( 479 field_def, 480 decode.at( 481 ["items", "properties"], 482 decode.dict(decode.string, decode.dynamic), 483 ), 484 ) 485 case item_props_result { 486 Ok(item_props) -> { 487 let nested_fields = 488 extract_union_fields_from_properties(item_props, field_path) 489 list.append(nested_fields, acc) 490 } 491 Error(_) -> acc 492 } 493 } 494 _ -> acc 495 } 496 } 497 _ -> acc 498 } 499 }) 500} 501 502/// Transform union inputs by adding $type based on the discriminator 503fn transform_union_inputs( 504 input: value.Value, 505 union_fields: List(UnionFieldInfo), 506) -> value.Value { 507 transform_unions_at_paths(input, union_fields, []) 508} 509 510/// Recursively transform union values at specified paths 511fn transform_unions_at_paths( 512 val: value.Value, 513 union_fields: List(UnionFieldInfo), 514 current_path: List(String), 515) -> value.Value { 516 case val { 517 value.Object(fields) -> { 518 // Check if current path matches a union field 519 let matching_union = 520 list.find(union_fields, fn(uf) { uf.path == current_path }) 521 522 case matching_union { 523 Ok(union_info) -> transform_union_object(fields, union_info.refs) 524 Error(_) -> { 525 // Recurse into object fields 526 value.Object( 527 list.map(fields, fn(field) { 528 let #(key, field_val) = field 529 let new_path = list.append(current_path, [key]) 530 #( 531 key, 532 transform_unions_at_paths(field_val, union_fields, new_path), 533 ) 534 }), 535 ) 536 } 537 } 538 } 539 value.List(items) -> { 540 // Check if current path is a union array 541 let matching_union = 542 list.find(union_fields, fn(uf) { uf.path == current_path }) 543 544 case matching_union { 545 Ok(union_info) -> { 546 // Transform each item in the array 547 value.List( 548 list.map(items, fn(item) { 549 case item { 550 value.Object(item_fields) -> 551 transform_union_object(item_fields, union_info.refs) 552 _ -> item 553 } 554 }), 555 ) 556 } 557 Error(_) -> { 558 // Recurse into list items 559 value.List( 560 list.map(items, fn(item) { 561 transform_unions_at_paths(item, union_fields, current_path) 562 }), 563 ) 564 } 565 } 566 } 567 _ -> val 568 } 569} 570 571/// Transform a union object from GraphQL discriminated format to AT Protocol format 572/// GraphQL input: { type: "SELF_LABELS", selfLabels: { values: [...] } } 573/// AT Protocol output: { $type: "com.atproto.label.defs#selfLabels", values: [...] } 574fn transform_union_object( 575 fields: List(#(String, value.Value)), 576 refs: List(String), 577) -> value.Value { 578 // Find the "type" discriminator field 579 let type_field = list.key_find(fields, "type") 580 581 case type_field { 582 Ok(value.Enum(enum_value)) -> { 583 // Convert enum value back to ref 584 let matching_ref = find_ref_for_enum_value(enum_value, refs) 585 case matching_ref { 586 Ok(ref) -> { 587 // Find the variant field (same name as the short ref name) 588 let short_name = enum_value_to_short_name(enum_value) 589 case list.key_find(fields, short_name) { 590 Ok(value.Object(variant_fields)) -> { 591 // Build AT Protocol format: variant fields + $type 592 value.Object([#("$type", value.String(ref)), ..variant_fields]) 593 } 594 _ -> { 595 // No variant data, just return $type 596 value.Object([#("$type", value.String(ref))]) 597 } 598 } 599 } 600 Error(_) -> value.Object(fields) 601 } 602 } 603 Ok(value.String(str_value)) -> { 604 // Handle string type discriminator (fallback) 605 let matching_ref = find_ref_for_enum_value(str_value, refs) 606 case matching_ref { 607 Ok(ref) -> { 608 let short_name = enum_value_to_short_name(str_value) 609 case list.key_find(fields, short_name) { 610 Ok(value.Object(variant_fields)) -> { 611 value.Object([#("$type", value.String(ref)), ..variant_fields]) 612 } 613 _ -> value.Object([#("$type", value.String(ref))]) 614 } 615 } 616 Error(_) -> value.Object(fields) 617 } 618 } 619 _ -> value.Object(fields) 620 } 621} 622 623/// Find the ref that matches an enum value 624/// "SELF_LABELS" matches "com.atproto.label.defs#selfLabels" 625fn find_ref_for_enum_value( 626 enum_value: String, 627 refs: List(String), 628) -> Result(String, Nil) { 629 list.find(refs, fn(ref) { union_input.ref_to_enum_value(ref) == enum_value }) 630} 631 632/// Convert SCREAMING_SNAKE_CASE to camelCase for field lookup 633/// "SELF_LABELS" -> "selfLabels" 634fn enum_value_to_short_name(enum_value: String) -> String { 635 union_input.screaming_snake_to_camel(enum_value) 636} 637 638/// Decode base64 string to bit array 639fn decode_base64(base64_str: String) -> Result(BitArray, Nil) { 640 Ok(do_erlang_base64_decode(base64_str)) 641} 642 643/// Extract blob fields from dynamic PDS response 644fn extract_blob_from_dynamic( 645 blob_dynamic: dynamic.Dynamic, 646 did: String, 647) -> Result(value.Value, String) { 648 let ref_link_decoder = { 649 use link <- decode.field("$link", decode.string) 650 decode.success(link) 651 } 652 653 let full_decoder = { 654 use mime_type <- decode.field("mimeType", decode.string) 655 use size <- decode.field("size", decode.int) 656 use ref <- decode.field("ref", ref_link_decoder) 657 decode.success(#(ref, mime_type, size)) 658 } 659 660 use #(ref, mime_type, size) <- result.try( 661 decode.run(blob_dynamic, full_decoder) 662 |> result.map_error(fn(_) { "Failed to decode blob fields" }), 663 ) 664 665 Ok( 666 value.Object([ 667 #("ref", value.String(ref)), 668 #("mime_type", value.String(mime_type)), 669 #("size", value.Int(size)), 670 #("did", value.String(did)), 671 ]), 672 ) 673} 674 675/// Erlang FFI: base64:decode/1 returns BitArray directly (not Result) 676@external(erlang, "base64", "decode") 677fn do_erlang_base64_decode(a: String) -> BitArray 678 679// ─── Public Resolver Factories ───────────────────────────────────── 680 681/// Create a resolver factory for create mutations 682pub fn create_resolver_factory( 683 collection: String, 684 ctx: MutationContext, 685) -> schema.Resolver { 686 fn(resolver_ctx: schema.Context) -> Result(value.Value, String) { 687 // Get authenticated session using helper 688 use auth <- result.try(get_authenticated_session(resolver_ctx, ctx)) 689 690 // Get input and rkey from arguments 691 let input_result = case schema.get_argument(resolver_ctx, "input") { 692 option.Some(val) -> Ok(val) 693 option.None -> Error("Missing required argument: input") 694 } 695 696 use input <- result.try(input_result) 697 698 let rkey = case schema.get_argument(resolver_ctx, "rkey") { 699 option.Some(value.String(r)) -> option.Some(r) 700 _ -> option.None 701 } 702 703 // Fetch lexicons for validation and blob path extraction 704 use all_lexicon_records <- result.try( 705 lexicons.get_all(ctx.db) 706 |> result.map_error(fn(_) { "Failed to fetch lexicons" }), 707 ) 708 709 use all_lex_jsons <- result.try( 710 all_lexicon_records 711 |> list.try_map(fn(lex) { 712 honk.parse_json_string(lex.json) 713 |> result.map_error(fn(e) { 714 "Failed to parse lexicon JSON: " <> errors.to_string(e) 715 }) 716 }), 717 ) 718 719 // Transform blob inputs from GraphQL format to AT Protocol format 720 let blob_paths = get_blob_paths(collection, all_lex_jsons) 721 let blob_transformed = transform_blob_inputs(input, blob_paths) 722 723 // Transform union inputs from GraphQL discriminated format to AT Protocol format 724 let union_fields = get_union_fields(collection, all_lex_jsons) 725 let transformed_input = 726 transform_union_inputs(blob_transformed, union_fields) 727 728 let record_json_value = graphql_value_to_json_value(transformed_input) 729 let record_json_string = json.to_string(record_json_value) 730 731 // Validate against lexicon 732 use _ <- result.try( 733 honk.validate_record(all_lex_jsons, collection, record_json_value) 734 |> result.map_error(fn(err) { 735 "Validation failed: " <> errors.to_string(err) 736 }), 737 ) 738 739 // Call createRecord via AT Protocol 740 let create_body = 741 case rkey { 742 option.Some(r) -> 743 json.object([ 744 #("repo", json.string(auth.user_info.did)), 745 #("collection", json.string(collection)), 746 #("rkey", json.string(r)), 747 #("record", record_json_value), 748 ]) 749 option.None -> 750 json.object([ 751 #("repo", json.string(auth.user_info.did)), 752 #("collection", json.string(collection)), 753 #("record", record_json_value), 754 ]) 755 } 756 |> json.to_string 757 758 let pds_url = 759 auth.session.pds_endpoint <> "/xrpc/com.atproto.repo.createRecord" 760 761 use response <- result.try( 762 dpop.make_dpop_request("POST", pds_url, auth.session, create_body) 763 |> result.map_error(fn(_) { "Failed to create record on PDS" }), 764 ) 765 766 use #(uri, cid) <- result.try(case response.status { 767 200 | 201 -> { 768 let response_decoder = { 769 use uri <- decode.field("uri", decode.string) 770 use cid <- decode.field("cid", decode.string) 771 decode.success(#(uri, cid)) 772 } 773 json.parse(response.body, response_decoder) 774 |> result.map_error(fn(_) { 775 "Failed to parse PDS success response. Body: " <> response.body 776 }) 777 } 778 _ -> 779 Error( 780 "PDS request failed with status " 781 <> int.to_string(response.status) 782 <> ": " 783 <> response.body, 784 ) 785 }) 786 787 // Index the created record in the database 788 use _ <- result.try( 789 records.insert( 790 ctx.db, 791 uri, 792 cid, 793 auth.user_info.did, 794 collection, 795 record_json_string, 796 ) 797 |> result.map_error(fn(_) { "Failed to index record in database" }), 798 ) 799 800 // Publish event for GraphQL subscriptions 801 pubsub.publish(pubsub.RecordEvent( 802 uri: uri, 803 cid: cid, 804 did: auth.user_info.did, 805 collection: collection, 806 value: record_json_string, 807 indexed_at: timestamp.current_iso8601(), 808 operation: pubsub.Create, 809 )) 810 811 Ok( 812 value.Object([ 813 #("uri", value.String(uri)), 814 #("cid", value.String(cid)), 815 #("did", value.String(auth.user_info.did)), 816 #("collection", value.String(collection)), 817 #("indexedAt", value.String("")), 818 #("value", input), 819 ]), 820 ) 821 } 822} 823 824/// Create a resolver factory for update mutations 825pub fn update_resolver_factory( 826 collection: String, 827 ctx: MutationContext, 828) -> schema.Resolver { 829 fn(resolver_ctx: schema.Context) -> Result(value.Value, String) { 830 // Get authenticated session using helper 831 use auth <- result.try(get_authenticated_session(resolver_ctx, ctx)) 832 833 // Get rkey (required) and input from arguments 834 let rkey_result = case schema.get_argument(resolver_ctx, "rkey") { 835 option.Some(value.String(r)) -> Ok(r) 836 option.Some(_) -> Error("rkey must be a string") 837 option.None -> Error("Missing required argument: rkey") 838 } 839 840 use rkey <- result.try(rkey_result) 841 842 let input_result = case schema.get_argument(resolver_ctx, "input") { 843 option.Some(val) -> Ok(val) 844 option.None -> Error("Missing required argument: input") 845 } 846 847 use input <- result.try(input_result) 848 849 // Fetch lexicons for validation and blob path extraction 850 use all_lexicon_records <- result.try( 851 lexicons.get_all(ctx.db) 852 |> result.map_error(fn(_) { "Failed to fetch lexicons" }), 853 ) 854 855 use all_lex_jsons <- result.try( 856 all_lexicon_records 857 |> list.try_map(fn(lex) { 858 honk.parse_json_string(lex.json) 859 |> result.map_error(fn(e) { 860 "Failed to parse lexicon JSON: " <> errors.to_string(e) 861 }) 862 }), 863 ) 864 865 // Transform blob inputs from GraphQL format to AT Protocol format 866 let blob_paths = get_blob_paths(collection, all_lex_jsons) 867 let blob_transformed = transform_blob_inputs(input, blob_paths) 868 869 // Transform union inputs from GraphQL discriminated format to AT Protocol format 870 let union_fields = get_union_fields(collection, all_lex_jsons) 871 let transformed_input = 872 transform_union_inputs(blob_transformed, union_fields) 873 874 let record_json_value = graphql_value_to_json_value(transformed_input) 875 let record_json_string = json.to_string(record_json_value) 876 877 // Validate against lexicon 878 use _ <- result.try( 879 honk.validate_record(all_lex_jsons, collection, record_json_value) 880 |> result.map_error(fn(err) { 881 "Validation failed: " <> errors.to_string(err) 882 }), 883 ) 884 885 // Call putRecord via AT Protocol 886 let update_body = 887 json.object([ 888 #("repo", json.string(auth.user_info.did)), 889 #("collection", json.string(collection)), 890 #("rkey", json.string(rkey)), 891 #("record", record_json_value), 892 ]) 893 |> json.to_string 894 895 let pds_url = 896 auth.session.pds_endpoint <> "/xrpc/com.atproto.repo.putRecord" 897 898 use response <- result.try( 899 dpop.make_dpop_request("POST", pds_url, auth.session, update_body) 900 |> result.map_error(fn(_) { "Failed to update record on PDS" }), 901 ) 902 903 use #(uri, cid) <- result.try(case response.status { 904 200 | 201 -> { 905 let response_decoder = { 906 use uri <- decode.field("uri", decode.string) 907 use cid <- decode.field("cid", decode.string) 908 decode.success(#(uri, cid)) 909 } 910 json.parse(response.body, response_decoder) 911 |> result.map_error(fn(_) { 912 "Failed to parse PDS success response. Body: " <> response.body 913 }) 914 } 915 _ -> 916 Error( 917 "PDS request failed with status " 918 <> int.to_string(response.status) 919 <> ": " 920 <> response.body, 921 ) 922 }) 923 924 // Update the record in the database 925 use _ <- result.try( 926 records.update(ctx.db, uri, cid, record_json_string) 927 |> result.map_error(fn(_) { "Failed to update record in database" }), 928 ) 929 930 // Publish event for GraphQL subscriptions 931 pubsub.publish(pubsub.RecordEvent( 932 uri: uri, 933 cid: cid, 934 did: auth.user_info.did, 935 collection: collection, 936 value: record_json_string, 937 indexed_at: timestamp.current_iso8601(), 938 operation: pubsub.Update, 939 )) 940 941 Ok( 942 value.Object([ 943 #("uri", value.String(uri)), 944 #("cid", value.String(cid)), 945 #("did", value.String(auth.user_info.did)), 946 #("collection", value.String(collection)), 947 #("indexedAt", value.String("")), 948 #("value", input), 949 ]), 950 ) 951 } 952} 953 954/// Create a resolver factory for delete mutations 955pub fn delete_resolver_factory( 956 collection: String, 957 ctx: MutationContext, 958) -> schema.Resolver { 959 fn(resolver_ctx: schema.Context) -> Result(value.Value, String) { 960 // Get authenticated session using helper 961 use auth <- result.try(get_authenticated_session(resolver_ctx, ctx)) 962 963 // Get rkey (required) from arguments 964 let rkey_result = case schema.get_argument(resolver_ctx, "rkey") { 965 option.Some(value.String(r)) -> Ok(r) 966 option.Some(_) -> Error("rkey must be a string") 967 option.None -> Error("Missing required argument: rkey") 968 } 969 970 use rkey <- result.try(rkey_result) 971 972 // Build the record URI to be deleted 973 let uri = "at://" <> auth.user_info.did <> "/" <> collection <> "/" <> rkey 974 975 // Call deleteRecord via AT Protocol 976 let delete_body = 977 json.object([ 978 #("repo", json.string(auth.user_info.did)), 979 #("collection", json.string(collection)), 980 #("rkey", json.string(rkey)), 981 ]) 982 |> json.to_string 983 984 let pds_url = 985 auth.session.pds_endpoint <> "/xrpc/com.atproto.repo.deleteRecord" 986 987 use response <- result.try( 988 dpop.make_dpop_request("POST", pds_url, auth.session, delete_body) 989 |> result.map_error(fn(_) { "Failed to delete record on PDS" }), 990 ) 991 992 use _ <- result.try(case response.status { 993 200 | 201 | 204 -> Ok(Nil) 994 _ -> 995 Error( 996 "PDS delete request failed with status " 997 <> int.to_string(response.status) 998 <> ": " 999 <> response.body, 1000 ) 1001 }) 1002 1003 // Delete the record from the database 1004 use _ <- result.try( 1005 records.delete(ctx.db, uri) 1006 |> result.map_error(fn(_) { "Failed to delete record from database" }), 1007 ) 1008 1009 // Publish event for GraphQL subscriptions 1010 pubsub.publish(pubsub.RecordEvent( 1011 uri: uri, 1012 cid: "", 1013 did: auth.user_info.did, 1014 collection: collection, 1015 value: "", 1016 indexed_at: timestamp.current_iso8601(), 1017 operation: pubsub.Delete, 1018 )) 1019 1020 Ok(value.Object([#("uri", value.String(uri))])) 1021 } 1022} 1023 1024/// Create a resolver for uploadBlob mutation 1025pub fn upload_blob_resolver_factory(ctx: MutationContext) -> schema.Resolver { 1026 fn(resolver_ctx: schema.Context) -> Result(value.Value, String) { 1027 // Get authenticated session using helper 1028 use auth <- result.try(get_authenticated_session(resolver_ctx, ctx)) 1029 1030 // Get data and mimeType from arguments 1031 let data_result = case schema.get_argument(resolver_ctx, "data") { 1032 option.Some(value.String(d)) -> Ok(d) 1033 option.Some(_) -> Error("data must be a string") 1034 option.None -> Error("Missing required argument: data") 1035 } 1036 1037 use data_base64 <- result.try(data_result) 1038 1039 let mime_type_result = case schema.get_argument(resolver_ctx, "mimeType") { 1040 option.Some(value.String(m)) -> Ok(m) 1041 option.Some(_) -> Error("mimeType must be a string") 1042 option.None -> Error("Missing required argument: mimeType") 1043 } 1044 1045 use mime_type <- result.try(mime_type_result) 1046 1047 // Decode base64 data to binary 1048 use binary_data <- result.try( 1049 decode_base64(data_base64) 1050 |> result.map_error(fn(_) { "Failed to decode base64 data" }), 1051 ) 1052 1053 // Upload blob to PDS 1054 let pds_url = 1055 auth.session.pds_endpoint <> "/xrpc/com.atproto.repo.uploadBlob" 1056 1057 use response <- result.try( 1058 dpop.make_dpop_request_with_binary( 1059 "POST", 1060 pds_url, 1061 auth.session, 1062 binary_data, 1063 mime_type, 1064 ) 1065 |> result.map_error(fn(_) { "Failed to upload blob to PDS" }), 1066 ) 1067 1068 use blob_ref <- result.try(case response.status { 1069 200 | 201 -> { 1070 let response_decoder = { 1071 use blob <- decode.field("blob", decode.dynamic) 1072 decode.success(blob) 1073 } 1074 1075 case json.parse(response.body, response_decoder) { 1076 Ok(blob_dynamic) -> 1077 extract_blob_from_dynamic(blob_dynamic, auth.user_info.did) 1078 Error(_) -> 1079 Error("Failed to parse PDS response. Body: " <> response.body) 1080 } 1081 } 1082 _ -> 1083 Error( 1084 "PDS request failed with status " 1085 <> int.to_string(response.status) 1086 <> ": " 1087 <> response.body, 1088 ) 1089 }) 1090 1091 Ok(blob_ref) 1092 } 1093} 1094 1095/// Create a resolver for createReport mutation 1096/// Allows authenticated users to submit moderation reports 1097pub fn create_report_resolver_factory(ctx: MutationContext) -> schema.Resolver { 1098 fn(resolver_ctx: schema.Context) -> Result(value.Value, String) { 1099 // Get authenticated session using helper 1100 use auth <- result.try(get_authenticated_session(resolver_ctx, ctx)) 1101 1102 // Get subjectUri (required) and reasonType (required) from arguments 1103 let subject_uri_result = case 1104 schema.get_argument(resolver_ctx, "subjectUri") 1105 { 1106 option.Some(value.String(u)) -> Ok(u) 1107 option.Some(_) -> Error("subjectUri must be a string") 1108 option.None -> Error("Missing required argument: subjectUri") 1109 } 1110 1111 use subject_uri <- result.try(subject_uri_result) 1112 1113 let reason_type_result = case 1114 schema.get_argument(resolver_ctx, "reasonType") 1115 { 1116 option.Some(value.Enum(r)) -> Ok(string.lowercase(r)) 1117 option.Some(value.String(r)) -> Ok(string.lowercase(r)) 1118 option.Some(_) -> Error("reasonType must be a string") 1119 option.None -> Error("Missing required argument: reasonType") 1120 } 1121 1122 use reason_type <- result.try(reason_type_result) 1123 1124 // Validate reason_type 1125 let valid_reasons = [ 1126 "spam", 1127 "violation", 1128 "misleading", 1129 "sexual", 1130 "rude", 1131 "other", 1132 ] 1133 use _ <- result.try(case list.contains(valid_reasons, reason_type) { 1134 True -> Ok(Nil) 1135 False -> 1136 Error( 1137 "Invalid reasonType. Must be one of: " 1138 <> string.join(valid_reasons, ", "), 1139 ) 1140 }) 1141 1142 // Get optional reason text 1143 let reason = case schema.get_argument(resolver_ctx, "reason") { 1144 option.Some(value.String(r)) -> option.Some(r) 1145 _ -> option.None 1146 } 1147 1148 // Insert the report 1149 use report <- result.try( 1150 reports.insert( 1151 ctx.db, 1152 auth.user_info.did, 1153 subject_uri, 1154 reason_type, 1155 reason, 1156 ) 1157 |> result.map_error(fn(_) { "Failed to create report" }), 1158 ) 1159 1160 // Return the created report 1161 let reason_value = case report.reason { 1162 option.Some(r) -> value.String(r) 1163 option.None -> value.Null 1164 } 1165 1166 Ok( 1167 value.Object([ 1168 #("id", value.Int(report.id)), 1169 #("reporterDid", value.String(report.reporter_did)), 1170 #("subjectUri", value.String(report.subject_uri)), 1171 #("reasonType", value.Enum(string.uppercase(report.reason_type))), 1172 #("reason", reason_value), 1173 #("status", value.Enum("PENDING")), 1174 #("createdAt", value.String(report.created_at)), 1175 ]), 1176 ) 1177 } 1178} 1179 1180// ─── Label Preference Mutation ──────────────────────────────────────────── 1181 1182/// Resolver factory for setLabelPreference mutation 1183pub fn set_label_preference_resolver_factory( 1184 ctx: MutationContext, 1185) -> schema.Resolver { 1186 fn(resolver_ctx: schema.Context) -> Result(value.Value, String) { 1187 // Get viewer auth (lightweight - no ATP session needed) 1188 use user_info <- result.try(get_viewer_auth(resolver_ctx, ctx.db)) 1189 1190 // Get val (required) argument 1191 let val_result = case schema.get_argument(resolver_ctx, "val") { 1192 option.Some(value.String(v)) -> Ok(v) 1193 option.Some(_) -> Error("val must be a string") 1194 option.None -> Error("Missing required argument: val") 1195 } 1196 1197 use val <- result.try(val_result) 1198 1199 // Get visibility (required) argument 1200 let visibility_result = case 1201 schema.get_argument(resolver_ctx, "visibility") 1202 { 1203 option.Some(value.Enum(v)) -> Ok(string.lowercase(v)) 1204 option.Some(value.String(v)) -> Ok(string.lowercase(v)) 1205 option.Some(_) -> Error("visibility must be a valid enum value") 1206 option.None -> Error("Missing required argument: visibility") 1207 } 1208 1209 use visibility <- result.try(visibility_result) 1210 1211 // Validate not a system label (starts with !) 1212 use _ <- result.try(case string.starts_with(val, "!") { 1213 True -> Error("Cannot set preference for system labels") 1214 False -> Ok(Nil) 1215 }) 1216 1217 // Validate visibility is a valid value 1218 use _ <- result.try(label_definitions.validate_visibility(visibility)) 1219 1220 // Validate label exists 1221 use def <- result.try(case label_definitions.get(ctx.db, val) { 1222 Ok(option.None) -> Error("Unknown label: " <> val) 1223 Error(_) -> Error("Failed to validate label") 1224 Ok(option.Some(d)) -> Ok(d) 1225 }) 1226 1227 // Set the preference 1228 use _ <- result.try( 1229 label_preferences.set(ctx.db, user_info.did, val, visibility) 1230 |> result.map_error(fn(_) { "Failed to set label preference" }), 1231 ) 1232 1233 // Return the updated preference 1234 Ok( 1235 value.Object([ 1236 #("val", value.String(def.val)), 1237 #("description", value.String(def.description)), 1238 #("severity", value.Enum(string.uppercase(def.severity))), 1239 #( 1240 "defaultVisibility", 1241 value.Enum(string.uppercase(def.default_visibility)), 1242 ), 1243 #("visibility", value.Enum(string.uppercase(visibility))), 1244 ]), 1245 ) 1246 } 1247}