A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
1# Implementation Plan: Skywatch Tail
2
3## Overview
4Build a TypeScript/Bun application that subscribes to a Bluesky labeler firehose, captures label events, hydrates associated content (posts/profiles), and stores everything in DuckDB for ML training.
5
6## Architecture
7
8### Core Components
9
101. **Firehose Subscriber** (`src/firehose/`)
11 - WebSocket connection to labeler service
12 - DAG-CBOR/CAR decoding using `@atcute/cbor` and `@atcute/car`
13 - Label filtering based on `CAPTURE_LABELS` config
14 - Event queue management for hydration pipeline
15
162. **Hydration Service** (`src/hydration/`)
17 - Post hydrator: Fetches `app.bsky.feed.post` records via `@atproto/api`
18 - Profile hydrator: Fetches `app.bsky.actor.profile` records and resolves handles
19 - Blob processor: Extracts blob references from embeds
20 - Rate limiting and retry logic for API calls
21
223. **Blob Handler** (`src/blobs/`)
23 - Conditional blob download based on `HYDRATE_BLOBS` flag
24 - SHA-256 cryptographic hashing (always computed)
25 - Perceptual hashing (pHash) for image similarity (always computed)
26 - Storage abstraction: local filesystem or S3
27 - Support for images and video blobs
28
294. **Database Layer** (`src/database/`)
30 - DuckDB connection management
31 - Schema initialization and migrations
32 - **Repository Pattern**: CRUD operations for each entity (labels, posts, etc.) will be encapsulated in a dedicated repository module to isolate data access logic.
33 - Transaction handling for atomic writes
34
355. **Configuration** (`src/config/`)
36 - Environment variable parsing with validation (using **Zod**).
37 - Creates a single, immutable, type-safe configuration object.
38 - Defaults and optional parameters
39
406. **Logging** (`src/logger/`)
41 - Pino logger setup with pretty printing in dev
42 - Structured logging for debugging and monitoring
43
44### Architectural Patterns
45
46- **Repository/Data Access Layer (DAL)**: Database interactions will be abstracted into repository modules. Business logic will call these repositories instead of directly querying the database, improving separation of concerns.
47- **Dependency Injection (DI)**: Services will receive their dependencies (like other services or database connections) via their constructor. This decouples components and makes unit testing significantly easier by allowing for mock dependencies.
48
49## Data Flow
50
51```
52Firehose → Label Event → Filter Check → Hydration Queue
53 ↓
54 [Post OR Profile Hydration]
55 ↓
56 Extract Blob References
57 ↓
58 [Compute Hashes / Optional Download]
59 ↓
60 Store in DuckDB
61```
62
63## Implementation Phases
64
65*Testing will be conducted throughout each phase, not just at the end. Unit and integration tests should be written as components are built.*
66
67*Commit changes after each phase passes testing to allow for easy rollback and debugging.*
68
69### Phase 1: Foundation (Core Infrastructure)
70**Goal**: Set up project structure, dependencies, and basic configuration.
71
72- [x] Initialize TypeScript/Bun project
73- [ ] Set up Docker and docker-compose.yml
74- [ ] Implement type-safe configuration (`src/config`) using Zod.
75- [ ] Initialize Pino logger (`src/logger`)
76- [ ] Set up DuckDB connection and schema (`src/database`)
77- [ ] **Test**: Write integration tests for database connection and schema validation.
78
79**Deliverables**:
80- Working Docker setup with volume mounts.
81- Type-safe configuration loading and validation.
82- Database schema initialized and tested.
83
84### Phase 2: Firehose Connection
85**Goal**: Connect to labeler firehose and parse label events.
86
87- [ ] Implement WebSocket client for `com.atproto.label.subscribeLabels`.
88- [ ] Implement DAG-CBOR decoding of label events.
89- [ ] Implement label filtering logic.
90- [ ] Store raw labels in the database using the Label Repository.
91- [ ] Implement connection recovery and error handling.
92- [ ] **Test**: Write unit tests for the decoder and filter. Write integration tests for firehose connection and data insertion.
93
94**Deliverables**:
95- Labels flowing from firehose into the database.
96- Filter working correctly.
97- Stable reconnection logic.
98
99### Phase 3: Content Hydration
100**Goal**: Fetch and store post and profile data.
101
102- [ ] Implement post and profile hydration services (`src/hydration`).
103- [ ] Use the Post and Profile Repositories to store data.
104- [ ] Link hydrated content to labels via URI/DID.
105- [ ] **Test**: Write unit tests for the data extraction and transformation logic.
106
107**Deliverables**:
108- Posts and profiles are automatically hydrated when labels are received.
109- Data is correctly stored and linked in the database.
110
111### Phase 4: Blob Processing
112**Goal**: Handle image/video blobs with hashing and optional download.
113
114- [ ] Implement blob processor to extract blob references.
115- [ ] Implement hashing utilities (SHA-256, pHash).
116- [ ] Implement conditional blob download and storage (local and S3).
117- [ ] Use the Blob Repository to store metadata.
118- [ ] **Test**: Write unit tests for hashing logic and integration tests for storage mechanisms.
119
120**Deliverables**:
121- Both hash types are captured for all blobs.
122- Optional blob download works for both local and S3 storage.
123- Blob metadata is linked to posts.
124
125### Phase 5: Rate Limiting & Optimization
126**Goal**: Ensure API compliance and performance.
127
128- [ ] Implement a rate-limiting utility using a token bucket or similar algorithm.
129- [ ] Integrate rate limiting into the hydration service.
130- [ ] Implement a robust hydration queue.
131- [ ] Add retry logic with exponential backoff for API calls.
132- [ ] **Test**: Write unit tests for the rate limiter and retry logic.
133
134**Deliverables**:
135- No API rate limit violations under normal operation.
136- Efficient resource usage and observable performance metrics.
137
138### Phase 6: Final Testing & Validation
139**Goal**: Ensure end-to-end reliability and expand test coverage.
140
141- [ ] Implement end-to-end tests using a mock firehose.
142- [ ] Conduct validation testing with a real labeler firehose.
143- [ ] Review and improve test coverage, aiming for >80% on critical paths.
144- [ ] Validate schema integrity and data relationships.
145- [ ] Test and finalize Docker deployment.
146
147**Deliverables**:
148- High test coverage for critical paths.
149- Docker deployment verified.
150- End-to-end validation complete.
151
152### Phase 7: Documentation & Portability
153**Goal**: Make it easy for others to use.
154
155- [ ] Write a comprehensive README with setup and deployment instructions.
156- [ ] Create a fully commented `.env.example` file.
157- [ ] Document the database schema.
158- [ ] Provide a troubleshooting guide.
159
160**Deliverables**:
161- A new user can clone, configure, and run the application with minimal effort.
162
163## Technical Decisions
164
165### Why DuckDB?
166- Embedded database (no separate server)
167- Excellent analytics performance
168- Easy to backup (single file)
169- Great for ML pipelines
170- JSON column support for complex fields
171
172### Why Bun?
173- Fast TypeScript runtime
174- Native support for TypeScript, JSX, and Web APIs (like WebSocket).
175- All-in-one toolchain (runtime, bundler, test runner).
176
177### Blob Storage Strategy
178- **Always compute hashes**: Even if not downloading, we need SHA-256 and pHash for data fingerprinting.
179- **Conditional download**: A critical safety feature, especially when dealing with CSAM or other sensitive content labels.
180- **Storage abstraction**: Allows for easy extension to other storage backends in the future.
181
182### Rate Limiting Approach
183- Track API calls per endpoint.
184- Implement a token bucket algorithm to manage request rates.
185- Queue hydration requests and process them according to rate limits.
186- Prioritize queue based on label timestamp (FIFO).
187
188## Risk Mitigation
189
190### Firehose Disconnection
191- Implement automatic reconnection with exponential backoff.
192- **Persist last processed cursor to a local file (`cursor.txt`) to allow for seamless resume capability.**
193- Log all disconnection and reconnection events for monitoring.
194
195### API Rate Limits
196- Implement conservative rate limiting to stay well under official limits.
197- Graceful degradation: The hydration queue will grow if limits are hit, but the application will not crash.
198- Monitor queue depth and API response headers.
199
200### Blob Safety
201- `HYDRATE_BLOBS=false` by default to prevent accidental download of sensitive material.
202- Provide clear documentation about the risks of enabling blob hydration.
203- Hashes are computed from metadata or partial reads where possible, without downloading the full blob unless required.
204
205### Data Integrity
206- Use atomic database transactions for related inserts (e.g., a label and its hydrated post).
207- Enforce data integrity with foreign key constraints in the database schema.
208- Perform validation before writing data to the database.
209
210## Project Structure
211
212```
213skywatch-capture/
214├── src/
215│ ├── index.ts # Main entry point
216│ ├── config/
217│ │ └── index.ts # Configuration object (with Zod validation)
218│ ├── logger/
219│ │ └── index.ts # Pino logger setup
220│ ├── database/
221│ │ ├── connection.ts # DuckDB connection
222│ │ ├── schema.ts # Table definitions
223│ │ ├── labels.repository.ts # Label repository
224│ │ ├── posts.repository.ts # Post repository
225│ │ ├── profiles.repository.ts# Profile repository
226│ │ └── blobs.repository.ts # Blob repository
227│ ├── firehose/
228│ │ ├── subscriber.ts # WebSocket client
229│ │ ├── decoder.ts # CBOR decoding
230│ │ └── filter.ts # Label filtering
231│ ├── hydration/
232│ │ ├── posts.service.ts # Post hydration service
233│ │ ├── profiles.service.ts # Profile hydration service
234│ │ └── queue.ts # Hydration queue
235│ ├── blobs/
236│ │ ├── processor.ts # Blob extraction
237│ │ ├── hasher.ts # SHA-256 & pHash
238│ │ ├── downloader.ts # Blob download
239│ │ └── storage/
240│ │ ├── local.ts # Local filesystem storage
241│ │ └── s3.ts # S3 storage
242│ └── utils/
243│ ├── rate-limit.ts # Rate limiting
244│ └── retry.ts # Retry logic
245├── tests/
246│ ├── unit/
247│ └── integration/
248├── data/ # Volume mount point
249│ ├── skywatch.duckdb # Database file
250│ ├── cursor.txt # Last processed firehose cursor
251│ └── blobs/ # Local blob storage
252├── .env.example
253├── docker-compose.yml
254├── Dockerfile
255├── package.json
256├── tsconfig.json
257└── README.md
258```
259
260## Dependencies
261
262### Core
263- `bun` - Runtime & Test Runner
264- `typescript` - Language
265- `@atproto/api` - Bluesky API client
266- `@atcute/cbor` - CBOR decoding
267- `@atcute/car` - CAR file handling
268- `duckdb` - Database
269- `pino` & `pino-pretty` - Logging
270- `dotenv` - Environment variables
271- `zod` - Type-safe validation
272
273### Blob Processing
274- `crypto` (built-in) - SHA-256 hashing
275- `sharp` or `jimp` - Image processing for pHash
276- `@aws-sdk/client-s3` - S3 storage (optional)
277
278### Testing
279- `@types/*` - TypeScript definitions
280
281## Success Criteria
282
2831. ✅ Successfully connects to labeler firehose
2842. ✅ Correctly parses and stores label events
2853. ✅ Hydrates posts and profiles automatically
2864. ✅ Computes both SHA-256 and pHash for all blobs
2875. ✅ Conditionally downloads blobs based on config
2886. ✅ Stores all data in DuckDB with proper relationships
2897. ✅ Respects API rate limits
2908. ✅ Handles disconnections gracefully
2919. ✅ Runs in Docker with persistent storage
29210. ✅ Configurable via environment variables
29311. ✅ Documented and portable
294
295## Timeline Estimate
296
297- Phase 1: 1-2 days
298- Phase 2: 2-3 days
299- Phase 3: 2-3 days
300- Phase 4: 3-4 days
301- Phase 5: 2-3 days
302- Phase 6: 2-3 days
303- Phase 7: 1-2 days
304
305**Total**: ~13-20 days for complete implementation
306
307## Next Steps
308
3091. Review and approve this plan
3102. Set up development environment
3113. Begin Phase 1 implementation
3124. Iterate based on testing and feedback