Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql

feat: publish pubsub events after GraphQL mutations

Adds pubsub.publish() calls after create, update, and delete mutation
resolvers so WebSocket subscriptions receive updates immediately
without waiting for Jetstream round-trip.

- Create timestamp utility module for ISO8601 formatting
- Rename event_handler_ffi to timestamp_ffi
- Add pubsub events after records.insert/update/delete in mutations

+55 -9
+5 -8
server/src/event_handler.gleam
··· 19 19 import logging 20 20 import pubsub 21 21 import stats_pubsub 22 + import timestamp 22 23 23 24 /// Convert a Dynamic value (Erlang term) to JSON string 24 25 fn dynamic_to_json(value: Dynamic) -> String { ··· 50 51 } 51 52 } 52 53 } 53 - 54 - /// Convert microseconds since Unix epoch to ISO8601 format 55 - /// Uses the event's original timestamp for accurate indexedAt values 56 - @external(erlang, "event_handler_ffi", "microseconds_to_iso8601") 57 - fn microseconds_to_iso8601(time_us: Int) -> String 58 54 59 55 /// Serialize a commit event to JSON string for activity logging 60 56 fn serialize_commit_event( ··· 104 100 105 101 // Log activity at entry point - serialize the commit event to JSON 106 102 let event_json = serialize_commit_event(did, time_us, commit) 107 - let timestamp = microseconds_to_iso8601(time_us) 103 + let timestamp = timestamp.microseconds_to_iso8601(time_us) 108 104 109 105 let activity_id = case 110 106 jetstream_activity.log_activity( ··· 269 265 } 270 266 271 267 // Convert event timestamp from microseconds to ISO8601 272 - let indexed_at = microseconds_to_iso8601(time_us) 268 + let indexed_at = 269 + timestamp.microseconds_to_iso8601(time_us) 273 270 274 271 let event = 275 272 pubsub.RecordEvent( ··· 587 584 588 585 // Publish delete event to PubSub for GraphQL subscriptions 589 586 // Use the event timestamp from the Jetstream event 590 - let indexed_at = microseconds_to_iso8601(time_us) 587 + let indexed_at = timestamp.microseconds_to_iso8601(time_us) 591 588 592 589 let event = 593 590 pubsub.RecordEvent(
+1 -1
server/src/event_handler_ffi.erl server/src/timestamp_ffi.erl
··· 1 - -module(event_handler_ffi). 1 + -module(timestamp_ffi). 2 2 -export([microseconds_to_iso8601/1]). 3 3 4 4 %% Convert microseconds since Unix epoch to ISO8601 format
+35
server/src/graphql/lexicon/mutations.gleam
··· 21 21 import honk 22 22 import honk/errors 23 23 import lib/oauth/did_cache 24 + import pubsub 24 25 import swell/schema 25 26 import swell/value 27 + import timestamp 26 28 27 29 /// Context for mutation execution 28 30 pub type MutationContext { ··· 500 502 |> result.map_error(fn(_) { "Failed to index record in database" }), 501 503 ) 502 504 505 + // Publish event for GraphQL subscriptions 506 + pubsub.publish(pubsub.RecordEvent( 507 + uri: uri, 508 + cid: cid, 509 + did: auth.user_info.did, 510 + collection: collection, 511 + value: record_json_string, 512 + indexed_at: timestamp.current_iso8601(), 513 + operation: pubsub.Create, 514 + )) 515 + 503 516 Ok( 504 517 value.Object([ 505 518 #("uri", value.String(uri)), ··· 613 626 |> result.map_error(fn(_) { "Failed to update record in database" }), 614 627 ) 615 628 629 + // Publish event for GraphQL subscriptions 630 + pubsub.publish(pubsub.RecordEvent( 631 + uri: uri, 632 + cid: cid, 633 + did: auth.user_info.did, 634 + collection: collection, 635 + value: record_json_string, 636 + indexed_at: timestamp.current_iso8601(), 637 + operation: pubsub.Update, 638 + )) 639 + 616 640 Ok( 617 641 value.Object([ 618 642 #("uri", value.String(uri)), ··· 680 704 records.delete(ctx.db, uri) 681 705 |> result.map_error(fn(_) { "Failed to delete record from database" }), 682 706 ) 707 + 708 + // Publish event for GraphQL subscriptions 709 + pubsub.publish(pubsub.RecordEvent( 710 + uri: uri, 711 + cid: "", 712 + did: auth.user_info.did, 713 + collection: collection, 714 + value: "", 715 + indexed_at: timestamp.current_iso8601(), 716 + operation: pubsub.Delete, 717 + )) 683 718 684 719 Ok(value.Object([#("uri", value.String(uri))])) 685 720 }
+14
server/src/timestamp.gleam
··· 1 + /// Timestamp utilities for ISO8601 formatting 2 + /// Convert microseconds since Unix epoch to ISO8601 format 3 + @external(erlang, "timestamp_ffi", "microseconds_to_iso8601") 4 + pub fn microseconds_to_iso8601(time_us: Int) -> String 5 + 6 + /// Get current timestamp in nanoseconds 7 + @external(erlang, "os", "system_time") 8 + fn system_time_native() -> Int 9 + 10 + /// Get current time as ISO8601 string 11 + pub fn current_iso8601() -> String { 12 + // os:system_time() returns nanoseconds, convert to microseconds 13 + microseconds_to_iso8601(system_time_native() / 1000) 14 + }