A Ruby gem for streaming data from the Bluesky/ATProto firehose

more detailed message validations

+56 -15
+1 -1
lib/skyfall/firehose/account_message.rb
··· 21 21 # 22 22 def initialize(type_object, data_object) 23 23 super 24 - raise DecodeError.new("Missing event details") if @data_object['active'].nil? 24 + check_if_not_nil :seq, :did, :time, :active 25 25 26 26 @active = @data_object['active'] 27 27 @status = @data_object['status']&.to_sym
+12 -1
lib/skyfall/firehose/commit_message.rb
··· 15 15 16 16 class Firehose::CommitMessage < Firehose::Message 17 17 18 + # 19 + # @private 20 + # @param type_object [Hash] first decoded CBOR frame with metadata 21 + # @param data_object [Hash] second decoded CBOR frame with payload 22 + # @raise [DecodeError] if the message doesn't include required data 23 + # 24 + def initialize(type_object, data_object) 25 + super 26 + check_if_not_nil :seq, :repo, :commit, :blocks, :ops, :time 27 + end 28 + 18 29 # @return [CID] CID (Content Identifier) of the commit 19 30 def commit 20 - @commit ||= @data_object['commit'] && CID.from_cbor_tag(@data_object['commit']) 31 + @commit ||= CID.from_cbor_tag(@data_object['commit']) 21 32 end 22 33 23 34 # @return [Skyfall::CarArchive] commit data in the form of a parsed CAR archive
+14 -3
lib/skyfall/firehose/identity_message.rb
··· 15 15 16 16 class Firehose::IdentityMessage < Firehose::Message 17 17 18 - # @return [String, nil] current handle assigned to the DID 19 - def handle 20 - @data_object['handle'] 18 + # 19 + # @private 20 + # @param type_object [Hash] first decoded CBOR frame with metadata 21 + # @param data_object [Hash] second decoded CBOR frame with payload 22 + # @raise [DecodeError] if the message doesn't include required data 23 + # 24 + def initialize(type_object, data_object) 25 + super 26 + check_if_not_nil :seq, :did, :time 27 + 28 + @handle = @data_object['handle'] 21 29 end 30 + 31 + # @return [String, nil] current handle assigned to the DID 32 + attr_reader :handle 22 33 end 23 34 end
+2
lib/skyfall/firehose/info_message.rb
··· 30 30 # @private 31 31 # @param type_object [Hash] first decoded CBOR frame with metadata 32 32 # @param data_object [Hash] second decoded CBOR frame with payload 33 + # @raise [DecodeError] if the message doesn't include required data 33 34 # 34 35 def initialize(type_object, data_object) 35 36 super 37 + check_if_not_nil :name 36 38 37 39 @name = @data_object['name'] 38 40 @message = @data_object['message']
+1 -1
lib/skyfall/firehose/labels_message.rb
··· 23 23 # 24 24 def initialize(type_object, data_object) 25 25 super 26 - raise DecodeError.new("Missing event details") unless @data_object['labels'].is_a?(Array) 26 + check_if_not_nil :seq, :labels 27 27 28 28 @labels = @data_object['labels'].map { |x| Label.new(x) } 29 29 end
+16 -5
lib/skyfall/firehose/message.rb
··· 145 145 instance_variables - [:@type_object, :@data_object, :@blocks] 146 146 end 147 147 148 + # Checks if all required fields are set in the data object. 149 + # @param fields [Array<Symbol, String>] list of fields to check 150 + # @raise [DecodeError] if any of the fields is nil or not set 151 + def check_if_not_nil(*fields) 152 + missing = fields.select { |f| @data_object[f.to_s].nil? } 153 + 154 + raise DecodeError.new("Missing event details (#{missing.map(&:to_s).join(', ')})") if missing.length > 0 155 + end 156 + 148 157 149 158 private 150 159 ··· 163 172 raise SubscriptionError.new(data['error'], data['message']) 164 173 end 165 174 166 - raise DecodeError.new("Invalid object type: #{type}") unless type.is_a?(Hash) 167 - raise UnsupportedError.new("Unexpected CBOR object: #{type}") unless type['op'] == 1 168 - raise DecodeError.new("Missing data: #{type} #{objects.inspect}") unless type['op'] && type['t'] 169 - raise DecodeError.new("Invalid message type: #{type['t']}") unless type['t'].start_with?('#') 170 - raise DecodeError.new("Invalid object type: #{data}") unless data.is_a?(Hash) 175 + raise DecodeError.new("Invalid object type: #{type.inspect}") unless type.is_a?(Hash) 176 + raise DecodeError.new("Missing data: #{type.inspect}") unless type['op'] && type['t'] 177 + raise DecodeError.new("Invalid object type: #{type['op'].inspect}") unless type['op'].is_a?(Integer) 178 + raise DecodeError.new("Invalid object type: #{type['t'].inspect}") unless type['t'].is_a?(String) 179 + raise DecodeError.new("Invalid message type: #{type['t'].inspect}") unless type['t'].start_with?('#') 180 + raise UnsupportedError.new("Unsupported version: #{type['op']}") unless type['op'] == 1 181 + raise DecodeError.new("Invalid object type: #{data.inspect}") unless data.is_a?(Hash) 171 182 172 183 [type, data] 173 184 end
+1 -1
lib/skyfall/jetstream/account_message.rb
··· 19 19 # @raise [DecodeError] if the message doesn't include required data 20 20 # 21 21 def initialize(json) 22 - raise DecodeError.new("Missing event details") if json['account'].nil? 22 + raise DecodeError.new("Missing event details (account)") if json['account'].nil? || json['account']['active'].nil? 23 23 super 24 24 end 25 25
+4 -1
lib/skyfall/jetstream/commit_message.rb
··· 17 17 # @raise [DecodeError] if the message doesn't include required data 18 18 # 19 19 def initialize(json) 20 - raise DecodeError.new("Missing event details") if json['commit'].nil? 20 + raise DecodeError.new("Missing event details (commit)") if json['commit'].nil? 21 + 22 + %w(collection rkey operation).each { |f| raise DecodeError.new("Missing event details (#{f})") if json['commit'][f].nil? } 23 + 21 24 super 22 25 end 23 26
+1 -1
lib/skyfall/jetstream/identity_message.rb
··· 21 21 # @raise [DecodeError] if the message doesn't include required data 22 22 # 23 23 def initialize(json) 24 - raise DecodeError.new("Missing event details") if json['identity'].nil? 24 + raise DecodeError.new("Missing event details (identity)") if json['identity'].nil? 25 25 super 26 26 end 27 27
+4 -1
lib/skyfall/jetstream/message.rb
··· 72 72 73 73 # 74 74 # @param json [Hash] message JSON decoded from the websocket message 75 + # @raise [DecodeError] if the message doesn't include required data 75 76 # 76 77 def initialize(json) 78 + %w(kind did time_us).each { |f| raise DecodeError.new("Missing event details (#{f})") if json[f].nil? } 79 + 77 80 @json = json 78 81 @type = @json['kind'].to_sym 79 82 @did = @json['did'] ··· 122 125 # @return [Time] 123 126 # 124 127 def time 125 - @time ||= @json['time_us'] && Time.at(@json['time_us'] / 1_000_000.0) 128 + @time ||= Time.at(@time_us / 1_000_000.0) 126 129 end 127 130 end 128 131 end