A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
at main 312 lines 12 kB view raw view rendered
1# Implementation Plan: Skywatch Tail 2 3## Overview 4Build a TypeScript/Bun application that subscribes to a Bluesky labeler firehose, captures label events, hydrates associated content (posts/profiles), and stores everything in DuckDB for ML training. 5 6## Architecture 7 8### Core Components 9 101. **Firehose Subscriber** (`src/firehose/`) 11 - WebSocket connection to labeler service 12 - DAG-CBOR/CAR decoding using `@atcute/cbor` and `@atcute/car` 13 - Label filtering based on `CAPTURE_LABELS` config 14 - Event queue management for hydration pipeline 15 162. **Hydration Service** (`src/hydration/`) 17 - Post hydrator: Fetches `app.bsky.feed.post` records via `@atproto/api` 18 - Profile hydrator: Fetches `app.bsky.actor.profile` records and resolves handles 19 - Blob processor: Extracts blob references from embeds 20 - Rate limiting and retry logic for API calls 21 223. **Blob Handler** (`src/blobs/`) 23 - Conditional blob download based on `HYDRATE_BLOBS` flag 24 - SHA-256 cryptographic hashing (always computed) 25 - Perceptual hashing (pHash) for image similarity (always computed) 26 - Storage abstraction: local filesystem or S3 27 - Support for images and video blobs 28 294. **Database Layer** (`src/database/`) 30 - DuckDB connection management 31 - Schema initialization and migrations 32 - **Repository Pattern**: CRUD operations for each entity (labels, posts, etc.) will be encapsulated in a dedicated repository module to isolate data access logic. 33 - Transaction handling for atomic writes 34 355. **Configuration** (`src/config/`) 36 - Environment variable parsing with validation (using **Zod**). 37 - Creates a single, immutable, type-safe configuration object. 38 - Defaults and optional parameters 39 406. **Logging** (`src/logger/`) 41 - Pino logger setup with pretty printing in dev 42 - Structured logging for debugging and monitoring 43 44### Architectural Patterns 45 46- **Repository/Data Access Layer (DAL)**: Database interactions will be abstracted into repository modules. Business logic will call these repositories instead of directly querying the database, improving separation of concerns. 47- **Dependency Injection (DI)**: Services will receive their dependencies (like other services or database connections) via their constructor. This decouples components and makes unit testing significantly easier by allowing for mock dependencies. 48 49## Data Flow 50 51``` 52Firehose → Label Event → Filter Check → Hydration Queue 53 54 [Post OR Profile Hydration] 55 56 Extract Blob References 57 58 [Compute Hashes / Optional Download] 59 60 Store in DuckDB 61``` 62 63## Implementation Phases 64 65*Testing will be conducted throughout each phase, not just at the end. Unit and integration tests should be written as components are built.* 66 67*Commit changes after each phase passes testing to allow for easy rollback and debugging.* 68 69### Phase 1: Foundation (Core Infrastructure) 70**Goal**: Set up project structure, dependencies, and basic configuration. 71 72- [x] Initialize TypeScript/Bun project 73- [ ] Set up Docker and docker-compose.yml 74- [ ] Implement type-safe configuration (`src/config`) using Zod. 75- [ ] Initialize Pino logger (`src/logger`) 76- [ ] Set up DuckDB connection and schema (`src/database`) 77- [ ] **Test**: Write integration tests for database connection and schema validation. 78 79**Deliverables**: 80- Working Docker setup with volume mounts. 81- Type-safe configuration loading and validation. 82- Database schema initialized and tested. 83 84### Phase 2: Firehose Connection 85**Goal**: Connect to labeler firehose and parse label events. 86 87- [ ] Implement WebSocket client for `com.atproto.label.subscribeLabels`. 88- [ ] Implement DAG-CBOR decoding of label events. 89- [ ] Implement label filtering logic. 90- [ ] Store raw labels in the database using the Label Repository. 91- [ ] Implement connection recovery and error handling. 92- [ ] **Test**: Write unit tests for the decoder and filter. Write integration tests for firehose connection and data insertion. 93 94**Deliverables**: 95- Labels flowing from firehose into the database. 96- Filter working correctly. 97- Stable reconnection logic. 98 99### Phase 3: Content Hydration 100**Goal**: Fetch and store post and profile data. 101 102- [ ] Implement post and profile hydration services (`src/hydration`). 103- [ ] Use the Post and Profile Repositories to store data. 104- [ ] Link hydrated content to labels via URI/DID. 105- [ ] **Test**: Write unit tests for the data extraction and transformation logic. 106 107**Deliverables**: 108- Posts and profiles are automatically hydrated when labels are received. 109- Data is correctly stored and linked in the database. 110 111### Phase 4: Blob Processing 112**Goal**: Handle image/video blobs with hashing and optional download. 113 114- [ ] Implement blob processor to extract blob references. 115- [ ] Implement hashing utilities (SHA-256, pHash). 116- [ ] Implement conditional blob download and storage (local and S3). 117- [ ] Use the Blob Repository to store metadata. 118- [ ] **Test**: Write unit tests for hashing logic and integration tests for storage mechanisms. 119 120**Deliverables**: 121- Both hash types are captured for all blobs. 122- Optional blob download works for both local and S3 storage. 123- Blob metadata is linked to posts. 124 125### Phase 5: Rate Limiting & Optimization 126**Goal**: Ensure API compliance and performance. 127 128- [ ] Implement a rate-limiting utility using a token bucket or similar algorithm. 129- [ ] Integrate rate limiting into the hydration service. 130- [ ] Implement a robust hydration queue. 131- [ ] Add retry logic with exponential backoff for API calls. 132- [ ] **Test**: Write unit tests for the rate limiter and retry logic. 133 134**Deliverables**: 135- No API rate limit violations under normal operation. 136- Efficient resource usage and observable performance metrics. 137 138### Phase 6: Final Testing & Validation 139**Goal**: Ensure end-to-end reliability and expand test coverage. 140 141- [ ] Implement end-to-end tests using a mock firehose. 142- [ ] Conduct validation testing with a real labeler firehose. 143- [ ] Review and improve test coverage, aiming for >80% on critical paths. 144- [ ] Validate schema integrity and data relationships. 145- [ ] Test and finalize Docker deployment. 146 147**Deliverables**: 148- High test coverage for critical paths. 149- Docker deployment verified. 150- End-to-end validation complete. 151 152### Phase 7: Documentation & Portability 153**Goal**: Make it easy for others to use. 154 155- [ ] Write a comprehensive README with setup and deployment instructions. 156- [ ] Create a fully commented `.env.example` file. 157- [ ] Document the database schema. 158- [ ] Provide a troubleshooting guide. 159 160**Deliverables**: 161- A new user can clone, configure, and run the application with minimal effort. 162 163## Technical Decisions 164 165### Why DuckDB? 166- Embedded database (no separate server) 167- Excellent analytics performance 168- Easy to backup (single file) 169- Great for ML pipelines 170- JSON column support for complex fields 171 172### Why Bun? 173- Fast TypeScript runtime 174- Native support for TypeScript, JSX, and Web APIs (like WebSocket). 175- All-in-one toolchain (runtime, bundler, test runner). 176 177### Blob Storage Strategy 178- **Always compute hashes**: Even if not downloading, we need SHA-256 and pHash for data fingerprinting. 179- **Conditional download**: A critical safety feature, especially when dealing with CSAM or other sensitive content labels. 180- **Storage abstraction**: Allows for easy extension to other storage backends in the future. 181 182### Rate Limiting Approach 183- Track API calls per endpoint. 184- Implement a token bucket algorithm to manage request rates. 185- Queue hydration requests and process them according to rate limits. 186- Prioritize queue based on label timestamp (FIFO). 187 188## Risk Mitigation 189 190### Firehose Disconnection 191- Implement automatic reconnection with exponential backoff. 192- **Persist last processed cursor to a local file (`cursor.txt`) to allow for seamless resume capability.** 193- Log all disconnection and reconnection events for monitoring. 194 195### API Rate Limits 196- Implement conservative rate limiting to stay well under official limits. 197- Graceful degradation: The hydration queue will grow if limits are hit, but the application will not crash. 198- Monitor queue depth and API response headers. 199 200### Blob Safety 201- `HYDRATE_BLOBS=false` by default to prevent accidental download of sensitive material. 202- Provide clear documentation about the risks of enabling blob hydration. 203- Hashes are computed from metadata or partial reads where possible, without downloading the full blob unless required. 204 205### Data Integrity 206- Use atomic database transactions for related inserts (e.g., a label and its hydrated post). 207- Enforce data integrity with foreign key constraints in the database schema. 208- Perform validation before writing data to the database. 209 210## Project Structure 211 212``` 213skywatch-capture/ 214├── src/ 215│ ├── index.ts # Main entry point 216│ ├── config/ 217│ │ └── index.ts # Configuration object (with Zod validation) 218│ ├── logger/ 219│ │ └── index.ts # Pino logger setup 220│ ├── database/ 221│ │ ├── connection.ts # DuckDB connection 222│ │ ├── schema.ts # Table definitions 223│ │ ├── labels.repository.ts # Label repository 224│ │ ├── posts.repository.ts # Post repository 225│ │ ├── profiles.repository.ts# Profile repository 226│ │ └── blobs.repository.ts # Blob repository 227│ ├── firehose/ 228│ │ ├── subscriber.ts # WebSocket client 229│ │ ├── decoder.ts # CBOR decoding 230│ │ └── filter.ts # Label filtering 231│ ├── hydration/ 232│ │ ├── posts.service.ts # Post hydration service 233│ │ ├── profiles.service.ts # Profile hydration service 234│ │ └── queue.ts # Hydration queue 235│ ├── blobs/ 236│ │ ├── processor.ts # Blob extraction 237│ │ ├── hasher.ts # SHA-256 & pHash 238│ │ ├── downloader.ts # Blob download 239│ │ └── storage/ 240│ │ ├── local.ts # Local filesystem storage 241│ │ └── s3.ts # S3 storage 242│ └── utils/ 243│ ├── rate-limit.ts # Rate limiting 244│ └── retry.ts # Retry logic 245├── tests/ 246│ ├── unit/ 247│ └── integration/ 248├── data/ # Volume mount point 249│ ├── skywatch.duckdb # Database file 250│ ├── cursor.txt # Last processed firehose cursor 251│ └── blobs/ # Local blob storage 252├── .env.example 253├── docker-compose.yml 254├── Dockerfile 255├── package.json 256├── tsconfig.json 257└── README.md 258``` 259 260## Dependencies 261 262### Core 263- `bun` - Runtime & Test Runner 264- `typescript` - Language 265- `@atproto/api` - Bluesky API client 266- `@atcute/cbor` - CBOR decoding 267- `@atcute/car` - CAR file handling 268- `duckdb` - Database 269- `pino` & `pino-pretty` - Logging 270- `dotenv` - Environment variables 271- `zod` - Type-safe validation 272 273### Blob Processing 274- `crypto` (built-in) - SHA-256 hashing 275- `sharp` or `jimp` - Image processing for pHash 276- `@aws-sdk/client-s3` - S3 storage (optional) 277 278### Testing 279- `@types/*` - TypeScript definitions 280 281## Success Criteria 282 2831. ✅ Successfully connects to labeler firehose 2842. ✅ Correctly parses and stores label events 2853. ✅ Hydrates posts and profiles automatically 2864. ✅ Computes both SHA-256 and pHash for all blobs 2875. ✅ Conditionally downloads blobs based on config 2886. ✅ Stores all data in DuckDB with proper relationships 2897. ✅ Respects API rate limits 2908. ✅ Handles disconnections gracefully 2919. ✅ Runs in Docker with persistent storage 29210. ✅ Configurable via environment variables 29311. ✅ Documented and portable 294 295## Timeline Estimate 296 297- Phase 1: 1-2 days 298- Phase 2: 2-3 days 299- Phase 3: 2-3 days 300- Phase 4: 3-4 days 301- Phase 5: 2-3 days 302- Phase 6: 2-3 days 303- Phase 7: 1-2 days 304 305**Total**: ~13-20 days for complete implementation 306 307## Next Steps 308 3091. Review and approve this plan 3102. Set up development environment 3113. Begin Phase 1 implementation 3124. Iterate based on testing and feedback