A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
at main 64 lines 1.5 kB view raw
1import { decodeFirst } from "@atcute/cbor"; 2import { logger } from "../logger/index.js"; 3 4export interface LabelEvent { 5 ver?: number; 6 src: string; 7 uri: string; 8 cid?: string; 9 val: string; 10 neg?: boolean; 11 cts: string; 12 exp?: string; 13 sig?: Uint8Array; 14} 15 16export interface FirehoseMessage { 17 op?: number; 18 t?: string; 19 seq?: number; 20 labels?: LabelEvent[]; 21 [key: string]: any; 22} 23 24export function decodeFirehoseMessage(data: Buffer): FirehoseMessage | null { 25 try { 26 const buffer = new Uint8Array(data); 27 const [header, remainder] = decodeFirst(buffer); 28 const [body] = decodeFirst(remainder); 29 30 return body as FirehoseMessage; 31 } catch (err) { 32 logger.error( 33 { 34 err: err instanceof Error ? err.message : String(err), 35 errorStack: err instanceof Error ? err.stack : undefined, 36 dataLength: data.length, 37 dataPreview: data.slice(0, 50).toString("hex") 38 }, 39 "Failed to decode CBOR message" 40 ); 41 return null; 42 } 43} 44 45export function extractLabelsFromMessage(message: FirehoseMessage): LabelEvent[] { 46 if (!message) { 47 return []; 48 } 49 50 if (message.labels && Array.isArray(message.labels)) { 51 return message.labels; 52 } 53 54 return []; 55} 56 57export function validateLabel(label: LabelEvent): boolean { 58 if (!label.src || !label.uri || !label.val || !label.cts) { 59 logger.warn({ label }, "Invalid label: missing required fields"); 60 return false; 61 } 62 63 return true; 64}