tangled
alpha
login
or
join now
mackuba.eu
/
skyfall
6
fork
atom
A Ruby gem for streaming data from the Bluesky/ATProto firehose
6
fork
atom
overview
issues
pulls
pipelines
removed faye_ext temporary hack
mackuba.eu
9 months ago
3bdc2754
636f9309
-48
1 changed file
expand all
collapse all
unified
split
lib
skyfall
faye_ext.rb
-48
lib/skyfall/faye_ext.rb
···
1
1
-
require 'websocket/driver'
2
2
-
require_relative 'firehose'
3
3
-
4
4
-
module WebSocket
5
5
-
class Driver
6
6
-
class Hybi
7
7
-
def emit_message
8
8
-
message = @extensions.process_incoming_message(@message)
9
9
-
@message = nil
10
10
-
11
11
-
payload = message.data
12
12
-
13
13
-
case message.opcode
14
14
-
when OPCODES[:text] then
15
15
-
payload = Driver.encode(payload, Encoding::UTF_8)
16
16
-
payload = nil unless payload.valid_encoding?
17
17
-
# when OPCODES[:binary]
18
18
-
# payload = payload.bytes.to_a
19
19
-
end
20
20
-
21
21
-
if payload
22
22
-
emit(:message, MessageEvent.new(payload))
23
23
-
else
24
24
-
fail(:encoding_error, 'Could not decode a text frame as UTF-8')
25
25
-
end
26
26
-
rescue ::WebSocket::Extensions::ExtensionError => error
27
27
-
fail(:extension_error, error.message)
28
28
-
end
29
29
-
end
30
30
-
end
31
31
-
end
32
32
-
33
33
-
module Skyfall
34
34
-
class Firehose
35
35
-
def handle_message(msg)
36
36
-
data = msg.data #.pack('C*')
37
37
-
@handlers[:raw_message]&.call(data)
38
38
-
39
39
-
if @handlers[:message]
40
40
-
atp_message = Message.new(data)
41
41
-
@cursor = atp_message.seq
42
42
-
@handlers[:message].call(atp_message)
43
43
-
else
44
44
-
@cursor = nil
45
45
-
end
46
46
-
end
47
47
-
end
48
48
-
end