A tool for tailing the firehose and matching images against known perceptual hashes, and then labeling them.
1import fs from "node:fs";
2import type { CommitCreateEvent } from "@skyware/jetstream";
3import Redis from "ioredis";
4import { Jetstream } from "@skyware/jetstream";
5import { BLOB_CHECKS } from "../rules/blobs";
6import { agent, isLoggedIn } from "./agent";
7import { ModerationClaims } from "./cache/moderation-claims";
8import { PhashCache } from "./cache/phash-cache";
9import { config } from "./config/index";
10import { logger } from "./logger/index";
11import { MetricsCollector } from "./metrics/collector";
12import { createAccountLabel, createAccountReport } from "./moderation/account";
13import { createPostLabel, createPostReport } from "./moderation/post";
14import { RedisQueue } from "./queue/redis-queue";
15import { QueueWorker } from "./queue/worker";
16import type { ImageJob } from "./types";
17
18logger.info("Starting skywatch-phash service");
19
20let cursor = 0;
21let cursorUpdateInterval: NodeJS.Timeout;
22
23function epochUsToDateTime(cursor: number): string {
24 return new Date(cursor / 1000).toISOString();
25}
26
27try {
28 logger.info({ process: "MAIN" }, "Trying to read cursor from cursor.txt");
29 cursor = Number(fs.readFileSync("cursor.txt", "utf8"));
30 logger.info(
31 { process: "MAIN", cursor, datetime: epochUsToDateTime(cursor) },
32 "Cursor found",
33 );
34} catch (error) {
35 if (error instanceof Error && "code" in error && error.code === "ENOENT") {
36 cursor = Math.floor(Date.now() * 1000);
37 logger.info(
38 { process: "MAIN", cursor, datetime: epochUsToDateTime(cursor) },
39 "Cursor not found in cursor.txt, setting cursor",
40 );
41 fs.writeFileSync("cursor.txt", cursor.toString(), "utf8");
42 } else {
43 logger.error({ process: "MAIN", error }, "Failed to read cursor");
44 process.exit(1);
45 }
46}
47
48// Create Jetstream instance at module level
49const jetstream = new Jetstream({
50 endpoint: config.jetstream.url,
51 wantedCollections: ["app.bsky.feed.post"],
52 cursor,
53});
54
55// Module-level variables for queue and worker
56// biome-ignore lint/style/useConst: These are assigned later via top-level await
57let redis: Redis;
58// biome-ignore lint/style/useConst: These are assigned later via top-level await
59let queue: RedisQueue;
60// biome-ignore lint/style/useConst: These are assigned later via top-level await
61let worker: QueueWorker;
62// biome-ignore lint/style/useConst: These are assigned later via top-level await
63let cache: PhashCache | undefined;
64// biome-ignore lint/style/useConst: These are assigned later via top-level await
65let claims: ModerationClaims;
66// biome-ignore lint/style/useConst: These are assigned later via top-level await
67let metrics: MetricsCollector;
68// biome-ignore lint/style/useConst: These are assigned later via top-level await
69let statsInterval: NodeJS.Timeout;
70// biome-ignore lint/style/useConst: These are assigned later via top-level await
71let cursorInterval: NodeJS.Timeout;
72
73// Register Jetstream event handlers at module level
74jetstream.on("open", () => {
75 if (jetstream.cursor) {
76 logger.info(
77 {
78 process: "MAIN",
79 url: config.jetstream.url,
80 cursor: jetstream.cursor,
81 datetime: epochUsToDateTime(jetstream.cursor),
82 },
83 "Connected to Jetstream with cursor",
84 );
85 } else {
86 logger.info(
87 { process: "MAIN", url: config.jetstream.url },
88 "Connected to Jetstream, waiting for cursor",
89 );
90 }
91 cursorUpdateInterval = setInterval(() => {
92 if (jetstream.cursor) {
93 logger.info(
94 {
95 process: "MAIN",
96 cursor: jetstream.cursor,
97 datetime: epochUsToDateTime(jetstream.cursor),
98 },
99 "Cursor updated",
100 );
101 fs.writeFile("cursor.txt", jetstream.cursor.toString(), (err) => {
102 if (err)
103 logger.error(
104 { process: "MAIN", error: err },
105 "Failed to write cursor",
106 );
107 });
108 }
109 }, config.jetstream.cursorUpdateInterval);
110});
111
112jetstream.on("close", () => {
113 clearInterval(cursorUpdateInterval);
114 logger.info({ process: "MAIN" }, "Jetstream connection closed");
115});
116
117jetstream.on("error", (error) => {
118 logger.error({ error: error.message }, "Jetstream error");
119});
120
121// Register onCreate handler for posts
122jetstream.onCreate(
123 "app.bsky.feed.post",
124 async (event: CommitCreateEvent<"app.bsky.feed.post">) => {
125 const atURI = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`;
126 const hasEmbed = Object.prototype.hasOwnProperty.call(
127 event.commit.record,
128 "embed",
129 );
130
131 if (!hasEmbed) {
132 return;
133 }
134
135 try {
136 const record = event.commit.record as Record<string, unknown>;
137 const embed = record.embed;
138
139 // Extract blob references
140 const blobs: Array<{ cid: string; mimeType?: string }> = [];
141
142 if (embed && typeof embed === "object") {
143 const embedObj = embed as Record<string, unknown>;
144
145 if (Array.isArray(embedObj.images)) {
146 for (const img of embedObj.images) {
147 if (typeof img === "object" && img !== null) {
148 const image = img as Record<string, unknown>;
149 if (
150 image.image &&
151 typeof image.image === "object" &&
152 image.image !== null
153 ) {
154 const imageObj = image.image as Record<string, unknown>;
155 const ref = imageObj.ref as Record<string, unknown> | undefined;
156 if (ref && typeof ref.$link === "string") {
157 blobs.push({
158 cid: ref.$link,
159 mimeType:
160 typeof imageObj.mimeType === "string"
161 ? imageObj.mimeType
162 : undefined,
163 });
164 }
165 }
166 }
167 }
168 }
169
170 if (
171 embedObj.media &&
172 typeof embedObj.media === "object" &&
173 embedObj.media !== null
174 ) {
175 const media = embedObj.media as Record<string, unknown>;
176 if (Array.isArray(media.images)) {
177 for (const img of media.images) {
178 if (typeof img === "object" && img !== null) {
179 const image = img as Record<string, unknown>;
180 if (
181 image.image &&
182 typeof image.image === "object" &&
183 image.image !== null
184 ) {
185 const imageObj = image.image as Record<string, unknown>;
186 const ref = imageObj.ref as
187 | Record<string, unknown>
188 | undefined;
189 if (ref && typeof ref.$link === "string") {
190 blobs.push({
191 cid: ref.$link,
192 mimeType:
193 typeof imageObj.mimeType === "string"
194 ? imageObj.mimeType
195 : undefined,
196 });
197 }
198 }
199 }
200 }
201 }
202 }
203 }
204
205 if (blobs.length === 0) {
206 return;
207 }
208
209 const postUri = `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`;
210
211 logger.debug(
212 { uri: postUri, blobCount: blobs.length },
213 "Post with blobs detected",
214 );
215
216 const job: ImageJob = {
217 postUri,
218 postCid: event.commit.cid,
219 postDid: event.did,
220 blobs,
221 timestamp: Date.now(),
222 attempts: 0,
223 };
224
225 await queue.enqueue(job);
226 } catch (error) {
227 logger.error({ error, event }, "Error processing jetstream event");
228 }
229 },
230);
231
232// Async setup
233logger.info("Authenticating labeler");
234await isLoggedIn;
235logger.info("Authentication complete");
236
237logger.info("Connecting to Redis");
238redis = new Redis(config.redis.url);
239queue = new RedisQueue(config.redis.url);
240
241redis.on("connect", () => {
242 logger.info("Redis connected");
243});
244
245redis.on("error", (error) => {
246 logger.error({ error }, "Redis error");
247});
248
249cache = config.cache.enabled
250 ? new PhashCache(redis, config.cache.ttl)
251 : undefined;
252claims = new ModerationClaims(redis);
253metrics = new MetricsCollector();
254
255if (cache) {
256 logger.info({ ttl: config.cache.ttl }, "Phash caching enabled");
257}
258
259worker = new QueueWorker(
260 queue,
261 BLOB_CHECKS,
262 {
263 concurrency: config.processing.concurrency,
264 retryAttempts: config.processing.retryAttempts,
265 retryDelay: config.processing.retryDelay,
266 },
267 agent,
268 cache,
269 metrics,
270);
271
272worker.onMatchFound(async (postUri, postCid, postDid, match) => {
273 const check = match.matchedCheck;
274
275 logger.warn(
276 {
277 postUri,
278 postDid,
279 label: check.label,
280 comment: check.comment,
281 phash: match.phash,
282 matchedPhash: match.matchedPhash,
283 hammingDistance: match.hammingDistance,
284 },
285 "Match found - executing moderation actions",
286 );
287
288 try {
289 if (check.toLabel) {
290 await createPostLabel(
291 postUri,
292 postCid,
293 check.label,
294 check.comment,
295 match.phash,
296 claims,
297 metrics,
298 );
299 }
300
301 if (check.reportPost) {
302 await createPostReport(
303 postUri,
304 postCid,
305 check.comment,
306 match.phash,
307 metrics,
308 );
309 }
310
311 if (check.labelAcct) {
312 await createAccountLabel(
313 postDid,
314 check.label,
315 check.comment,
316 postUri,
317 match.phash,
318 claims,
319 metrics,
320 );
321 }
322
323 if (check.reportAcct) {
324 await createAccountReport(
325 postDid,
326 check.comment,
327 postUri,
328 match.phash,
329 metrics,
330 );
331 }
332 } catch (error) {
333 logger.error(
334 { error, postUri, postDid },
335 "Failed to execute moderation actions",
336 );
337 }
338});
339
340logger.info("Starting queue worker");
341await worker.start();
342
343logger.info("Starting Jetstream subscription");
344jetstream.start();
345
346logger.info("Service started and ready");
347
348statsInterval = setInterval(async () => {
349 const workerStats = await worker.getStats();
350 const cacheStats = cache ? await cache.getStats() : null;
351 const metricsData = metrics.getWithRates();
352 logger.info(
353 {
354 worker: workerStats,
355 cache: cacheStats,
356 metrics: metricsData,
357 },
358 "Service stats",
359 );
360}, 60000);
361
362async function shutdown() {
363 try {
364 logger.info({ process: "MAIN" }, "Shutting down gracefully");
365 if (jetstream.cursor !== undefined) {
366 fs.writeFileSync("cursor.txt", jetstream.cursor.toString(), "utf8");
367 }
368 jetstream.close();
369 await worker.stop();
370 await queue.disconnect();
371 await redis.quit();
372 clearInterval(statsInterval);
373 clearInterval(cursorUpdateInterval);
374 logger.info({ process: "MAIN" }, "Service stopped");
375 process.exit(0);
376 } catch (error) {
377 logger.error({ process: "MAIN", error }, "Error shutting down gracefully");
378 process.exit(1);
379 }
380}
381
382process.on("SIGINT", () => void shutdown());
383process.on("SIGTERM", () => void shutdown());