Ruby CLI tool for accessing Bluesky API / ATProto
1#!/usr/bin/env ruby
2
3$LOAD_PATH.unshift File.expand_path('../lib', __dir__)
4
5require 'didkit'
6require 'json'
7require 'minisky'
8require 'optparse'
9require 'time'
10require 'uri'
11
12require 'ratproto/version'
13
14DID_REGEXP = /\Adid:[a-z]+:[a-zA-Z0-9.\-_]+\z/
15NSID_REGEXP = /\A[a-z0-9]+(\.[a-z0-9]+)+\z/
16
17def print_help
18 puts <<~HELP
19 rat #{RatProto::VERSION} 🐀
20
21 Usage:
22 rat fetch at://uri
23 rat stream <firehose-host> [options]
24 rat resolve <did>|<handle>
25
26 Commands:
27 fetch Fetch a single record given its at:// URI
28 stream Stream events from a relay / PDS firehose
29 resolve Resolve a DID or @handle
30 help Show this help
31 version Show version
32
33 Stream options:
34 -j, --jetstream Use a Jetstream source instead of a CBOR firehose
35 -r, --cursor CURSOR Start from a given cursor
36 -d, --did DID[,DID...] Filter only events from given DID(s)
37 (can be passed multiple times)
38 -c, --collection NSID[,NSID...] Filter only events of given collection(s)
39 (can be passed multiple times)
40 HELP
41end
42
43def abort_with_error(message)
44 puts message
45 exit 1
46end
47
48def validate_did(did)
49 unless did =~ DID_REGEXP
50 abort_with_error "Error: #{did.inspect} is not a valid DID"
51 end
52end
53
54def validate_nsid(collection)
55 unless collection =~ NSID_REGEXP
56 abort_with_error "Error: #{collection.inspect} is not a valid collection NSID"
57 end
58end
59
60def parse_at_uri(uri)
61 unless uri.start_with?('at://')
62 abort_with_error "Error: not an at:// URI: #{uri.inspect}"
63 end
64
65 unless uri =~ /\Aat:\/\/([^\/]+)\/([^\/]+)\/([^\/]+)\z/
66 abort_with_error "Error: invalid at:// URI: #{uri.inspect}"
67 end
68
69 [$1, $2, $3]
70end
71
72def run_fetch(args)
73 uri = args.shift
74
75 if uri.nil?
76 abort_with_error "Usage: #{$PROGRAM_NAME} fetch <at://uri>"
77 end
78
79 if !args.empty?
80 abort_with_error "Error: Unexpected arguments for fetch: #{args.join(' ')}"
81 end
82
83 repo, collection, rkey = parse_at_uri(uri)
84
85 begin
86 pds = DID.new(repo).document.pds_host
87 sky = Minisky.new(pds, nil)
88
89 response = sky.get_request('com.atproto.repo.getRecord', {
90 repo: repo, collection: collection, rkey: rkey
91 })
92 rescue StandardError => e
93 abort_with_error "Error loading record: #{e.class}: #{e.message}"
94 end
95
96 puts JSON.pretty_generate(response['value'])
97end
98
99def run_resolve(args)
100 target = args.shift
101
102 if target.nil?
103 abort_with_error "Usage: #{$PROGRAM_NAME} resolve <did>|<handle>"
104 end
105
106 if !args.empty?
107 abort_with_error "Error: Unexpected arguments for resolve: #{args.join(' ')}"
108 end
109
110 did = DID.resolve_handle(target)
111
112 if did.nil?
113 abort_with_error "Couldn't resolve #{target}"
114 end
115
116 puts JSON.pretty_generate(did.document.json)
117rescue StandardError => e
118 abort_with_error "Error resolving #{target.inspect}: #{e.class}: #{e.message}"
119end
120
121def parse_stream_options(args)
122 options = {}
123
124 parser = OptionParser.new do |opts|
125 opts.banner = "Usage: #{$PROGRAM_NAME} stream <relay.host> [options]"
126
127 opts.on('-j', '--jetstream', 'Use a Jetstream source') do
128 options[:jetstream] = true
129 end
130
131 opts.on('-rCURSOR', '--cursor=CURSOR', 'Start from a given cursor') do |cursor|
132 options[:cursor] = cursor
133 end
134
135 opts.on('-dLIST', '--did=LIST', 'Filter only events from given DID(s) (comma-separated or repeated)') do |list|
136 items = list.split(',').map(&:strip).reject(&:empty?)
137
138 if items.empty?
139 abort_with_error "Error: empty argument to -d/--did"
140 end
141
142 options[:dids] ||= []
143 options[:dids] += items
144 end
145
146 opts.on('-cLIST', '--collection=LIST', 'Filter only events of given collections') do |list|
147 items = list.split(',').map(&:strip).reject(&:empty?)
148
149 if items.empty?
150 abort_with_error "Error: empty argument to -c/--collection"
151 end
152
153 options[:collections] ||= []
154 options[:collections] += items
155 end
156
157 opts.on('-h', '--help', 'Show stream-specific help') do
158 puts opts
159 exit
160 end
161 end
162
163 remaining = []
164
165 begin
166 parser.order!(args) { |other| remaining << other }
167 rescue OptionParser::InvalidOption, OptionParser::MissingArgument => e
168 puts "Error: #{e.message}"
169 puts parser
170 exit 1
171 end
172
173 [options, remaining]
174end
175
176def run_stream(args)
177 options, arguments = parse_stream_options(args)
178
179 service = arguments.shift
180
181 if service.nil?
182 abort_with_error "Usage: #{$PROGRAM_NAME} stream <firehose-host> [options]"
183 end
184
185 if !arguments.empty?
186 abort_with_error "Error: Unexpected arguments for stream: #{arguments.join(' ')}"
187 end
188
189 if options[:cursor] && options[:cursor] !~ /\A\d+\z/
190 abort_with_error "Error: cursor must be a decimal integer, got: #{options[:cursor].inspect}"
191 end
192
193 if options[:dids]
194 options[:dids].each { |did| validate_did(did) }
195 end
196
197 if options[:collections]
198 options[:collections].each { |collection| validate_nsid(collection) }
199 end
200
201 if options[:jetstream]
202 jet_opts = {}
203 jet_opts[:cursor] = options[:cursor].to_i if options[:cursor]
204
205 # pass DID/collection filters to Jetstream to filter events server-side
206 jet_opts[:wanted_dids] = options[:dids] if options[:dids]
207 jet_opts[:wanted_collections] = options[:collections] if options[:collections]
208
209 sky = Skyfall::Jetstream.new(service, jet_opts)
210 else
211 cursor = options[:cursor]&.to_i
212 sky = Skyfall::Firehose.new(service, :subscribe_repos, cursor)
213 end
214
215 sky.on_connecting { |url| puts "Connecting to #{url}..." }
216 sky.on_connect { puts "Connected" }
217 sky.on_disconnect { puts "Disconnected" }
218 sky.on_reconnect { puts "Connection lost, trying to reconnect..." }
219 sky.on_timeout { puts "Connection stalled, triggering a reconnect..." }
220 sky.on_error { |e| puts "ERROR: #{e}" }
221
222 sky.on_message do |msg|
223 next unless msg.type == :commit
224 next if options[:dids] && !options[:dids].include?(msg.repo)
225
226 time = msg.time.getlocal.iso8601
227
228 msg.operations.each do |op|
229 next if options[:collections] && !options[:collections].include?(op.collection)
230
231 json = op.raw_record && JSON.generate(op.raw_record)
232 puts "[#{time}] #{msg.repo} #{op.action.inspect} #{op.collection} #{op.rkey} #{json}"
233 end
234 end
235
236 trap('SIGINT') do
237 puts 'Disconnecting...'
238 sky.disconnect
239 end
240
241 sky.connect
242end
243
244if ARGV.empty?
245 print_help
246 exit
247end
248
249cmd = ARGV.shift
250
251case cmd
252when 'help', '--help', '-h'
253 print_help
254when 'version', '--version'
255 puts "RatProto #{RatProto::VERSION} 🐀"
256when 'fetch'
257 run_fetch(ARGV)
258when 'resolve'
259 run_resolve(ARGV)
260when 'stream'
261 require 'skyfall'
262 run_stream(ARGV)
263else
264 abort_with_error "Error: unknown command: #{cmd}"
265end