A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
at main 422 lines 15 kB view raw view rendered
1# Product Requirements Document (PRD) 2 3This document outlines the requirements for the Skywatch Capture application. It serves as a reference for developers, designers, and stakeholders to ensure that the product meets the needs of its users. 4 5`labels.uri` is the URI against which the label is applied. It can take two forms, a reference to a post in the form of an at-uri: `at://did:plc:7i7s4avtaolnrgc3ubcoqrq3/app.bsky.feed.post/3lf5u32pxwk2f` or a reference to a user in the form of a did: `did:plc:piwuaowuiykzaare644i5fre`. 6 7`labels.val` is the label value being emitted. 8`labels.neg` is a boolean indicating whether this label is a negation label, overwriting a previous label. 9 10## Core Use Case 11 12The primary purpose of this application is to subscribe to a Bluesky labeler's firehose, capture all emitted label events, hydrate the associated data (posts and user profiles), and store this comprehensive dataset in a local database. This data is intended for future use in training machine learning classifiers for content moderation. 13 14## Functional Requirements 15 16- **Firehose Subscription:** Connect to and process a DAG-CBOR encoded firehose from a specified Bluesky labeler service. 17- **Data Hydration:** For each label received, fetch the full context of the labeled content. 18 - **Post Hydration:** If the label URI is an `at-uri` (post), fetch the full `app.bsky.feed.post` record and store the following fields: `did`, `text`, `facets`, `embeds`, `langs`, `tags`, `createdAt`, and reply status. 19 - **Profile Hydration:** If the label URI is a `did` (user), fetch the full `app.bsky.actor.profile` record and store the `displayName` and `description`. Additionally, resolve and store the user's `handle`. 20- **Image & Blob Handling:** 21 - An option (`HYDRATE_BLOBS`) must be provided to control whether to download image/video blobs. This is a safety feature for users labeling sensitive content. 22 - In all cases, both a **SHA-256 (cryptographic) hash** and a **perceptual hash (pHash)** of any referenced image blobs must be captured to ensure compatibility with various moderation toolkits. 23 - If `HYDRATE_BLOBS` is true, the application must support storing the downloaded blobs either on the local filesystem or in an AWS S3 bucket, configurable via environment variables. 24- **Data Storage:** 25 - All captured and hydrated data should be stored in a DuckDB database file. 26 - The database schema should be structured to link labels to their hydrated content. 27- **Filtering:** The user must be able to optionally provide a comma-separated list of labels to capture (`CAPTURE_LABELS`). If provided, any label not in this list will be ignored. 28 29## Technical Requirements 30 31- **Language/Runtime:** Use TypeScript with Bun. 32- **Containerization:** The application must be containerized using Docker. The DuckDB database file must be stored on a volume outside the container to ensure data persistence. A `docker-compose.yml` file should be provided to manage services. 33- **Key Libraries:** 34 - `@atcute/cbor` and `@atcute/car` for parsing the firehose. 35 - `@atproto/api` for all Bluesky API interactions. 36 - `pino` and `pino-pretty` for logging. 37 - `dotenv` for environment variable management. 38- **Portability:** The application should be designed to be portable and easily configurable for use by other moderation services or researchers. 39- **Rate Limits:** Be mindful of Bluesky API rate limits during hydration. 40 41## Configuration 42 43The application will be configured via a `.env` file with the following variables: 44 45```env 46# Bluesky Credentials 47BSKY_HANDLE=your-bluesky-handle.bsky.social 48BSKY_PASSWORD=your-app-password 49 50# Bluesky PDS and Labeler URL 51PDS=bsky.social 52WSS_URL=wss://your-labeler-service.com/xrpc/com.atproto.label.subscribeLabels 53 54# Blob & Image Handling 55HYDRATE_BLOBS=false # Set to true to download images/videos 56BLOB_STORAGE_TYPE=local # 'local' or 's3' 57BLOB_STORAGE_PATH=./data/blobs # Path for local storage 58 59# S3 Configuration (only required if BLOB_STORAGE_TYPE is 's3') 60S3_BUCKET=your-s3-bucket-name 61S3_REGION=us-east-1 62AWS_ACCESS_KEY_ID=your-aws-access-key 63AWS_SECRET_ACCESS_KEY=your-aws-secret-key 64 65# Database 66DB_PATH=./data/skywatch.duckdb 67 68# Filtering (Optional) 69# Comma-separated list of labels to capture, e.g., "spam,hate-speech" 70CAPTURE_LABELS= 71 72# Logging 73LOG_LEVEL=info 74``` 75 76## Data Schema 77 78The database will contain the following tables: 79 80#### `labels` 81Stores the raw label event data. 82- `id` (INTEGER, Primary Key, Auto-incrementing) 83- `uri` (TEXT) - The `at-uri` or `did` of the labeled content. 84- `cid` (TEXT) - The CID of the specific record version. 85- `val` (TEXT) - The label value (e.g., "spam"). 86- `neg` (BOOLEAN) - If the label is a negation. 87- `cts` (DATETIME) - Timestamp of label creation. 88- `exp` (DATETIME, nullable) - Expiration timestamp of the label. 89- `src` (TEXT) - The DID of the labeler. 90 91#### `posts` 92Stores hydrated data for labeled posts. Linked to `labels.uri`. 93- `uri` (TEXT, Primary Key) 94- `did` (TEXT) - Author of the post. 95- `text` (TEXT) 96- `facets` (JSON) 97- `embeds` (JSON) 98- `langs` (JSON) 99- `tags` (JSON) 100- `createdAt` (DATETIME) 101- `is_reply` (BOOLEAN) 102 103#### `profiles` 104Stores hydrated data for labeled user accounts. Linked to `labels.uri`. 105- `did` (TEXT, Primary Key) 106- `handle` (TEXT) 107- `displayName` (TEXT) 108- `description` (TEXT) 109 110#### `blobs` 111Stores information about image blobs found in posts. 112- `post_uri` (TEXT) - Foreign key to `posts.uri`. 113- `blob_cid` (TEXT) - CID of the blob. 114- `sha256` (TEXT) - Cryptographic hash for exact file matching. 115- `phash` (TEXT) - Perceptual hash for finding visually similar images. 116- `storage_path` (TEXT, nullable) - Local or S3 path if downloaded. 117- `mimetype` (TEXT) 118- PRIMARY KEY (`post_uri`, `blob_cid`) 119 120 121## Lexicons 122The following bluesky lexicons are necessary for this tool: 123 124### `com.atproto.label.subscribeLabels` 125Skywatch emits a DAG-CBOR encoded firehose of moderation decisions at `wss://ozone.skywatch.blue/xrpc/com.atproto.label.subscribeLabels 126A label event looks like the following: 127 128```json 129"label": { 130 "type": "object", 131 "description": "Metadata tag on an atproto resource (eg, repo or record).", 132 "required": ["src", "uri", "val", "cts"], 133 "properties": { 134 "ver": { 135 "type": "integer", 136 "description": "The AT Protocol version of the label object." 137 }, 138 "src": { 139 "type": "string", 140 "format": "did", 141 "description": "DID of the actor who created this label." 142 }, 143 "uri": { 144 "type": "string", 145 "format": "uri", 146 "description": "AT URI of the record, repository (account), or other resource that this label applies to." 147 }, 148 "cid": { 149 "type": "string", 150 "format": "cid", 151 "description": "Optionally, CID specifying the specific version of 'uri' resource this label applies to." 152 }, 153 "val": { 154 "type": "string", 155 "maxLength": 128, 156 "description": "The short string name of the value or type of this label." 157 }, 158 "neg": { 159 "type": "boolean", 160 "description": "If true, this is a negation label, overwriting a previous label." 161 }, 162 "cts": { 163 "type": "string", 164 "format": "datetime", 165 "description": "Timestamp when this label was created." 166 }, 167 "exp": { 168 "type": "string", 169 "format": "datetime", 170 "description": "Timestamp at which this label expires (no longer applies)." 171 }, 172 "sig": { 173 "type": "bytes", 174 "description": "Signature of dag-cbor encoded label." 175 } 176 } 177}, 178``` 179 180### `app.bsky.feed.post` 181Post are structured as the following: 182 183```json 184{ 185 "lexicon": 1, 186 "id": "app.bsky.feed.post", 187 "defs": { 188 "main": { 189 "type": "record", 190 "description": "Record containing a Bluesky post.", 191 "key": "tid", 192 "record": { 193 "type": "object", 194 "required": ["text", "createdAt"], 195 "properties": { 196 "text": { 197 "type": "string", 198 "maxLength": 3000, 199 "maxGraphemes": 300, 200 "description": "The primary post content. May be an empty string, if there are embeds." 201 }, 202 "entities": { 203 "type": "array", 204 "description": "DEPRECATED: replaced by app.bsky.richtext.facet.", 205 "items": { "type": "ref", "ref": "#entity" } 206 }, 207 "facets": { 208 "type": "array", 209 "description": "Annotations of text (mentions, URLs, hashtags, etc)", 210 "items": { "type": "ref", "ref": "app.bsky.richtext.facet" } 211 }, 212 "reply": { "type": "ref", "ref": "#replyRef" }, 213 "embed": { 214 "type": "union", 215 "refs": [ 216 "app.bsky.embed.images", 217 "app.bsky.embed.video", 218 "app.bsky.embed.external", 219 "app.bsky.embed.record", 220 "app.bsky.embed.recordWithMedia" 221 ] 222 }, 223 "langs": { 224 "type": "array", 225 "description": "Indicates human language of post primary text content.", 226 "maxLength": 3, 227 "items": { "type": "string", "format": "language" } 228 }, 229 "labels": { 230 "type": "union", 231 "description": "Self-label values for this post. Effectively content warnings.", 232 "refs": ["com.atproto.label.defs#selfLabels"] 233 }, 234 "tags": { 235 "type": "array", 236 "description": "Additional hashtags, in addition to any included in post text and facets.", 237 "maxLength": 8, 238 "items": { "type": "string", "maxLength": 640, "maxGraphemes": 64 } 239 }, 240 "createdAt": { 241 "type": "string", 242 "format": "datetime", 243 "description": "Client-declared timestamp when this post was originally created." 244 } 245 } 246 } 247 }, 248 "replyRef": { 249 "type": "object", 250 "required": ["root", "parent"], 251 "properties": { 252 "root": { "type": "ref", "ref": "com.atproto.repo.strongRef" }, 253 "parent": { "type": "ref", "ref": "com.atproto.repo.strongRef" } 254 } 255 }, 256 "entity": { 257 "type": "object", 258 "description": "Deprecated: use facets instead.", 259 "required": ["index", "type", "value"], 260 "properties": { 261 "index": { "type": "ref", "ref": "#textSlice" }, 262 "type": { 263 "type": "string", 264 "description": "Expected values are 'mention' and 'link'." 265 }, 266 "value": { "type": "string" } 267 } 268 }, 269 "textSlice": { 270 "type": "object", 271 "description": "Deprecated. Use app.bsky.richtext instead -- A text segment. Start is inclusive, end is exclusive. Indices are for utf16-encoded strings.", 272 "required": ["start", "end"], 273 "properties": { 274 "start": { "type": "integer", "minimum": 0 }, 275 "end": { "type": "integer", "minimum": 0 } 276 } 277 } 278 } 279} 280``` 281 282With posts we are interested in the `app.bsky.embeds.images` lexicon in particular. The blob reference can be used to retriexe the image from the PDS and then saved to local storage or hashed. 283 284```json 285{ 286 "lexicon": 1, 287 "id": "app.bsky.embed.images", 288 "description": "A set of images embedded in a Bluesky record (eg, a post).", 289 "defs": { 290 "main": { 291 "type": "object", 292 "required": ["images"], 293 "properties": { 294 "images": { 295 "type": "array", 296 "items": { "type": "ref", "ref": "#image" }, 297 "maxLength": 4 298 } 299 } 300 }, 301 "image": { 302 "type": "object", 303 "required": ["image", "alt"], 304 "properties": { 305 "image": { 306 "type": "blob", 307 "accept": ["image/*"], 308 "maxSize": 1000000 309 }, 310 "alt": { 311 "type": "string", 312 "description": "Alt text description of the image, for accessibility." 313 }, 314 "aspectRatio": { 315 "type": "ref", 316 "ref": "app.bsky.embed.defs#aspectRatio" 317 } 318 } 319 }, 320 "view": { 321 "type": "object", 322 "required": ["images"], 323 "properties": { 324 "images": { 325 "type": "array", 326 "items": { "type": "ref", "ref": "#viewImage" }, 327 "maxLength": 4 328 } 329 } 330 }, 331 "viewImage": { 332 "type": "object", 333 "required": ["thumb", "fullsize", "alt"], 334 "properties": { 335 "thumb": { 336 "type": "string", 337 "format": "uri", 338 "description": "Fully-qualified URL where a thumbnail of the image can be fetched. For example, CDN location provided by the App View." 339 }, 340 "fullsize": { 341 "type": "string", 342 "format": "uri", 343 "description": "Fully-qualified URL where a large version of the image can be fetched. May or may not be the exact original blob. For example, CDN location provided by the App View." 344 }, 345 "alt": { 346 "type": "string", 347 "description": "Alt text description of the image, for accessibility." 348 }, 349 "aspectRatio": { 350 "type": "ref", 351 "ref": "app.bsky.embed.defs#aspectRatio" 352 } 353 } 354 } 355 } 356} 357``` 358 359### `app.bsky.actor.profile` 360 361```json 362{ 363 "lexicon": 1, 364 "id": "app.bsky.actor.profile", 365 "defs": { 366 "main": { 367 "type": "record", 368 "description": "A declaration of a Bluesky account profile.", 369 "key": "literal:self", 370 "record": { 371 "type": "object", 372 "properties": { 373 "displayName": { 374 "type": "string", 375 "maxGraphemes": 64, 376 "maxLength": 640 377 }, 378 "description": { 379 "type": "string", 380 "description": "Free-form profile description text.", 381 "maxGraphemes": 256, 382 "maxLength": 2560 383 }, 384 "pronouns": { 385 "type": "string", 386 "description": "Free-form pronouns text.", 387 "maxGraphemes": 20, 388 "maxLength": 200 389 }, 390 "website": { "type": "string", "format": "uri" }, 391 "avatar": { 392 "type": "blob", 393 "description": "Small image to be displayed next to posts from account. AKA, 'profile picture'", 394 "accept": ["image/png", "image/jpeg"], 395 "maxSize": 1000000 396 }, 397 "banner": { 398 "type": "blob", 399 "description": "Larger horizontal image to display behind profile view.", 400 "accept": ["image/png", "image/jpeg"], 401 "maxSize": 1000000 402 }, 403 "labels": { 404 "type": "union", 405 "description": "Self-label values, specific to the Bluesky application, on the overall account.", 406 "refs": ["com.atproto.label.defs#selfLabels"] 407 }, 408 "joinedViaStarterPack": { 409 "type": "ref", 410 "ref": "com.atproto.repo.strongRef" 411 }, 412 "pinnedPost": { 413 "type": "ref", 414 "ref": "com.atproto.repo.strongRef" 415 }, 416 "createdAt": { "type": "string", "format": "datetime" } 417 } 418 } 419 } 420 } 421} 422```