A tool for measuring the coverage of Bluesky/ATProto relays
at master 113 lines 2.6 kB view raw
1#!/usr/bin/env ruby 2 3require 'bundler/setup' 4require 'json' 5require 'skyfall' 6require 'yaml' 7 8require_relative 'init' 9require_relative 'opts' 10require_relative 'report' 11 12def get_hosts(entries) 13 if entries 14 entries.map { |x| x.is_a?(Hash) ? x['host'] : x } 15 else 16 [] 17 end 18end 19 20config = YAML.load(File.read(SOURCES)) 21options = parse_options(ARGV) 22 23if options[:relays] || options[:jetstreams] 24 relays = options[:relays] || [] 25 jetstreams = options[:jetstreams] || [] 26else 27 relays = get_hosts(config['relays']) 28 jetstreams = get_hosts(config['jetstreams']) 29end 30 31maxlen = (relays + jetstreams).map(&:length).max 32 33verbose = options[:verbose] 34duration = options[:duration] || config['duration']&.to_i || 60 * 15 35test_start_time = Time.now 36 37Worker = Struct.new(:host, :type, :pid, :pipe) 38workers = [] 39 40sources = relays.map { |h| [:firehose, h] } + jetstreams.map { |h| [:jetstream, h] } 41 42sources.each do |type, host| 43 input, output = IO.pipe 44 45 pid = fork do 46 input.close 47 sky = (type == :firehose) ? Skyfall::Firehose.new(host) : Skyfall::Jetstream.new(host) 48 49 events = 0 50 users = Set.new 51 minute = Time.now.to_i / 60 52 connected = false 53 54 sky.on_message do |msg| 55 events += 1 56 users << msg.did 57 58 if verbose 59 now = Time.now.to_i / 60 60 if now > minute 61 puts "[#{Time.now}] #{host.ljust(maxlen)} | events: #{events.to_s.ljust(8)} | users: #{users.size}" 62 minute = now 63 end 64 end 65 end 66 67 sky.on_connecting { puts "[#{Time.now}] #{host}: Connecting..." } 68 sky.on_connect { puts "[#{Time.now}] #{host}: Connected ✓"; connected = true } 69 sky.on_error { |e| puts "[#{Time.now}] #{host}: ERROR: #{e.message}" } 70 71 trap('SIGINT') { sky.disconnect } 72 73 sky.connect 74 puts "[#{Time.now}] #{host}: Finished." 75 76 output.puts(JSON.generate({ events: events, users: users.size, connected: connected })) 77 end 78 79 output.close 80 81 workers << Worker.new(host, type, pid, input) 82end 83 84begin 85 sleep(duration) 86 87 Process.kill('SIGINT', *workers.map(&:pid)) 88 89 while !workers.empty? 90 pid = Process.wait 91 worker = workers.detect { |w| w.pid == pid } 92 workers.delete(worker) 93 94 line = worker.pipe.gets 95 next if line.nil? 96 97 result = JSON.parse(line) 98 puts "#{worker.host}: #{result.inspect}" if verbose 99 100 Report.create!( 101 start_time: test_start_time, 102 duration: duration, 103 host: worker.host, 104 source_type: worker.type, 105 users: result['users'], 106 events: result['events'], 107 connected: result['connected'] 108 ) 109 end 110rescue Interrupt 111 puts 112 puts "Stopping..." 113end