Auto-indexing service and GraphQL API for AT Protocol Records
quickslice.slices.network/
atproto
gleam
graphql
1/// Query resolvers for admin GraphQL API
2import admin_session as session
3import backfill_state
4import database/executor.{type Executor}
5import database/repositories/actors
6import database/repositories/config as config_repo
7import database/repositories/jetstream_activity
8import database/repositories/label_definitions
9import database/repositories/label_preferences
10import database/repositories/labels
11import database/repositories/lexicons
12import database/repositories/oauth_clients
13import database/repositories/records
14import database/repositories/reports
15import gleam/erlang/process.{type Subject}
16import gleam/list
17import gleam/option.{None, Some}
18import gleam/otp/actor
19import gleam/string
20import graphql/admin/converters
21import graphql/admin/cursor
22import graphql/admin/types as admin_types
23import graphql/lexicon/converters as lexicon_converters
24import lib/oauth/did_cache
25import swell/connection
26import swell/schema
27import swell/value
28import wisp
29
30/// Fetch activity buckets for a given time range
31fn fetch_activity_buckets(
32 conn: Executor,
33 range: admin_types.TimeRange,
34) -> Result(value.Value, String) {
35 let fetch_result = case range {
36 admin_types.OneHour -> jetstream_activity.get_activity_1hr(conn)
37 admin_types.ThreeHours -> jetstream_activity.get_activity_3hr(conn)
38 admin_types.SixHours -> jetstream_activity.get_activity_6hr(conn)
39 admin_types.OneDay -> jetstream_activity.get_activity_1day(conn)
40 admin_types.SevenDays -> jetstream_activity.get_activity_7day(conn)
41 }
42 case fetch_result {
43 Ok(buckets) ->
44 Ok(value.List(list.map(buckets, converters.activity_bucket_to_value)))
45 Error(_) -> Error("Failed to fetch activity data")
46 }
47}
48
49/// Build the Query root type with all query resolvers
50pub fn query_type(
51 conn: Executor,
52 req: wisp.Request,
53 did_cache: Subject(did_cache.Message),
54 backfill_state_subject: Subject(backfill_state.Message),
55) -> schema.Type {
56 schema.object_type("Query", "Root query type", [
57 // currentSession query
58 schema.field(
59 "currentSession",
60 admin_types.current_session_type(),
61 "Get current authenticated user session (null if not authenticated)",
62 fn(_ctx) {
63 case session.get_current_session(req, conn, did_cache) {
64 Ok(sess) -> {
65 let user_is_admin = config_repo.is_admin(conn, sess.did)
66 Ok(converters.current_session_to_value(
67 sess.did,
68 sess.handle,
69 user_is_admin,
70 ))
71 }
72 Error(_) -> Ok(value.Null)
73 }
74 },
75 ),
76 // statistics query
77 schema.field(
78 "statistics",
79 schema.non_null(admin_types.statistics_type()),
80 "Get system statistics",
81 fn(_ctx) {
82 case
83 records.get_count(conn),
84 actors.get_count(conn),
85 lexicons.get_count(conn)
86 {
87 Ok(record_count), Ok(actor_count), Ok(lexicon_count) -> {
88 Ok(converters.statistics_to_value(
89 record_count,
90 actor_count,
91 lexicon_count,
92 ))
93 }
94 _, _, _ -> Error("Failed to fetch statistics")
95 }
96 },
97 ),
98 // settings query
99 schema.field(
100 "settings",
101 schema.non_null(admin_types.settings_type()),
102 "Get system settings",
103 fn(_ctx) {
104 let domain_authority = case config_repo.get(conn, "domain_authority") {
105 Ok(authority) -> authority
106 Error(_) -> ""
107 }
108 let admin_dids = config_repo.get_admin_dids(conn)
109 let relay_url = config_repo.get_relay_url(conn)
110 let plc_directory_url = config_repo.get_plc_directory_url(conn)
111 let jetstream_url = config_repo.get_jetstream_url(conn)
112 let oauth_supported_scopes =
113 config_repo.get_oauth_supported_scopes(conn)
114
115 Ok(converters.settings_to_value(
116 domain_authority,
117 admin_dids,
118 relay_url,
119 plc_directory_url,
120 jetstream_url,
121 oauth_supported_scopes,
122 ))
123 },
124 ),
125 // isBackfilling query
126 schema.field(
127 "isBackfilling",
128 schema.non_null(schema.boolean_type()),
129 "Check if a backfill operation is currently running",
130 fn(_ctx) {
131 let is_backfilling =
132 actor.call(
133 backfill_state_subject,
134 waiting: 100,
135 sending: backfill_state.IsBackfilling,
136 )
137 Ok(value.Boolean(is_backfilling))
138 },
139 ),
140 // lexicons query
141 schema.field(
142 "lexicons",
143 schema.non_null(
144 schema.list_type(schema.non_null(admin_types.lexicon_type())),
145 ),
146 "Get all lexicons",
147 fn(_ctx) {
148 case lexicons.get_all(conn) {
149 Ok(lexicon_list) ->
150 Ok(value.List(list.map(lexicon_list, converters.lexicon_to_value)))
151 Error(_) -> Error("Failed to fetch lexicons")
152 }
153 },
154 ),
155 // oauthClients query (admin only)
156 schema.field(
157 "oauthClients",
158 schema.non_null(
159 schema.list_type(schema.non_null(admin_types.oauth_client_type())),
160 ),
161 "Get all OAuth client registrations (admin only)",
162 fn(_ctx) {
163 case session.get_current_session(req, conn, did_cache) {
164 Ok(sess) -> {
165 case config_repo.is_admin(conn, sess.did) {
166 True -> {
167 case oauth_clients.get_all(conn) {
168 Ok(clients) ->
169 Ok(
170 value.List(list.map(
171 clients,
172 converters.oauth_client_to_value,
173 )),
174 )
175 Error(_) -> Error("Failed to fetch OAuth clients")
176 }
177 }
178 False -> Error("Admin privileges required")
179 }
180 }
181 Error(_) -> Error("Authentication required")
182 }
183 },
184 ),
185 // activityBuckets query with TimeRange argument
186 schema.field_with_args(
187 "activityBuckets",
188 schema.non_null(
189 schema.list_type(schema.non_null(admin_types.activity_bucket_type())),
190 ),
191 "Get activity data bucketed by time range",
192 [
193 schema.argument(
194 "range",
195 schema.non_null(admin_types.time_range_enum()),
196 "Time range for bucketing",
197 None,
198 ),
199 ],
200 fn(ctx) {
201 case schema.get_argument(ctx, "range") {
202 Some(value.String(range_str)) ->
203 case admin_types.time_range_from_string(range_str) {
204 Ok(range) -> fetch_activity_buckets(conn, range)
205 Error(_) -> Error("Invalid time range argument")
206 }
207 _ -> Error("Missing time range argument")
208 }
209 },
210 ),
211 // recentActivity query with hours argument
212 schema.field_with_args(
213 "recentActivity",
214 schema.non_null(
215 schema.list_type(schema.non_null(admin_types.activity_entry_type())),
216 ),
217 "Get recent activity entries",
218 [
219 schema.argument(
220 "hours",
221 schema.non_null(schema.int_type()),
222 "Number of hours to look back",
223 None,
224 ),
225 ],
226 fn(ctx) {
227 case schema.get_argument(ctx, "hours") {
228 Some(value.Int(hours)) -> {
229 case jetstream_activity.get_recent_activity(conn, hours) {
230 Ok(entries) ->
231 Ok(
232 value.List(list.map(
233 entries,
234 converters.activity_entry_to_value,
235 )),
236 )
237 Error(_) -> Error("Failed to fetch recent activity")
238 }
239 }
240 _ -> Error("Invalid or missing hours argument")
241 }
242 },
243 ),
244 // labelDefinitions query
245 schema.field(
246 "labelDefinitions",
247 schema.non_null(
248 schema.list_type(schema.non_null(admin_types.label_definition_type())),
249 ),
250 "Get all label definitions",
251 fn(_ctx) {
252 case label_definitions.get_all(conn) {
253 Ok(defs) ->
254 Ok(value.List(list.map(defs, converters.label_definition_to_value)))
255 Error(_) -> Error("Failed to fetch label definitions")
256 }
257 },
258 ),
259 // viewerLabelPreferences query (authenticated users)
260 schema.field(
261 "viewerLabelPreferences",
262 schema.non_null(
263 schema.list_type(schema.non_null(admin_types.label_preference_type())),
264 ),
265 "Get label preferences for the current user (non-system labels only)",
266 fn(_ctx) {
267 case session.get_current_session(req, conn, did_cache) {
268 Ok(sess) -> {
269 // Get non-system label definitions
270 case label_definitions.get_non_system(conn) {
271 Ok(defs) -> {
272 // Get user's preferences
273 case label_preferences.get_by_did(conn, sess.did) {
274 Ok(prefs) -> {
275 // Build a map of label_val -> visibility
276 let pref_map =
277 list.fold(prefs, [], fn(acc, pref) {
278 [#(pref.label_val, pref.visibility), ..acc]
279 })
280
281 // Map each definition to a preference, using user's setting or default
282 let result =
283 list.map(defs, fn(def) {
284 let visibility = case list.key_find(pref_map, def.val) {
285 Ok(v) -> v
286 Error(_) -> def.default_visibility
287 }
288 lexicon_converters.label_preference_to_value(
289 def,
290 visibility,
291 )
292 })
293
294 Ok(value.List(result))
295 }
296 Error(_) -> Error("Failed to fetch label preferences")
297 }
298 }
299 Error(_) -> Error("Failed to fetch label definitions")
300 }
301 }
302 Error(_) -> Error("Authentication required")
303 }
304 },
305 ),
306 // labels query (admin only) - Connection type
307 schema.field_with_args(
308 "labels",
309 schema.non_null(admin_types.label_connection_type()),
310 "Get labels with optional filters (admin only)",
311 [
312 schema.argument(
313 "uri",
314 schema.string_type(),
315 "Filter by subject URI",
316 None,
317 ),
318 schema.argument(
319 "val",
320 schema.string_type(),
321 "Filter by label value",
322 None,
323 ),
324 schema.argument(
325 "first",
326 schema.int_type(),
327 "Number of items to fetch (default 50)",
328 None,
329 ),
330 schema.argument(
331 "after",
332 schema.string_type(),
333 "Cursor for pagination",
334 None,
335 ),
336 ],
337 fn(ctx) {
338 case session.get_current_session(req, conn, did_cache) {
339 Ok(sess) -> {
340 case config_repo.is_admin(conn, sess.did) {
341 True -> {
342 let uri_filter = case schema.get_argument(ctx, "uri") {
343 Some(value.String(u)) -> Some(u)
344 _ -> None
345 }
346 let val_filter = case schema.get_argument(ctx, "val") {
347 Some(value.String(v)) -> Some(v)
348 _ -> None
349 }
350 let first = case schema.get_argument(ctx, "first") {
351 Some(value.Int(f)) -> f
352 _ -> 50
353 }
354 let after_id = case schema.get_argument(ctx, "after") {
355 Some(value.String(c)) -> {
356 case cursor.decode(c) {
357 Ok(#("Label", id)) -> Some(id)
358 _ -> None
359 }
360 }
361 _ -> None
362 }
363
364 case
365 labels.get_paginated(
366 conn,
367 uri_filter,
368 val_filter,
369 first,
370 after_id,
371 )
372 {
373 Ok(paginated) -> {
374 // Build edges with cursors
375 let edges =
376 list.map(paginated.labels, fn(label) {
377 connection.Edge(
378 node: converters.label_to_value(label),
379 cursor: cursor.encode("Label", label.id),
380 )
381 })
382
383 // Build page info
384 let start_cursor = case list.first(paginated.labels) {
385 Ok(first_label) ->
386 Some(cursor.encode("Label", first_label.id))
387 Error(_) -> None
388 }
389 let end_cursor = case list.last(paginated.labels) {
390 Ok(last_label) ->
391 Some(cursor.encode("Label", last_label.id))
392 Error(_) -> None
393 }
394
395 let page_info =
396 connection.PageInfo(
397 has_next_page: paginated.has_next_page,
398 has_previous_page: option.is_some(after_id),
399 start_cursor: start_cursor,
400 end_cursor: end_cursor,
401 )
402
403 let conn_value =
404 connection.Connection(
405 edges: edges,
406 page_info: page_info,
407 total_count: Some(paginated.total_count),
408 )
409
410 Ok(connection.connection_to_value(conn_value))
411 }
412 Error(_) -> Error("Failed to fetch labels")
413 }
414 }
415 False -> Error("Admin privileges required")
416 }
417 }
418 Error(_) -> Error("Authentication required")
419 }
420 },
421 ),
422 // reports query (admin only) - Connection type
423 schema.field_with_args(
424 "reports",
425 schema.non_null(admin_types.report_connection_type()),
426 "Get moderation reports with optional status filter (admin only)",
427 [
428 schema.argument(
429 "status",
430 admin_types.report_status_enum(),
431 "Filter by status",
432 None,
433 ),
434 schema.argument(
435 "first",
436 schema.int_type(),
437 "Number of items to fetch (default 50)",
438 None,
439 ),
440 schema.argument(
441 "after",
442 schema.string_type(),
443 "Cursor for pagination",
444 None,
445 ),
446 ],
447 fn(ctx) {
448 case session.get_current_session(req, conn, did_cache) {
449 Ok(sess) -> {
450 case config_repo.is_admin(conn, sess.did) {
451 True -> {
452 let status_filter = case schema.get_argument(ctx, "status") {
453 Some(value.Enum(s)) -> Some(string.lowercase(s))
454 _ -> None
455 }
456 let first = case schema.get_argument(ctx, "first") {
457 Some(value.Int(f)) -> f
458 _ -> 50
459 }
460 let after_id = case schema.get_argument(ctx, "after") {
461 Some(value.String(c)) -> {
462 case cursor.decode(c) {
463 Ok(#("Report", id)) -> Some(id)
464 _ -> None
465 }
466 }
467 _ -> None
468 }
469
470 case
471 reports.get_paginated(conn, status_filter, first, after_id)
472 {
473 Ok(paginated) -> {
474 // Build edges with cursors
475 let edges =
476 list.map(paginated.reports, fn(report) {
477 connection.Edge(
478 node: converters.report_to_value(report),
479 cursor: cursor.encode("Report", report.id),
480 )
481 })
482
483 // Build page info
484 let start_cursor = case list.first(paginated.reports) {
485 Ok(first_report) ->
486 Some(cursor.encode("Report", first_report.id))
487 Error(_) -> None
488 }
489 let end_cursor = case list.last(paginated.reports) {
490 Ok(last_report) ->
491 Some(cursor.encode("Report", last_report.id))
492 Error(_) -> None
493 }
494
495 let page_info =
496 connection.PageInfo(
497 has_next_page: paginated.has_next_page,
498 has_previous_page: option.is_some(after_id),
499 start_cursor: start_cursor,
500 end_cursor: end_cursor,
501 )
502
503 let conn_value =
504 connection.Connection(
505 edges: edges,
506 page_info: page_info,
507 total_count: Some(paginated.total_count),
508 )
509
510 Ok(connection.connection_to_value(conn_value))
511 }
512 Error(_) -> Error("Failed to fetch reports")
513 }
514 }
515 False -> Error("Admin privileges required")
516 }
517 }
518 Error(_) -> Error("Authentication required")
519 }
520 },
521 ),
522 ])
523}