A Ruby gem for streaming data from the Bluesky/ATProto firehose
at master 311 lines 17 kB view raw view rendered
1# Skyfall 2 3A Ruby gem for streaming data from the Bluesky/ATProto firehose 🦋 4 5> [!NOTE] 6> Part of ATProto Ruby SDK: [ruby.sdk.blue](https://ruby.sdk.blue) 7 8 9## What does it do 10 11Skyfall is a Ruby library for connecting to the *"[firehose](https://atproto.com/specs/event-stream)"* of the Bluesky social network, i.e. a websocket which streams all new posts and everything else happening on the Bluesky network in real time. The code connects to the websocket endpoint, decodes the messages which are encoded in some binary formats like DAG-CBOR, and returns the data as Ruby objects, which you can filter and save to some kind of database (e.g. in order to create a custom feed). 12 13Since version 0.5, Skyfall also supports connecting to [Jetstream](https://github.com/bluesky-social/jetstream/) sources, which serve the same kind of stream, but as JSON messages instead of CBOR. 14 15 16## Installation 17 18To use Skyfall, you need a reasonably new version of Ruby – it should run on Ruby 2.6 and above, although it's recommended to use a version that's still getting maintainance updates, i.e. currently 3.2+. A compatible version should be preinstalled on macOS Big Sur and above and on many Linux systems. Otherwise, you can install one using tools such as [RVM](https://rvm.io), [asdf](https://asdf-vm.com), [ruby-install](https://github.com/postmodern/ruby-install) or [ruby-build](https://github.com/rbenv/ruby-build), or `rpm` or `apt-get` on Linux (see more installation options on [ruby-lang.org](https://www.ruby-lang.org/en/downloads/)). 19 20To install the gem, run the command: 21 22 [sudo] gem install skyfall 23 24Or add this to your app's `Gemfile`: 25 26 gem 'skyfall', '~> 0.6' 27 28 29## Usage 30 31### Standard ATProto firehose 32 33To connect to the firehose, start by creating a `Skyfall::Firehose` object, specifying the server hostname and endpoint name: 34 35```rb 36require 'skyfall' 37 38sky = Skyfall::Firehose.new('bsky.network', :subscribe_repos) 39``` 40 41The server name can be just a hostname, or a full URL with a `ws:` or `wss:` scheme, which is useful if you want to use a non-encrypted websocket connection, e.g. `"ws://localhost:8000"`. The endpoint can be either a full NSID string like `"com.atproto.sync.subscribeRepos"`, or one of the defined symbol shortcuts - you will almost always want to pass `:subscribe_repos` here. 42 43Next, set up event listeners to handle incoming messages and get notified of errors. Here are all the available listeners (you will need at least either `on_message` or `on_raw_message`): 44 45```rb 46# this gives you a parsed message object, one of subclasses of Skyfall::Firehose::Message 47sky.on_message { |msg| p msg } 48 49# this gives you raw binary data as received from the websocket 50sky.on_raw_message { |data| p data } 51 52# lifecycle events 53sky.on_connecting { |url| puts "Connecting to #{url}..." } 54sky.on_connect { puts "Connected" } 55sky.on_disconnect { puts "Disconnected" } 56sky.on_reconnect { puts "Connection lost, trying to reconnect..." } 57sky.on_timeout { puts "Connection stalled, triggering a reconnect..." } 58 59# handling errors (there's a default error handler that does exactly this) 60sky.on_error { |e| puts "ERROR: #{e}" } 61``` 62 63You can also call these as setters accepting a `Proc` - e.g. to disable default error handling, you can do: 64 65```rb 66sky.on_error = nil 67``` 68 69When you're ready, open the connection by calling `connect`: 70 71```rb 72sky.connect 73``` 74 75The `#connect` method blocks until the connection is explicitly closed with `#disconnect` from an event or interrupt handler. Skyfall uses [EventMachine](https://github.com/eventmachine/eventmachine) under the hood, so in order to run some things in parallel, you can use e.g. `EM::PeriodicTimer`. 76 77 78### Using a Jetstream source 79 80Alternatively, you can connect to a [Jetstream](https://github.com/bluesky-social/jetstream/) server. Jetstream is a firehose proxy that lets you stream data as simple JSON instead, which uses much less bandwidth, and allows you to pick only a subset of events that you're interested in, e.g. only posts or only from specific accounts. (See the [configuration section](#jetstream-filters) for more info on Jetstream filtering.) 81 82Jetstream connections are made using a `Skyfall::Jetstream` instance, which has more or less the same API as `Skyfall::Firehose`, so it should be possible to switch between those by just changing the line that creates the client instance: 83 84```rb 85sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network') 86 87sky.on_message { |msg| ... } 88sky.on_error { |e| ... } 89sky.on_connect { ... } 90... 91 92sky.connect 93``` 94 95### Cursors 96 97ATProto websocket endpoints implement a "*cursor*" feature to help you make sure that you don't miss anything if your connection is down for a bit (because of a network issue, server restart, deploy etc.). Each message includes a `seq` field, which is the sequence number of the event. You can keep track of the last seq you've seen, and when you reconnect, you pass that number as a cursor parameter - the server will then "replay" all events you might have missed since that last one. (The `bsky.network` Relay firehose currently has a buffer of about 72 hours, though that's not something required by specification.) 98 99To use a cursor when connecting to the firehose, pass it as the third parameter to `Skyfall::Firehose`. You should then regularly save the `seq` of the last event to some permanent storage, and then load it from there when reconnecting. 100 101A full-network firehose sends many hundreds of events per second, so depending on your use case, it might be enough if you save it every n events (e.g. every 100 or 1000) and on clean shutdown: 102 103```rb 104cursor = load_cursor 105 106sky = Skyfall::Firehose.new('bsky.network', :subscribe_repos, cursor) 107sky.on_message do |msg| 108 save_cursor(msg.seq) if msg.seq % 1000 == 0 109 process_message(msg) 110end 111``` 112 113Jetstream has a similar mechanism, except the cursor is the event's timestamp in Unix time microseconds instead of just a number incrementing by 1. For `Skyfall::Jetstream`, pass the cursor as a key in an options hash: 114 115```rb 116cursor = load_cursor 117 118sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', { cursor: cursor }) 119sky.on_message do |msg| 120 save_cursor(msg.seq) 121 process_message(msg) 122end 123``` 124 125 126### Processing messages 127 128Each message passed to `on_message` is an instance of a subclass of either `Skyfall::Firehose::Message` or `Skyfall::Jetstream::Message`, depending on the selected source. The supported message types are: 129 130- `CommitMessage` (`#commit`) - represents a change in a user's repo; most messages are of this type 131- `IdentityMessage` (`#identity`) - notifies about a change in user's DID document, e.g. a handle change or a migration to a new PDS 132- `AccountMessage` (`#account`) - notifies about a change of an account's status (de/activation, suspension, deletion) 133- `SyncMessage` (`#sync`) - updates repository state, can be used to trigger account resynchronization 134- `LabelsMessage` (`#labels`) - only used in `subscribe_labels` endpoint 135- `InfoMessage` (`#info`) - a protocol error message, e.g. about an invalid cursor parameter 136- `UnknownMessage` is used for other unrecognized message types 137 138`Skyfall::Firehose::Message` and `Skyfall::Jetstream::Message` variants of message classes should have more or less the same interface, except when a given field is not included in one of the formats. 139 140All message objects have the following shared properties: 141 142- `type` (symbol) - the message type identifier, e.g. `:commit` 143- `seq` (integer) - a sequential index of the message; Jetstream messages instead have a `time_us` value, which is a Unix timestamp in microseconds (also aliased as `seq` for compatibility) 144- `repo` or `did` (string) - DID of the repository (user account) 145- `time` (Time) - timestamp of the described action 146 147All properties except `type` may be nil for some message types that aren't related to a specific user, like `#info`. 148 149Commit messages additionally have: 150 151- `commit` - CID of the commit 152- `operations` - list of operations (usually one) 153 154Handle and Identity messages additionally have: 155 156- `handle` - the new handle assigned to the DID 157 158Account messages additionally have: 159 160- `active?` - whether the account is active, or inactive for any reason 161- `status` - if not active, shows the status of the account (`:deactivated`, `:deleted`, `:takendown`) 162 163Info messages additionally have: 164 165- `name` - identifier of the message/error 166- `message` - a human-readable description 167 168 169### Commit operations 170 171Operations are objects of type `Skyfall::Firehose::Operation` or `Skyfall::Jetstream::Operation` and have such properties: 172 173- `repo` or `did` (string) - DID of the repository (user account) 174- `collection` (string) - name of the relevant collection in the repository, e.g. `app.bsky.feed.post` for posts 175- `type` (symbol) - short name of the collection, e.g. `:bsky_post` 176- `rkey` (string) - identifier of a record in a collection 177- `path` (string) - the path part of the at:// URI - collection name + ID (rkey) of the item 178- `uri` (string) - the complete at:// URI 179- `action` (symbol) - `:create`, `:update` or `:delete` 180- `cid` (CID) - CID of the operation/record (`nil` for delete operations) 181 182Create and update operations will also have an attached record (JSON object) with details of the post, like etc. The record data is currently available as a Ruby hash via `raw_record` property (custom types will be added in future). 183 184So for example, in order to filter only "create post" operations and print their details, you can do something like this: 185 186```rb 187sky.on_message do |m| 188 next if m.type != :commit 189 190 m.operations.each do |op| 191 next unless op.action == :create && op.type == :bsky_post 192 193 puts "#{op.repo}:" 194 puts op.raw_record['text'] 195 puts 196 end 197end 198``` 199 200For more examples, see the [examples page](https://ruby.sdk.blue/examples/) on [ruby.sdk.blue](https://ruby.sdk.blue), or the [bluesky-feeds-rb](https://tangled.org/mackuba.eu/bluesky-feeds-rb/blob/master/app/firehose_stream.rb) project, which implements a feed generator service. 201 202 203### Note on custom lexicons 204 205Note that the `Operation` objects have two properties that tell you the kind of record they're about: `#collection`, which is a string containing the official name of the collection/lexicon, e.g. `"app.bsky.feed.post"`; and `#type`, which is a symbol meant to save you some typing, e.g. `:bsky_post`. 206 207When Skyfall receives a message about a record type that's not on the list, whether in the `app.bsky` namespace or not, the operation `type` will be `:unknown`, while the `collection` will be the original string. So if an app like e.g. "Skygram" appears with a `zz.skygram.*` namespace that lets you share photos on ATProto, the operations will have a type `:unknown` and collection names like `zz.skygram.feed.photo`, and you can check the `collection` field for record types known to you and process them in some appropriate way, even if Skyfall doesn't recognize the record type. 208 209Do not however check if such operations have a `type` equal to `:unknown` first - just ignore the type and only check the `collection` string. The reason is that some next version of Skyfall might start recognizing those records and add a new `type` value for them like e.g. `:skygram_photo`, and then they won't match your condition anymore. 210 211 212## Reconnection logic 213 214In a perfect world, the websocket would never disconnect until you disconnect it, but unfortunately we don't live in a perfect world. The socket sometimes disconnects or stops responding, and Skyfall has some built-in protections to make sure it can operate without much oversight. 215 216 217### Broken connections 218 219If the connection is randomly closed for some reason, Skyfall will by default try to reconnect automatically. If the reconnection fails (e.g. because the network is down), it will wait with an [exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) up to 5 minute intervals and keep retrying forever until it connects again. The `on_reconnect` callback is triggered when the connection is closed (before the wait delay). This mechanism should generally solve most of the problem. 220 221The auto reconnecting feature is enabled by default, but you can turn it off by setting `auto_reconnect` to `false`. 222 223### Stalled connections & heartbeat 224 225Occasionally, especially during times of very heavy traffic, the websocket can get into a stuck state where it stops receiving any data, but doesn't disconnect and just hangs like this forever. To work around this, there is a "heartbeat" feature which starts a background timer, which periodically checks how much time has passed since the last received event, and if the time exceeds a set limit, it manually disconnects and reconnects the stream. 226 227This feature is not enabled by default, because there are some firehoses which will not be sending events often, possibly only once in a while – e.g. labellers and independent PDS firehoses – and in this case we don't want any heartbeat since it will be completely normal not to have any events for a long time. It's not really possible to detect easily if we're connecting to a full network relay or one of those, so in order to avoid false alarms, you need to enable this manually using the `check_heartbeat` property. 228 229You can also change the `heartbeat_interval`, i.e. how often the timer is triggered (default: 10s), and the `heartbeat_timeout`, i.e. the amount of time passed without events needed to cause a reconnect (default: 5 min): 230 231```rb 232sky.check_heartbeat = true 233sky.heartbeat_interval = 5 234sky.heartbeat_timeout = 120 235``` 236 237### Cursors when reconnecting 238 239Skyfall keeps track of the last event's `seq` internally in the `cursor` property, so if the client reconnects for whatever reason, it will automatically use the latest cursor in the URL. 240 241> [!NOTE] 242> This only happens if you use the `on_message` callback and not `on_raw_message`, since the event is not parsed from binary data into a `Message` object if you use `on_raw_message`, so Skyfall won't have access to the `seq` field then. 243 244 245## Streaming from labellers 246 247Apart from `subscribe_repos`, there is a second endpoint `subscribe_labels`, which is used to stream labels from [labellers](https://atproto.com/specs/label) (ATProto moderation services). This endpoint only sends `#labels` events (and possibly `#info`). 248 249To connect to a labeller, pass `:subscribe_labels` as the endpoint name to `Skyfall::Firehose`. The `on_message` callback will get called with `Skyfall::Firehose::LabelsMessage` events, each of which includes one or more labels as `Skyfall::Label`: 250 251```rb 252cursor = load_cursor(service) 253sky = Skyfall::Firehose.new(service, :subscribe_labels, cursor) 254sky.on_message do |msg| 255 if msg.type == :labels 256 msg.labels.each do |l| 257 puts "[#{l.created_at}] #{l.subject} => #{l.value}" 258 end 259 end 260end 261``` 262 263See [ATProto label docs](https://atproto.com/specs/label) for info on what fields are included with each label - `Skyfall::Label` includes properties with these original names, and also more friendly aliases for each (e.g. `value` instead of `val`). 264 265 266## Other configuration 267 268### User agent 269 270Skyfall sends a user agent header when making a connection. This is set by default to `"Skyfall/0.x.y"`, but it's recommended that you override it using the `user_agent` field to something that identifies your app and its author – this will let the owner of the server you're connecting to know who to contact in case the client is causing some problems. 271 272You can also append your user agent info to the default value like this: 273 274```rb 275sky.user_agent = "NewsBot (@news.bot) #{sky.version_string}" 276``` 277 278### Jetstream filters 279 280Jetstream allows you to specify [filters](https://github.com/bluesky-social/jetstream?tab=readme-ov-file#consuming-jetstream) of collection types and/or tracked DIDs when you connect, so it will send you only the events you're interested in. You can e.g. ask only for posts and ignore likes, or only profile events and ignore everything else, or only listen for posts from a few specific accounts. 281 282To use these filters, pass the "wantedCollections" and/or "wantedDids" parameters in the options hash when initializing `Skyfall::Jetstream`. You can use the original JavaScript param names, or a more Ruby-like snake_case form: 283 284```rb 285sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', { 286 wanted_collections: 'app.bsky.feed.post', 287 wanted_dids: @dids 288}) 289``` 290 291For collections, you can also use the symbol codes used in `Operation#type`, e.g. `:bsky_post`: 292 293```rb 294sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', { 295 wanted_collections: [:bsky_post] 296}) 297``` 298 299See [Jetstream docs](https://github.com/bluesky-social/jetstream?tab=readme-ov-file#consuming-jetstream) for more info on available filters. 300 301> [!NOTE] 302> The `compress` and `requireHello` options (and zstd compression) are not available at the moment. Also the "subscriber sourced messages" aren't implemented yet. 303 304 305## Credits 306 307Copyright © 2026 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/did:plc:oio4hkxaop4ao4wz2pp3f4cr)). 308 309The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT). 310 311Bug reports and pull requests are welcome 😎