Openstatus www.openstatus.dev

๐Ÿ› notification (#506)

* ๐Ÿ› notification

* ๐Ÿงน clean code

* ๐Ÿงน clean code

authored by

Thibault Le Ouay and committed by
GitHub
5e2c577d 6f2de396

+12 -606
-142
apps/server/src/checker/checker.test.ts
··· 1 - import { expect, mock, test } from "bun:test"; 2 - 3 - import { checkerRetryPolicy } from "./checker"; 4 - 5 - mock.module("./ping.ts", () => { 6 - return { 7 - publishPing: () => {}, 8 - }; 9 - }); 10 - 11 - test("should call upsertMonitorStatus when we can fetch", async () => { 12 - const fn = mock(() => {}); 13 - 14 - mock.module("./monitor-handler.ts", () => { 15 - return { 16 - handleMonitorFailed: mock(() => {}), 17 - handleMonitorRecovered: fn, 18 - }; 19 - }); 20 - 21 - await checkerRetryPolicy({ 22 - workspaceId: "1", 23 - monitorId: "1", 24 - url: "https://www.google.com", 25 - cronTimestamp: 1, 26 - status: "error", 27 - pageIds: [], 28 - method: "GET", 29 - }); 30 - expect(fn).toHaveBeenCalledTimes(1); 31 - }); 32 - 33 - test("should call upsertMonitorStatus when status error", async () => { 34 - const fn = mock(() => {}); 35 - 36 - mock.module("./monitor-handler.ts", () => { 37 - return { 38 - handleMonitorFailed: fn, 39 - handleMonitorRecovered: mock(() => {}), 40 - }; 41 - }); 42 - try { 43 - await checkerRetryPolicy({ 44 - workspaceId: "1", 45 - monitorId: "1", 46 - url: "https://xxxxxxx.fake", 47 - cronTimestamp: 1, 48 - status: "active", 49 - pageIds: [], 50 - method: "GET", 51 - }); 52 - } catch (e) { 53 - expect(e).toBeInstanceOf(Error); 54 - } 55 - expect(fn).toHaveBeenCalledTimes(1); 56 - }); 57 - 58 - test("What should we do when redirect ", async () => { 59 - const fn = mock(() => {}); 60 - 61 - mock.module("./monitor-handler.ts", () => { 62 - return { 63 - handleMonitorFailed: fn, 64 - }; 65 - }); 66 - try { 67 - await checkerRetryPolicy({ 68 - workspaceId: "1", 69 - monitorId: "1", 70 - url: "https://www.openstatus.dev/toto", 71 - cronTimestamp: 1, 72 - status: "active", 73 - pageIds: [], 74 - method: "GET", 75 - }); 76 - } catch (e) { 77 - expect(e).toBeInstanceOf(Error); 78 - } 79 - expect(fn).toHaveBeenCalledTimes(0); 80 - }); 81 - 82 - test("When 404 we should trigger alerting ", async () => { 83 - const fn = mock(() => {}); 84 - const fn1 = mock(() => {}); 85 - 86 - mock.module("./ping.ts", () => { 87 - return { 88 - publishPing: fn, 89 - }; 90 - }); 91 - mock.module("./monitor-handler.ts", () => { 92 - return { 93 - handleMonitorFailed: fn1, 94 - }; 95 - }); 96 - try { 97 - await checkerRetryPolicy({ 98 - workspaceId: "1", 99 - monitorId: "1", 100 - url: "https://www.openstat.us/404", 101 - cronTimestamp: 1, 102 - status: "active", 103 - pageIds: [], 104 - method: "GET", 105 - }); 106 - } catch (e) { 107 - expect(e).toBeInstanceOf(Error); 108 - } 109 - expect(fn).toHaveBeenCalledTimes(1); 110 - expect(fn1).toHaveBeenCalledTimes(1); 111 - }); 112 - 113 - test("When error 404 we should not trigger alerting ", async () => { 114 - const fn = mock(() => {}); 115 - const fn1 = mock(() => {}); 116 - 117 - mock.module("./ping.ts", () => { 118 - return { 119 - publishPing: fn, 120 - }; 121 - }); 122 - mock.module("./monitor-handler.ts", () => { 123 - return { 124 - handleMonitorFailed: fn1, 125 - }; 126 - }); 127 - try { 128 - await checkerRetryPolicy({ 129 - workspaceId: "1", 130 - monitorId: "1", 131 - url: "https://www.openstat.us/404", 132 - cronTimestamp: 1, 133 - status: "error", 134 - pageIds: [], 135 - method: "GET", 136 - }); 137 - } catch (e) { 138 - expect(e).toBeInstanceOf(Error); 139 - } 140 - expect(fn).toHaveBeenCalledTimes(1); 141 - expect(fn1).toHaveBeenCalledTimes(0); 142 - });
-136
apps/server/src/checker/checker.ts
··· 1 - import { handleMonitorFailed, handleMonitorRecovered } from "./monitor-handler"; 2 - import type { PublishPingType } from "./ping"; 3 - import { getHeaders, publishPing } from "./ping"; 4 - import type { Payload } from "./schema"; 5 - 6 - // we could have a 'retry' parameter to know how often we should retry 7 - // we could use a setTimeout to retry after a certain amount of time - can be random between 500ms and 10s 8 - export const publishPingRetryPolicy = async ({ 9 - payload, 10 - latency, 11 - statusCode, 12 - message, 13 - }: PublishPingType) => { 14 - try { 15 - console.log( 16 - `1๏ธโƒฃ try publish ping to tb - attempt 1 ${JSON.stringify( 17 - payload, 18 - )} with latency ${latency} and status code ${statusCode}`, 19 - ); 20 - await publishPing({ 21 - payload, 22 - statusCode, 23 - latency, 24 - message, 25 - }); 26 - } catch { 27 - try { 28 - console.log( 29 - "2๏ธโƒฃ try publish ping to tb - attempt 2 ", 30 - JSON.stringify(payload), 31 - ); 32 - 33 - await publishPing({ 34 - payload, 35 - statusCode, 36 - latency, 37 - message, 38 - }); 39 - } catch (e) { 40 - throw e; 41 - } 42 - } 43 - console.log( 44 - `๐Ÿ—ƒ๏ธ Successfully published ${JSON.stringify( 45 - payload, 46 - )} with latency ${latency} and status code ${statusCode}`, 47 - ); 48 - }; 49 - 50 - const run = async (data: Payload, retry: number) => { 51 - let startTime = 0; 52 - let endTime = 0; 53 - let res = null; 54 - let message = undefined; 55 - // We are doing these for wrong urls 56 - try { 57 - const headers = getHeaders(data); 58 - console.log(`๐Ÿ†• fetch is about to start for ${JSON.stringify(data)}`); 59 - startTime = performance.now(); 60 - res = await fetch(data.url, { 61 - method: data.method, 62 - keepalive: false, 63 - cache: "no-store", 64 - headers, 65 - // Avoid having "TypeError: Request with a GET or HEAD method cannot have a body." error 66 - ...(data.method === "POST" && { body: data?.body }), 67 - }); 68 - 69 - endTime = performance.now(); 70 - console.log(`โœ… fetch is done for ${JSON.stringify(data)}`); 71 - } catch (e) { 72 - endTime = performance.now(); 73 - message = `${e}`; 74 - console.log( 75 - `๐Ÿšจ error on pingEndpoint for ${JSON.stringify(data)} error: `, 76 - e, 77 - ); 78 - } 79 - 80 - const latency = Number((endTime - startTime).toFixed(0)); 81 - if (res?.ok) { 82 - await publishPingRetryPolicy({ 83 - payload: data, 84 - latency, 85 - statusCode: res.status, 86 - message: undefined, 87 - }); 88 - if (data?.status === "error") { 89 - await handleMonitorRecovered(data, res); 90 - } 91 - } else { 92 - if (retry < 2) { 93 - throw new Error(`error on ping for ${data.monitorId}`); 94 - } 95 - // Store the error on third task retry 96 - if (retry === 2) { 97 - console.log( 98 - `๐Ÿ› error on fetching data for ${JSON.stringify( 99 - data, 100 - )} with result ${JSON.stringify(res)}`, 101 - ); 102 - await publishPingRetryPolicy({ 103 - payload: data, 104 - latency, 105 - statusCode: res?.status, 106 - message, 107 - }); 108 - if (data.status === "active") { 109 - await handleMonitorFailed(data, res, message); 110 - } 111 - } 112 - } 113 - return { res, latency }; 114 - }; 115 - 116 - // We have this extensible retry policy because sometimes we have this error : 117 - // ConnectionClosed: The socket connection was closed unexpectedly. 118 - export const checkerRetryPolicy = async (data: Payload) => { 119 - try { 120 - console.log("๐Ÿฅ‡ try run checker - attempt 1 ", JSON.stringify(data)); 121 - await run(data, 0); 122 - } catch { 123 - try { 124 - console.log("๐Ÿฅˆ try run checker - attempt 2 ", JSON.stringify(data)); 125 - await run(data, 1); 126 - } catch (e) { 127 - try { 128 - console.log("๐Ÿฅ‰ try run checker - attempt 3 ", JSON.stringify(data)); 129 - await run(data, 2); 130 - } catch (e) { 131 - throw e; 132 - } 133 - } 134 - } 135 - console.log("๐Ÿ”ฅ successfully run checker ", JSON.stringify(data)); 136 - };
+12 -115
apps/server/src/checker/index.ts
··· 5 5 6 6 import { env } from "../env"; 7 7 import { checkerAudit } from "../utils/audit-log"; 8 - import { upsertMonitorStatus } from "./alerting"; 9 - import { checkerRetryPolicy } from "./checker"; 10 - import { payloadSchema } from "./schema"; 11 - import type { Payload } from "./schema"; 12 - 13 - export type Variables = { 14 - payload: Payload; 15 - }; 16 - 17 - export const checkerRoute = new Hono<{ Variables: Variables }>(); 18 - 19 - // TODO: only use checkerRoute.post("/checker", checker); 20 - 21 - checkerRoute.post("/checker", async (c) => { 22 - const json = await c.req.json(); 23 - const auth = c.req.header("Authorization"); 24 - if (auth !== `Basic ${env.CRON_SECRET}`) { 25 - console.error("Unauthorized"); 26 - return c.text("Unauthorized", 401); 27 - } 28 - console.log("Retry : ", c.req.header("X-CloudTasks-TaskRetryCount")); 29 - 30 - const result = payloadSchema.safeParse(json); 31 - 32 - if (!result.success) { 33 - console.error(result.error); 34 - return c.text("Unprocessable Entity", 422); 35 - } 36 - const retry = Number(c.req.header("X-CloudTasks-TaskRetryCount") || 0); 37 - if (retry > 3) { 38 - console.error( 39 - `catchTooManyRetry for ${JSON.stringify(result.data)} 40 - )}`, 41 - ); 42 - return c.text("Ok", 200); // finish the task 43 - } 8 + import { triggerAlerting, upsertMonitorStatus } from "./alerting"; 44 9 45 - try { 46 - console.log( 47 - `start checker URL: ${result.data.url} monitorId ${result.data.monitorId}`, 48 - ); 49 - await checkerRetryPolicy(result.data); 50 - console.log( 51 - `end checker URL: ${result.data.url} monitorId ${result.data.monitorId}`, 52 - ); 53 - return c.text("Ok", 200); 54 - } catch (e) { 55 - console.error( 56 - `fail checker URL: ${result.data.url} monitorId ${result.data.monitorId}`, 57 - JSON.stringify(result.data), 58 - e, 59 - ); 60 - if (result.data.status === "error") { 61 - console.log( 62 - `The monitor was already in error we should not retry checker URL: ${result.data}`, 63 - ); 64 - return c.text("Ok", 200); 65 - } 66 - return c.text("Internal Server Error", 500); 67 - } 68 - }); 69 - 70 - checkerRoute.post("/checkerV2", async (c) => { 71 - const json = await c.req.json(); 72 - 73 - const auth = c.req.header("Authorization"); 74 - if (auth !== `Basic ${env.CRON_SECRET}`) { 75 - console.error("Unauthorized"); 76 - return c.text("Unauthorized", 401); 77 - } 78 - 79 - const result = payloadSchema.safeParse(json); 80 - 81 - if (!result.success) { 82 - console.error(result.error); 83 - return c.text("Unprocessable Entity", 422); 84 - } 85 - const retry = Number(c.req.header("X-CloudTasks-TaskRetryCount") || 0); 86 - if (retry > 2) { 87 - console.error( 88 - `โ›” Too many retry for ${JSON.stringify(result.data)} 89 - )}`, 90 - ); 91 - // catchTooManyRetry(result.data); 92 - return c.text("Ok", 200); // finish the task 93 - } 94 - if (retry > 0 && result.data.status === "error") { 95 - console.log( 96 - `๐Ÿ—‘๏ธ The monitor was already in error we should not checked the URL: ${result.data}`, 97 - ); 98 - return c.text("Ok", 200); 99 - } 100 - 101 - try { 102 - console.log(`๐Ÿงญ start checker for: ${JSON.stringify(result.data)}`); 103 - await checkerRetryPolicy(result.data); 104 - console.log(`๐Ÿ”š end checker for: ${JSON.stringify(result.data)} `); 105 - return c.text("Ok", 200); 106 - } catch (e) { 107 - if (result.data.status === "error") { 108 - console.log( 109 - `๐Ÿ—‘๏ธ The monitor was already in error we should not retry checker URL: ${result.data}`, 110 - ); 111 - return c.text("Ok", 200); 112 - } 113 - console.error( 114 - `๐Ÿ”ด fail checker URL: ${result.data.url} monitorId ${result.data.monitorId}`, 115 - JSON.stringify(result.data), 116 - e, 117 - ); 118 - return c.text("Internal Server Error", 500); 119 - } 120 - }); 10 + export const checkerRoute = new Hono(); 121 11 122 12 checkerRoute.post("/updateStatus", async (c) => { 123 13 const auth = c.req.header("Authorization"); ··· 146 36 147 37 switch (status) { 148 38 case "active": 39 + if (!statusCode) { 40 + return; 41 + } 149 42 await upsertMonitorStatus({ 150 43 monitorId: monitorId, 151 44 status: "active", 152 45 region: region, 153 46 }); 154 - if (!statusCode) { 155 - return; 156 - } 47 + 157 48 await checkerAudit.publishAuditLog({ 158 49 id: `monitor:${monitorId}`, 159 50 action: "monitor.recovered", ··· 177 68 statusCode: statusCode, 178 69 message, 179 70 }, 71 + }); 72 + await triggerAlerting({ 73 + monitorId: monitorId, 74 + region: env.FLY_REGION, 75 + statusCode, 76 + message, 180 77 }); 181 78 break; 182 79 }
-48
apps/server/src/checker/monitor-handler.ts
··· 1 - import { env } from "../env"; 2 - import { checkerAudit } from "../utils/audit-log"; 3 - import { triggerAlerting, upsertMonitorStatus } from "./alerting"; 4 - import type { Payload } from "./schema"; 5 - 6 - export async function handleMonitorRecovered(data: Payload, res: Response) { 7 - // await upsertMonitorStatus({ 8 - // monitorId: data.monitorId, 9 - // status: "active", 10 - // }); 11 - // ALPHA 12 - await checkerAudit.publishAuditLog({ 13 - id: `monitor:${data.monitorId}`, 14 - action: "monitor.recovered", 15 - targets: [{ id: data.monitorId, type: "monitor" }], 16 - metadata: { region: env.FLY_REGION, statusCode: res.status }, 17 - }); 18 - // 19 - } 20 - 21 - export async function handleMonitorFailed( 22 - data: Payload, 23 - res: Response | null, 24 - message?: string, 25 - ) { 26 - // await upsertMonitorStatus({ 27 - // monitorId: data.monitorId, 28 - // status: "error", 29 - // }); 30 - // ALPHA 31 - await checkerAudit.publishAuditLog({ 32 - id: `monitor:${data.monitorId}`, 33 - action: "monitor.failed", 34 - targets: [{ id: data.monitorId, type: "monitor" }], 35 - metadata: { 36 - region: env.FLY_REGION, 37 - statusCode: res?.status, 38 - message, 39 - }, 40 - }); 41 - // 42 - await triggerAlerting({ 43 - monitorId: data.monitorId, 44 - region: env.FLY_REGION, 45 - statusCode: res?.status, 46 - message, 47 - }); 48 - }
-60
apps/server/src/checker/ping.ts
··· 1 - import { publishPingResponse } from "@openstatus/tinybird"; 2 - 3 - import { env } from "../env"; 4 - import { fakePromiseWithRandomResolve } from "../utils/random-promise"; 5 - import type { Payload } from "./schema"; 6 - 7 - const region = env.FLY_REGION; 8 - 9 - export function getHeaders(data?: Payload) { 10 - const customHeaders = 11 - data?.headers?.reduce((o, v) => { 12 - // removes empty keys from the header 13 - if (v.key.trim() === "") return o; 14 - return { ...o, [v.key]: v.value }; 15 - }, {}) || {}; 16 - return { 17 - "OpenStatus-Ping": "true", 18 - ...customHeaders, 19 - }; 20 - } 21 - 22 - export type PublishPingType = { 23 - payload: Payload; 24 - latency: number; 25 - statusCode?: number | undefined; 26 - message?: string | undefined; 27 - }; 28 - 29 - export async function publishPing({ 30 - payload, 31 - latency, 32 - message, 33 - statusCode, 34 - }: PublishPingType) { 35 - const { monitorId, cronTimestamp, url, workspaceId } = payload; 36 - 37 - if ( 38 - process.env.NODE_ENV === "production" || 39 - process.env.NODE_ENV === "test" 40 - ) { 41 - const res = await publishPingResponse({ 42 - timestamp: Date.now(), 43 - statusCode, 44 - latency, 45 - region, 46 - url, 47 - monitorId, 48 - cronTimestamp, 49 - workspaceId, 50 - message: message, 51 - }); 52 - if (res.successful_rows === 0) { 53 - throw new Error(`error 0 rows on publish ping for ${payload.monitorId}`); 54 - } 55 - return res; 56 - } 57 - 58 - const res = await fakePromiseWithRandomResolve(); 59 - return res; 60 - }
-17
apps/server/src/checker/schema.ts
··· 1 - import { z } from "zod"; 2 - 3 - import { monitorMethods, monitorStatus } from "@openstatus/db/src/schema"; 4 - 5 - export const payloadSchema = z.object({ 6 - workspaceId: z.string(), 7 - monitorId: z.string(), 8 - method: z.enum(monitorMethods), 9 - body: z.string().optional(), 10 - headers: z.array(z.object({ key: z.string(), value: z.string() })).optional(), 11 - url: z.string(), 12 - cronTimestamp: z.number(), 13 - pageIds: z.array(z.string()), 14 - status: z.enum(monitorStatus), 15 - }); 16 - 17 - export type Payload = z.infer<typeof payloadSchema>;
-6
apps/server/src/index.ts
··· 6 6 import { env } from "./env"; 7 7 import { publicRoute } from "./public"; 8 8 import { api } from "./v1"; 9 - import { VercelIngest } from "./vercel"; 10 9 11 10 const app = new Hono(); 12 11 app.use("*", sentry({ dsn: process.env.SENTRY_DSN })); 13 - 14 - /** 15 - * Vercel Integration 16 - */ 17 - app.post("/integration/vercel", VercelIngest); 18 12 19 13 /** 20 14 * Public Routes
-46
apps/server/src/schema/vercel.ts
··· 1 - import { z } from "zod"; 2 - 3 - // https://vercel.com/docs/observability/log-drains-overview/log-drains-reference#json-log-drains 4 - export const logDrainSchema = z.object({ 5 - id: z.string().optional(), 6 - timestamp: z.number().optional(), 7 - type: z 8 - .enum([ 9 - "middleware-invocation", 10 - "stdout", 11 - "stderr", 12 - "edge-function-invocation", 13 - "fatal", 14 - ]) 15 - .optional(), 16 - edgeType: z.enum(["edge-function", "middleware"]).optional(), 17 - requestId: z.string().optional(), 18 - statusCode: z.number().optional(), 19 - message: z.string().optional(), 20 - projectId: z.string().optional(), 21 - deploymentId: z.string().optional(), 22 - buildId: z.string().optional(), 23 - source: z.enum(["external", "lambda", "edge", "static", "build"]), 24 - host: z.string().optional(), 25 - environment: z.string().optional(), 26 - branch: z.string().optional(), 27 - destination: z.string().optional(), 28 - path: z.string().optional(), 29 - entrypoint: z.string().optional(), 30 - proxy: z 31 - .object({ 32 - timestamp: z.number().optional(), 33 - region: z.string().optional(), // TODO: use regions enum? 34 - method: z.string().optional(), // TODO: use methods enum? 35 - vercelCache: z.string().optional(), // TODO: use "HIT" / "MISS" enum? 36 - statusCode: z.number().optional(), 37 - path: z.string().optional(), 38 - host: z.string().optional(), 39 - scheme: z.string().optional(), 40 - clientIp: z.string().optional(), 41 - userAgent: z.array(z.string()).optional(), 42 - }) 43 - .optional(), 44 - }); 45 - 46 - export const logDrainSchemaArray = z.array(logDrainSchema);
-36
apps/server/src/vercel.ts
··· 1 - import type { Context, Env } from "hono"; 2 - 3 - import { Tinybird } from "@openstatus/tinybird"; 4 - 5 - import { env } from "./env"; 6 - import { logDrainSchema, logDrainSchemaArray } from "./schema/vercel"; 7 - 8 - const tb = new Tinybird({ token: env.TINY_BIRD_API_KEY }); // should we use t3-env? 9 - 10 - export function publishVercelLogDrain() { 11 - return tb.buildIngestEndpoint({ 12 - datasource: "vercel_log_drain__v0", 13 - event: logDrainSchema, 14 - }); 15 - } 16 - 17 - export const VercelIngest = async ( 18 - c: Context<Env, "/integration/vercel", {}>, 19 - ) => { 20 - const json = c.req.json(); 21 - const logDrains = logDrainSchemaArray.safeParse(json); 22 - 23 - if (logDrains.success) { 24 - // We are only pushing the logs that are not stdout or stderr 25 - const data = logDrains.data.filter( 26 - (log) => log.type !== "stdout" && log.type !== "stderr", 27 - ); 28 - 29 - for (const event of data) { 30 - // FIXME: Zod-bird is broken 31 - await publishVercelLogDrain()(event); 32 - } 33 - } 34 - 35 - c.json({ code: "ok" }); 36 - };