A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
at main 124 lines 3.6 kB view raw
1import { config } from "./config/index.js"; 2import { logger } from "./logger/index.js"; 3import { 4 initializeDatabase, 5 closeDatabase, 6 getDatabase, 7} from "./database/connection.js"; 8import { initializeSchema } from "./database/schema.js"; 9import { LabelsRepository } from "./database/labels.repository.js"; 10import { FirehoseSubscriber } from "./firehose/subscriber.js"; 11import { PostHydrationService } from "./hydration/posts.service.js"; 12import { ProfileHydrationService } from "./hydration/profiles.service.js"; 13import { HydrationQueue } from "./hydration/queue.js"; 14 15async function main() { 16 logger.info("Starting Skywatch Tail..."); 17 18 try { 19 await initializeDatabase(); 20 await initializeSchema(); 21 22 const db = getDatabase(); 23 const labelsRepo = new LabelsRepository(db); 24 25 const postHydration = new PostHydrationService(db); 26 const profileHydration = new ProfileHydrationService(db); 27 const hydrationQueue = new HydrationQueue(); 28 29 await postHydration.initialize(); 30 await profileHydration.initialize(); 31 32 hydrationQueue.on("task", async (task) => { 33 try { 34 if (task.type === "post") { 35 await postHydration.hydratePost(task.identifier); 36 } else if (task.type === "profile") { 37 await profileHydration.hydrateProfile(task.identifier); 38 } 39 } catch (error) { 40 logger.error({ error, task }, "Hydration task failed"); 41 } 42 }); 43 44 const subscriber = new FirehoseSubscriber(); 45 46 subscriber.on("label", async (label) => { 47 try { 48 logger.info({ uri: label.uri, val: label.val }, "Received label"); 49 50 await labelsRepo.insert({ 51 uri: label.uri, 52 cid: label.cid, 53 val: label.val, 54 neg: label.neg || false, 55 cts: label.cts, 56 exp: label.exp, 57 src: label.src, 58 }); 59 60 logger.debug({ uri: label.uri }, "Label stored"); 61 62 if (label.uri.startsWith("at://")) { 63 const uriParts = label.uri.replace("at://", "").split("/"); 64 65 if (uriParts.length === 3) { 66 hydrationQueue.enqueue({ 67 type: "post", 68 identifier: label.uri, 69 }); 70 } else if (uriParts.length === 1) { 71 hydrationQueue.enqueue({ 72 type: "profile", 73 identifier: label.uri.replace("at://", ""), 74 }); 75 } 76 } else if (label.uri.startsWith("did:")) { 77 hydrationQueue.enqueue({ 78 type: "profile", 79 identifier: label.uri, 80 }); 81 } 82 } catch (error) { 83 logger.error({ error, label }, "Failed to process label"); 84 } 85 }); 86 87 subscriber.on("error", (error) => { 88 logger.error({ error }, "Firehose error"); 89 }); 90 91 subscriber.on("connected", () => { 92 logger.info("Firehose connected"); 93 }); 94 95 subscriber.on("disconnected", () => { 96 logger.warn("Firehose disconnected"); 97 }); 98 99 await subscriber.start(); 100 101 logger.info("Application ready and subscribed to firehose"); 102 103 process.on("SIGINT", async () => { 104 logger.info("Shutting down gracefully..."); 105 subscriber.stop(); 106 hydrationQueue.clear(); 107 await closeDatabase(); 108 process.exit(0); 109 }); 110 111 process.on("SIGTERM", async () => { 112 logger.info("Shutting down gracefully..."); 113 subscriber.stop(); 114 hydrationQueue.clear(); 115 await closeDatabase(); 116 process.exit(0); 117 }); 118 } catch (error) { 119 logger.error({ error }, "Failed to start application"); 120 process.exit(1); 121 } 122} 123 124main();