Auto-indexing service and GraphQL API for AT Protocol Records
quickslice.slices.network/
atproto
gleam
graphql
1import actor_validator
2import backfill
3import database/executor.{type Executor}
4import database/repositories/actors
5import database/repositories/jetstream_activity
6import database/repositories/lexicons
7import database/repositories/records
8import database/types.{Inserted, Skipped}
9import gleam/dynamic.{type Dynamic}
10import gleam/dynamic/decode
11import gleam/json
12import gleam/list
13import gleam/option
14import gleam/result
15import gleam/string
16import goose
17import honk
18import honk/errors
19import logging
20import pubsub
21import stats_pubsub
22import timestamp
23
24/// Convert a Dynamic value (Erlang term) to JSON string
25fn dynamic_to_json(value: Dynamic) -> String {
26 // Erlang's json:encode returns an iolist, we need to convert it to a string
27 let iolist = do_json_encode(value)
28 iolist_to_string(iolist)
29}
30
31/// Encode a dynamic value to JSON (returns iolist)
32@external(erlang, "json", "encode")
33fn do_json_encode(value: Dynamic) -> Dynamic
34
35/// Convert an iolist to a string
36@external(erlang, "erlang", "iolist_to_binary")
37fn iolist_to_binary(iolist: Dynamic) -> Dynamic
38
39/// Wrapper to convert iolist to string
40fn iolist_to_string(iolist: Dynamic) -> String {
41 let binary = iolist_to_binary(iolist)
42 // The binary is already a string in Gleam's representation
43 case decode.run(binary, decode.string) {
44 Ok(str) -> str
45 Error(_) -> {
46 logging.log(
47 logging.Warning,
48 "[jetstream] Failed to convert iolist to string",
49 )
50 string.inspect(iolist)
51 }
52 }
53}
54
55/// Serialize a commit event to JSON string for activity logging
56fn serialize_commit_event(
57 did: String,
58 time_us: Int,
59 commit: goose.CommitData,
60) -> String {
61 let record_json = case commit.record {
62 option.Some(record_data) -> json.string(dynamic_to_json(record_data))
63 option.None -> json.null()
64 }
65
66 let cid_json = case commit.cid {
67 option.Some(cid) -> json.string(cid)
68 option.None -> json.null()
69 }
70
71 json.object([
72 #("did", json.string(did)),
73 #("time_us", json.int(time_us)),
74 #(
75 "commit",
76 json.object([
77 #("rev", json.string(commit.rev)),
78 #("operation", json.string(commit.operation)),
79 #("collection", json.string(commit.collection)),
80 #("rkey", json.string(commit.rkey)),
81 #("record", record_json),
82 #("cid", cid_json),
83 ]),
84 ),
85 ])
86 |> json.to_string
87}
88
89/// Handle a commit event (create, update, or delete)
90pub fn handle_commit_event(
91 db: Executor,
92 did: String,
93 time_us: Int,
94 commit: goose.CommitData,
95 plc_url: String,
96 collection_ids: List(String),
97 external_collection_ids: List(String),
98) -> Nil {
99 let uri = "at://" <> did <> "/" <> commit.collection <> "/" <> commit.rkey
100
101 // Log activity at entry point - serialize the commit event to JSON
102 let event_json = serialize_commit_event(did, time_us, commit)
103 let timestamp = timestamp.microseconds_to_iso8601(time_us)
104
105 let activity_id = case
106 jetstream_activity.log_activity(
107 db,
108 timestamp,
109 commit.operation,
110 commit.collection,
111 did,
112 event_json,
113 )
114 {
115 Ok(id) -> option.Some(id)
116 Error(err) -> {
117 logging.log(
118 logging.Warning,
119 "[jetstream] Failed to log activity: " <> string.inspect(err),
120 )
121 option.None
122 }
123 }
124
125 case commit.operation {
126 "create" | "update" -> {
127 // Extract record and cid from options
128 case commit.record, commit.cid {
129 option.Some(record_data), option.Some(cid_value) -> {
130 // Convert the dynamic record to JSON string using Erlang's json:encode
131 let json_string = dynamic_to_json(record_data)
132
133 // Get lexicons from database for validation
134 case lexicons.get_all(db) {
135 Ok(lexicons) -> {
136 // Parse lexicon JSON strings to Json objects
137 let lexicon_jsons_result =
138 lexicons
139 |> list.try_map(fn(lex) {
140 honk.parse_json_string(lex.json)
141 |> result.map_error(fn(e) { errors.to_string(e) })
142 })
143
144 // Parse record JSON string to Json object
145 let record_json_result =
146 honk.parse_json_string(json_string)
147 |> result.map_error(fn(e) { errors.to_string(e) })
148
149 // Validate record against lexicon
150 case lexicon_jsons_result, record_json_result {
151 Ok(lexicon_jsons), Ok(record_json) ->
152 case
153 honk.validate_record(
154 lexicon_jsons,
155 commit.collection,
156 record_json,
157 )
158 {
159 Ok(_) -> {
160 // Check if record already exists BEFORE inserting to determine operation type
161 let existing_record = records.get(db, uri)
162 let is_create = case existing_record {
163 Ok([]) -> True
164 // Empty list means record doesn't exist
165 Ok(_) -> False
166 // Non-empty list means record exists
167 Error(_) -> {
168 // Database error - log it and treat as update to be safe
169 logging.log(
170 logging.Warning,
171 "[jetstream] Error checking existing record for "
172 <> uri,
173 )
174 False
175 }
176 }
177
178 // Ensure actor exists before inserting record
179 case
180 actor_validator.ensure_actor_exists(db, did, plc_url)
181 {
182 Ok(is_new_actor) -> {
183 // If this is a new actor, synchronously backfill all collections
184 // This ensures subscription joins have complete data immediately
185 // We're already in a spawned process per event, so blocking is fine
186 case is_new_actor {
187 True -> {
188 // Publish stats event for new actor
189 stats_pubsub.publish(stats_pubsub.ActorCreated)
190
191 backfill.backfill_collections_for_actor(
192 db,
193 did,
194 collection_ids,
195 external_collection_ids,
196 plc_url,
197 )
198 }
199 False -> Nil
200 }
201
202 // Continue with record insertion
203 // Validation passed, insert record
204 case
205 records.insert(
206 db,
207 uri,
208 cid_value,
209 did,
210 commit.collection,
211 json_string,
212 )
213 {
214 Ok(Inserted) -> {
215 logging.log(
216 logging.Info,
217 "[jetstream] "
218 <> case is_create {
219 True -> "create"
220 False -> "update"
221 }
222 <> " "
223 <> commit.collection
224 <> " ("
225 <> commit.rkey
226 <> ") "
227 <> did,
228 )
229
230 // Update activity status to success
231 case activity_id {
232 option.Some(id) -> {
233 case
234 jetstream_activity.update_status(
235 db,
236 id,
237 "success",
238 option.None,
239 )
240 {
241 Ok(_) ->
242 // Publish activity event for real-time UI updates
243 stats_pubsub.publish(
244 stats_pubsub.ActivityLogged(
245 id,
246 timestamp,
247 commit.operation,
248 commit.collection,
249 did,
250 "success",
251 option.None,
252 event_json,
253 ),
254 )
255 Error(_) -> Nil
256 }
257 }
258 option.None -> Nil
259 }
260
261 // Publish event to PubSub for GraphQL subscriptions
262 let operation = case is_create {
263 True -> pubsub.Create
264 False -> pubsub.Update
265 }
266
267 // Convert event timestamp from microseconds to ISO8601
268 let indexed_at =
269 timestamp.microseconds_to_iso8601(time_us)
270
271 let event =
272 pubsub.RecordEvent(
273 uri: uri,
274 cid: cid_value,
275 did: did,
276 collection: commit.collection,
277 value: json_string,
278 indexed_at: indexed_at,
279 operation: operation,
280 )
281
282 pubsub.publish(event)
283
284 // Publish stats event for real-time stats updates
285 case is_create {
286 True ->
287 stats_pubsub.publish(
288 stats_pubsub.RecordCreated,
289 )
290 False -> Nil
291 }
292 }
293 Ok(Skipped) -> {
294 logging.log(
295 logging.Info,
296 "[jetstream] skipped (duplicate CID) "
297 <> commit.collection
298 <> " ("
299 <> commit.rkey
300 <> ") "
301 <> did,
302 )
303
304 // Update activity status to success (but don't increment counters)
305 case activity_id {
306 option.Some(id) -> {
307 case
308 jetstream_activity.update_status(
309 db,
310 id,
311 "success",
312 option.Some("Skipped: duplicate CID"),
313 )
314 {
315 Ok(_) ->
316 // Publish activity event for real-time UI updates
317 stats_pubsub.publish(
318 stats_pubsub.ActivityLogged(
319 id,
320 timestamp,
321 commit.operation,
322 commit.collection,
323 did,
324 "success",
325 option.Some("Skipped: duplicate CID"),
326 event_json,
327 ),
328 )
329 Error(_) -> Nil
330 }
331 }
332 option.None -> Nil
333 }
334 // Don't publish RecordCreated event - record wasn't actually created
335 }
336 Error(err) -> {
337 logging.log(
338 logging.Error,
339 "[jetstream] Failed to insert record "
340 <> uri
341 <> ": "
342 <> string.inspect(err),
343 )
344
345 // Update activity status to error
346 case activity_id {
347 option.Some(id) -> {
348 case
349 jetstream_activity.update_status(
350 db,
351 id,
352 "error",
353 option.Some(
354 "Database insert failed: "
355 <> string.inspect(err),
356 ),
357 )
358 {
359 Ok(_) -> {
360 let error_msg =
361 "Database insert failed: "
362 <> string.inspect(err)
363 // Publish activity event for real-time UI updates
364 stats_pubsub.publish(
365 stats_pubsub.ActivityLogged(
366 id,
367 timestamp,
368 commit.operation,
369 commit.collection,
370 did,
371 "error",
372 option.Some(error_msg),
373 event_json,
374 ),
375 )
376 }
377 Error(_) -> Nil
378 }
379 }
380 option.None -> Nil
381 }
382 }
383 }
384 }
385 Error(actor_err) -> {
386 logging.log(
387 logging.Error,
388 "[jetstream] Failed to validate/create actor for "
389 <> uri
390 <> ": "
391 <> actor_err,
392 )
393
394 // Update activity status to error
395 case activity_id {
396 option.Some(id) -> {
397 case
398 jetstream_activity.update_status(
399 db,
400 id,
401 "error",
402 option.Some(
403 "Actor validation failed: " <> actor_err,
404 ),
405 )
406 {
407 Ok(_) -> {
408 let error_msg =
409 "Actor validation failed: " <> actor_err
410 // Publish activity event for real-time UI updates
411 stats_pubsub.publish(
412 stats_pubsub.ActivityLogged(
413 id,
414 timestamp,
415 commit.operation,
416 commit.collection,
417 did,
418 "error",
419 option.Some(error_msg),
420 event_json,
421 ),
422 )
423 }
424 Error(_) -> Nil
425 }
426 }
427 option.None -> Nil
428 }
429 }
430 }
431 }
432 Error(validation_error) -> {
433 logging.log(
434 logging.Warning,
435 "[jetstream] Validation failed for "
436 <> uri
437 <> ": "
438 <> errors.to_string(validation_error),
439 )
440
441 // Update activity status to validation_error
442 case activity_id {
443 option.Some(id) -> {
444 case
445 jetstream_activity.update_status(
446 db,
447 id,
448 "validation_error",
449 option.Some(errors.to_string(validation_error)),
450 )
451 {
452 Ok(_) -> {
453 let error_msg = errors.to_string(validation_error)
454 // Publish activity event for real-time UI updates
455 stats_pubsub.publish(stats_pubsub.ActivityLogged(
456 id,
457 timestamp,
458 commit.operation,
459 commit.collection,
460 did,
461 "validation_error",
462 option.Some(error_msg),
463 event_json,
464 ))
465 }
466 Error(_) -> Nil
467 }
468 }
469 option.None -> Nil
470 }
471 }
472 }
473 Error(_lex_parse_err), _ | _, Error(_rec_parse_err) -> {
474 logging.log(
475 logging.Error,
476 "[jetstream] Failed to parse JSON for validation: " <> uri,
477 )
478
479 // Update activity status to error
480 case activity_id {
481 option.Some(id) -> {
482 let _ =
483 jetstream_activity.update_status(
484 db,
485 id,
486 "error",
487 option.Some("Failed to parse JSON"),
488 )
489 Nil
490 }
491 option.None -> Nil
492 }
493 }
494 }
495 }
496 Error(db_err) -> {
497 logging.log(
498 logging.Error,
499 "[jetstream] Failed to fetch lexicons for validation: "
500 <> string.inspect(db_err),
501 )
502
503 // Update activity status to error
504 case activity_id {
505 option.Some(id) -> {
506 let _ =
507 jetstream_activity.update_status(
508 db,
509 id,
510 "error",
511 option.Some(
512 "Failed to fetch lexicons: " <> string.inspect(db_err),
513 ),
514 )
515 Nil
516 }
517 option.None -> Nil
518 }
519 }
520 }
521 }
522 _, _ -> {
523 logging.log(
524 logging.Warning,
525 "[jetstream] "
526 <> commit.operation
527 <> " event missing record or cid for "
528 <> uri,
529 )
530
531 // Update activity status to error
532 case activity_id {
533 option.Some(id) -> {
534 let _ =
535 jetstream_activity.update_status(
536 db,
537 id,
538 "error",
539 option.Some("Event missing record or cid"),
540 )
541 Nil
542 }
543 option.None -> Nil
544 }
545 }
546 }
547 }
548 "delete" -> {
549 logging.log(
550 logging.Info,
551 "[jetstream] delete "
552 <> commit.collection
553 <> " ("
554 <> commit.rkey
555 <> ") "
556 <> did,
557 )
558
559 case records.delete(db, uri) {
560 Ok(_) -> {
561 // Update activity status to success
562 case activity_id {
563 option.Some(id) -> {
564 case
565 jetstream_activity.update_status(db, id, "success", option.None)
566 {
567 Ok(_) ->
568 // Publish activity event for real-time UI updates
569 stats_pubsub.publish(stats_pubsub.ActivityLogged(
570 id,
571 timestamp,
572 commit.operation,
573 commit.collection,
574 did,
575 "success",
576 option.None,
577 event_json,
578 ))
579 Error(_) -> Nil
580 }
581 }
582 option.None -> Nil
583 }
584
585 // Publish delete event to PubSub for GraphQL subscriptions
586 // Use the event timestamp from the Jetstream event
587 let indexed_at = timestamp.microseconds_to_iso8601(time_us)
588
589 let event =
590 pubsub.RecordEvent(
591 uri: uri,
592 cid: "",
593 did: did,
594 collection: commit.collection,
595 value: "",
596 indexed_at: indexed_at,
597 operation: pubsub.Delete,
598 )
599
600 pubsub.publish(event)
601
602 // Publish stats event for real-time stats updates
603 stats_pubsub.publish(stats_pubsub.RecordDeleted)
604 }
605 Error(err) -> {
606 logging.log(
607 logging.Error,
608 "[jetstream] Failed to delete: " <> string.inspect(err),
609 )
610
611 // Update activity status to error
612 case activity_id {
613 option.Some(id) -> {
614 let _ =
615 jetstream_activity.update_status(
616 db,
617 id,
618 "error",
619 option.Some("Delete failed: " <> string.inspect(err)),
620 )
621 Nil
622 }
623 option.None -> Nil
624 }
625 }
626 }
627 }
628 _ -> {
629 logging.log(
630 logging.Warning,
631 "[jetstream] Unknown operation: " <> commit.operation,
632 )
633
634 // Update activity status to error
635 case activity_id {
636 option.Some(id) -> {
637 let _ =
638 jetstream_activity.update_status(
639 db,
640 id,
641 "error",
642 option.Some("Unknown operation: " <> commit.operation),
643 )
644 Nil
645 }
646 option.None -> Nil
647 }
648 }
649 }
650}
651
652/// Handle an identity event (update actor handle)
653pub fn handle_identity_event(db: Executor, identity: goose.IdentityData) -> Nil {
654 case actors.upsert(db, identity.did, identity.handle) {
655 Ok(_) -> {
656 logging.log(
657 logging.Info,
658 "[jetstream] identity update: "
659 <> identity.handle
660 <> " ("
661 <> identity.did
662 <> ")",
663 )
664 }
665 Error(err) -> {
666 logging.log(
667 logging.Error,
668 "[jetstream] Failed to upsert actor "
669 <> identity.did
670 <> ": "
671 <> string.inspect(err),
672 )
673 }
674 }
675}
676
677/// Handle an account event
678pub fn handle_account_event(_db: Executor, account: goose.AccountData) -> Nil {
679 // For now, just log account events - we could extend this in the future
680 let status = case account.active {
681 True -> "active"
682 False -> "inactive"
683 }
684 logging.log(
685 logging.Info,
686 "[jetstream] account " <> status <> ": " <> account.did,
687 )
688}