A tool for measuring the coverage of Bluesky/ATProto relays
1#!/usr/bin/env ruby
2
3require 'bundler/setup'
4require 'json'
5require 'skyfall'
6require 'yaml'
7
8require_relative 'init'
9require_relative 'opts'
10require_relative 'report'
11
12def get_hosts(entries)
13 if entries
14 entries.map { |x| x.is_a?(Hash) ? x['host'] : x }
15 else
16 []
17 end
18end
19
20config = YAML.load(File.read(SOURCES))
21options = parse_options(ARGV)
22
23if options[:relays] || options[:jetstreams]
24 relays = options[:relays] || []
25 jetstreams = options[:jetstreams] || []
26else
27 relays = get_hosts(config['relays'])
28 jetstreams = get_hosts(config['jetstreams'])
29end
30
31maxlen = (relays + jetstreams).map(&:length).max
32
33verbose = options[:verbose]
34duration = options[:duration] || config['duration']&.to_i || 60 * 15
35test_start_time = Time.now
36
37Worker = Struct.new(:host, :type, :pid, :pipe)
38workers = []
39
40sources = relays.map { |h| [:firehose, h] } + jetstreams.map { |h| [:jetstream, h] }
41
42sources.each do |type, host|
43 input, output = IO.pipe
44
45 pid = fork do
46 input.close
47 sky = (type == :firehose) ? Skyfall::Firehose.new(host) : Skyfall::Jetstream.new(host)
48
49 events = 0
50 users = Set.new
51 minute = Time.now.to_i / 60
52 connected = false
53
54 sky.on_message do |msg|
55 events += 1
56 users << msg.did
57
58 if verbose
59 now = Time.now.to_i / 60
60 if now > minute
61 puts "[#{Time.now}] #{host.ljust(maxlen)} | events: #{events.to_s.ljust(8)} | users: #{users.size}"
62 minute = now
63 end
64 end
65 end
66
67 sky.on_connecting { puts "[#{Time.now}] #{host}: Connecting..." }
68 sky.on_connect { puts "[#{Time.now}] #{host}: Connected ✓"; connected = true }
69 sky.on_error { |e| puts "[#{Time.now}] #{host}: ERROR: #{e.message}" }
70
71 trap('SIGINT') { sky.disconnect }
72
73 sky.connect
74 puts "[#{Time.now}] #{host}: Finished."
75
76 output.puts(JSON.generate({ events: events, users: users.size, connected: connected }))
77 end
78
79 output.close
80
81 workers << Worker.new(host, type, pid, input)
82end
83
84begin
85 sleep(duration)
86
87 Process.kill('SIGINT', *workers.map(&:pid))
88
89 while !workers.empty?
90 pid = Process.wait
91 worker = workers.detect { |w| w.pid == pid }
92 workers.delete(worker)
93
94 line = worker.pipe.gets
95 next if line.nil?
96
97 result = JSON.parse(line)
98 puts "#{worker.host}: #{result.inspect}" if verbose
99
100 Report.create!(
101 start_time: test_start_time,
102 duration: duration,
103 host: worker.host,
104 source_type: worker.type,
105 users: result['users'],
106 events: result['events'],
107 connected: result['connected']
108 )
109 end
110rescue Interrupt
111 puts
112 puts "Stopping..."
113end