A Ruby gem for streaming data from the Bluesky/ATProto firehose

added optional speedup patch for websocket-driver

not having to first convert an ascii string to a byte array and then back into an ascii string saves a *ton* of time

+48
+48
lib/skyfall/faye_ext.rb
··· 1 + require 'websocket/driver' 2 + require_relative 'firehose' 3 + 4 + module WebSocket 5 + class Driver 6 + class Hybi 7 + def emit_message 8 + message = @extensions.process_incoming_message(@message) 9 + @message = nil 10 + 11 + payload = message.data 12 + 13 + case message.opcode 14 + when OPCODES[:text] then 15 + payload = Driver.encode(payload, Encoding::UTF_8) 16 + payload = nil unless payload.valid_encoding? 17 + # when OPCODES[:binary] 18 + # payload = payload.bytes.to_a 19 + end 20 + 21 + if payload 22 + emit(:message, MessageEvent.new(payload)) 23 + else 24 + fail(:encoding_error, 'Could not decode a text frame as UTF-8') 25 + end 26 + rescue ::WebSocket::Extensions::ExtensionError => error 27 + fail(:extension_error, error.message) 28 + end 29 + end 30 + end 31 + end 32 + 33 + module Skyfall 34 + class Firehose 35 + def handle_message(msg) 36 + data = msg.data #.pack('C*') 37 + @handlers[:raw_message]&.call(data) 38 + 39 + if @handlers[:message] 40 + atp_message = Message.new(data) 41 + @cursor = atp_message.seq 42 + @handlers[:message].call(atp_message) 43 + else 44 + @cursor = nil 45 + end 46 + end 47 + end 48 + end