A Ruby gem for streaming data from the Bluesky/ATProto firehose

added YARD documentation for most classes

+762 -9
+18
lib/skyfall/collection.rb
··· 1 1 module Skyfall 2 + 3 + # 4 + # This module defines constants for known Bluesky record collection types, and a mapping of those 5 + # names to symbol short codes which can be used as shorthand when processing events or in 6 + # Jetstream filters. 7 + # 8 + 2 9 module Collection 3 10 BSKY_PROFILE = "app.bsky.actor.profile" 4 11 BSKY_ACTOR_STATUS = "app.bsky.actor.status" ··· 19 26 20 27 BSKY_NOTIF_DECLARATION = "app.bsky.notification.declaration" 21 28 BSKY_CHAT_DECLARATION = "chat.bsky.actor.declaration" 29 + 30 + # Mapping of NSID collection names to symbol short codes 22 31 23 32 SHORT_CODES = { 24 33 BSKY_ACTOR_STATUS => :bsky_actor_status, ··· 41 50 BSKY_NOTIF_DECLARATION => :bsky_notif_declaration 42 51 } 43 52 53 + # Returns a symbol short code for a given collection NSID, or `:unknown` 54 + # if NSID is not on the list. 55 + # @param collection [String] collection NSID 56 + # @return [Symbol] short code or :unknown 57 + 44 58 def self.short_code(collection) 45 59 SHORT_CODES[collection] || :unknown 46 60 end 61 + 62 + # Returns a collection NSID assigned to a given short code symbol, if one is defined. 63 + # @param code [Symbol] one of the symbols listed in {SHORT_CODES} 64 + # @return [String, nil] assigned NSID string, or nil when code is not known 47 65 48 66 def self.from_short_code(code) 49 67 SHORT_CODES.detect { |k, v| v == code }&.first
+35 -1
lib/skyfall/errors.rb
··· 1 1 module Skyfall 2 + # 3 + # Wrapper base class for Skyfall error classes. 4 + # 2 5 class Error < StandardError 3 6 end 4 7 8 + # 9 + # Raised when some part of the message being decoded has invalid format. 10 + # 5 11 class DecodeError < Error 6 12 end 7 13 14 + # 15 + # Raised when the server sends a message which is formatted correctly, but written in a version 16 + # that's not supported by this library. 17 + # 8 18 class UnsupportedError < Error 9 19 end 10 20 21 + # 22 + # Raised when {Stream#connect} is called and there's already another instance of {Stream} or its 23 + # subclass like {Firehose} that's connected to another websocket. 24 + # 25 + # This is currently not supported in Skyfall, because it uses EventMachine behind the scenes, which 26 + # runs everything on a single "reactor" thread, and there can be only one such reactor thread in 27 + # a given process. In theory, it should be possible for two connections to run inside a single 28 + # shared EventMachine event loop, but it would require some more coordination and it might have 29 + # unexpected side effects - e.g. synchronous work (including I/O and network requests) done during 30 + # processing of an event from one connection would be blocking the other connection. 31 + # 11 32 class ReactorActiveError < Error 12 33 def initialize 13 34 super( ··· 17 38 end 18 39 end 19 40 41 + # 42 + # Raised when the server sends a message which is formatted correctly, but describes some kind of 43 + # error condition that the server has detected. 44 + # 20 45 class SubscriptionError < Error 21 - attr_reader :error_type, :error_message 22 46 47 + # @return [String] a short machine-readable error code 48 + attr_reader :error_type 49 + 50 + # @return [String] a human-readable error message 51 + attr_reader :error_message 52 + 53 + # 54 + # @param error_type [String] a short machine-readable error code 55 + # @param error_message [String, nil] a human-readable error message 56 + # 23 57 def initialize(error_type, error_message = nil) 24 58 @error_type = error_type 25 59 @error_message = error_message
+2
lib/skyfall/extensions.rb
··· 2 2 require 'stringio' 3 3 4 4 module Skyfall 5 + 6 + # @private 5 7 module Extensions 6 8 7 9 refine StringIO do
+74
lib/skyfall/firehose.rb
··· 2 2 require 'uri' 3 3 4 4 module Skyfall 5 + 6 + # 7 + # Client of a standard AT Protocol firehose websocket. 8 + # 9 + # This is the main Skyfall class to use to connect to a CBOR-based firehose 10 + # websocket endpoint like `subscribeRepos` (on a PDS or a relay). 11 + # 12 + # To connect to the firehose, you need to: 13 + # 14 + # * create an instance of {Firehose}, passing it the hostname/URL of the server, 15 + # name of the endpoint (normally `:subscribe_repos`) and optionally a cursor 16 + # * set up callbacks to be run when connecting, disconnecting, when a message 17 + # is received etc. (you need to set at least a message handler) 18 + # * call {#connect} to start the connection 19 + # * handle the received messages (instances of a {Skyfall::Firehose::Message} 20 + # subclass) 21 + # 22 + # @example 23 + # client = Skyfall::Firehose.new('bsky.network', :subscribe_repos, last_cursor) 24 + # 25 + # client.on_message do |msg| 26 + # next unless msg.type == :commit 27 + # 28 + # msg.operations.each do |op| 29 + # if op.type == :bsky_post && op.action == :create 30 + # puts "[#{msg.time}] #{msg.repo}: #{op.raw_record['text']}" 31 + # end 32 + # end 33 + # end 34 + # 35 + # client.connect 36 + # 37 + # # You might also want to set some or all of these lifecycle callback handlers: 38 + # 39 + # client.on_connecting { |url| puts "Connecting to #{url}..." } 40 + # client.on_connect { puts "Connected" } 41 + # client.on_disconnect { puts "Disconnected" } 42 + # client.on_reconnect { puts "Connection lost, trying to reconnect..." } 43 + # client.on_timeout { puts "Connection stalled, triggering a reconnect..." } 44 + # client.on_error { |e| puts "ERROR: #{e}" } 45 + # 46 + # @note Most of the methods of this class that you might want to use are defined in {Skyfall::Stream}. 47 + # 48 + 5 49 class Firehose < Stream 50 + 51 + # the main firehose endpoint on a PDS or relay 6 52 SUBSCRIBE_REPOS = "com.atproto.sync.subscribeRepos" 53 + 54 + # only used with moderation services (labellers) 7 55 SUBSCRIBE_LABELS = "com.atproto.label.subscribeLabels" 8 56 9 57 NAMED_ENDPOINTS = { ··· 11 59 :subscribe_labels => SUBSCRIBE_LABELS 12 60 } 13 61 62 + # Current cursor (seq of the last seen message) 63 + # @return [Integer, nil] 14 64 attr_accessor :cursor 15 65 66 + # 67 + # @param server [String] Address of the server to connect to. 68 + # Expects a string with either just a hostname, or a ws:// or wss:// URL with no path. 69 + # 70 + # @param endpoint [Symbol, String] XRPC method name. 71 + # Pass either a full NSID, or a symbol shorthand from {NAMED_ENDPOINTS} 72 + # 73 + # @param cursor [Integer, String, nil] sequence number from which to resume 74 + # 75 + # @raise [ArgumentError] if any of the parameters is invalid 76 + # 77 + 16 78 def initialize(server, endpoint, cursor = nil) 17 79 require_relative 'firehose/message' 18 80 super(server) ··· 24 86 25 87 26 88 protected 89 + 90 + # Returns the full URL of the websocket endpoint to connect to. 91 + # @return [String] 27 92 28 93 def build_websocket_url 29 94 @root_url + "/xrpc/" + @endpoint + (@cursor ? "?cursor=#{@cursor}" : "") 30 95 end 96 + 97 + # Processes a single message received from the websocket. Passes the received data to the 98 + # {#on_raw_message} handler, builds a {Skyfall::Firehose::Message} object, and passes it to 99 + # the {#on_message} handler (if defined). Also updates the {#cursor} to this message's sequence 100 + # number (note: this is skipped if {#on_message} is not set). 101 + # 102 + # @param msg 103 + # {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/API/MessageEvent Faye::WebSocket::API::MessageEvent} 104 + # @return [nil] 31 105 32 106 def handle_message(msg) 33 107 data = msg.data
+19
lib/skyfall/firehose/account_message.rb
··· 1 1 require_relative '../firehose' 2 2 3 3 module Skyfall 4 + 5 + # 6 + # Firehose message sent when the status of an account changes. This can be: 7 + # 8 + # - an account being created, sending its initial state (should be active) 9 + # - an account being deactivated or suspended 10 + # - an account being restored back to an active state from deactivation/suspension 11 + # - an account being deleted (the status returning `:deleted`) 12 + # 13 + 4 14 class Firehose::AccountMessage < Firehose::Message 15 + 16 + # 17 + # @private 18 + # @param type_object [Hash] first decoded CBOR frame with metadata 19 + # @param data_object [Hash] second decoded CBOR frame with payload 20 + # @raise [DecodeError] if the message doesn't include required data 21 + # 5 22 def initialize(type_object, data_object) 6 23 super 7 24 raise DecodeError.new("Missing event details") if @data_object['active'].nil? ··· 10 27 @status = @data_object['status']&.to_sym 11 28 end 12 29 30 + # @return [Boolean] true if the account is active, false if it's deactivated/suspended etc. 13 31 def active? 14 32 @active 15 33 end 16 34 35 + # @return [Symbol, nil] for inactive accounts, specifies the exact state; nil for active accounts 17 36 attr_reader :status 18 37 end 19 38 end
+16
lib/skyfall/firehose/commit_message.rb
··· 4 4 require_relative 'operation' 5 5 6 6 module Skyfall 7 + 8 + # 9 + # Firehose message which includes one or more operations on records in the repo (a record was 10 + # created, updated or deleted). In most cases this is a single record operation. 11 + # 12 + # Most of the messages received from the firehose are of this type, and this is the type you 13 + # will usually be most interested in. 14 + # 15 + 7 16 class Firehose::CommitMessage < Firehose::Message 17 + 18 + # @return [CID] CID (Content Identifier) of the commit 8 19 def commit 9 20 @commit ||= @data_object['commit'] && CID.from_cbor_tag(@data_object['commit']) 10 21 end 11 22 23 + # @return [Skyfall::CarArchive] commit data in the form of a parsed CAR archive 12 24 def blocks 13 25 @blocks ||= CarArchive.new(@data_object['blocks']) 14 26 end 15 27 28 + # @return [Array<Firehose::Operation>] record operations (usually one) included in the commit 16 29 def operations 17 30 @operations ||= @data_object['ops'].map { |op| Firehose::Operation.new(self, op) } 18 31 end 19 32 33 + # Looks up record data assigned to a given operation in the commit's CAR archive. 34 + # @param op [Firehose::Operation] 35 + # @return [Hash, nil] 20 36 def raw_record_for_operation(op) 21 37 op.cid && blocks.section_with_cid(op.cid) 22 38 end
+14
lib/skyfall/firehose/identity_message.rb
··· 1 1 require_relative '../firehose' 2 2 3 3 module Skyfall 4 + 5 + # 6 + # Firehose message sent when a new DID is created or when the details of someone's DID document 7 + # are changed (usually either a handle change or a migration to a different PDS). The message 8 + # should include currently assigned handle (though the field is not required). 9 + # 10 + # Note: the message is originally emitted from the account's PDS and is passed as is by relays, 11 + # which means you can't fully trust that the handle is actually correctly assigned to the DID 12 + # and verified by DNS or well-known. To confirm that, use `DID.resolve_handle` from 13 + # [DIDKit](https://ruby.sdk.blue/didkit/). 14 + # 15 + 4 16 class Firehose::IdentityMessage < Firehose::Message 17 + 18 + # @return [String, nil] current handle assigned to the DID 5 19 def handle 6 20 @data_object['handle'] 7 21 end
+28 -1
lib/skyfall/firehose/info_message.rb
··· 1 1 require_relative '../firehose' 2 2 3 3 module Skyfall 4 + 5 + # 6 + # An informational firehose message from the websocket service itself, unrelated to any repos. 7 + # 8 + # Currently there is only one type of message defined, `"OutdatedCursor"`, which is sent when 9 + # the client connects with a cursor that is older than the oldest event currently kept in the 10 + # backfill buffer. This message means that you're likely missing some events that were sent 11 + # since the last time the client was connected but which were already deleted from the buffer. 12 + # 13 + # Note: the {#did}, {#seq} and {#time} properties are always `nil` for `#info` messages. 14 + # 15 + 4 16 class Firehose::InfoMessage < Firehose::Message 5 - attr_reader :name, :message 17 + 18 + # @return [String] short machine-readable code of the info message 19 + attr_reader :name 20 + 21 + # @return [String, nil] a human-readable description 22 + attr_reader :message 6 23 24 + # Message which means that the cursor passed when connecting is older than the oldest event 25 + # currently kept in the backfill buffer, and that you've likely missed some events that have 26 + # already been deleted 7 27 OUTDATED_CURSOR = "OutdatedCursor" 8 28 29 + # 30 + # @private 31 + # @param type_object [Hash] first decoded CBOR frame with metadata 32 + # @param data_object [Hash] second decoded CBOR frame with payload 33 + # 9 34 def initialize(type_object, data_object) 10 35 super 11 36 ··· 13 38 @message = @data_object['message'] 14 39 end 15 40 41 + # @return [String] a formatted summary 16 42 def to_s 17 43 (@name || "InfoMessage") + (@message ? ": #{@message}" : "") 18 44 end 19 45 20 46 protected 21 47 48 + # @return [Array<Symbol>] list of instance variables to be printed in the {#inspect} output 22 49 def inspectable_variables 23 50 super - [:@did, :@seq] 24 51 end
+16
lib/skyfall/firehose/labels_message.rb
··· 2 2 require_relative '../label' 3 3 4 4 module Skyfall 5 + 6 + # 7 + # A message which includes one or more labels (as {Skyfall::Label}). This type of message 8 + # is only sent from a `:subscribe_labels` firehose from a labeller service. 9 + # 10 + # Note: the {#did} and {#time} properties are always `nil` for `#labels` messages. 11 + # 12 + 5 13 class Firehose::LabelsMessage < Firehose::Message 6 14 15 + # @return [Array<Skyfall::Label>] labels included in the batch 7 16 attr_reader :labels 8 17 18 + # 19 + # @private 20 + # @param type_object [Hash] first decoded CBOR frame with metadata 21 + # @param data_object [Hash] second decoded CBOR frame with payload 22 + # @raise [DecodeError] if the message doesn't include required data 23 + # 9 24 def initialize(type_object, data_object) 10 25 super 11 26 raise DecodeError.new("Missing event details") unless @data_object['labels'].is_a?(Array) ··· 15 30 16 31 protected 17 32 33 + # @return [Array<Symbol>] list of instance variables to be printed in the {#inspect} output 18 34 def inspectable_variables 19 35 super - [:@did] 20 36 end
+73 -3
lib/skyfall/firehose/message.rb
··· 6 6 require 'time' 7 7 8 8 module Skyfall 9 + 10 + # @abstract 11 + # Abstract base class representing a CBOR firehose message. 12 + # 13 + # Actual messages are returned as instances of one of the subclasses of this class, 14 + # depending on the type of message, most commonly as {Skyfall::Firehose::CommitMessage}. 15 + # 16 + # The {new} method is overridden here so that it can be called with a binary data message 17 + # from the websocket, and it parses the type from the appropriate frame and builds an 18 + # instance of a matching subclass. 19 + # 20 + # You normally don't need to call this class directly, unless you're building a custom 21 + # subclass of {Skyfall::Stream}, or reading raw data packets from the websocket through 22 + # the {Skyfall::Stream#on_raw_message} event handler. 23 + 9 24 class Firehose::Message 10 25 using Skyfall::Extensions 11 26 ··· 17 32 require_relative 'sync_message' 18 33 require_relative 'unknown_message' 19 34 20 - attr_reader :type, :did, :seq 35 + # Type of the message (e.g. `:commit`, `:identity` etc.) 36 + # @return [Symbol] 37 + attr_reader :type 38 + 39 + # DID of the account (repo) that the event is sent by. 40 + # @return [String, nil] 41 + attr_reader :did 42 + 43 + # Sequential number of the message, to be used as a cursor when reconnecting. 44 + # @return [Integer, nil] 45 + attr_reader :seq 46 + 21 47 alias repo did 22 48 23 - # :nodoc: - consider this as semi-private API 24 - attr_reader :type_object, :data_object 49 + # First of the two CBOR objects forming the message payload, which mostly just includes the type field. 50 + # @api private 51 + # @return [Hash] 52 + attr_reader :type_object 25 53 54 + # Second of the two CBOR objects forming the message payload, which contains the rest of the data. 55 + # @api private 56 + # @return [Hash] 57 + attr_reader :data_object 58 + 59 + # 60 + # Parses the CBOR objects from the binary data and returns an instance of an appropriate subclass. 61 + # 62 + # {Skyfall::Firehose::UnknownMessage} is returned if the message type is not recognized. 63 + # 64 + # @param data [String] binary payload of a firehose websocket message 65 + # @return [Skyfall::Firehose::Message] 66 + # @raise [Skyfall::DecodeError] if the structure of the message is invalid 67 + # @raise [Skyfall::UnsupportedError] if the message has an unknown future version 68 + # @raise [Skyfall::SubscriptionError] if the data contains an error message from the server 69 + # 26 70 def self.new(data) 27 71 type_object, data_object = decode_cbor_objects(data) 28 72 ··· 41 85 message 42 86 end 43 87 88 + # 89 + # @private 90 + # @param type_object [Hash] first decoded CBOR frame with metadata 91 + # @param data_object [Hash] second decoded CBOR frame with payload 92 + # 44 93 def initialize(type_object, data_object) 45 94 @type_object = type_object 46 95 @data_object = data_object ··· 50 99 @seq = @data_object['seq'] 51 100 end 52 101 102 + # 103 + # List of operations on records included in the message. Only `#commit` messages include 104 + # operations, but for convenience the method is declared here and returns an empty array 105 + # in other messages. 106 + # @return [Array<Firehose::Operation>] 107 + # 53 108 def operations 54 109 [] 55 110 end 56 111 112 + # 113 + # @return [Boolean] true if the message is {Firehose::UnknownMessage} (of unrecognized type) 114 + # 57 115 def unknown? 58 116 self.is_a?(Firehose::UnknownMessage) 59 117 end 60 118 119 + # 120 + # Timestamp decoded from the message. 121 + # 122 + # Note: this represents the time when the message was emitted from the original PDS, which 123 + # might differ a lot from the `created_at` time saved in the record data, e.g. if user's local 124 + # time is set incorrectly, or if an archive of existing posts was imported from another platform. 125 + # 126 + # @return [Time, nil] 127 + # 61 128 def time 62 129 @time ||= @data_object['time'] && Time.parse(@data_object['time']) 63 130 end 64 131 132 + # Returns a string with a representation of the object for debugging purposes. 133 + # @return [String] 65 134 def inspect 66 135 vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 67 136 "#<#{self.class}:0x#{object_id} #{vars}>" ··· 70 139 71 140 protected 72 141 142 + # @return [Array<Symbol>] list of instance variables to be printed in the {#inspect} output 73 143 def inspectable_variables 74 144 instance_variables - [:@type_object, :@data_object, :@blocks] 75 145 end
+40
lib/skyfall/firehose/operation.rb
··· 2 2 require_relative '../firehose' 3 3 4 4 module Skyfall 5 + 6 + # 7 + # A single record operation from a firehose commit event. An operation is a new record being 8 + # created, or an existing record modified or deleted. It includes the URI and other details of 9 + # the record in question, type of the action taken, and record data for "created" and "update" 10 + # actions. 11 + # 12 + # Note: when a record is deleted, the previous record data is *not* included in the commit, only 13 + # its URI. This means that if you're tracking records which are referencing other records, e.g. 14 + # follow, block, or like records, you need to store information about this referencing record 15 + # including an URI or rkey, because in case of a delete, you will not get information about which 16 + # post was unliked or which account was unfollowed, only which like/follow record was deleted. 17 + # 18 + # At the moment, Skyfall doesn't parse the record data into any rich models specific for a given 19 + # record type with a convenient API, but simply returns them as `Hash` objects (see {#raw_record}). 20 + # In the future, a separate `#record` method might be added which returns a parsed record model. 21 + # 22 + 5 23 class Firehose::Operation 24 + 25 + # 26 + # @param message [Skyfall::Firehose::Message] commit message the operation is included in 27 + # @param json [Hash] operation data 28 + # 6 29 def initialize(message, json) 7 30 @message = message 8 31 @json = json 9 32 end 10 33 34 + # @return [String] DID of the account/repository in which the operation happened 11 35 def repo 12 36 @message.repo 13 37 end 14 38 15 39 alias did repo 16 40 41 + # @return [String] path part of the record URI (collection + rkey) 17 42 def path 18 43 @json['path'] 19 44 end 20 45 46 + # @return [Symbol] type of the operation (`:create`, `:update` or `:delete`) 21 47 def action 22 48 @json['action'].to_sym 23 49 end 24 50 51 + # @return [String] record collection NSID 25 52 def collection 26 53 @json['path'].split('/')[0] 27 54 end 28 55 56 + # @return [String] record rkey 29 57 def rkey 30 58 @json['path'].split('/')[1] 31 59 end 32 60 61 + # @return [String] full AT URI of the record 33 62 def uri 34 63 "at://#{repo}/#{path}" 35 64 end 36 65 66 + # @return [CID, nil] CID (Content Identifier) of the record (nil for delete operations) 37 67 def cid 38 68 @cid ||= @json['cid'] && CID.from_cbor_tag(@json['cid']) 39 69 end 40 70 71 + # @return [Hash, nil] record data as a plain Ruby Hash (nil for delete operations) 41 72 def raw_record 42 73 @raw_record ||= @message.raw_record_for_operation(self) 43 74 end 44 75 76 + # Symbol short code of the collection, like `:bsky_post`. If the collection NSID is not 77 + # recognized, the type is `:unknown`. The full NSID is always available through the 78 + # `#collection` property. 79 + # 80 + # @return [Symbol] 81 + # @see Skyfall::Collection 82 + # 45 83 def type 46 84 Collection.short_code(collection) 47 85 end 48 86 87 + # Returns a string with a representation of the object for debugging purposes. 88 + # @return [String] 49 89 def inspect 50 90 vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 51 91 "#<#{self.class}:0x#{object_id} #{vars}>"
+5
lib/skyfall/firehose/unknown_message.rb
··· 1 1 require_relative '../firehose' 2 2 3 3 module Skyfall 4 + 5 + # 6 + # Firehose message of an unrecognized type. 7 + # 8 + 4 9 class Firehose::UnknownMessage < Firehose::Message 5 10 end 6 11 end
+81
lib/skyfall/jetstream.rb
··· 5 5 require 'uri' 6 6 7 7 module Skyfall 8 + 9 + # 10 + # Client of a Jetstream service (JSON-based firehose). 11 + # 12 + # This is an equivalent of {Skyfall::Firehose} for Jetstream sources, mirroring its API. 13 + # It returns messages as instances of subclasses of {Skyfall::Jetstream::Message}, which 14 + # are generally equivalent to the respective {Skyfall::Firehose::Message} variants as much 15 + # as possible. 16 + # 17 + # To connect to a Jetstream websocket, you need to: 18 + # 19 + # * create an instance of Jetstream, passing it the hostname/URL of the server, and optionally 20 + # parameters such as cursor or collection/DID filters 21 + # * set up callbacks to be run when connecting, disconnecting, when a message is received etc. 22 + # (you need to set at least a message handler) 23 + # * call {#connect} to start the connection 24 + # * handle the received messages 25 + # 26 + # @example 27 + # client = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', { 28 + # wanted_collections: 'app.bsky.feed.post', 29 + # wanted_dids: @dids 30 + # }) 31 + # 32 + # client.on_message do |msg| 33 + # next unless msg.type == :commit 34 + # 35 + # op = msg.operation 36 + # 37 + # if op.type == :bsky_post && op.action == :create 38 + # puts "[#{msg.time}] #{msg.repo}: #{op.raw_record['text']}" 39 + # end 40 + # end 41 + # 42 + # client.connect 43 + # 44 + # # You might also want to set some or all of these lifecycle callback handlers: 45 + # 46 + # client.on_connecting { |url| puts "Connecting to #{url}..." } 47 + # client.on_connect { puts "Connected" } 48 + # client.on_disconnect { puts "Disconnected" } 49 + # client.on_reconnect { puts "Connection lost, trying to reconnect..." } 50 + # client.on_timeout { puts "Connection stalled, triggering a reconnect..." } 51 + # client.on_error { |e| puts "ERROR: #{e}" } 52 + # 53 + # @note Most of the methods of this class that you might want to use are defined in {Skyfall::Stream}. 54 + # 55 + 8 56 class Jetstream < Stream 57 + 58 + # Current cursor (time of the last seen message) 59 + # @return [Integer, nil] 9 60 attr_accessor :cursor 10 61 62 + # 63 + # @param server [String] Address of the server to connect to. 64 + # Expects a string with either just a hostname, or a ws:// or wss:// URL with no path. 65 + # @param params [Hash] options, see below: 66 + # 67 + # @option params [Integer] :cursor 68 + # cursor from which to resume 69 + # 70 + # @option params [Array<String>] :wanted_dids 71 + # DID filter to pass to the server (`:wantedDids` is also accepted); 72 + # value should be a DID string or an array of those 73 + # 74 + # @option params [Array<String, Symbol>] :wanted_collections 75 + # collection filter to pass to the server (`:wantedCollections` is also accepted); 76 + # value should be an NSID string or a symbol shorthand, or an array of those 77 + # 78 + # @raise [ArgumentError] if the server parameter or the options are invalid 79 + # 11 80 def initialize(server, params = {}) 12 81 require_relative 'jetstream/message' 13 82 super(server) ··· 20 89 21 90 protected 22 91 92 + # Returns the full URL of the websocket endpoint to connect to. 93 + # @return [String] 94 + 23 95 def build_websocket_url 24 96 params = @cursor ? @params.merge(cursor: @cursor) : @params 25 97 query = URI.encode_www_form(params) 26 98 27 99 @root_url + "/subscribe" + (query.length > 0 ? "?#{query}" : '') 28 100 end 101 + 102 + # Processes a single message received from the websocket. Passes the received data to the 103 + # {#on_raw_message} handler, builds a {Skyfall::Jetstream::Message} object, and passes it to 104 + # the {#on_message} handler (if defined). Also updates the {#cursor} to this message's 105 + # microsecond timestamp (note: this is skipped if {#on_message} is not set). 106 + # 107 + # @param msg 108 + # {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/API/MessageEvent Faye::WebSocket::API::MessageEvent} 109 + # @return [nil] 29 110 30 111 def handle_message(msg) 31 112 data = msg.data
+17
lib/skyfall/jetstream/account_message.rb
··· 2 2 require_relative '../jetstream' 3 3 4 4 module Skyfall 5 + 6 + # 7 + # Jetstream message sent when the status of an account changes. This can be: 8 + # 9 + # - an account being created, sending its initial state (should be active) 10 + # - an account being deactivated or suspended 11 + # - an account being restored back to an active state from deactivation/suspension 12 + # - an account being deleted (the status returning `:deleted`) 13 + # 14 + 5 15 class Jetstream::AccountMessage < Jetstream::Message 16 + 17 + # 18 + # @param json [Hash] message JSON decoded from the websocket message 19 + # @raise [DecodeError] if the message doesn't include required data 20 + # 6 21 def initialize(json) 7 22 raise DecodeError.new("Missing event details") if json['account'].nil? 8 23 super 9 24 end 10 25 26 + # @return [Boolean] true if the account is active, false if it's deactivated/suspended etc. 11 27 def active? 12 28 @json['account']['active'] 13 29 end 14 30 31 + # @return [Symbol, nil] for inactive accounts, specifies the exact state; nil for active accounts 15 32 def status 16 33 @json['account']['status']&.to_sym 17 34 end
+21
lib/skyfall/jetstream/commit_message.rb
··· 3 3 require_relative 'operation' 4 4 5 5 module Skyfall 6 + 7 + # 8 + # Jetstream message which includes a single operation on a record in the repo (a record was 9 + # created, updated or deleted). Most of the messages received from Jetstream are of this type, 10 + # and this is the type you will usually be most interested in. 11 + # 12 + 6 13 class Jetstream::CommitMessage < Jetstream::Message 14 + 15 + # 16 + # @param json [Hash] message JSON decoded from the websocket message 17 + # @raise [DecodeError] if the message doesn't include required data 18 + # 7 19 def initialize(json) 8 20 raise DecodeError.new("Missing event details") if json['commit'].nil? 9 21 super 10 22 end 11 23 24 + # Returns the record operation included in the commit. 25 + # @return [Jetstream::Operation] 26 + # 12 27 def operation 13 28 @operation ||= Jetstream::Operation.new(self, json['commit']) 14 29 end 15 30 16 31 alias op operation 17 32 33 + # Returns record operations included in the commit. Currently a `:commit` message from 34 + # Jetstream always includes exactly one operation, but for compatibility with 35 + # {Skyfall::Firehose}'s API it's also returned in an array here. 36 + # 37 + # @return [Array<Jetstream::Operation>] 38 + # 18 39 def operations 19 40 [operation] 20 41 end
+18
lib/skyfall/jetstream/identity_message.rb
··· 2 2 require_relative '../jetstream' 3 3 4 4 module Skyfall 5 + 6 + # 7 + # Jetstream message sent when a new DID is created or when the details of someone's DID document 8 + # are changed (usually either a handle change or a migration to a different PDS). The message 9 + # should include currently assigned handle (though the field is not required). 10 + # 11 + # Note: the message is originally emitted from the account's PDS and is passed as is by relays, 12 + # which means you can't fully trust that the handle is actually correctly assigned to the DID 13 + # and verified by DNS or well-known. To confirm that, use `DID.resolve_handle` from 14 + # [DIDKit](https://ruby.sdk.blue/didkit/). 15 + # 16 + 5 17 class Jetstream::IdentityMessage < Jetstream::Message 18 + 19 + # 20 + # @param json [Hash] message JSON decoded from the websocket message 21 + # @raise [DecodeError] if the message doesn't include required data 22 + # 6 23 def initialize(json) 7 24 raise DecodeError.new("Missing event details") if json['identity'].nil? 8 25 super 9 26 end 10 27 28 + # @return [String, nil] current handle assigned to the DID 11 29 def handle 12 30 @json['identity']['handle'] 13 31 end
+69 -2
lib/skyfall/jetstream/message.rb
··· 4 4 require 'time' 5 5 6 6 module Skyfall 7 + 8 + # @abstract 9 + # Abstract base class representing a Jetstream message. 10 + # 11 + # Actual messages are returned as instances of one of the subclasses of this class, 12 + # depending on the type of message, most commonly as {Skyfall::Jetstream::CommitMessage}. 13 + # 14 + # The {new} method is overridden here so that it can be called with a JSON message from 15 + # the websocket, and it parses the type from the JSON and builds an instance of a matching 16 + # subclass. 17 + # 18 + # You normally don't need to call this class directly, unless you're building a custom 19 + # subclass of {Skyfall::Stream} or reading raw data packets from the websocket through 20 + # the {Skyfall::Stream#on_raw_message} event handler. 21 + 7 22 class Jetstream::Message 8 23 require_relative 'account_message' 9 24 require_relative 'commit_message' 10 25 require_relative 'identity_message' 11 26 require_relative 'unknown_message' 12 27 13 - attr_reader :did, :type, :time_us 28 + # Type of the message (e.g. `:commit`, `:identity` etc.) 29 + # @return [Symbol] 30 + attr_reader :type 31 + 32 + # DID of the account (repo) that the event is sent by 33 + # @return [String] 34 + attr_reader :did 35 + 36 + # Server timestamp of the message (in Unix time microseconds), which serves as a cursor 37 + # when reconnecting; an equivalent of {Skyfall::Firehose::Message#seq} in CBOR firehose 38 + # messages. 39 + # @return [Integer] 40 + attr_reader :time_us 41 + 14 42 alias repo did 15 43 alias seq time_us 16 44 17 - # :nodoc: - consider this as semi-private API 45 + # The raw JSON of the message as parsed from the websocket packet. 18 46 attr_reader :json 19 47 48 + # 49 + # Parses the JSON data from a websocket message and returns an instance of an appropriate subclass. 50 + # 51 + # {Skyfall::Jetstream::UnknownMessage} is returned if the message type is not recognized. 52 + # 53 + # @param data [String] plain text payload of a Jetstream websocket message 54 + # @return [Skyfall::Jetstream::Message] 55 + # @raise [DecodeError] if the message doesn't include required data 56 + # 20 57 def self.new(data) 21 58 json = JSON.parse(data) 22 59 ··· 32 69 message 33 70 end 34 71 72 + # 73 + # @param json [Hash] message JSON decoded from the websocket message 74 + # 35 75 def initialize(json) 36 76 @json = json 37 77 @type = @json['kind'].to_sym ··· 39 79 @time_us = @json['time_us'] 40 80 end 41 81 82 + # 83 + # @return [Boolean] true if the message is {Jetstream::UnknownMessage} (of unrecognized type) 84 + # 42 85 def unknown? 43 86 self.is_a?(Jetstream::UnknownMessage) 44 87 end 45 88 89 + # Returns a record operation included in the message. Only `:commit` messages include 90 + # operations, but for convenience the method is declared here and returns nil in other messages. 91 + # 92 + # @return [nil] 93 + # 46 94 def operation 47 95 nil 48 96 end 49 97 50 98 alias op operation 51 99 100 + # List of operations on records included in the message. Only `:commit` messages include 101 + # operations, but for convenience the method is declared here and returns an empty array 102 + # in other messages. 103 + # 104 + # @return [Array<Jetstream::Operation>] 105 + # 52 106 def operations 53 107 [] 54 108 end 55 109 110 + # 111 + # Timestamp decoded from the message. 112 + # 113 + # Note: the time is read from the {#time_us} field, which stores the event time as an integer in 114 + # Unix time microseconds, and which is used as an equivalent of {Skyfall::Firehose::Message#seq} 115 + # in CBOR firehose messages. This timestamp represents the time when the message was received 116 + # and stored by Jetstream, which might differ a lot from the `created_at` time saved in the 117 + # record data, e.g. if user's local time is set incorrectly or if an archive of existing posts 118 + # was imported from another platform. It will also differ (usually only slightly) from the 119 + # timestamp of the original CBOR message emitted from the PDS and passed through the relay. 120 + # 121 + # @return [Time] 122 + # 56 123 def time 57 124 @time ||= @json['time_us'] && Time.at(@json['time_us'] / 1_000_000.0) 58 125 end
+40
lib/skyfall/jetstream/operation.rb
··· 2 2 require_relative '../jetstream' 3 3 4 4 module Skyfall 5 + 6 + # 7 + # A single record operation from a Jetstream commit event. An operation is a new record being 8 + # created, or an existing record modified or deleted. It includes the URI and other details of 9 + # the record in question, type of the action taken, and record data for "created" and "update" 10 + # actions. 11 + # 12 + # Note: when a record is deleted, the previous record data is *not* included in the commit, only 13 + # its URI. This means that if you're tracking records which are referencing other records, e.g. 14 + # follow, block, or like records, you need to store information about this referencing record 15 + # including an URI or rkey, because in case of a delete, you will not get information about which 16 + # post was unliked or which account was unfollowed, only which like/follow record was deleted. 17 + # 18 + # At the moment, Skyfall doesn't parse the record data into any rich models specific for a given 19 + # record type with a convenient API, but simply returns them as `Hash` objects (see {#raw_record}). 20 + # In the future, a second `#record` method might be added which returns a parsed record model. 21 + # 22 + 5 23 class Jetstream::Operation 24 + 25 + # 26 + # @param message [Skyfall::Jetstream::Message] commit message the operation is parsed from 27 + # @param json [Hash] operation data 28 + # 6 29 def initialize(message, json) 7 30 @message = message 8 31 @json = json 9 32 end 10 33 34 + # @return [String] DID of the account/repository in which the operation happened 11 35 def repo 12 36 @message.repo 13 37 end 14 38 15 39 alias did repo 16 40 41 + # @return [String] path part of the record URI (collection + rkey) 17 42 def path 18 43 @json['collection'] + '/' + @json['rkey'] 19 44 end 20 45 46 + # @return [Symbol] type of the operation (`:create`, `:update` or `:delete`) 21 47 def action 22 48 @json['operation'].to_sym 23 49 end 24 50 51 + # @return [String] record collection NSID 25 52 def collection 26 53 @json['collection'] 27 54 end 28 55 56 + # @return [String] record rkey 29 57 def rkey 30 58 @json['rkey'] 31 59 end 32 60 61 + # @return [String] full AT URI of the record 33 62 def uri 34 63 "at://#{repo}/#{collection}/#{rkey}" 35 64 end 36 65 66 + # @return [CID, nil] CID (Content Identifier) of the record (nil for delete operations) 37 67 def cid 38 68 @cid ||= @json['cid'] && CID.from_json(@json['cid']) 39 69 end 40 70 71 + # @return [Hash, nil] record data as a plain Ruby Hash (nil for delete operations) 41 72 def raw_record 42 73 @json['record'] 43 74 end 44 75 76 + # Symbol short code of the collection, like `:bsky_post`. If the collection NSID is not 77 + # recognized, the type is `:unknown`. The full NSID is always available through the 78 + # `#collection` property. 79 + # 80 + # @return [Symbol] 81 + # @see Skyfall::Collection 82 + # 45 83 def type 46 84 Collection.short_code(collection) 47 85 end 48 86 87 + # Returns a string with a representation of the object for debugging purposes. 88 + # @return [String] 49 89 def inspect 50 90 vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 51 91 "#<#{self.class}:0x#{object_id} #{vars}>"
+5
lib/skyfall/jetstream/unknown_message.rb
··· 1 1 require_relative '../jetstream' 2 2 3 3 module Skyfall 4 + 5 + # 6 + # Jetstream message of an unrecognized type. 7 + # 8 + 4 9 class Jetstream::UnknownMessage < Jetstream::Message 5 10 end 6 11 end
+31
lib/skyfall/label.rb
··· 2 2 require 'time' 3 3 4 4 module Skyfall 5 + 6 + # 7 + # A single label emitted from the "subscribeLabels" firehose of a labeller service. 8 + # 9 + # The label assigns some specific value - from a list of available values defined by this 10 + # labeller - to a specific target (at:// URI or a DID). In general, this will usually be either 11 + # a "badge" that a user requested to be assigned to themselves from a fun/informative labeller, 12 + # or some kind of (likely negative) label assigned to a user or post by a moderation labeller. 13 + # 14 + # You generally don't need to create instances of this class manually, but will receive them 15 + # from {Skyfall::Firehose} that's connected to `:subscribe_labels` in the {Stream#on_message} 16 + # callback handler (wrapped in a {Skyfall::Firehose::LabelsMessage}). 17 + # 18 + 5 19 class Label 20 + 21 + # @return [Hash] the label's JSON data 6 22 attr_reader :data 7 23 24 + # 25 + # @param data [Hash] raw label JSON 26 + # @raise [Skyfall::DecodeError] if the data has an invalid format 27 + # @raise [Skyfall::UnsupportedError] if the label is in an unsupported future version 28 + # 8 29 def initialize(data) 9 30 @data = data 10 31 ··· 20 41 raise DecodeError.new("Invalid uri: #{uri}") unless uri.start_with?('at://') || uri.start_with?('did:') 21 42 end 22 43 44 + # @return [Integer] label format version number 23 45 def version 24 46 @data['ver'] 25 47 end 26 48 49 + # DID of the labelling authority (the labeller service). 50 + # @return [String] 27 51 def authority 28 52 @data['src'] 29 53 end 30 54 55 + # AT URI or DID of the labelled subject (e.g. a user or post). 56 + # @return [String] 31 57 def subject 32 58 @data['uri'] 33 59 end 34 60 61 + # @return [CID, nil] CID of the specific version of the subject that this label applies to 35 62 def cid 36 63 @cid ||= @data['cid'] && CID.from_json(@data['cid']) 37 64 end 38 65 66 + # @return [String] label value 39 67 def value 40 68 @data['val'] 41 69 end 42 70 71 + # @return [Boolean] if true, then this is a negation (delete) of an existing label 43 72 def negation? 44 73 !!@data['neg'] 45 74 end 46 75 76 + # @return [Time] timestamp when the label was created 47 77 def created_at 48 78 @created_at ||= Time.parse(@data['cts']) 49 79 end 50 80 81 + # @return [Time, nil] optional timestamp when the label expires 51 82 def expires_at 52 83 @expires_at ||= @data['exp'] && Time.parse(@data['exp']) 53 84 end
+140 -2
lib/skyfall/stream.rb
··· 5 5 require_relative 'version' 6 6 7 7 module Skyfall 8 + 9 + # Base class of a websocket client. It provides basic websocket client functionality such as 10 + # connecting to the service, keeping the connection alive and running lifecycle callbacks. 11 + # 12 + # In most cases, you will not create instances of this class directly, but rather use either 13 + # {Firehose} or {Jetstream}. Use this class as a superclass if you need to implement some 14 + # custom client for a websocket API that isn't supported yet. 15 + 8 16 class Stream 9 17 EVENTS = %w(message raw_message connecting connect disconnect reconnect error timeout) 10 18 MAX_RECONNECT_INTERVAL = 300 11 19 12 - attr_accessor :auto_reconnect, :last_update, :user_agent 13 - attr_accessor :heartbeat_timeout, :heartbeat_interval, :check_heartbeat 20 + # If enabled, the client will try to reconnect if the connection is closed unexpectedly. 21 + # (Default: true) 22 + # 23 + # When the reconnect attempt fails, it will wait with an exponential backoff delay before 24 + # retrying again, up to {MAX_RECONNECT_INTERVAL} seconds. 25 + # 26 + # @return [Boolean] 27 + attr_accessor :auto_reconnect 28 + 29 + # User agent sent in the header when connecting. 30 + # 31 + # Default value is {#default_user_agent} = {#version_string} `(Skyfall/x.y)`. It's recommended 32 + # to set it or extend it with some information that indicates what service this is and who is 33 + # running it (e.g. a Bluesky handle). 34 + # 35 + # @return [String] 36 + # @example 37 + # client.user_agent = "my.service (@my.handle) #{client.version_string}" 38 + attr_accessor :user_agent 39 + 40 + # If enabled, runs a timer which does periodical "heatbeat checks". 41 + # 42 + # The heartbeat timer is started when the client connects to the service, and checks if the stream 43 + # hasn't stalled and is still regularly sending new messages. If no messages are detected for some 44 + # period of time, the client forces a reconnect. 45 + # 46 + # This is **not** enabled by default, because depending on the service you're connecting to, it 47 + # might be normal to not receive any messages for a while. 48 + # 49 + # @see #heartbeat_timeout 50 + # @see #heartbeat_interval 51 + # @return [Boolean] 52 + attr_accessor :check_heartbeat 53 + 54 + # Interval in seconds between heartbeat checks (default: 10). Only used if {#check_heartbeat} is set. 55 + # @return [Numeric] 56 + attr_accessor :heartbeat_interval 57 + 58 + # Number of seconds without messages after which reconnect is triggered (default: 300). 59 + # Only used if {#check_heartbeat} is set. 60 + # @return [Numeric] 61 + attr_accessor :heartbeat_timeout 62 + 63 + # Time when the most recent message was received from the websocket. 64 + # 65 + # Note: this is _local time_ when the message was received; this is different from the timestamp 66 + # of the message, which is the server time of the original source (PDS) when emitting the message, 67 + # and different from a potential `created_at` saved in the record. 68 + # 69 + # @return [Time, nil] 70 + attr_accessor :last_update 14 71 72 + # 73 + # @param server [String] Address of the server to connect to. 74 + # Expects a string with either just a hostname, or a ws:// or wss:// URL. 75 + # 76 + # @raise [ArgumentError] if the server parameter is invalid 77 + # 15 78 def initialize(server) 16 79 @root_url = build_root_url(server) 17 80 ··· 27 90 @handlers[:error] = proc { |e| puts "ERROR: #{e}" } 28 91 end 29 92 93 + # 94 + # Opens a connection to the configured websocket. 95 + # 96 + # This method starts an EventMachine reactor on the current thread, and will only return 97 + # once the connection is closed. 98 + # 99 + # @return [nil] 100 + # @raise [ReactorActiveError] if another stream is already running 101 + # 30 102 def connect 31 103 return if @ws 32 104 ··· 86 158 end 87 159 end 88 160 161 + # 162 + # Forces a reconnect, closing the connection and calling {#connect} again. 163 + # @return [nil] 164 + # 89 165 def reconnect 90 166 @reconnecting = true 91 167 @connection_attempts = 0 ··· 93 169 @ws ? @ws.close : connect 94 170 end 95 171 172 + # 173 + # Closes the connection and stops the EventMachine reactor thread. 174 + # @return [nil] 175 + # 96 176 def disconnect 97 177 return unless EM.reactor_running? 98 178 ··· 103 183 104 184 alias close disconnect 105 185 186 + # 187 + # Default user agent sent when connecting to the service. (Currently `"#{version_string}"`) 188 + # @return [String] 189 + # 106 190 def default_user_agent 107 191 version_string 108 192 end 109 193 194 + # 195 + # Skyfall version string for use in user agent strings (`"Skyfall/x.y"`). 196 + # @return [String] 197 + # 110 198 def version_string 111 199 "Skyfall/#{Skyfall::VERSION}" 112 200 end ··· 132 220 end 133 221 134 222 223 + # Returns a string with a representation of the object for debugging purposes. 224 + # @return [String] 135 225 def inspect 136 226 vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 137 227 "#<#{self.class}:0x#{object_id} #{vars}>" ··· 140 230 141 231 protected 142 232 233 + # @note This method is designed to be overridden in subclasses. 234 + # 235 + # Returns the full URL of the websocket endpoint to connect to, with path and query parameters 236 + # if needed. The base implementation simply returns the base URL passed to the initializer. 237 + # 238 + # Override this method in subclasses to point to the specific endpoint and add necessary 239 + # parameters like cursor or filters, depending on the arguments passed to the constructor. 240 + # 241 + # @return [String] 242 + 143 243 def build_websocket_url 144 244 @root_url 145 245 end 146 246 247 + # Builds and configures a websocket client object that is used to connect to the requested service. 248 + # 249 + # @return [Faye::WebSocket::Client] 250 + # see {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/Client Faye::WebSocket::Client} 251 + 147 252 def build_websocket_client(url) 148 253 Faye::WebSocket::Client.new(url, nil, { headers: { 'User-Agent' => user_agent }.merge(request_headers) }) 149 254 end 150 255 256 + # @note This method is designed to be overridden in subclasses. 257 + # 258 + # Processes a single message received from the websocket. The implementation is expected to 259 + # parse the message from a plain text or binary form, build an appropriate message object, 260 + # and call the `:message` and/or `:raw_message` callback handlers, passing the right parameters. 261 + # 262 + # The base implementation simply takes the message data and passes it as is to `:raw_message`, 263 + # and does not call `:message` at all. 264 + # 265 + # @param msg 266 + # {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/API/MessageEvent Faye::WebSocket::API::MessageEvent} 267 + # @return [nil] 268 + 151 269 def handle_message(msg) 152 270 data = msg.data 153 271 @handlers[:raw_message]&.call(data) 154 272 end 155 273 274 + # Additional headers to pass with the request when connecting to the websocket endpoint. 275 + # The user agent header (built from {#user_agent}) is added separately. 276 + # 277 + # The base implementation returns an empty hash. 278 + # 279 + # @return [Hash] a hash of `{ header_name => header_value }` 280 + 156 281 def request_headers 157 282 {} 158 283 end 159 284 285 + # Returns the underlying websocket client object. It can be used e.g. to send messages back 286 + # to the server (but see also: {#send_data}). 287 + # 288 + # @return [Faye::WebSocket::Client] 289 + # see {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/Client Faye::WebSocket::Client} 290 + 160 291 def socket 161 292 @ws 162 293 end 163 294 295 + # Sends a message back to the server. 296 + # 297 + # @param data [String, Array] the message to send - 298 + # a string for text websockets, a binary string or byte array for binary websockets 299 + # @return [Boolean] true if the message was sent successfully 300 + 164 301 def send_data(data) 165 302 @ws.send(data) 166 303 end 167 304 305 + # @return [Array<Symbol>] list of instance variables to be printed in the {#inspect} output 168 306 def inspectable_variables 169 307 instance_variables - [:@handlers, :@ws] 170 308 end