A TypeScript toolkit for consuming the Bluesky network in real-time.
TypeScript 100.0%
2 1 0

Clone this repository

https://tangled.org/natespilman.com/skyline https://tangled.org/did:plc:c7frv4rcitff3p2nh7of5bcv/skyline
git@knot.tangled.wizardry.systems:natespilman.com/skyline git@knot.tangled.wizardry.systems:did:plc:c7frv4rcitff3p2nh7of5bcv/skyline

For self-hosted knots, clone URLs may differ based on your setup.

Download tar.gz
README.md

skyline#

A TypeScript toolkit for consuming the Bluesky network in real-time. Provides WebSocket streaming via Jetstream, HTTP API bindings for the Bluesky public API, and typed parsing/formatting utilities for building CLI tools and bots.

This library is read-only. It can observe the network (stream posts, fetch profiles, search, read threads) but cannot create posts, follow users, or perform any write operations. For writes, use the official @atproto/api package, which provides a full-featured agent with createPost, like, follow, etc.

Quick start#

npm install
npx tsx examples/hello-jetstream.ts

That's it — no API keys, no auth tokens, no configuration. You'll see live Bluesky posts scrolling by within seconds. Press Ctrl+C to stop.

ATProto concepts#

This section covers the AT Protocol concepts you'll encounter throughout the codebase. If you've used Bluesky the app but never touched ATProto code, start here.

DIDs and handles#

Every Bluesky user has two identifiers:

  • DID (Decentralized Identifier): A permanent, opaque ID like did:plc:oky5czdrnfjpqslsw2a5iclo. This never changes, even if the user changes their handle or moves to a different server. All data in ATProto is addressed by DID.
  • Handle: A human-readable name like jay.bsky.team. Handles can change at any time — a user might switch from alice.bsky.social to alice.com. Handles are resolved to DIDs via DNS or the resolveHandle API.

The Jetstream firehose only gives you DIDs. If you need a display name or handle, you must resolve it via the HTTP API (fetchResolveHandle, fetchGetProfile). This is why many examples in this repo work with raw DIDs — the stream doesn't include profile data.

AT URIs#

AT URIs are the canonical way to address any record in the AT Protocol:

at://did:plc:oky5czdrnfjpqslsw2a5iclo/app.bsky.feed.post/3mffccqrpx22t
       └─── DID (who) ──────────────┘ └─ collection ────┘ └─ rkey ─────┘
  • DID: The repo owner
  • Collection: The Lexicon type (e.g. app.bsky.feed.post, app.bsky.graph.follow)
  • rkey (record key): A unique ID within the collection, usually a TID (timestamp-based ID)

