A tool for tailing the firehose and matching images against known perceptual hashes, and then labeling them.
at main 383 lines 11 kB view raw
1import fs from "node:fs"; 2import type { CommitCreateEvent } from "@skyware/jetstream"; 3import Redis from "ioredis"; 4import { Jetstream } from "@skyware/jetstream"; 5import { BLOB_CHECKS } from "../rules/blobs"; 6import { agent, isLoggedIn } from "./agent"; 7import { ModerationClaims } from "./cache/moderation-claims"; 8import { PhashCache } from "./cache/phash-cache"; 9import { config } from "./config/index"; 10import { logger } from "./logger/index"; 11import { MetricsCollector } from "./metrics/collector"; 12import { createAccountLabel, createAccountReport } from "./moderation/account"; 13import { createPostLabel, createPostReport } from "./moderation/post"; 14import { RedisQueue } from "./queue/redis-queue"; 15import { QueueWorker } from "./queue/worker"; 16import type { ImageJob } from "./types"; 17 18logger.info("Starting skywatch-phash service"); 19 20let cursor = 0; 21let cursorUpdateInterval: NodeJS.Timeout; 22 23function epochUsToDateTime(cursor: number): string { 24 return new Date(cursor / 1000).toISOString(); 25} 26 27try { 28 logger.info({ process: "MAIN" }, "Trying to read cursor from cursor.txt"); 29 cursor = Number(fs.readFileSync("cursor.txt", "utf8")); 30 logger.info( 31 { process: "MAIN", cursor, datetime: epochUsToDateTime(cursor) }, 32 "Cursor found", 33 ); 34} catch (error) { 35 if (error instanceof Error && "code" in error && error.code === "ENOENT") { 36 cursor = Math.floor(Date.now() * 1000); 37 logger.info( 38 { process: "MAIN", cursor, datetime: epochUsToDateTime(cursor) }, 39 "Cursor not found in cursor.txt, setting cursor", 40 ); 41 fs.writeFileSync("cursor.txt", cursor.toString(), "utf8"); 42 } else { 43 logger.error({ process: "MAIN", error }, "Failed to read cursor"); 44 process.exit(1); 45 } 46} 47 48// Create Jetstream instance at module level 49const jetstream = new Jetstream({ 50 endpoint: config.jetstream.url, 51 wantedCollections: ["app.bsky.feed.post"], 52 cursor, 53}); 54 55// Module-level variables for queue and worker 56// biome-ignore lint/style/useConst: These are assigned later via top-level await 57let redis: Redis; 58// biome-ignore lint/style/useConst: These are assigned later via top-level await 59let queue: RedisQueue; 60// biome-ignore lint/style/useConst: These are assigned later via top-level await 61let worker: QueueWorker; 62// biome-ignore lint/style/useConst: These are assigned later via top-level await 63let cache: PhashCache | undefined; 64// biome-ignore lint/style/useConst: These are assigned later via top-level await 65let claims: ModerationClaims; 66// biome-ignore lint/style/useConst: These are assigned later via top-level await 67let metrics: MetricsCollector; 68// biome-ignore lint/style/useConst: These are assigned later via top-level await 69let statsInterval: NodeJS.Timeout; 70// biome-ignore lint/style/useConst: These are assigned later via top-level await 71let cursorInterval: NodeJS.Timeout; 72 73// Register Jetstream event handlers at module level 74jetstream.on("open", () => { 75 if (jetstream.cursor) { 76 logger.info( 77 { 78 process: "MAIN", 79 url: config.jetstream.url, 80 cursor: jetstream.cursor, 81 datetime: epochUsToDateTime(jetstream.cursor), 82 }, 83 "Connected to Jetstream with cursor", 84 ); 85 } else { 86 logger.info( 87 { process: "MAIN", url: config.jetstream.url }, 88 "Connected to Jetstream, waiting for cursor", 89 ); 90 } 91 cursorUpdateInterval = setInterval(() => { 92 if (jetstream.cursor) { 93 logger.info( 94 { 95 process: "MAIN", 96 cursor: jetstream.cursor, 97 datetime: epochUsToDateTime(jetstream.cursor), 98 }, 99 "Cursor updated", 100 ); 101 fs.writeFile("cursor.txt", jetstream.cursor.toString(), (err) => { 102 if (err) 103 logger.error( 104 { process: "MAIN", error: err }, 105 "Failed to write cursor", 106 ); 107 }); 108 } 109 }, config.jetstream.cursorUpdateInterval); 110}); 111 112jetstream.on("close", () => { 113 clearInterval(cursorUpdateInterval); 114 logger.info({ process: "MAIN" }, "Jetstream connection closed"); 115}); 116 117jetstream.on("error", (error) => { 118 logger.error({ error: error.message }, "Jetstream error"); 119}); 120 121// Register onCreate handler for posts 122jetstream.onCreate( 123 "app.bsky.feed.post", 124 async (event: CommitCreateEvent<"app.bsky.feed.post">) => { 125 const atURI = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`; 126 const hasEmbed = Object.prototype.hasOwnProperty.call( 127 event.commit.record, 128 "embed", 129 ); 130 131 if (!hasEmbed) { 132 return; 133 } 134 135 try { 136 const record = event.commit.record as Record<string, unknown>; 137 const embed = record.embed; 138 139 // Extract blob references 140 const blobs: Array<{ cid: string; mimeType?: string }> = []; 141 142 if (embed && typeof embed === "object") { 143 const embedObj = embed as Record<string, unknown>; 144 145 if (Array.isArray(embedObj.images)) { 146 for (const img of embedObj.images) { 147 if (typeof img === "object" && img !== null) { 148 const image = img as Record<string, unknown>; 149 if ( 150 image.image && 151 typeof image.image === "object" && 152 image.image !== null 153 ) { 154 const imageObj = image.image as Record<string, unknown>; 155 const ref = imageObj.ref as Record<string, unknown> | undefined; 156 if (ref && typeof ref.$link === "string") { 157 blobs.push({ 158 cid: ref.$link, 159 mimeType: 160 typeof imageObj.mimeType === "string" 161 ? imageObj.mimeType 162 : undefined, 163 }); 164 } 165 } 166 } 167 } 168 } 169 170 if ( 171 embedObj.media && 172 typeof embedObj.media === "object" && 173 embedObj.media !== null 174 ) { 175 const media = embedObj.media as Record<string, unknown>; 176 if (Array.isArray(media.images)) { 177 for (const img of media.images) { 178 if (typeof img === "object" && img !== null) { 179 const image = img as Record<string, unknown>; 180 if ( 181 image.image && 182 typeof image.image === "object" && 183 image.image !== null 184 ) { 185 const imageObj = image.image as Record<string, unknown>; 186 const ref = imageObj.ref as 187 | Record<string, unknown> 188 | undefined; 189 if (ref && typeof ref.$link === "string") { 190 blobs.push({ 191 cid: ref.$link, 192 mimeType: 193 typeof imageObj.mimeType === "string" 194 ? imageObj.mimeType 195 : undefined, 196 }); 197 } 198 } 199 } 200 } 201 } 202 } 203 } 204 205 if (blobs.length === 0) { 206 return; 207 } 208 209 const postUri = `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`; 210 211 logger.debug( 212 { uri: postUri, blobCount: blobs.length }, 213 "Post with blobs detected", 214 ); 215 216 const job: ImageJob = { 217 postUri, 218 postCid: event.commit.cid, 219 postDid: event.did, 220 blobs, 221 timestamp: Date.now(), 222 attempts: 0, 223 }; 224 225 await queue.enqueue(job); 226 } catch (error) { 227 logger.error({ error, event }, "Error processing jetstream event"); 228 } 229 }, 230); 231 232// Async setup 233logger.info("Authenticating labeler"); 234await isLoggedIn; 235logger.info("Authentication complete"); 236 237logger.info("Connecting to Redis"); 238redis = new Redis(config.redis.url); 239queue = new RedisQueue(config.redis.url); 240 241redis.on("connect", () => { 242 logger.info("Redis connected"); 243}); 244 245redis.on("error", (error) => { 246 logger.error({ error }, "Redis error"); 247}); 248 249cache = config.cache.enabled 250 ? new PhashCache(redis, config.cache.ttl) 251 : undefined; 252claims = new ModerationClaims(redis); 253metrics = new MetricsCollector(); 254 255if (cache) { 256 logger.info({ ttl: config.cache.ttl }, "Phash caching enabled"); 257} 258 259worker = new QueueWorker( 260 queue, 261 BLOB_CHECKS, 262 { 263 concurrency: config.processing.concurrency, 264 retryAttempts: config.processing.retryAttempts, 265 retryDelay: config.processing.retryDelay, 266 }, 267 agent, 268 cache, 269 metrics, 270); 271 272worker.onMatchFound(async (postUri, postCid, postDid, match) => { 273 const check = match.matchedCheck; 274 275 logger.warn( 276 { 277 postUri, 278 postDid, 279 label: check.label, 280 comment: check.comment, 281 phash: match.phash, 282 matchedPhash: match.matchedPhash, 283 hammingDistance: match.hammingDistance, 284 }, 285 "Match found - executing moderation actions", 286 ); 287 288 try { 289 if (check.toLabel) { 290 await createPostLabel( 291 postUri, 292 postCid, 293 check.label, 294 check.comment, 295 match.phash, 296 claims, 297 metrics, 298 ); 299 } 300 301 if (check.reportPost) { 302 await createPostReport( 303 postUri, 304 postCid, 305 check.comment, 306 match.phash, 307 metrics, 308 ); 309 } 310 311 if (check.labelAcct) { 312 await createAccountLabel( 313 postDid, 314 check.label, 315 check.comment, 316 postUri, 317 match.phash, 318 claims, 319 metrics, 320 ); 321 } 322 323 if (check.reportAcct) { 324 await createAccountReport( 325 postDid, 326 check.comment, 327 postUri, 328 match.phash, 329 metrics, 330 ); 331 } 332 } catch (error) { 333 logger.error( 334 { error, postUri, postDid }, 335 "Failed to execute moderation actions", 336 ); 337 } 338}); 339 340logger.info("Starting queue worker"); 341await worker.start(); 342 343logger.info("Starting Jetstream subscription"); 344jetstream.start(); 345 346logger.info("Service started and ready"); 347 348statsInterval = setInterval(async () => { 349 const workerStats = await worker.getStats(); 350 const cacheStats = cache ? await cache.getStats() : null; 351 const metricsData = metrics.getWithRates(); 352 logger.info( 353 { 354 worker: workerStats, 355 cache: cacheStats, 356 metrics: metricsData, 357 }, 358 "Service stats", 359 ); 360}, 60000); 361 362async function shutdown() { 363 try { 364 logger.info({ process: "MAIN" }, "Shutting down gracefully"); 365 if (jetstream.cursor !== undefined) { 366 fs.writeFileSync("cursor.txt", jetstream.cursor.toString(), "utf8"); 367 } 368 jetstream.close(); 369 await worker.stop(); 370 await queue.disconnect(); 371 await redis.quit(); 372 clearInterval(statsInterval); 373 clearInterval(cursorUpdateInterval); 374 logger.info({ process: "MAIN" }, "Service stopped"); 375 process.exit(0); 376 } catch (error) { 377 logger.error({ process: "MAIN", error }, "Error shutting down gracefully"); 378 process.exit(1); 379 } 380} 381 382process.on("SIGINT", () => void shutdown()); 383process.on("SIGTERM", () => void shutdown());