A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
1import { decodeFirst } from "@atcute/cbor";
2import { logger } from "../logger/index.js";
3
4export interface LabelEvent {
5 ver?: number;
6 src: string;
7 uri: string;
8 cid?: string;
9 val: string;
10 neg?: boolean;
11 cts: string;
12 exp?: string;
13 sig?: Uint8Array;
14}
15
16export interface FirehoseMessage {
17 op?: number;
18 t?: string;
19 seq?: number;
20 labels?: LabelEvent[];
21 [key: string]: any;
22}
23
24export function decodeFirehoseMessage(data: Buffer): FirehoseMessage | null {
25 try {
26 const buffer = new Uint8Array(data);
27 const [header, remainder] = decodeFirst(buffer);
28 const [body] = decodeFirst(remainder);
29
30 return body as FirehoseMessage;
31 } catch (err) {
32 logger.error(
33 {
34 err: err instanceof Error ? err.message : String(err),
35 errorStack: err instanceof Error ? err.stack : undefined,
36 dataLength: data.length,
37 dataPreview: data.slice(0, 50).toString("hex")
38 },
39 "Failed to decode CBOR message"
40 );
41 return null;
42 }
43}
44
45export function extractLabelsFromMessage(message: FirehoseMessage): LabelEvent[] {
46 if (!message) {
47 return [];
48 }
49
50 if (message.labels && Array.isArray(message.labels)) {
51 return message.labels;
52 }
53
54 return [];
55}
56
57export function validateLabel(label: LabelEvent): boolean {
58 if (!label.src || !label.uri || !label.val || !label.cts) {
59 logger.warn({ label }, "Invalid label: missing required fields");
60 return false;
61 }
62
63 return true;
64}