A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules

feat: Add environment variable validation

Adds a function to validate environment variables. This will catch any
issues early in the startup process.

+181 -106
+181 -106
src/main.ts
··· 1 - import fs from 'node:fs'; 2 3 import type { 4 CommitCreateEvent, 5 CommitUpdateEvent, 6 - IdentityEvent } from '@skyware/jetstream'; 7 - import { 8 - Jetstream, 9 - } from '@skyware/jetstream'; 10 - 11 12 - import { checkHandle } from './checkHandles.js'; 13 - import { checkPosts } from './checkPosts.js'; 14 - import { checkDescription, checkDisplayName } from './checkProfiles.js'; 15 - import { checkStarterPack, checkNewStarterPack } from './checkStarterPack.js'; 16 import { 17 CURSOR_UPDATE_INTERVAL, 18 FIREHOSE_URL, 19 METRICS_PORT, 20 WANTED_COLLECTION, 21 - } from './config.js'; 22 - import logger from './logger.js'; 23 - import { startMetricsServer } from './metrics.js'; 24 - import type { Post, LinkFeature } from './types.js'; 25 26 let cursor = 0; 27 let cursorUpdateInterval: NodeJS.Timeout; ··· 31 } 32 33 try { 34 - logger.info('Trying to read cursor from cursor.txt...'); 35 - cursor = Number(fs.readFileSync('cursor.txt', 'utf8')); 36 - logger.info(`Cursor found: ${cursor.toString()} (${epochUsToDateTime(cursor)})`); 37 } catch (error) { 38 - if (error instanceof Error && 'code' in error && error.code === 'ENOENT') { 39 cursor = Math.floor(Date.now() * 1000); 40 logger.info( 41 `Cursor not found in cursor.txt, setting cursor to: ${cursor.toString()} (${epochUsToDateTime(cursor)})`, 42 ); 43 - fs.writeFileSync('cursor.txt', cursor.toString(), 'utf8'); 44 } else { 45 logger.error(error); 46 process.exit(1); ··· 53 cursor, 54 }); 55 56 - jetstream.on('open', () => { 57 if (jetstream.cursor) { 58 logger.info( 59 `Connected to Jetstream at ${FIREHOSE_URL} with cursor ${jetstream.cursor.toString()} (${epochUsToDateTime(jetstream.cursor)})`, ··· 68 logger.info( 69 `Cursor updated to: ${jetstream.cursor.toString()} (${epochUsToDateTime(jetstream.cursor)})`, 70 ); 71 - fs.writeFile('cursor.txt', jetstream.cursor.toString(), (err) => { 72 if (err) logger.error(err); 73 }); 74 } 75 }, CURSOR_UPDATE_INTERVAL); 76 }); 77 78 - jetstream.on('close', () => { 79 clearInterval(cursorUpdateInterval); 80 - logger.info('Jetstream connection closed.'); 81 }); 82 83 - jetstream.on('error', (error) => { 84 logger.error(`Jetstream error: ${error.message}`); 85 }); 86 87 // Check for post updates 88 89 jetstream.onCreate( 90 - 'app.bsky.feed.post', 91 - (event: CommitCreateEvent<'app.bsky.feed.post'>) => { 92 try { 93 const atURI = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`; 94 - const hasFacets = Object.hasOwn(event.commit.record, 'facets'); 95 - const hasText = Object.hasOwn(event.commit.record, 'text'); 96 97 const tasks: Promise<void>[] = []; 98 ··· 100 if (hasFacets && event.commit.record.facets) { 101 const hasLinkType = event.commit.record.facets.some((facet) => 102 facet.features.some( 103 - (feature) => feature.$type === 'app.bsky.richtext.facet#link', 104 ), 105 ); 106 107 if (hasLinkType) { 108 - const urls = event.commit.record.facets.flatMap((facet) => 109 - facet.features.filter( 110 - (feature) => feature.$type === 'app.bsky.richtext.facet#link', 111 - ), 112 - ) 113 .map((feature: LinkFeature) => feature.uri); 114 115 urls.forEach((url) => { ··· 123 cid: event.commit.cid, 124 }, 125 ]; 126 - tasks.push(checkPosts(posts).catch((error: unknown) => { 127 - logger.error(`Error checking post links for ${event.did}:`, error); 128 - })); 129 }); 130 } 131 } else if (hasText && event.commit.record.text) { ··· 139 cid: event.commit.cid, 140 }, 141 ]; 142 - tasks.push(checkPosts(posts).catch((error: unknown) => { 143 - logger.error(`Error checking post text for ${event.did}:`, error); 144 - })); 145 } 146 147 // Wait for all tasks to complete ··· 156 157 // Check for profile updates 158 jetstream.onUpdate( 159 - 'app.bsky.actor.profile', 160 - (event: CommitUpdateEvent<'app.bsky.actor.profile'>) => { 161 try { 162 const tasks: Promise<void>[] = []; 163 164 if (event.commit.record.displayName || event.commit.record.description) { 165 - const displayName = event.commit.record.displayName ?? ''; 166 - const description = event.commit.record.description ?? ''; 167 - 168 tasks.push( 169 - checkDescription(event.did, event.time_us, displayName, description) 170 - .catch((error: unknown) => { 171 - logger.error(`Error checking profile description for ${event.did}:`, error); 172 - }) 173 ); 174 - 175 tasks.push( 176 - checkDisplayName(event.did, event.time_us, displayName, description) 177 - .catch((error: unknown) => { 178 - logger.error(`Error checking profile display name for ${event.did}:`, error); 179 - }) 180 ); 181 } 182 183 if (event.commit.record.joinedViaStarterPack) { 184 tasks.push( 185 - checkStarterPack(event.did, event.time_us, event.commit.record.joinedViaStarterPack.uri) 186 - .catch((error: unknown) => { 187 - logger.error(`Error checking starter pack for ${event.did}:`, error); 188 - }) 189 ); 190 } 191 ··· 194 void Promise.allSettled(tasks); 195 } 196 } catch (error: unknown) { 197 - logger.error(`Error processing profile update event for ${event.did}:`, error); 198 } 199 }, 200 ); ··· 202 // Check for profile updates 203 204 jetstream.onCreate( 205 - 'app.bsky.actor.profile', 206 - (event: CommitCreateEvent<'app.bsky.actor.profile'>) => { 207 try { 208 const tasks: Promise<void>[] = []; 209 210 if (event.commit.record.displayName || event.commit.record.description) { 211 - const displayName = event.commit.record.displayName ?? ''; 212 - const description = event.commit.record.description ?? ''; 213 - 214 tasks.push( 215 - checkDescription(event.did, event.time_us, displayName, description) 216 - .catch((error: unknown) => { 217 - logger.error(`Error checking profile description for ${event.did}:`, error); 218 - }) 219 ); 220 - 221 tasks.push( 222 - checkDisplayName(event.did, event.time_us, displayName, description) 223 - .catch((error: unknown) => { 224 - logger.error(`Error checking profile display name for ${event.did}:`, error); 225 - }) 226 ); 227 228 if (event.commit.record.joinedViaStarterPack) { 229 tasks.push( 230 - checkStarterPack(event.did, event.time_us, event.commit.record.joinedViaStarterPack.uri) 231 - .catch((error: unknown) => { 232 - logger.error(`Error checking starter pack for ${event.did}:`, error); 233 - }) 234 ); 235 } 236 ··· 240 } 241 } 242 } catch (error: unknown) { 243 - logger.error(`Error processing profile creation event for ${event.did}:`, error); 244 } 245 }, 246 ); 247 248 jetstream.onCreate( 249 - 'app.bsky.graph.starterpack', 250 - (event: CommitCreateEvent<'app.bsky.graph.starterpack'>) => { 251 try { 252 const atURI = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`; 253 const { name, description } = event.commit.record; ··· 260 name, 261 description, 262 ).catch((error: unknown) => { 263 - logger.error(`Error checking new starter pack for ${event.did}:`, error); 264 }); 265 } catch (error: unknown) { 266 - logger.error(`Error processing starter pack creation event for ${event.did}:`, error); 267 } 268 }, 269 ); 270 271 jetstream.onUpdate( 272 - 'app.bsky.graph.starterpack', 273 - (event: CommitUpdateEvent<'app.bsky.graph.starterpack'>) => { 274 try { 275 const atURI = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`; 276 const { name, description } = event.commit.record; ··· 283 name, 284 description, 285 ).catch((error: unknown) => { 286 - logger.error(`Error checking updated starter pack for ${event.did}:`, error); 287 }); 288 } catch (error: unknown) { 289 - logger.error(`Error processing starter pack update event for ${event.did}:`, error); 290 } 291 }, 292 ); 293 294 // Check for handle updates 295 - jetstream.on('identity', (event: IdentityEvent) => { 296 try { 297 if (event.identity.handle) { 298 - void checkHandle(event.identity.did, event.identity.handle, event.time_us) 299 - .catch((error: unknown) => { 300 - logger.error(`Error checking handle for ${event.identity.did}:`, error); 301 - }); 302 } 303 } catch (error: unknown) { 304 - logger.error(`Error processing identity event for ${event.identity.did}:`, error); 305 } 306 }); 307 ··· 311 metricsServer = startMetricsServer(METRICS_PORT); 312 logger.info(`Metrics server started on port ${METRICS_PORT.toString()}`); 313 } catch (error: unknown) { 314 - logger.error('Failed to start metrics server:', error); 315 process.exit(1); 316 } 317 ··· 326 // Start jetstream with error handling 327 try { 328 jetstream.start(); 329 - logger.info('Jetstream started successfully'); 330 } catch (error: unknown) { 331 - logger.error('Failed to start jetstream:', error); 332 process.exit(1); 333 } 334 335 function shutdown() { 336 try { 337 - logger.info('Shutting down gracefully...'); 338 if (jetstream.cursor) { 339 - fs.writeFileSync('cursor.txt', jetstream.cursor.toString(), 'utf8'); 340 } 341 jetstream.close(); 342 if (metricsServer) { 343 metricsServer.close(() => { 344 - logger.info('Metrics server closed'); 345 }); 346 } 347 - logger.info('Shutdown completed successfully'); 348 } catch (error: unknown) { 349 - logger.error('Error shutting down gracefully:', error); 350 process.exit(1); 351 } 352 } 353 354 // Global error handlers 355 - process.on('unhandledRejection', (reason, promise) => { 356 - logger.error('Unhandled Promise Rejection at:', promise, 'reason:', reason); 357 // Don't exit the process for unhandled rejections, just log them 358 }); 359 360 - process.on('uncaughtException', (error) => { 361 - logger.error('Uncaught Exception:', error); 362 shutdown(); 363 }); 364 365 - process.on('SIGINT', shutdown); 366 - process.on('SIGTERM', shutdown);
··· 1 + import fs from "node:fs"; 2 3 import type { 4 CommitCreateEvent, 5 CommitUpdateEvent, 6 + IdentityEvent, 7 + } from "@skyware/jetstream"; 8 + import { Jetstream } from "@skyware/jetstream"; 9 10 + import { checkHandle } from "./checkHandles.js"; 11 + import { checkPosts } from "./checkPosts.js"; 12 + import { checkDescription, checkDisplayName } from "./checkProfiles.js"; 13 + import { checkStarterPack, checkNewStarterPack } from "./checkStarterPack.js"; 14 import { 15 CURSOR_UPDATE_INTERVAL, 16 FIREHOSE_URL, 17 METRICS_PORT, 18 WANTED_COLLECTION, 19 + } from "./config.js"; 20 + import { validateEnvironment } from "./validateEnv.js"; 21 + import logger from "./logger.js"; 22 + import { startMetricsServer } from "./metrics.js"; 23 + import type { Post, LinkFeature } from "./types.js"; 24 + 25 + validateEnvironment(); 26 27 let cursor = 0; 28 let cursorUpdateInterval: NodeJS.Timeout; ··· 32 } 33 34 try { 35 + logger.info("Trying to read cursor from cursor.txt..."); 36 + cursor = Number(fs.readFileSync("cursor.txt", "utf8")); 37 + logger.info( 38 + `Cursor found: ${cursor.toString()} (${epochUsToDateTime(cursor)})`, 39 + ); 40 } catch (error) { 41 + if (error instanceof Error && "code" in error && error.code === "ENOENT") { 42 cursor = Math.floor(Date.now() * 1000); 43 logger.info( 44 `Cursor not found in cursor.txt, setting cursor to: ${cursor.toString()} (${epochUsToDateTime(cursor)})`, 45 ); 46 + fs.writeFileSync("cursor.txt", cursor.toString(), "utf8"); 47 } else { 48 logger.error(error); 49 process.exit(1); ··· 56 cursor, 57 }); 58 59 + jetstream.on("open", () => { 60 if (jetstream.cursor) { 61 logger.info( 62 `Connected to Jetstream at ${FIREHOSE_URL} with cursor ${jetstream.cursor.toString()} (${epochUsToDateTime(jetstream.cursor)})`, ··· 71 logger.info( 72 `Cursor updated to: ${jetstream.cursor.toString()} (${epochUsToDateTime(jetstream.cursor)})`, 73 ); 74 + fs.writeFile("cursor.txt", jetstream.cursor.toString(), (err) => { 75 if (err) logger.error(err); 76 }); 77 } 78 }, CURSOR_UPDATE_INTERVAL); 79 }); 80 81 + jetstream.on("close", () => { 82 clearInterval(cursorUpdateInterval); 83 + logger.info("Jetstream connection closed."); 84 }); 85 86 + jetstream.on("error", (error) => { 87 logger.error(`Jetstream error: ${error.message}`); 88 }); 89 90 // Check for post updates 91 92 jetstream.onCreate( 93 + "app.bsky.feed.post", 94 + (event: CommitCreateEvent<"app.bsky.feed.post">) => { 95 try { 96 const atURI = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`; 97 + const hasFacets = Object.hasOwn(event.commit.record, "facets"); 98 + const hasText = Object.hasOwn(event.commit.record, "text"); 99 100 const tasks: Promise<void>[] = []; 101 ··· 103 if (hasFacets && event.commit.record.facets) { 104 const hasLinkType = event.commit.record.facets.some((facet) => 105 facet.features.some( 106 + (feature) => feature.$type === "app.bsky.richtext.facet#link", 107 ), 108 ); 109 110 if (hasLinkType) { 111 + const urls = event.commit.record.facets 112 + .flatMap((facet) => 113 + facet.features.filter( 114 + (feature) => feature.$type === "app.bsky.richtext.facet#link", 115 + ), 116 + ) 117 .map((feature: LinkFeature) => feature.uri); 118 119 urls.forEach((url) => { ··· 127 cid: event.commit.cid, 128 }, 129 ]; 130 + tasks.push( 131 + checkPosts(posts).catch((error: unknown) => { 132 + logger.error( 133 + `Error checking post links for ${event.did}:`, 134 + error, 135 + ); 136 + }), 137 + ); 138 }); 139 } 140 } else if (hasText && event.commit.record.text) { ··· 148 cid: event.commit.cid, 149 }, 150 ]; 151 + tasks.push( 152 + checkPosts(posts).catch((error: unknown) => { 153 + logger.error(`Error checking post text for ${event.did}:`, error); 154 + }), 155 + ); 156 } 157 158 // Wait for all tasks to complete ··· 167 168 // Check for profile updates 169 jetstream.onUpdate( 170 + "app.bsky.actor.profile", 171 + (event: CommitUpdateEvent<"app.bsky.actor.profile">) => { 172 try { 173 const tasks: Promise<void>[] = []; 174 175 if (event.commit.record.displayName || event.commit.record.description) { 176 + const displayName = event.commit.record.displayName ?? ""; 177 + const description = event.commit.record.description ?? ""; 178 + 179 tasks.push( 180 + checkDescription( 181 + event.did, 182 + event.time_us, 183 + displayName, 184 + description, 185 + ).catch((error: unknown) => { 186 + logger.error( 187 + `Error checking profile description for ${event.did}:`, 188 + error, 189 + ); 190 + }), 191 ); 192 + 193 tasks.push( 194 + checkDisplayName( 195 + event.did, 196 + event.time_us, 197 + displayName, 198 + description, 199 + ).catch((error: unknown) => { 200 + logger.error( 201 + `Error checking profile display name for ${event.did}:`, 202 + error, 203 + ); 204 + }), 205 ); 206 } 207 208 if (event.commit.record.joinedViaStarterPack) { 209 tasks.push( 210 + checkStarterPack( 211 + event.did, 212 + event.time_us, 213 + event.commit.record.joinedViaStarterPack.uri, 214 + ).catch((error: unknown) => { 215 + logger.error( 216 + `Error checking starter pack for ${event.did}:`, 217 + error, 218 + ); 219 + }), 220 ); 221 } 222 ··· 225 void Promise.allSettled(tasks); 226 } 227 } catch (error: unknown) { 228 + logger.error( 229 + `Error processing profile update event for ${event.did}:`, 230 + error, 231 + ); 232 } 233 }, 234 ); ··· 236 // Check for profile updates 237 238 jetstream.onCreate( 239 + "app.bsky.actor.profile", 240 + (event: CommitCreateEvent<"app.bsky.actor.profile">) => { 241 try { 242 const tasks: Promise<void>[] = []; 243 244 if (event.commit.record.displayName || event.commit.record.description) { 245 + const displayName = event.commit.record.displayName ?? ""; 246 + const description = event.commit.record.description ?? ""; 247 + 248 tasks.push( 249 + checkDescription( 250 + event.did, 251 + event.time_us, 252 + displayName, 253 + description, 254 + ).catch((error: unknown) => { 255 + logger.error( 256 + `Error checking profile description for ${event.did}:`, 257 + error, 258 + ); 259 + }), 260 ); 261 + 262 tasks.push( 263 + checkDisplayName( 264 + event.did, 265 + event.time_us, 266 + displayName, 267 + description, 268 + ).catch((error: unknown) => { 269 + logger.error( 270 + `Error checking profile display name for ${event.did}:`, 271 + error, 272 + ); 273 + }), 274 ); 275 276 if (event.commit.record.joinedViaStarterPack) { 277 tasks.push( 278 + checkStarterPack( 279 + event.did, 280 + event.time_us, 281 + event.commit.record.joinedViaStarterPack.uri, 282 + ).catch((error: unknown) => { 283 + logger.error( 284 + `Error checking starter pack for ${event.did}:`, 285 + error, 286 + ); 287 + }), 288 ); 289 } 290 ··· 294 } 295 } 296 } catch (error: unknown) { 297 + logger.error( 298 + `Error processing profile creation event for ${event.did}:`, 299 + error, 300 + ); 301 } 302 }, 303 ); 304 305 jetstream.onCreate( 306 + "app.bsky.graph.starterpack", 307 + (event: CommitCreateEvent<"app.bsky.graph.starterpack">) => { 308 try { 309 const atURI = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`; 310 const { name, description } = event.commit.record; ··· 317 name, 318 description, 319 ).catch((error: unknown) => { 320 + logger.error( 321 + `Error checking new starter pack for ${event.did}:`, 322 + error, 323 + ); 324 }); 325 } catch (error: unknown) { 326 + logger.error( 327 + `Error processing starter pack creation event for ${event.did}:`, 328 + error, 329 + ); 330 } 331 }, 332 ); 333 334 jetstream.onUpdate( 335 + "app.bsky.graph.starterpack", 336 + (event: CommitUpdateEvent<"app.bsky.graph.starterpack">) => { 337 try { 338 const atURI = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`; 339 const { name, description } = event.commit.record; ··· 346 name, 347 description, 348 ).catch((error: unknown) => { 349 + logger.error( 350 + `Error checking updated starter pack for ${event.did}:`, 351 + error, 352 + ); 353 }); 354 } catch (error: unknown) { 355 + logger.error( 356 + `Error processing starter pack update event for ${event.did}:`, 357 + error, 358 + ); 359 } 360 }, 361 ); 362 363 // Check for handle updates 364 + jetstream.on("identity", (event: IdentityEvent) => { 365 try { 366 if (event.identity.handle) { 367 + void checkHandle( 368 + event.identity.did, 369 + event.identity.handle, 370 + event.time_us, 371 + ).catch((error: unknown) => { 372 + logger.error(`Error checking handle for ${event.identity.did}:`, error); 373 + }); 374 } 375 } catch (error: unknown) { 376 + logger.error( 377 + `Error processing identity event for ${event.identity.did}:`, 378 + error, 379 + ); 380 } 381 }); 382 ··· 386 metricsServer = startMetricsServer(METRICS_PORT); 387 logger.info(`Metrics server started on port ${METRICS_PORT.toString()}`); 388 } catch (error: unknown) { 389 + logger.error("Failed to start metrics server:", error); 390 process.exit(1); 391 } 392 ··· 401 // Start jetstream with error handling 402 try { 403 jetstream.start(); 404 + logger.info("Jetstream started successfully"); 405 } catch (error: unknown) { 406 + logger.error("Failed to start jetstream:", error); 407 process.exit(1); 408 } 409 410 function shutdown() { 411 try { 412 + logger.info("Shutting down gracefully..."); 413 if (jetstream.cursor) { 414 + fs.writeFileSync("cursor.txt", jetstream.cursor.toString(), "utf8"); 415 } 416 jetstream.close(); 417 if (metricsServer) { 418 metricsServer.close(() => { 419 + logger.info("Metrics server closed"); 420 }); 421 } 422 + logger.info("Shutdown completed successfully"); 423 } catch (error: unknown) { 424 + logger.error("Error shutting down gracefully:", error); 425 process.exit(1); 426 } 427 } 428 429 // Global error handlers 430 + process.on("unhandledRejection", (reason, promise) => { 431 + logger.error("Unhandled Promise Rejection at:", promise, "reason:", reason); 432 // Don't exit the process for unhandled rejections, just log them 433 }); 434 435 + process.on("uncaughtException", (error) => { 436 + logger.error("Uncaught Exception:", error); 437 shutdown(); 438 }); 439 440 + process.on("SIGINT", shutdown); 441 + process.on("SIGTERM", shutdown);