Openstatus www.openstatus.dev
at 4c0f4c00a38753a5d0dfd7e7b7b7706dec6f1503 333 lines 9.0 kB view raw
1import { Hono } from "hono"; 2import { z } from "zod"; 3 4import { and, count, db, eq, inArray, isNull, schema } from "@openstatus/db"; 5import { incidentTable } from "@openstatus/db/src/schema"; 6import { 7 monitorStatusSchema, 8 selectMonitorSchema, 9} from "@openstatus/db/src/schema/monitors/validation"; 10 11import { getLogger } from "@logtape/logtape"; 12import { monitorRegions } from "@openstatus/db/src/schema/constants"; 13import { env } from "../env"; 14import type { Env } from "../index"; 15import { checkerAudit } from "../utils/audit-log"; 16import { triggerNotifications, upsertMonitorStatus } from "./alerting"; 17 18export const checkerRoute = new Hono<Env>(); 19 20const payloadSchema = z.object({ 21 monitorId: z.string(), 22 message: z.string().optional(), 23 statusCode: z.number().optional(), 24 region: z.enum(monitorRegions), 25 cronTimestamp: z.number(), 26 status: monitorStatusSchema, 27 latency: z.number().optional(), 28}); 29 30const logger = getLogger(["workflow"]); 31 32checkerRoute.post("/updateStatus", async (c) => { 33 const auth = c.req.header("Authorization"); 34 if (auth !== `Basic ${env().CRON_SECRET}`) { 35 logger.error("Unauthorized"); 36 return c.text("Unauthorized", 401); 37 } 38 39 const event = c.get("event"); 40 const json = await c.req.json(); 41 42 const result = payloadSchema.safeParse(json); 43 44 if (!result.success) { 45 return c.text("Unprocessable Entity", 422); 46 } 47 event.status_update = { 48 status: result.data.status, 49 message: result.data.message, 50 region: result.data.region, 51 status_code: result.data.statusCode, 52 cron_timestamp: result.data.cronTimestamp, 53 latency_ms: result.data.latency, 54 }; 55 56 const { 57 monitorId, 58 message, 59 region, 60 statusCode, 61 cronTimestamp, 62 status, 63 latency, 64 } = result.data; 65 66 logger.info("Updating monitor status", { 67 monitor_id: monitorId, 68 region, 69 status, 70 status_code: statusCode, 71 cron_timestamp: cronTimestamp, 72 latency_ms: latency, 73 }); 74 75 // First we upsert the monitor status 76 await upsertMonitorStatus({ 77 monitorId: monitorId, 78 status, 79 region: region, 80 }); 81 82 const currentMonitor = await db 83 .select() 84 .from(schema.monitor) 85 .where(eq(schema.monitor.id, Number(monitorId))) 86 .get(); 87 88 const monitor = selectMonitorSchema.parse(currentMonitor); 89 const numberOfRegions = monitor.regions.length; 90 91 const affectedRegion = await db 92 .select({ count: count() }) 93 .from(schema.monitorStatusTable) 94 .where( 95 and( 96 eq(schema.monitorStatusTable.monitorId, monitor.id), 97 eq(schema.monitorStatusTable.status, status), 98 inArray(schema.monitorStatusTable.region, monitor.regions), 99 ), 100 ) 101 .get(); 102 103 if (!affectedRegion?.count) { 104 return c.json({ success: true }, 200); 105 } 106 107 // audit log the current state of the ping 108 109 switch (status) { 110 case "active": 111 await checkerAudit.publishAuditLog({ 112 id: `monitor:${monitorId}`, 113 action: "monitor.recovered", 114 targets: [{ id: monitorId, type: "monitor" }], 115 metadata: { 116 region, 117 statusCode: statusCode ?? -1, 118 cronTimestamp, 119 latency, 120 }, 121 }); 122 break; 123 case "degraded": 124 await checkerAudit.publishAuditLog({ 125 id: `monitor:${monitorId}`, 126 action: "monitor.degraded", 127 targets: [{ id: monitorId, type: "monitor" }], 128 metadata: { 129 region, 130 statusCode: statusCode ?? -1, 131 cronTimestamp, 132 latency, 133 }, 134 }); 135 break; 136 case "error": 137 await checkerAudit.publishAuditLog({ 138 id: `monitor:${monitorId}`, 139 action: "monitor.failed", 140 targets: [{ id: monitorId, type: "monitor" }], 141 metadata: { 142 region, 143 statusCode: statusCode ?? -1, 144 message, 145 cronTimestamp, 146 latency, 147 }, 148 }); 149 break; 150 } 151 152 if (affectedRegion.count >= numberOfRegions / 2 || numberOfRegions === 1) { 153 switch (status) { 154 case "active": { 155 // it's been resolved 156 if (monitor.status === "active") { 157 break; 158 } 159 160 logger.info("Monitor status changed to active", { 161 monitor_id: monitor.id, 162 workspace_id: monitor.workspaceId, 163 }); 164 await db 165 .update(schema.monitor) 166 .set({ status: "active" }) 167 .where(eq(schema.monitor.id, monitor.id)); 168 169 // we can't have a monitor in error without an incident 170 if (monitor.status === "error") { 171 const incident = await db 172 .select() 173 .from(incidentTable) 174 .where( 175 and( 176 eq(incidentTable.monitorId, Number(monitorId)), 177 isNull(incidentTable.resolvedAt), 178 isNull(incidentTable.acknowledgedAt), 179 ), 180 ) 181 .get(); 182 183 if (!incident) { 184 // it was just a single failure not a proper incident 185 break; 186 } 187 if (incident?.resolvedAt) { 188 // incident is already resolved 189 break; 190 } 191 logger.info("Recovering incident", { 192 incident_id: incident.id, 193 monitor_id: monitorId, 194 }); 195 196 await db 197 .update(incidentTable) 198 .set({ 199 resolvedAt: new Date(cronTimestamp), 200 autoResolved: true, 201 }) 202 .where(eq(incidentTable.id, incident.id)) 203 .run(); 204 205 await checkerAudit.publishAuditLog({ 206 id: `monitor:${monitorId}`, 207 action: "incident.resolved", 208 targets: [{ id: monitorId, type: "monitor" }], 209 metadata: { cronTimestamp, incidentId: incident.id }, 210 }); 211 } 212 213 await triggerNotifications({ 214 monitorId, 215 statusCode, 216 message, 217 notifType: "recovery", 218 cronTimestamp, 219 region, 220 latency, 221 incidentId: `${cronTimestamp}`, 222 }); 223 224 break; 225 } 226 case "degraded": 227 if (monitor.status === "degraded") { 228 // already degraded let's return early 229 break; 230 } 231 logger.info("Monitor status changed to degraded", { 232 monitor_id: monitor.id, 233 workspace_id: monitor.workspaceId, 234 }); 235 236 await db 237 .update(schema.monitor) 238 .set({ status: "degraded" }) 239 .where(eq(schema.monitor.id, monitor.id)); 240 // figure how to send the notification once 241 await triggerNotifications({ 242 monitorId, 243 statusCode, 244 message, 245 notifType: "degraded", 246 cronTimestamp, 247 latency, 248 region, 249 incidentId: `${cronTimestamp}`, 250 }); 251 252 break; 253 case "error": 254 if (monitor.status === "error") { 255 // already in error let's return early 256 break; 257 } 258 259 logger.info("Monitor status changed to error", { 260 monitor_id: monitor.id, 261 workspace_id: monitor.workspaceId, 262 }); 263 264 await db 265 .update(schema.monitor) 266 .set({ status: "error" }) 267 .where(eq(schema.monitor.id, monitor.id)); 268 269 try { 270 const incident = await db 271 .select() 272 .from(incidentTable) 273 .where( 274 and( 275 eq(incidentTable.monitorId, Number(monitorId)), 276 isNull(incidentTable.resolvedAt), 277 isNull(incidentTable.acknowledgedAt), 278 ), 279 ) 280 .get(); 281 if (incident) { 282 logger.info("we are already in incident"); 283 break; 284 } 285 const [newIncident] = await db 286 .insert(incidentTable) 287 .values({ 288 monitorId: Number(monitorId), 289 workspaceId: monitor.workspaceId, 290 startedAt: new Date(cronTimestamp), 291 }) 292 .returning(); 293 294 if (!newIncident.id) { 295 return; 296 } 297 298 await checkerAudit.publishAuditLog({ 299 id: `monitor:${monitorId}`, 300 action: "incident.created", 301 targets: [{ id: monitorId, type: "monitor" }], 302 metadata: { cronTimestamp, incidentId: newIncident.id }, 303 }); 304 305 await triggerNotifications({ 306 monitorId, 307 statusCode, 308 message, 309 notifType: "alert", 310 cronTimestamp, 311 latency, 312 region, 313 incidentId: String(newIncident.id), 314 }); 315 316 await db 317 .update(schema.monitor) 318 .set({ status: "error" }) 319 .where(eq(schema.monitor.id, monitor.id)); 320 } catch { 321 logger.warning("incident was already created"); 322 } 323 324 break; 325 default: 326 logger.error("should not happen"); 327 break; 328 } 329 } 330 331 // if we are in error 332 return c.text("Ok", 200); 333});