Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql
at main 523 lines 18 kB view raw
1/// Query resolvers for admin GraphQL API 2import admin_session as session 3import backfill_state 4import database/executor.{type Executor} 5import database/repositories/actors 6import database/repositories/config as config_repo 7import database/repositories/jetstream_activity 8import database/repositories/label_definitions 9import database/repositories/label_preferences 10import database/repositories/labels 11import database/repositories/lexicons 12import database/repositories/oauth_clients 13import database/repositories/records 14import database/repositories/reports 15import gleam/erlang/process.{type Subject} 16import gleam/list 17import gleam/option.{None, Some} 18import gleam/otp/actor 19import gleam/string 20import graphql/admin/converters 21import graphql/admin/cursor 22import graphql/admin/types as admin_types 23import graphql/lexicon/converters as lexicon_converters 24import lib/oauth/did_cache 25import swell/connection 26import swell/schema 27import swell/value 28import wisp 29 30/// Fetch activity buckets for a given time range 31fn fetch_activity_buckets( 32 conn: Executor, 33 range: admin_types.TimeRange, 34) -> Result(value.Value, String) { 35 let fetch_result = case range { 36 admin_types.OneHour -> jetstream_activity.get_activity_1hr(conn) 37 admin_types.ThreeHours -> jetstream_activity.get_activity_3hr(conn) 38 admin_types.SixHours -> jetstream_activity.get_activity_6hr(conn) 39 admin_types.OneDay -> jetstream_activity.get_activity_1day(conn) 40 admin_types.SevenDays -> jetstream_activity.get_activity_7day(conn) 41 } 42 case fetch_result { 43 Ok(buckets) -> 44 Ok(value.List(list.map(buckets, converters.activity_bucket_to_value))) 45 Error(_) -> Error("Failed to fetch activity data") 46 } 47} 48 49/// Build the Query root type with all query resolvers 50pub fn query_type( 51 conn: Executor, 52 req: wisp.Request, 53 did_cache: Subject(did_cache.Message), 54 backfill_state_subject: Subject(backfill_state.Message), 55) -> schema.Type { 56 schema.object_type("Query", "Root query type", [ 57 // currentSession query 58 schema.field( 59 "currentSession", 60 admin_types.current_session_type(), 61 "Get current authenticated user session (null if not authenticated)", 62 fn(_ctx) { 63 case session.get_current_session(req, conn, did_cache) { 64 Ok(sess) -> { 65 let user_is_admin = config_repo.is_admin(conn, sess.did) 66 Ok(converters.current_session_to_value( 67 sess.did, 68 sess.handle, 69 user_is_admin, 70 )) 71 } 72 Error(_) -> Ok(value.Null) 73 } 74 }, 75 ), 76 // statistics query 77 schema.field( 78 "statistics", 79 schema.non_null(admin_types.statistics_type()), 80 "Get system statistics", 81 fn(_ctx) { 82 case 83 records.get_count(conn), 84 actors.get_count(conn), 85 lexicons.get_count(conn) 86 { 87 Ok(record_count), Ok(actor_count), Ok(lexicon_count) -> { 88 Ok(converters.statistics_to_value( 89 record_count, 90 actor_count, 91 lexicon_count, 92 )) 93 } 94 _, _, _ -> Error("Failed to fetch statistics") 95 } 96 }, 97 ), 98 // settings query 99 schema.field( 100 "settings", 101 schema.non_null(admin_types.settings_type()), 102 "Get system settings", 103 fn(_ctx) { 104 let domain_authority = case config_repo.get(conn, "domain_authority") { 105 Ok(authority) -> authority 106 Error(_) -> "" 107 } 108 let admin_dids = config_repo.get_admin_dids(conn) 109 let relay_url = config_repo.get_relay_url(conn) 110 let plc_directory_url = config_repo.get_plc_directory_url(conn) 111 let jetstream_url = config_repo.get_jetstream_url(conn) 112 let oauth_supported_scopes = 113 config_repo.get_oauth_supported_scopes(conn) 114 115 Ok(converters.settings_to_value( 116 domain_authority, 117 admin_dids, 118 relay_url, 119 plc_directory_url, 120 jetstream_url, 121 oauth_supported_scopes, 122 )) 123 }, 124 ), 125 // isBackfilling query 126 schema.field( 127 "isBackfilling", 128 schema.non_null(schema.boolean_type()), 129 "Check if a backfill operation is currently running", 130 fn(_ctx) { 131 let is_backfilling = 132 actor.call( 133 backfill_state_subject, 134 waiting: 100, 135 sending: backfill_state.IsBackfilling, 136 ) 137 Ok(value.Boolean(is_backfilling)) 138 }, 139 ), 140 // lexicons query 141 schema.field( 142 "lexicons", 143 schema.non_null( 144 schema.list_type(schema.non_null(admin_types.lexicon_type())), 145 ), 146 "Get all lexicons", 147 fn(_ctx) { 148 case lexicons.get_all(conn) { 149 Ok(lexicon_list) -> 150 Ok(value.List(list.map(lexicon_list, converters.lexicon_to_value))) 151 Error(_) -> Error("Failed to fetch lexicons") 152 } 153 }, 154 ), 155 // oauthClients query (admin only) 156 schema.field( 157 "oauthClients", 158 schema.non_null( 159 schema.list_type(schema.non_null(admin_types.oauth_client_type())), 160 ), 161 "Get all OAuth client registrations (admin only)", 162 fn(_ctx) { 163 case session.get_current_session(req, conn, did_cache) { 164 Ok(sess) -> { 165 case config_repo.is_admin(conn, sess.did) { 166 True -> { 167 case oauth_clients.get_all(conn) { 168 Ok(clients) -> 169 Ok( 170 value.List(list.map( 171 clients, 172 converters.oauth_client_to_value, 173 )), 174 ) 175 Error(_) -> Error("Failed to fetch OAuth clients") 176 } 177 } 178 False -> Error("Admin privileges required") 179 } 180 } 181 Error(_) -> Error("Authentication required") 182 } 183 }, 184 ), 185 // activityBuckets query with TimeRange argument 186 schema.field_with_args( 187 "activityBuckets", 188 schema.non_null( 189 schema.list_type(schema.non_null(admin_types.activity_bucket_type())), 190 ), 191 "Get activity data bucketed by time range", 192 [ 193 schema.argument( 194 "range", 195 schema.non_null(admin_types.time_range_enum()), 196 "Time range for bucketing", 197 None, 198 ), 199 ], 200 fn(ctx) { 201 case schema.get_argument(ctx, "range") { 202 Some(value.String(range_str)) -> 203 case admin_types.time_range_from_string(range_str) { 204 Ok(range) -> fetch_activity_buckets(conn, range) 205 Error(_) -> Error("Invalid time range argument") 206 } 207 _ -> Error("Missing time range argument") 208 } 209 }, 210 ), 211 // recentActivity query with hours argument 212 schema.field_with_args( 213 "recentActivity", 214 schema.non_null( 215 schema.list_type(schema.non_null(admin_types.activity_entry_type())), 216 ), 217 "Get recent activity entries", 218 [ 219 schema.argument( 220 "hours", 221 schema.non_null(schema.int_type()), 222 "Number of hours to look back", 223 None, 224 ), 225 ], 226 fn(ctx) { 227 case schema.get_argument(ctx, "hours") { 228 Some(value.Int(hours)) -> { 229 case jetstream_activity.get_recent_activity(conn, hours) { 230 Ok(entries) -> 231 Ok( 232 value.List(list.map( 233 entries, 234 converters.activity_entry_to_value, 235 )), 236 ) 237 Error(_) -> Error("Failed to fetch recent activity") 238 } 239 } 240 _ -> Error("Invalid or missing hours argument") 241 } 242 }, 243 ), 244 // labelDefinitions query 245 schema.field( 246 "labelDefinitions", 247 schema.non_null( 248 schema.list_type(schema.non_null(admin_types.label_definition_type())), 249 ), 250 "Get all label definitions", 251 fn(_ctx) { 252 case label_definitions.get_all(conn) { 253 Ok(defs) -> 254 Ok(value.List(list.map(defs, converters.label_definition_to_value))) 255 Error(_) -> Error("Failed to fetch label definitions") 256 } 257 }, 258 ), 259 // viewerLabelPreferences query (authenticated users) 260 schema.field( 261 "viewerLabelPreferences", 262 schema.non_null( 263 schema.list_type(schema.non_null(admin_types.label_preference_type())), 264 ), 265 "Get label preferences for the current user (non-system labels only)", 266 fn(_ctx) { 267 case session.get_current_session(req, conn, did_cache) { 268 Ok(sess) -> { 269 // Get non-system label definitions 270 case label_definitions.get_non_system(conn) { 271 Ok(defs) -> { 272 // Get user's preferences 273 case label_preferences.get_by_did(conn, sess.did) { 274 Ok(prefs) -> { 275 // Build a map of label_val -> visibility 276 let pref_map = 277 list.fold(prefs, [], fn(acc, pref) { 278 [#(pref.label_val, pref.visibility), ..acc] 279 }) 280 281 // Map each definition to a preference, using user's setting or default 282 let result = 283 list.map(defs, fn(def) { 284 let visibility = case list.key_find(pref_map, def.val) { 285 Ok(v) -> v 286 Error(_) -> def.default_visibility 287 } 288 lexicon_converters.label_preference_to_value( 289 def, 290 visibility, 291 ) 292 }) 293 294 Ok(value.List(result)) 295 } 296 Error(_) -> Error("Failed to fetch label preferences") 297 } 298 } 299 Error(_) -> Error("Failed to fetch label definitions") 300 } 301 } 302 Error(_) -> Error("Authentication required") 303 } 304 }, 305 ), 306 // labels query (admin only) - Connection type 307 schema.field_with_args( 308 "labels", 309 schema.non_null(admin_types.label_connection_type()), 310 "Get labels with optional filters (admin only)", 311 [ 312 schema.argument( 313 "uri", 314 schema.string_type(), 315 "Filter by subject URI", 316 None, 317 ), 318 schema.argument( 319 "val", 320 schema.string_type(), 321 "Filter by label value", 322 None, 323 ), 324 schema.argument( 325 "first", 326 schema.int_type(), 327 "Number of items to fetch (default 50)", 328 None, 329 ), 330 schema.argument( 331 "after", 332 schema.string_type(), 333 "Cursor for pagination", 334 None, 335 ), 336 ], 337 fn(ctx) { 338 case session.get_current_session(req, conn, did_cache) { 339 Ok(sess) -> { 340 case config_repo.is_admin(conn, sess.did) { 341 True -> { 342 let uri_filter = case schema.get_argument(ctx, "uri") { 343 Some(value.String(u)) -> Some(u) 344 _ -> None 345 } 346 let val_filter = case schema.get_argument(ctx, "val") { 347 Some(value.String(v)) -> Some(v) 348 _ -> None 349 } 350 let first = case schema.get_argument(ctx, "first") { 351 Some(value.Int(f)) -> f 352 _ -> 50 353 } 354 let after_id = case schema.get_argument(ctx, "after") { 355 Some(value.String(c)) -> { 356 case cursor.decode(c) { 357 Ok(#("Label", id)) -> Some(id) 358 _ -> None 359 } 360 } 361 _ -> None 362 } 363 364 case 365 labels.get_paginated( 366 conn, 367 uri_filter, 368 val_filter, 369 first, 370 after_id, 371 ) 372 { 373 Ok(paginated) -> { 374 // Build edges with cursors 375 let edges = 376 list.map(paginated.labels, fn(label) { 377 connection.Edge( 378 node: converters.label_to_value(label), 379 cursor: cursor.encode("Label", label.id), 380 ) 381 }) 382 383 // Build page info 384 let start_cursor = case list.first(paginated.labels) { 385 Ok(first_label) -> 386 Some(cursor.encode("Label", first_label.id)) 387 Error(_) -> None 388 } 389 let end_cursor = case list.last(paginated.labels) { 390 Ok(last_label) -> 391 Some(cursor.encode("Label", last_label.id)) 392 Error(_) -> None 393 } 394 395 let page_info = 396 connection.PageInfo( 397 has_next_page: paginated.has_next_page, 398 has_previous_page: option.is_some(after_id), 399 start_cursor: start_cursor, 400 end_cursor: end_cursor, 401 ) 402 403 let conn_value = 404 connection.Connection( 405 edges: edges, 406 page_info: page_info, 407 total_count: Some(paginated.total_count), 408 ) 409 410 Ok(connection.connection_to_value(conn_value)) 411 } 412 Error(_) -> Error("Failed to fetch labels") 413 } 414 } 415 False -> Error("Admin privileges required") 416 } 417 } 418 Error(_) -> Error("Authentication required") 419 } 420 }, 421 ), 422 // reports query (admin only) - Connection type 423 schema.field_with_args( 424 "reports", 425 schema.non_null(admin_types.report_connection_type()), 426 "Get moderation reports with optional status filter (admin only)", 427 [ 428 schema.argument( 429 "status", 430 admin_types.report_status_enum(), 431 "Filter by status", 432 None, 433 ), 434 schema.argument( 435 "first", 436 schema.int_type(), 437 "Number of items to fetch (default 50)", 438 None, 439 ), 440 schema.argument( 441 "after", 442 schema.string_type(), 443 "Cursor for pagination", 444 None, 445 ), 446 ], 447 fn(ctx) { 448 case session.get_current_session(req, conn, did_cache) { 449 Ok(sess) -> { 450 case config_repo.is_admin(conn, sess.did) { 451 True -> { 452 let status_filter = case schema.get_argument(ctx, "status") { 453 Some(value.Enum(s)) -> Some(string.lowercase(s)) 454 _ -> None 455 } 456 let first = case schema.get_argument(ctx, "first") { 457 Some(value.Int(f)) -> f 458 _ -> 50 459 } 460 let after_id = case schema.get_argument(ctx, "after") { 461 Some(value.String(c)) -> { 462 case cursor.decode(c) { 463 Ok(#("Report", id)) -> Some(id) 464 _ -> None 465 } 466 } 467 _ -> None 468 } 469 470 case 471 reports.get_paginated(conn, status_filter, first, after_id) 472 { 473 Ok(paginated) -> { 474 // Build edges with cursors 475 let edges = 476 list.map(paginated.reports, fn(report) { 477 connection.Edge( 478 node: converters.report_to_value(report), 479 cursor: cursor.encode("Report", report.id), 480 ) 481 }) 482 483 // Build page info 484 let start_cursor = case list.first(paginated.reports) { 485 Ok(first_report) -> 486 Some(cursor.encode("Report", first_report.id)) 487 Error(_) -> None 488 } 489 let end_cursor = case list.last(paginated.reports) { 490 Ok(last_report) -> 491 Some(cursor.encode("Report", last_report.id)) 492 Error(_) -> None 493 } 494 495 let page_info = 496 connection.PageInfo( 497 has_next_page: paginated.has_next_page, 498 has_previous_page: option.is_some(after_id), 499 start_cursor: start_cursor, 500 end_cursor: end_cursor, 501 ) 502 503 let conn_value = 504 connection.Connection( 505 edges: edges, 506 page_info: page_info, 507 total_count: Some(paginated.total_count), 508 ) 509 510 Ok(connection.connection_to_value(conn_value)) 511 } 512 Error(_) -> Error("Failed to fetch reports") 513 } 514 } 515 False -> Error("Admin privileges required") 516 } 517 } 518 Error(_) -> Error("Authentication required") 519 } 520 }, 521 ), 522 ]) 523}