serverless #atproto jetstream to webhook connector, powered by cloudflare durable objects
at main 293 lines 9.6 kB view raw view rendered
1# Serverless #atproto Jetstream to Webhook Connector 2 3A Cloudflare Worker that connects to the atproto Jetstream to process events and forward them to a webhook endpoint. 4 5Why might this be useful? This is an experimental setup for small atproto apps running on serverless systems (eg Next.js + Vercel) that want to subscribe to realtime events from the firehose/jetstream, but prefer to consume the events as webhooks instead of running a separate service that consumes the jetstream or firehose directly. 6 7Flow: 8CF Worker + Durable Object => Listens to Jetstream for specified collection events => Adds new events to CF Queue => CF Queue worker processes events and forwards to your webhook. 9 10 11## Features 12 13- **Real-time Event Processing**: Connects to Bluesky Jetstream WebSocket to receive live AT Protocol events 14- **Configurable Collections**: Subscribe to a set of collections via environment variables 15- **Queue-based Architecture**: Uses Cloudflare Queues for reliable event processing and webhook delivery. 16- **Webhook Integration**: Forwards events to your webhook endpoint with optional bearer token authentication 17- **Cursor Tracking**: Maintains cursor position for gapless playback during reconnections 18- **Stats Collection**: Tracks event counts per collection and total processing stats 19- **Web Dashboard**: HTML interface to view processing statistics 20- **Auto-Reconnection**: Handles WebSocket disconnections with exponential backoff 21- **Persistent Storage**: Uses Durable Object storage to maintain state across deployments 22- **Fully Configurable**: No hardcoded URLs or collections - easy to adapt for any project 23 24## Quick Start 25 26### Local Development 27 28```bash 29# 1. Clone and install dependencies 30pnpm install 31 32# 2. Set up local environment 33pnpm run setup-local 34# This creates .dev.vars from template 35 36# 3. Edit .dev.vars with your configuration 37nano .dev.vars 38 39# 4. Create Cloudflare Queues (one-time setup) 40pnpm run setup-queues 41 42# 5. Start development server 43pnpm run dev 44 45# 6. View dashboard 46open http://localhost:8787/stats/html 47``` 48 49### Production Deployment 50 51```bash 52# 1. Configure environment variables and secrets 53pnpm run setup-config 54 55# 2. Deploy to Cloudflare 56pnpm run deploy 57``` 58 59## Configuration 60 61This worker is fully configurable via environment variables: 62 63| Variable | Description | Example | 64|----------|-------------|---------| 65| `WEBHOOK_URL` | **Required** - Your webhook endpoint | `https://example.com/api/webhooks/jetstream` | 66| `JETSTREAM_COLLECTIONS` | **Required** - Collections to watch (comma-separated) | `app.bsky.feed.post,app.bsky.graph.follow` | 67| `WEBHOOK_BEARER_TOKEN` | **Optional** - Bearer token for webhook authentication | `your-secret-token` | 68 69### Example Collections 70 71```bash 72# Social media activity 73JETSTREAM_COLLECTIONS=app.bsky.feed.post,app.bsky.graph.follow,app.bsky.feed.like 74 75# Profile updates 76JETSTREAM_COLLECTIONS=app.bsky.actor.profile 77 78# Custom AT Protocol collections 79JETSTREAM_COLLECTIONS=com.example.app.*,org.myproject.data.* 80 81# Watch everything (high volume!) 82JETSTREAM_COLLECTIONS=* 83``` 84 85### Local Development (.dev.vars) 86 87```bash 88# Copy template and edit 89cp .dev.vars.example .dev.vars 90 91# Example configuration 92WEBHOOK_URL=https://example.com/api/webhooks/jetstream-event 93JETSTREAM_COLLECTIONS=app.bsky.feed.post,app.bsky.graph.follow 94WEBHOOK_BEARER_TOKEN=your-development-token 95``` 96 97### Production (Cloudflare Secrets) 98 99```bash 100# Set via interactive script 101pnpm run setup-config 102 103# Or manually 104wrangler secret put WEBHOOK_URL 105wrangler secret put JETSTREAM_COLLECTIONS 106wrangler secret put WEBHOOK_BEARER_TOKEN 107``` 108 109## Endpoints 110 111- **`GET /`** - API information and available endpoints 112- **`GET /stats`** - JSON statistics of processed events 113- **`GET /stats/html`** - HTML dashboard with real-time statistics (auto-refreshes every 30s) 114- **`GET /status`** - WebSocket connection status 115- **`GET /health`** - Health check endpoint 116- **`POST /reset`** - Reset all statistics 117- **`POST /reconnect`** - Force WebSocket reconnection 118 119## Architecture 120 121### Unified Worker Design 122 123This worker handles both event processing and queue consumption in a single deployment: 124 1251. **Jetstream Processing** (Durable Object): WebSocket connection, event filtering, queueing 1262. **Queue Consumption** (Queue Handler): Batch processing and webhook delivery 1273. **HTTP API** (Fetch Handler): Stats, dashboard, and control endpoints 128 129``` 130Jetstream Events → Durable Object → Cloudflare Queue → Queue Handler → Your Webhook 131``` 132 133### Durable Object: `JetstreamProcessor` 134 135The core processing logic runs in a single Durable Object instance that: 136 1371. **Establishes WebSocket Connection**: Connects to `wss://jetstream1.us-west.bsky.network/subscribe` 1382. **Filters Events**: Only receives events from collections specified in `JETSTREAM_COLLECTIONS` 1393. **Processes Events**: For each received commit event: 140 - Skips identity and account events (only processes commits) 141 - Updates the cursor with the event's `time_us` 142 - Increments collection-specific counters 143 - Queues the event for webhook delivery 144 - Persists statistics every 100 events 1454. **Handles Reconnections**: Automatically reconnects on disconnection with cursor for gapless playback 146 147### Queue Consumer 148 149The queue handler processes events in batches and delivers them to your webhook with: 150- **Batch processing**: Up to 10 events per batch 151- **Automatic retries**: 3 retry attempts with dead letter queue 152- **Bearer token authentication**: Optional secure webhook delivery 153 154### Event Types Processed 155 156The processor handles Jetstream events with these `kind` values: 157 158- **`commit`**: Repository commits with operations (create, update, delete) - **PROCESSED** 159- **`identity`**: Identity/handle updates - **SKIPPED** 160- **`account`**: Account status changes - **SKIPPED** 161 162### Data Persistence 163 164Statistics are stored in Durable Object storage: 165 166```typescript 167interface StoredStats { 168 cursor: number; // Latest time_us for reconnection 169 eventCounts: Record<string, number>; // Events per collection 170 totalEvents: number; // Total commit events processed 171 totalReceived: number; // Total events received (including skipped) 172 lastEventTime: string; // ISO timestamp of last processing 173} 174``` 175 176## Monitoring 177 178### Real-time Dashboard 179 180Visit `/stats/html` for a web interface showing: 181 182- **Commit events processed** - Only the events you care about 183- **Total events received** - All events from Jetstream (including skipped) 184- **Processing efficiency** - Percentage of useful vs total events 185- **Unique collections** - Number of different collections processed 186- **Last event timestamp** - When the most recent event was received 187- **Events breakdown by collection** - Detailed stats per collection 188- **Auto-refresh every 30 seconds** 189 190### API Monitoring 191 192```bash 193# Check connection status 194curl http://localhost:8787/status 195 196# Get current statistics 197curl http://localhost:8787/stats 198 199# Check health 200curl http://localhost:8787/health 201 202# Force reconnection if needed 203curl -X POST http://localhost:8787/reconnect 204``` 205 206## Webhook Integration 207 208Each commit event is posted to your webhook endpoint as JSON with optional bearer token authentication: 209 210```http 211POST {WEBHOOK_URL} 212Content-Type: application/json 213Authorization: Bearer {WEBHOOK_BEARER_TOKEN} 214User-Agent: Jetstream-Unified/1.0 215 216{ 217 "did": "did:plc:...", 218 "time_us": 1725911162329308, 219 "kind": "commit", 220 "commit": { 221 "rev": "3l3qo2vutsw2b", 222 "operation": "create", 223 "collection": "app.bsky.feed.post", 224 "rkey": "3l3qo2vuowo2b", 225 "record": { 226 "$type": "app.bsky.feed.post", 227 "createdAt": "2024-09-09T19:46:02.102Z", 228 "text": "Hello, world!", 229 // ... record data 230 }, 231 "cid": "bafyreidwaivazkwu67xztlmuobx35hs2lnfh3kolmgfmucldvhd3sgzcqi" 232 } 233} 234``` 235 236## Error Handling 237 238- **WebSocket Errors**: Automatic reconnection with exponential backoff 239- **Webhook Failures**: Automatic retries via Cloudflare Queues with dead letter queue 240- **Parse Errors**: Individual event failures don't crash the processor 241- **Storage Errors**: Graceful degradation with in-memory fallback 242- **Configuration Errors**: Clear error messages for missing required environment variables 243 244## Development Commands 245 246```bash 247# Local development 248pnpm run dev # Start development server 249pnpm run setup-local # Set up local environment 250 251# Configuration 252pnpm run setup-config # Interactive production setup 253pnpm run setup-queues # Create Cloudflare Queues (one-time) 254 255# Deployment 256pnpm run deploy # Deploy to Cloudflare 257pnpm run cf-typegen # Regenerate TypeScript types 258``` 259 260## Project Structure 261 262``` 263src/ 264├── types.ts # Shared TypeScript interfaces 265└── index.ts # Main worker (Durable Object + Queue Consumer) 266 267wrangler.jsonc # Cloudflare Worker configuration 268.dev.vars.example # Environment variables template 269setup-*.sh # Setup scripts for queues and configuration 270``` 271 272## Deployment 273 274The worker automatically starts processing events upon deployment. The Durable Object ensures only one instance runs globally, maintaining connection state across worker invocations. 275 276## Adapting for Your Project 277 278This worker is designed to be easily adaptable: 279 2801. **Fork the repository** 2812. **Configure your environment variables**: 282 - Set your webhook URL 283 - Choose your AT Protocol collections 284 - Add authentication if needed 2853. **Deploy to Cloudflare** 2864. **Monitor via the dashboard** 287 288No code changes required - everything is configurable via environment variables! 289 290## License 291 292MIT 293