serverless #atproto jetstream to webhook connector, powered by cloudflare durable objects
1# Serverless #atproto Jetstream to Webhook Connector
2
3A Cloudflare Worker that connects to the atproto Jetstream to process events and forward them to a webhook endpoint.
4
5Why might this be useful? This is an experimental setup for small atproto apps running on serverless systems (eg Next.js + Vercel) that want to subscribe to realtime events from the firehose/jetstream, but prefer to consume the events as webhooks instead of running a separate service that consumes the jetstream or firehose directly.
6
7Flow:
8CF Worker + Durable Object => Listens to Jetstream for specified collection events => Adds new events to CF Queue => CF Queue worker processes events and forwards to your webhook.
9
10
11## Features
12
13- **Real-time Event Processing**: Connects to Bluesky Jetstream WebSocket to receive live AT Protocol events
14- **Configurable Collections**: Subscribe to a set of collections via environment variables
15- **Queue-based Architecture**: Uses Cloudflare Queues for reliable event processing and webhook delivery.
16- **Webhook Integration**: Forwards events to your webhook endpoint with optional bearer token authentication
17- **Cursor Tracking**: Maintains cursor position for gapless playback during reconnections
18- **Stats Collection**: Tracks event counts per collection and total processing stats
19- **Web Dashboard**: HTML interface to view processing statistics
20- **Auto-Reconnection**: Handles WebSocket disconnections with exponential backoff
21- **Persistent Storage**: Uses Durable Object storage to maintain state across deployments
22- **Fully Configurable**: No hardcoded URLs or collections - easy to adapt for any project
23
24## Quick Start
25
26### Local Development
27
28```bash
29# 1. Clone and install dependencies
30pnpm install
31
32# 2. Set up local environment
33pnpm run setup-local
34# This creates .dev.vars from template
35
36# 3. Edit .dev.vars with your configuration
37nano .dev.vars
38
39# 4. Create Cloudflare Queues (one-time setup)
40pnpm run setup-queues
41
42# 5. Start development server
43pnpm run dev
44
45# 6. View dashboard
46open http://localhost:8787/stats/html
47```
48
49### Production Deployment
50
51```bash
52# 1. Configure environment variables and secrets
53pnpm run setup-config
54
55# 2. Deploy to Cloudflare
56pnpm run deploy
57```
58
59## Configuration
60
61This worker is fully configurable via environment variables:
62
63| Variable | Description | Example |
64|----------|-------------|---------|
65| `WEBHOOK_URL` | **Required** - Your webhook endpoint | `https://example.com/api/webhooks/jetstream` |
66| `JETSTREAM_COLLECTIONS` | **Required** - Collections to watch (comma-separated) | `app.bsky.feed.post,app.bsky.graph.follow` |
67| `WEBHOOK_BEARER_TOKEN` | **Optional** - Bearer token for webhook authentication | `your-secret-token` |
68
69### Example Collections
70
71```bash
72# Social media activity
73JETSTREAM_COLLECTIONS=app.bsky.feed.post,app.bsky.graph.follow,app.bsky.feed.like
74
75# Profile updates
76JETSTREAM_COLLECTIONS=app.bsky.actor.profile
77
78# Custom AT Protocol collections
79JETSTREAM_COLLECTIONS=com.example.app.*,org.myproject.data.*
80
81# Watch everything (high volume!)
82JETSTREAM_COLLECTIONS=*
83```
84
85### Local Development (.dev.vars)
86
87```bash
88# Copy template and edit
89cp .dev.vars.example .dev.vars
90
91# Example configuration
92WEBHOOK_URL=https://example.com/api/webhooks/jetstream-event
93JETSTREAM_COLLECTIONS=app.bsky.feed.post,app.bsky.graph.follow
94WEBHOOK_BEARER_TOKEN=your-development-token
95```
96
97### Production (Cloudflare Secrets)
98
99```bash
100# Set via interactive script
101pnpm run setup-config
102
103# Or manually
104wrangler secret put WEBHOOK_URL
105wrangler secret put JETSTREAM_COLLECTIONS
106wrangler secret put WEBHOOK_BEARER_TOKEN
107```
108
109## Endpoints
110
111- **`GET /`** - API information and available endpoints
112- **`GET /stats`** - JSON statistics of processed events
113- **`GET /stats/html`** - HTML dashboard with real-time statistics (auto-refreshes every 30s)
114- **`GET /status`** - WebSocket connection status
115- **`GET /health`** - Health check endpoint
116- **`POST /reset`** - Reset all statistics
117- **`POST /reconnect`** - Force WebSocket reconnection
118
119## Architecture
120
121### Unified Worker Design
122
123This worker handles both event processing and queue consumption in a single deployment:
124
1251. **Jetstream Processing** (Durable Object): WebSocket connection, event filtering, queueing
1262. **Queue Consumption** (Queue Handler): Batch processing and webhook delivery
1273. **HTTP API** (Fetch Handler): Stats, dashboard, and control endpoints
128
129```
130Jetstream Events → Durable Object → Cloudflare Queue → Queue Handler → Your Webhook
131```
132
133### Durable Object: `JetstreamProcessor`
134
135The core processing logic runs in a single Durable Object instance that:
136
1371. **Establishes WebSocket Connection**: Connects to `wss://jetstream1.us-west.bsky.network/subscribe`
1382. **Filters Events**: Only receives events from collections specified in `JETSTREAM_COLLECTIONS`
1393. **Processes Events**: For each received commit event:
140 - Skips identity and account events (only processes commits)
141 - Updates the cursor with the event's `time_us`
142 - Increments collection-specific counters
143 - Queues the event for webhook delivery
144 - Persists statistics every 100 events
1454. **Handles Reconnections**: Automatically reconnects on disconnection with cursor for gapless playback
146
147### Queue Consumer
148
149The queue handler processes events in batches and delivers them to your webhook with:
150- **Batch processing**: Up to 10 events per batch
151- **Automatic retries**: 3 retry attempts with dead letter queue
152- **Bearer token authentication**: Optional secure webhook delivery
153
154### Event Types Processed
155
156The processor handles Jetstream events with these `kind` values:
157
158- **`commit`**: Repository commits with operations (create, update, delete) - **PROCESSED**
159- **`identity`**: Identity/handle updates - **SKIPPED**
160- **`account`**: Account status changes - **SKIPPED**
161
162### Data Persistence
163
164Statistics are stored in Durable Object storage:
165
166```typescript
167interface StoredStats {
168 cursor: number; // Latest time_us for reconnection
169 eventCounts: Record<string, number>; // Events per collection
170 totalEvents: number; // Total commit events processed
171 totalReceived: number; // Total events received (including skipped)
172 lastEventTime: string; // ISO timestamp of last processing
173}
174```
175
176## Monitoring
177
178### Real-time Dashboard
179
180Visit `/stats/html` for a web interface showing:
181
182- **Commit events processed** - Only the events you care about
183- **Total events received** - All events from Jetstream (including skipped)
184- **Processing efficiency** - Percentage of useful vs total events
185- **Unique collections** - Number of different collections processed
186- **Last event timestamp** - When the most recent event was received
187- **Events breakdown by collection** - Detailed stats per collection
188- **Auto-refresh every 30 seconds**
189
190### API Monitoring
191
192```bash
193# Check connection status
194curl http://localhost:8787/status
195
196# Get current statistics
197curl http://localhost:8787/stats
198
199# Check health
200curl http://localhost:8787/health
201
202# Force reconnection if needed
203curl -X POST http://localhost:8787/reconnect
204```
205
206## Webhook Integration
207
208Each commit event is posted to your webhook endpoint as JSON with optional bearer token authentication:
209
210```http
211POST {WEBHOOK_URL}
212Content-Type: application/json
213Authorization: Bearer {WEBHOOK_BEARER_TOKEN}
214User-Agent: Jetstream-Unified/1.0
215
216{
217 "did": "did:plc:...",
218 "time_us": 1725911162329308,
219 "kind": "commit",
220 "commit": {
221 "rev": "3l3qo2vutsw2b",
222 "operation": "create",
223 "collection": "app.bsky.feed.post",
224 "rkey": "3l3qo2vuowo2b",
225 "record": {
226 "$type": "app.bsky.feed.post",
227 "createdAt": "2024-09-09T19:46:02.102Z",
228 "text": "Hello, world!",
229 // ... record data
230 },
231 "cid": "bafyreidwaivazkwu67xztlmuobx35hs2lnfh3kolmgfmucldvhd3sgzcqi"
232 }
233}
234```
235
236## Error Handling
237
238- **WebSocket Errors**: Automatic reconnection with exponential backoff
239- **Webhook Failures**: Automatic retries via Cloudflare Queues with dead letter queue
240- **Parse Errors**: Individual event failures don't crash the processor
241- **Storage Errors**: Graceful degradation with in-memory fallback
242- **Configuration Errors**: Clear error messages for missing required environment variables
243
244## Development Commands
245
246```bash
247# Local development
248pnpm run dev # Start development server
249pnpm run setup-local # Set up local environment
250
251# Configuration
252pnpm run setup-config # Interactive production setup
253pnpm run setup-queues # Create Cloudflare Queues (one-time)
254
255# Deployment
256pnpm run deploy # Deploy to Cloudflare
257pnpm run cf-typegen # Regenerate TypeScript types
258```
259
260## Project Structure
261
262```
263src/
264├── types.ts # Shared TypeScript interfaces
265└── index.ts # Main worker (Durable Object + Queue Consumer)
266
267wrangler.jsonc # Cloudflare Worker configuration
268.dev.vars.example # Environment variables template
269setup-*.sh # Setup scripts for queues and configuration
270```
271
272## Deployment
273
274The worker automatically starts processing events upon deployment. The Durable Object ensures only one instance runs globally, maintaining connection state across worker invocations.
275
276## Adapting for Your Project
277
278This worker is designed to be easily adaptable:
279
2801. **Fork the repository**
2812. **Configure your environment variables**:
282 - Set your webhook URL
283 - Choose your AT Protocol collections
284 - Add authentication if needed
2853. **Deploy to Cloudflare**
2864. **Monitor via the dashboard**
287
288No code changes required - everything is configurable via environment variables!
289
290## License
291
292MIT
293