#!/usr/bin/env ruby $LOAD_PATH.unshift File.expand_path('../lib', __dir__) require 'didkit' require 'json' require 'minisky' require 'optparse' require 'time' require 'uri' require 'ratproto/version' DID_REGEXP = /\Adid:[a-z]+:[a-zA-Z0-9.\-_]+\z/ NSID_REGEXP = /\A[a-z0-9]+(\.[a-z0-9]+)+\z/ def print_help puts <<~HELP rat #{RatProto::VERSION} 🐀 Usage: rat fetch at://uri rat stream [options] rat resolve | Commands: fetch Fetch a single record given its at:// URI stream Stream events from a relay / PDS firehose resolve Resolve a DID or @handle help Show this help version Show version Stream options: -j, --jetstream Use a Jetstream source instead of a CBOR firehose -r, --cursor CURSOR Start from a given cursor -d, --did DID[,DID...] Filter only events from given DID(s) (can be passed multiple times) -c, --collection NSID[,NSID...] Filter only events of given collection(s) (can be passed multiple times) HELP end def abort_with_error(message) puts message exit 1 end def validate_did(did) unless did =~ DID_REGEXP abort_with_error "Error: #{did.inspect} is not a valid DID" end end def validate_nsid(collection) unless collection =~ NSID_REGEXP abort_with_error "Error: #{collection.inspect} is not a valid collection NSID" end end def parse_at_uri(uri) unless uri.start_with?('at://') abort_with_error "Error: not an at:// URI: #{uri.inspect}" end unless uri =~ /\Aat:\/\/([^\/]+)\/([^\/]+)\/([^\/]+)\z/ abort_with_error "Error: invalid at:// URI: #{uri.inspect}" end [$1, $2, $3] end def run_fetch(args) uri = args.shift if uri.nil? abort_with_error "Usage: #{$PROGRAM_NAME} fetch " end if !args.empty? abort_with_error "Error: Unexpected arguments for fetch: #{args.join(' ')}" end repo, collection, rkey = parse_at_uri(uri) begin pds = DID.new(repo).document.pds_host sky = Minisky.new(pds, nil) response = sky.get_request('com.atproto.repo.getRecord', { repo: repo, collection: collection, rkey: rkey }) rescue StandardError => e abort_with_error "Error loading record: #{e.class}: #{e.message}" end puts JSON.pretty_generate(response['value']) end def run_resolve(args) target = args.shift if target.nil? abort_with_error "Usage: #{$PROGRAM_NAME} resolve |" end if !args.empty? abort_with_error "Error: Unexpected arguments for resolve: #{args.join(' ')}" end did = DID.resolve_handle(target) if did.nil? abort_with_error "Couldn't resolve #{target}" end puts JSON.pretty_generate(did.document.json) rescue StandardError => e abort_with_error "Error resolving #{target.inspect}: #{e.class}: #{e.message}" end def parse_stream_options(args) options = {} parser = OptionParser.new do |opts| opts.banner = "Usage: #{$PROGRAM_NAME} stream [options]" opts.on('-j', '--jetstream', 'Use a Jetstream source') do options[:jetstream] = true end opts.on('-rCURSOR', '--cursor=CURSOR', 'Start from a given cursor') do |cursor| options[:cursor] = cursor end opts.on('-dLIST', '--did=LIST', 'Filter only events from given DID(s) (comma-separated or repeated)') do |list| items = list.split(',').map(&:strip).reject(&:empty?) if items.empty? abort_with_error "Error: empty argument to -d/--did" end options[:dids] ||= [] options[:dids] += items end opts.on('-cLIST', '--collection=LIST', 'Filter only events of given collections') do |list| items = list.split(',').map(&:strip).reject(&:empty?) if items.empty? abort_with_error "Error: empty argument to -c/--collection" end options[:collections] ||= [] options[:collections] += items end opts.on('-h', '--help', 'Show stream-specific help') do puts opts exit end end remaining = [] begin parser.order!(args) { |other| remaining << other } rescue OptionParser::InvalidOption, OptionParser::MissingArgument => e puts "Error: #{e.message}" puts parser exit 1 end [options, remaining] end def run_stream(args) options, arguments = parse_stream_options(args) service = arguments.shift if service.nil? abort_with_error "Usage: #{$PROGRAM_NAME} stream [options]" end if !arguments.empty? abort_with_error "Error: Unexpected arguments for stream: #{arguments.join(' ')}" end if options[:cursor] && options[:cursor] !~ /\A\d+\z/ abort_with_error "Error: cursor must be a decimal integer, got: #{options[:cursor].inspect}" end if options[:dids] options[:dids].each { |did| validate_did(did) } end if options[:collections] options[:collections].each { |collection| validate_nsid(collection) } end if options[:jetstream] jet_opts = {} jet_opts[:cursor] = options[:cursor].to_i if options[:cursor] # pass DID/collection filters to Jetstream to filter events server-side jet_opts[:wanted_dids] = options[:dids] if options[:dids] jet_opts[:wanted_collections] = options[:collections] if options[:collections] sky = Skyfall::Jetstream.new(service, jet_opts) else cursor = options[:cursor]&.to_i sky = Skyfall::Firehose.new(service, :subscribe_repos, cursor) end sky.on_connecting { |url| puts "Connecting to #{url}..." } sky.on_connect { puts "Connected" } sky.on_disconnect { puts "Disconnected" } sky.on_reconnect { puts "Connection lost, trying to reconnect..." } sky.on_timeout { puts "Connection stalled, triggering a reconnect..." } sky.on_error { |e| puts "ERROR: #{e}" } sky.on_message do |msg| next unless msg.type == :commit next if options[:dids] && !options[:dids].include?(msg.repo) time = msg.time.getlocal.iso8601 msg.operations.each do |op| next if options[:collections] && !options[:collections].include?(op.collection) json = op.raw_record && JSON.generate(op.raw_record) puts "[#{time}] #{msg.repo} #{op.action.inspect} #{op.collection} #{op.rkey} #{json}" end end trap('SIGINT') do puts 'Disconnecting...' sky.disconnect end sky.connect end if ARGV.empty? print_help exit end cmd = ARGV.shift case cmd when 'help', '--help', '-h' print_help when 'version', '--version' puts "RatProto #{RatProto::VERSION} 🐀" when 'fetch' run_fetch(ARGV) when 'resolve' run_resolve(ARGV) when 'stream' require 'skyfall' run_stream(ARGV) else abort_with_error "Error: unknown command: #{cmd}" end