AT URIs are what the protocol uses internally. The bsky.app URLs you see in your browser (https://bsky.app/profile/jay.bsky.team/post/3mffccqrpx22t) are a web-app convenience. This library provides helpers to convert between the two: buildAtUri, parseAtUri, toBskyUrl, atUriToBskyUrl, parseBskyUrl.

Facets (rich text)#

ATProto does not use markdown. Post text is stored as plain UTF-8 with a separate facets array that annotates byte ranges with features like links or mentions:

{
  "text": "Check out @jay.bsky.team's work at https://bsky.social",
  "facets": [
    {
      "index": { "byteStart": 10, "byteEnd": 24 },
      "features": [{ "$type": "app.bsky.richtext.facet#mention", "did": "did:plc:..." }]
    },
    {
      "index": { "byteStart": 35, "byteEnd": 55 },
      "features": [{ "$type": "app.bsky.richtext.facet#link", "uri": "https://bsky.social" }]
    }
  ]
}

This design keeps the text human-readable while supporting rich rendering. The extractLinks() and extractMentions() helpers pull these features out of the facets array so you don't have to navigate the structure yourself.

Lexicons and the ATProto ecosystem#

All record types in ATProto are defined by Lexicons — schema definitions identified by reverse-DNS names like app.bsky.feed.post. Bluesky's lexicons all live under app.bsky.*, but the protocol itself is generic. A different app could define com.example.recipes.recipe and use the same infrastructure.

This library is tightly coupled to app.bsky.* lexicons — the post parsers, type guards, and API bindings all reference Bluesky-specific schemas. However, the low-level WebSocket layer (createJetstreamConnection, onMessage) receives events for all collections, so you could use it to observe non-Bluesky record types if another ATProto app existed on the network. You'd just need to write your own parsers for those record types.

Jetstream vs the AT Protocol firehose#

This library supports two ways to consume the real-time event stream:

Jetstream (lib/websocket.ts, examples/hello-jetstream.ts)#

Jetstream is an official Bluesky service that provides a simplified, JSON-based WebSocket stream. It pre-processes the raw firehose into clean events:

  • Format: JSON messages with kind (commit, identity, account), pre-parsed records, microsecond timestamps
  • Filtering: Server-side filtering by collection and DID via URL params or options_update messages
  • Reconnection: Cursor-based replay from a microsecond timestamp (see Cursors below)
  • What you lose: Cryptographic proofs (CIDs). Jetstream strips the content-addressed hashes that let you verify data integrity. For most applications, this doesn't matter.
  • Endpoint: wss://jetstream2.us-east.bsky.network/subscribe

This is what 90% of the library uses.

AT Protocol firehose (examples/hello-firehose.ts)#

The raw firehose via @atproto/sync gives you the full commit stream with cryptographic proofs:

  • Format: CBOR-encoded repo commits with CID verification
  • What you gain: Content-addressed integrity proofs — you can verify that the data hasn't been tampered with
  • What you lose: Convenience. You're working with raw repo operations, not pre-parsed events. No server-side filtering; you process everything client-side.

Use the raw firehose only if you need cryptographic verification or are building infrastructure-level tooling.

Cursors#

Every Jetstream event includes a time_us field — a microsecond Unix timestamp (not milliseconds, not an opaque token). This is the cursor.

When your WebSocket disconnects, the library stores the time_us of the last event it processed. On reconnect, it subtracts a safety buffer (default: 5 seconds / 5_000_000 microseconds) and passes that as the cursor URL parameter. Jetstream replays all events from that point forward.

Retention window: Jetstream retains roughly 72 hours of events. If you disconnect for longer than that, you'll resume from the oldest available event, not from where you left off. There is no "I missed 4 days of data" recovery — you'd need to use the relay firehose or a backfill service for that.

Duplicates: Because of the safety buffer, you may receive some events twice after reconnecting. The library does not deduplicate — if your application needs exactly-once processing, you'll need to track seen rkeys yourself.

The startStreamWithReconnect() function handles all of this automatically. If you're using the lower-level createJetstreamConnection + attachLifecycleWithCursor, you manage the CursorRef yourself.

Stream volume and backpressure#

An unfiltered Jetstream subscription receives every event on the entire Bluesky network. As of early 2025, that's roughly:

  • ~50–150 post creates per second (varies by time of day)
  • Plus likes, follows, reposts, blocks, list operations, profile updates, etc.
  • Total event throughput can exceed 500+ events/second during peak hours

The examples in this repo process events synchronously in the onEvent callback. For a CLI tool printing to stdout, this is fine — the bottleneck is your terminal, not the event rate. But if you're doing anything expensive per event (HTTP calls, database writes), you need to buffer or batch. The library does not provide built-in backpressure handling.

Filtering helps: subscribing to only app.bsky.feed.post cuts volume significantly, and filtering by specific DIDs (via wantedDids) can reduce it to near-zero if you're watching a small set of users.

Dynamic stream filtering with sendOptionsUpdate#

You can change your subscription filters on a live WebSocket without reconnecting:

import { startStreamWithReconnect, sendOptionsUpdate } from "./lib/index.js";

const { getWs } = startStreamWithReconnect({
  config: { wantedCollections: ["app.bsky.feed.post"] },
  onEvent: (event) => { /* ... */ },
});

// Later, narrow the stream to specific users:
sendOptionsUpdate(
  getWs(),
  ["app.bsky.feed.post"],           // collections
  ["did:plc:abc...", "did:plc:def..."],  // DIDs
);

This sends a JSON options_update message over the existing WebSocket. The server immediately starts filtering to your new criteria. The user-feed example uses this to progressively narrow the stream as it discovers users from search results.

Limits: you can filter by collection and/or DID. There's no server-side keyword or language filter — those happen client-side in the onEvent callback.

Authentication and the HTTP API#

Most Bluesky API endpoints work without authentication through the public API at https://public.api.bsky.app/xrpc. This includes profiles, feeds, threads, followers, and social graph queries.

searchPosts is the exception — it requires a Bearer token. This is a Bluesky policy decision; the search index is more expensive to operate than basic data lookups.

The token is a JWT access token obtained by creating an authenticated session:

# 1. Create an App Password at https://bsky.app/settings/app-passwords
# 2. Exchange it for a JWT:
curl -s -X POST https://bsky.social/xrpc/com.atproto.server.createSession \
  -H "Content-Type: application/json" \
  -d '{"identifier":"your.handle","password":"your-app-password"}' \
  | jq -r .accessJwt
# 3. Export it:
export BSKY_AUTH_TOKEN="eyJ..."

The JWT expires after a few hours. This library does not handle token refresh — if your token expires, you'll get a 401 and need to create a new session.

Rate limits#

The Bluesky public API enforces rate limits per IP:

  • Unauthenticated: ~3,000 requests per 5 minutes
  • Authenticated: ~5,000 requests per 5 minutes

When you hit the limit, the API returns HTTP 429. The isRateLimitError() helper detects this, and delay() is a simple sleep utility. The library does not implement automatic retry or backoff — you need to handle this yourself. The user-feed example shows a basic pattern: catch the error, sleep 30 seconds, retry.

The Jetstream WebSocket has no rate limit on receiving events — it pushes to you as fast as events happen.

Filtering the stream to specific users#

To watch posts from specific accounts, you pass their DIDs to wantedDids:

startStreamWithReconnect({
  config: {
    wantedCollections: ["app.bsky.feed.post"],
    wantedDids: ["did:plc:abc...", "did:plc:def..."],
  },
  onEvent: (event) => { /* only events from these DIDs */ },
});

There is no shortcut for "my followers". You must:

  1. Resolve your handle to a DID via fetchResolveHandle
  2. Paginate through fetchGetFollows (the people you follow) or fetchGetFollowers (people who follow you)
  3. Collect all their DIDs
  4. Pass them to wantedDids or sendOptionsUpdate

The user-feed example demonstrates this pattern with search results instead of followers.

Reconnection behavior#

startStreamWithReconnect handles reconnection automatically:

  1. WebSocket close event fires
  2. Wait reconnectDelay (default: 5 seconds)
  3. Reconnect with cursor = lastEventTimestamp - cursorBufferUs (default buffer: 5 seconds)
  4. Jetstream replays events from the cursor forward

This is fully automatic — you don't need to wire anything up. The lower-level functions (createJetstreamConnection + attachLifecycleWithCursor) give you manual control if you need it.

Guarantees: You will not miss events during brief disconnects (under 72 hours). You may receive duplicates after reconnecting due to the safety buffer. The library does not deduplicate.

Project structure#

lib/
├── types.ts        Zod schemas and TypeScript types for all data structures
├── websocket.ts    Jetstream WebSocket connection, cursor tracking, reconnection
├── parsing.ts      Event type guards, post/identity/account parsers, facet extraction
├── formatting.ts   Terminal output: box drawing, keyword highlighting, URL conversion
├── api.ts          HTTP API bindings for the Bluesky public API
├── index.ts        Re-exports everything from the above modules
└── __tests__/      Unit tests (vitest)

examples/
├── hello-jetstream.ts     Minimal Jetstream connection (no auth needed)
├── hello-firehose.ts      Raw AT Protocol firehose with CID proofs
├── keyword-stream.ts      Real-time keyword filtering with cursor reconnection
├── post-lifecycle.ts      Track creates, edits, and deletes with throughput stats
├── identity-monitor.ts    Handle changes and account status events
├── profile-dashboard.ts   Fetch and display a user profile (HTTP API)
├── thread-explorer.ts     Render a post thread as an indented tree
├── search-posts.ts        Search posts with engagement analysis (requires auth)
├── user-feed.ts           Search users → stream their posts (API + WebSocket)
└── README.md              Detailed guide for each example

API reference#

WebSocket#

Function Purpose
startStreamWithReconnect(options) High-level: connect, handle events, auto-reconnect with cursor. This is what you want.
createJetstreamConnection(config) Low-level: open a raw WebSocket to Jetstream
onMessage(ws, handler) Attach a typed event handler (no cursor tracking)
onMessageWithCursor(ws, handler, cursorRef) Attach a handler that updates a cursor ref on each event
attachLifecycleWithCursor(ws, options) Wire up open/error/close handlers with cursor-aware reconnection
sendOptionsUpdate(ws, collections, dids) Change subscription filters on a live connection without reconnecting
buildJetstreamUrl(config) Build the WebSocket URL with query parameters
waitForOpen(ws) Promise that resolves when the WebSocket opens
createCursorRef() Create a `{ current: number

Parsing#

Function Purpose
isCommitEvent(event) Type guard: is this a commit (create/update/delete)?
isIdentityEvent(event) Type guard: is this a handle change?
isAccountEvent(event) Type guard: is this an account status change?
parsePost(event) Extract a PostData from a post create event, or null
parsePostUpdate(event) Extract a PostUpdateData from a post update event, or null
parsePostDelete(event) Extract a PostDeleteData from a post delete event, or null
parseKeywordPost(event, keywords) Parse + keyword-match in one step. Returns null if no match
parseUserPost(event, registry) Parse + user-match against a UserRegistry. Returns null if user not in registry
extractLinks(facets) Pull all link URIs from a post's facets array
extractMentions(facets) Pull all mentioned DIDs from a post's facets array
buildAtUri(did, collection, rkey) Construct an at:// URI
parseAtUri(uri) Decompose an at:// URI into { did, collection, rkey }
parseBskyUrl(url) Decompose a bsky.app URL into { id, rkey }

HTTP API#

Function Auth required Purpose
fetchResolveHandle(handle) No Resolve a handle to a DID
fetchGetProfile(actor) No Fetch a user's full profile
fetchGetProfiles(actors) No Batch-fetch up to 25 profiles
fetchSearchActors(query) No Search for users by name/handle
fetchGetAuthorFeed(actor, options) No Fetch a user's recent posts
fetchGetPostThread(uri, options) No Fetch a thread (parents + replies)
fetchGetFollowers(actor, options) No Paginate through a user's followers
fetchGetFollows(actor, options) No Paginate through who a user follows
fetchSearchPosts(query, options) Yes Full-text search across all posts
fetchGetLikes(uri, options) No Who liked a post
fetchGetRepostedBy(uri, options) No Who reposted a post
fetchGetQuotes(uri, options) No Posts that quote a given post

Formatting#

Function Purpose
toBskyUrl(did, rkey) Build a https://bsky.app/profile/... URL
atUriToBskyUrl(atUri) Convert an AT URI to a bsky.app URL
formatPost(post, header) Box-drawing formatted post for terminal output
formatKeywordPost(post) Format with keyword highlighting
formatUserPost(post) Format with user display name and handle
highlightKeywords(text, keywords) Wrap matched keywords in **bold** markers
formatTruncated(text, maxLength) Truncate with ...
formatEngagement(counts) Compact stats: L:5 R:2 Q:1 Re:3

Running tests#

npm test

Tests use Vitest and cover parsing, formatting, WebSocket URL building, and API URL construction.