A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules

fix: eliminate fire-and-forget async patterns in moderation actions

Convert void-discarded promises to properly awaited calls with error
handling and aggregation. This prevents phantom state where Redis
succeeds but Ozone API fails.

Changes:
- Add ModerationResult type for structured error tracking
- Add moderationActionsFailedCounter metric for monitoring failures
- Modify moderation functions to rethrow errors after logging
- Update checkProfiles.ts ProfileChecker with async/await pattern
- Update checkPosts.ts to use for...of loop with proper awaiting
- Update countStarterPacks.ts with error handling

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

Skywatch 1862f394 afc8fc66

+229 -69
+4
src/accountModeration.ts
··· 101 { process: "MODERATION", error: e }, 102 "Failed to create account label", 103 ); 104 } 105 }); 106 }; ··· 161 { process: "MODERATION", error: e }, 162 "Failed to create account comment", 163 ); 164 } 165 }); 166 }; ··· 206 { process: "MODERATION", error: e }, 207 "Failed to create account report", 208 ); 209 } 210 }); 211 }; ··· 270 { process: "MODERATION", error: e }, 271 "Failed to negate account label", 272 ); 273 } 274 }); 275 };
··· 101 { process: "MODERATION", error: e }, 102 "Failed to create account label", 103 ); 104 + throw e; 105 } 106 }); 107 }; ··· 162 { process: "MODERATION", error: e }, 163 "Failed to create account comment", 164 ); 165 + throw e; 166 } 167 }); 168 }; ··· 208 { process: "MODERATION", error: e }, 209 "Failed to create account report", 210 ); 211 + throw e; 212 } 213 }); 214 }; ··· 273 { process: "MODERATION", error: e }, 274 "Failed to negate account label", 275 ); 276 + throw e; 277 } 278 }); 279 };
+7
src/metrics.ts
··· 68 registers: [register], 69 }); 70 71 const app = express(); 72 73 app.get("/metrics", (req, res) => {
··· 68 registers: [register], 69 }); 70 71 + export const moderationActionsFailedCounter = new Counter({ 72 + name: "skywatch_moderation_actions_failed_total", 73 + help: "Total number of moderation actions that failed", 74 + labelNames: ["action", "target_type"], 75 + registers: [register], 76 + }); 77 + 78 const app = express(); 79 80 app.get("/metrics", (req, res) => {
+3 -1
src/moderation.ts
··· 130 { process: "MODERATION", error: e }, 131 "Failed to create post label", 132 ); 133 } 134 }); 135 }; ··· 178 } catch (e) { 179 logger.error( 180 { process: "MODERATION", error: e }, 181 - "Failed to create post label", 182 ); 183 } 184 }); 185 };
··· 130 { process: "MODERATION", error: e }, 131 "Failed to create post label", 132 ); 133 + throw e; 134 } 135 }); 136 }; ··· 179 } catch (e) { 180 logger.error( 181 { process: "MODERATION", error: e }, 182 + "Failed to create post report", 183 ); 184 + throw e; 185 } 186 }); 187 };
+18 -8
src/rules/account/countStarterPacks.ts
··· 2 import { agent, isLoggedIn } from "../../agent.js"; 3 import { limit } from "../../limits.js"; 4 import { logger } from "../../logger.js"; 5 6 - const ALLOWED_DIDS = [ 7 - "did:plc:example" 8 - ]; 9 10 export const countStarterPacks = async (did: string, time: number) => { 11 await isLoggedIn; ··· 34 "Labeling account with excessive starter packs", 35 ); 36 37 - void createAccountLabel( 38 - did, 39 - "follow-farming", 40 - `${time.toString()}: Account has ${starterPacks.toString()} starter packs`, 41 - ); 42 } 43 } catch (error) { 44 const errorInfo =
··· 2 import { agent, isLoggedIn } from "../../agent.js"; 3 import { limit } from "../../limits.js"; 4 import { logger } from "../../logger.js"; 5 + import { moderationActionsFailedCounter } from "../../metrics.js"; 6 7 + const ALLOWED_DIDS = ["did:plc:example"]; 8 9 export const countStarterPacks = async (did: string, time: number) => { 10 await isLoggedIn; ··· 33 "Labeling account with excessive starter packs", 34 ); 35 36 + try { 37 + await createAccountLabel( 38 + did, 39 + "follow-farming", 40 + `${time.toString()}: Account has ${starterPacks.toString()} starter packs`, 41 + ); 42 + } catch (labelError) { 43 + logger.error( 44 + { process: "COUNTSTARTERPACKS", did, time, error: labelError }, 45 + "Failed to apply follow-farming label", 46 + ); 47 + moderationActionsFailedCounter.inc({ 48 + action: "label", 49 + target_type: "account", 50 + }); 51 + } 52 } 53 } catch (error) { 54 const errorInfo =
+86 -25
src/rules/posts/checkPosts.ts
··· 6 } from "../../accountModeration.js"; 7 import { checkAccountThreshold } from "../../accountThreshold.js"; 8 import { logger } from "../../logger.js"; 9 import { createPostLabel, createPostReport } from "../../moderation.js"; 10 - import type { Post } from "../../types.js"; 11 import { getFinalUrl } from "../../utils/getFinalUrl.js"; 12 import { getLanguage } from "../../utils/getLanguage.js"; 13 import { countStarterPacks } from "../account/countStarterPacks.js"; ··· 74 const lang = await getLanguage(post[0].text); 75 76 // iterate through the checks 77 - POST_CHECKS.forEach((checkPost) => { 78 if (checkPost.language) { 79 if (!checkPost.language.includes(lang)) { 80 - return; 81 } 82 } 83 ··· 87 { process: "CHECKPOSTS", did: post[0].did, atURI: post[0].atURI }, 88 "Whitelisted DID", 89 ); 90 - return; 91 } 92 } 93 ··· 99 { process: "CHECKPOSTS", did: post[0].did, atURI: post[0].atURI }, 100 "Whitelisted phrase found", 101 ); 102 - return; 103 } 104 } 105 106 - void countStarterPacks(post[0].did, post[0].time); 107 108 const postURL = `https://pdsls.dev/${post[0].atURI}`; 109 const formattedComment = `${checkPost.comment}\n\nPost: ${postURL}\n\nText: "${post[0].text}"`; 110 111 if (checkPost.toLabel) { 112 - void createPostLabel( 113 - post[0].atURI, 114 - post[0].cid, 115 - checkPost.label, 116 - formattedComment, 117 - checkPost.duration, 118 - post[0].did, 119 - post[0].time, 120 - ); 121 } else if (checkPost.trackOnly) { 122 - void checkAccountThreshold( 123 - post[0].did, 124 - post[0].atURI, 125 - checkPost.label, 126 - post[0].time, 127 - ); 128 } 129 130 if (checkPost.reportPost === true) { ··· 137 }, 138 "Reporting post", 139 ); 140 - void createPostReport(post[0].atURI, post[0].cid, formattedComment); 141 } 142 143 if (checkPost.reportAcct) { ··· 150 }, 151 "Reporting account", 152 ); 153 - void createAccountReport(post[0].did, formattedComment); 154 } 155 156 if (checkPost.commentAcct) { 157 - void createAccountComment(post[0].did, formattedComment, post[0].atURI); 158 } 159 } 160 - }); 161 };
··· 6 } from "../../accountModeration.js"; 7 import { checkAccountThreshold } from "../../accountThreshold.js"; 8 import { logger } from "../../logger.js"; 9 + import { moderationActionsFailedCounter } from "../../metrics.js"; 10 import { createPostLabel, createPostReport } from "../../moderation.js"; 11 + import type { ModerationResult, Post } from "../../types.js"; 12 import { getFinalUrl } from "../../utils/getFinalUrl.js"; 13 import { getLanguage } from "../../utils/getLanguage.js"; 14 import { countStarterPacks } from "../account/countStarterPacks.js"; ··· 75 const lang = await getLanguage(post[0].text); 76 77 // iterate through the checks 78 + for (const checkPost of POST_CHECKS) { 79 if (checkPost.language) { 80 if (!checkPost.language.includes(lang)) { 81 + continue; 82 } 83 } 84 ··· 88 { process: "CHECKPOSTS", did: post[0].did, atURI: post[0].atURI }, 89 "Whitelisted DID", 90 ); 91 + continue; 92 } 93 } 94 ··· 100 { process: "CHECKPOSTS", did: post[0].did, atURI: post[0].atURI }, 101 "Whitelisted phrase found", 102 ); 103 + continue; 104 } 105 } 106 107 + await countStarterPacks(post[0].did, post[0].time); 108 109 const postURL = `https://pdsls.dev/${post[0].atURI}`; 110 const formattedComment = `${checkPost.comment}\n\nPost: ${postURL}\n\nText: "${post[0].text}"`; 111 112 + const results: ModerationResult = { success: true, errors: [] }; 113 + 114 if (checkPost.toLabel) { 115 + try { 116 + await createPostLabel( 117 + post[0].atURI, 118 + post[0].cid, 119 + checkPost.label, 120 + formattedComment, 121 + checkPost.duration, 122 + post[0].did, 123 + post[0].time, 124 + ); 125 + } catch (error) { 126 + results.success = false; 127 + results.errors.push({ action: "label", error }); 128 + } 129 } else if (checkPost.trackOnly) { 130 + try { 131 + await checkAccountThreshold( 132 + post[0].did, 133 + post[0].atURI, 134 + checkPost.label, 135 + post[0].time, 136 + ); 137 + } catch (error) { 138 + // Threshold check failures are logged but don't add to results.errors 139 + // since it's not a direct moderation action 140 + logger.error( 141 + { 142 + process: "CHECKPOSTS", 143 + did: post[0].did, 144 + atURI: post[0].atURI, 145 + error, 146 + }, 147 + "Account threshold check failed", 148 + ); 149 + } 150 } 151 152 if (checkPost.reportPost === true) { ··· 159 }, 160 "Reporting post", 161 ); 162 + try { 163 + await createPostReport(post[0].atURI, post[0].cid, formattedComment); 164 + } catch (error) { 165 + results.success = false; 166 + results.errors.push({ action: "report", error }); 167 + } 168 } 169 170 if (checkPost.reportAcct) { ··· 177 }, 178 "Reporting account", 179 ); 180 + try { 181 + await createAccountReport(post[0].did, formattedComment); 182 + } catch (error) { 183 + results.success = false; 184 + results.errors.push({ action: "report", error }); 185 + } 186 } 187 188 if (checkPost.commentAcct) { 189 + try { 190 + await createAccountComment( 191 + post[0].did, 192 + formattedComment, 193 + post[0].atURI, 194 + ); 195 + } catch (error) { 196 + results.success = false; 197 + results.errors.push({ action: "comment", error }); 198 + } 199 + } 200 + 201 + // Log and track any failures 202 + if (!results.success) { 203 + for (const error of results.errors) { 204 + logger.error( 205 + { 206 + process: "CHECKPOSTS", 207 + did: post[0].did, 208 + atURI: post[0].atURI, 209 + action: error.action, 210 + error: error.error, 211 + }, 212 + "Moderation action failed", 213 + ); 214 + moderationActionsFailedCounter.inc({ 215 + action: error.action, 216 + target_type: "post", 217 + }); 218 + } 219 } 220 } 221 + } 222 };
+101 -35
src/rules/profiles/checkProfiles.ts
··· 7 negateAccountLabel, 8 } from "../../accountModeration.js"; 9 import { logger } from "../../logger.js"; 10 - import type { Checks } from "../../types.js"; 11 import { getLanguage } from "../../utils/getLanguage.js"; 12 13 export class ProfileChecker { ··· 21 this.time = time; 22 } 23 24 - checkDescription(description: string): void { 25 if (!description) return; 26 - this.performActions(description, "CHECKDESCRIPTION"); 27 } 28 29 - checkDisplayName(displayName: string): void { 30 if (!displayName) return; 31 - this.performActions(displayName, "CHECKDISPLAYNAME"); 32 } 33 34 - checkBoth(displayName: string, description: string): void { 35 const profile = `${displayName} ${description}`; 36 if (!profile) return; 37 - this.performActions(profile, "CHECKPROFILE"); 38 } 39 40 - private performActions( 41 content: string, 42 processType: "CHECKPROFILE" | "CHECKDESCRIPTION" | "CHECKDISPLAYNAME", 43 - ): void { 44 const matched = this.check.check.test(content); 45 46 if (matched) { ··· 52 return; 53 } 54 55 - this.applyActions(content, processType); 56 } else { 57 if (this.check.unlabel) { 58 - this.removeLabel(content, processType); 59 } 60 } 61 } 62 63 - private applyActions(content: string, processType: string): void { 64 const formattedComment = `${this.time.toString()}: ${this.check.comment}\n\nContent: ${content}`; 65 66 if (this.check.toLabel) { 67 - void createAccountLabel(this.did, this.check.label, formattedComment); 68 } 69 70 if (this.check.reportAcct) { 71 - void createAccountReport(this.did, formattedComment); 72 - logger.info( 73 - { 74 - process: processType, 75 - did: this.did, 76 - time: this.time, 77 - label: this.check.label, 78 - }, 79 - "Reporting account", 80 - ); 81 } 82 83 if (this.check.commentAcct) { 84 - void createAccountComment( 85 - this.did, 86 - formattedComment, 87 - `profile:${this.did}`, 88 - ); 89 } 90 } 91 92 - private removeLabel(content: string, _processType: string): void { 93 const formattedComment = `${this.check.comment}\n\nContent: ${content}`; 94 - void negateAccountLabel(this.did, this.check.label, formattedComment); 95 } 96 } 97 ··· 129 130 if (checkRule.description === true) { 131 const checker = new ProfileChecker(checkRule, did, time); 132 - checker.checkDescription(description); 133 } 134 } 135 }; ··· 168 169 if (checkRule.displayName === true) { 170 const checker = new ProfileChecker(checkRule, did, time); 171 - checker.checkDisplayName(displayName); 172 } 173 } 174 }; ··· 213 const checker = new ProfileChecker(checkRule, did, time); 214 215 if (checkRule.description === true && checkRule.displayName === true) { 216 - checker.checkBoth(displayName, description); 217 } else if (checkRule.description === true) { 218 - checker.checkDescription(description); 219 } else if (checkRule.displayName === true) { 220 - checker.checkDisplayName(displayName); 221 } 222 } 223 };
··· 7 negateAccountLabel, 8 } from "../../accountModeration.js"; 9 import { logger } from "../../logger.js"; 10 + import { moderationActionsFailedCounter } from "../../metrics.js"; 11 + import type { Checks, ModerationResult } from "../../types.js"; 12 import { getLanguage } from "../../utils/getLanguage.js"; 13 14 export class ProfileChecker { ··· 22 this.time = time; 23 } 24 25 + async checkDescription(description: string): Promise<void> { 26 if (!description) return; 27 + await this.performActions(description, "CHECKDESCRIPTION"); 28 } 29 30 + async checkDisplayName(displayName: string): Promise<void> { 31 if (!displayName) return; 32 + await this.performActions(displayName, "CHECKDISPLAYNAME"); 33 } 34 35 + async checkBoth(displayName: string, description: string): Promise<void> { 36 const profile = `${displayName} ${description}`; 37 if (!profile) return; 38 + await this.performActions(profile, "CHECKPROFILE"); 39 } 40 41 + private async performActions( 42 content: string, 43 processType: "CHECKPROFILE" | "CHECKDESCRIPTION" | "CHECKDISPLAYNAME", 44 + ): Promise<void> { 45 const matched = this.check.check.test(content); 46 47 if (matched) { ··· 53 return; 54 } 55 56 + const result = await this.applyActions(content, processType); 57 + if (!result.success) { 58 + for (const error of result.errors) { 59 + logger.error( 60 + { 61 + process: processType, 62 + did: this.did, 63 + action: error.action, 64 + error: error.error, 65 + }, 66 + "Moderation action failed", 67 + ); 68 + moderationActionsFailedCounter.inc({ 69 + action: error.action, 70 + target_type: "account", 71 + }); 72 + } 73 + } 74 } else { 75 if (this.check.unlabel) { 76 + const result = await this.removeLabel(content, processType); 77 + if (!result.success) { 78 + for (const error of result.errors) { 79 + logger.error( 80 + { 81 + process: processType, 82 + did: this.did, 83 + action: error.action, 84 + error: error.error, 85 + }, 86 + "Moderation action failed", 87 + ); 88 + moderationActionsFailedCounter.inc({ 89 + action: error.action, 90 + target_type: "account", 91 + }); 92 + } 93 + } 94 } 95 } 96 } 97 98 + private async applyActions( 99 + content: string, 100 + processType: string, 101 + ): Promise<ModerationResult> { 102 + const results: ModerationResult = { success: true, errors: [] }; 103 const formattedComment = `${this.time.toString()}: ${this.check.comment}\n\nContent: ${content}`; 104 105 if (this.check.toLabel) { 106 + try { 107 + await createAccountLabel(this.did, this.check.label, formattedComment); 108 + } catch (error) { 109 + results.success = false; 110 + results.errors.push({ action: "label", error }); 111 + } 112 } 113 114 if (this.check.reportAcct) { 115 + try { 116 + await createAccountReport(this.did, formattedComment); 117 + logger.info( 118 + { 119 + process: processType, 120 + did: this.did, 121 + time: this.time, 122 + label: this.check.label, 123 + }, 124 + "Reporting account", 125 + ); 126 + } catch (error) { 127 + results.success = false; 128 + results.errors.push({ action: "report", error }); 129 + } 130 } 131 132 if (this.check.commentAcct) { 133 + try { 134 + await createAccountComment( 135 + this.did, 136 + formattedComment, 137 + `profile:${this.did}`, 138 + ); 139 + } catch (error) { 140 + results.success = false; 141 + results.errors.push({ action: "comment", error }); 142 + } 143 } 144 + 145 + return results; 146 } 147 148 + private async removeLabel( 149 + content: string, 150 + _processType: string, 151 + ): Promise<ModerationResult> { 152 + const results: ModerationResult = { success: true, errors: [] }; 153 const formattedComment = `${this.check.comment}\n\nContent: ${content}`; 154 + try { 155 + await negateAccountLabel(this.did, this.check.label, formattedComment); 156 + } catch (error) { 157 + results.success = false; 158 + results.errors.push({ action: "unlabel", error }); 159 + } 160 + return results; 161 } 162 } 163 ··· 195 196 if (checkRule.description === true) { 197 const checker = new ProfileChecker(checkRule, did, time); 198 + await checker.checkDescription(description); 199 } 200 } 201 }; ··· 234 235 if (checkRule.displayName === true) { 236 const checker = new ProfileChecker(checkRule, did, time); 237 + await checker.checkDisplayName(displayName); 238 } 239 } 240 }; ··· 279 const checker = new ProfileChecker(checkRule, did, time); 280 281 if (checkRule.description === true && checkRule.displayName === true) { 282 + await checker.checkBoth(displayName, description); 283 } else if (checkRule.description === true) { 284 + await checker.checkDescription(description); 285 } else if (checkRule.displayName === true) { 286 + await checker.checkDisplayName(displayName); 287 } 288 } 289 };
+10
src/types.ts
··· 89 commentAcct?: boolean; 90 allowlist?: string[]; 91 }
··· 89 commentAcct?: boolean; 90 allowlist?: string[]; 91 } 92 + 93 + export interface ModerationError { 94 + action: "label" | "report" | "comment" | "unlabel"; 95 + error: unknown; 96 + } 97 + 98 + export interface ModerationResult { 99 + success: boolean; 100 + errors: ModerationError[]; 101 + }