A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
1import { config } from "./config/index.js";
2import { logger } from "./logger/index.js";
3import {
4 initializeDatabase,
5 closeDatabase,
6 getDatabase,
7} from "./database/connection.js";
8import { initializeSchema } from "./database/schema.js";
9import { LabelsRepository } from "./database/labels.repository.js";
10import { FirehoseSubscriber } from "./firehose/subscriber.js";
11import { PostHydrationService } from "./hydration/posts.service.js";
12import { ProfileHydrationService } from "./hydration/profiles.service.js";
13import { HydrationQueue } from "./hydration/queue.js";
14
15async function main() {
16 logger.info("Starting Skywatch Tail...");
17
18 try {
19 await initializeDatabase();
20 await initializeSchema();
21
22 const db = getDatabase();
23 const labelsRepo = new LabelsRepository(db);
24
25 const postHydration = new PostHydrationService(db);
26 const profileHydration = new ProfileHydrationService(db);
27 const hydrationQueue = new HydrationQueue();
28
29 await postHydration.initialize();
30 await profileHydration.initialize();
31
32 hydrationQueue.on("task", async (task) => {
33 try {
34 if (task.type === "post") {
35 await postHydration.hydratePost(task.identifier);
36 } else if (task.type === "profile") {
37 await profileHydration.hydrateProfile(task.identifier);
38 }
39 } catch (error) {
40 logger.error({ error, task }, "Hydration task failed");
41 }
42 });
43
44 const subscriber = new FirehoseSubscriber();
45
46 subscriber.on("label", async (label) => {
47 try {
48 logger.info({ uri: label.uri, val: label.val }, "Received label");
49
50 await labelsRepo.insert({
51 uri: label.uri,
52 cid: label.cid,
53 val: label.val,
54 neg: label.neg || false,
55 cts: label.cts,
56 exp: label.exp,
57 src: label.src,
58 });
59
60 logger.debug({ uri: label.uri }, "Label stored");
61
62 if (label.uri.startsWith("at://")) {
63 const uriParts = label.uri.replace("at://", "").split("/");
64
65 if (uriParts.length === 3) {
66 hydrationQueue.enqueue({
67 type: "post",
68 identifier: label.uri,
69 });
70 } else if (uriParts.length === 1) {
71 hydrationQueue.enqueue({
72 type: "profile",
73 identifier: label.uri.replace("at://", ""),
74 });
75 }
76 } else if (label.uri.startsWith("did:")) {
77 hydrationQueue.enqueue({
78 type: "profile",
79 identifier: label.uri,
80 });
81 }
82 } catch (error) {
83 logger.error({ error, label }, "Failed to process label");
84 }
85 });
86
87 subscriber.on("error", (error) => {
88 logger.error({ error }, "Firehose error");
89 });
90
91 subscriber.on("connected", () => {
92 logger.info("Firehose connected");
93 });
94
95 subscriber.on("disconnected", () => {
96 logger.warn("Firehose disconnected");
97 });
98
99 await subscriber.start();
100
101 logger.info("Application ready and subscribed to firehose");
102
103 process.on("SIGINT", async () => {
104 logger.info("Shutting down gracefully...");
105 subscriber.stop();
106 hydrationQueue.clear();
107 await closeDatabase();
108 process.exit(0);
109 });
110
111 process.on("SIGTERM", async () => {
112 logger.info("Shutting down gracefully...");
113 subscriber.stop();
114 hydrationQueue.clear();
115 await closeDatabase();
116 process.exit(0);
117 });
118 } catch (error) {
119 logger.error({ error }, "Failed to start application");
120 process.exit(1);
121 }
122}
123
124main();