Openstatus www.openstatus.dev
at 4c0f4c00a38753a5d0dfd7e7b7b7706dec6f1503 332 lines 9.3 kB view raw
1import { CloudTasksClient } from "@google-cloud/tasks"; 2import type { google } from "@google-cloud/tasks/build/protos/protos"; 3import { z } from "zod"; 4 5import { and, eq, gte, lte, notInArray } from "@openstatus/db"; 6import { 7 type MonitorStatus, 8 maintenance, 9 maintenancesToMonitors, 10 monitor, 11 monitorStatusTable, 12 selectMonitorSchema, 13 selectMonitorStatusSchema, 14} from "@openstatus/db/src/schema"; 15import type { Region } from "@openstatus/db/src/schema/constants"; 16import { regionDict } from "@openstatus/regions"; 17import { db } from "../lib/db"; 18 19import { getSentry } from "@hono/sentry"; 20import { getLogger } from "@logtape/logtape"; 21import type { monitorPeriodicitySchema } from "@openstatus/db/src/schema/constants"; 22import { 23 type DNSPayloadSchema, 24 type httpPayloadSchema, 25 type tpcPayloadSchema, 26 transformHeaders, 27} from "@openstatus/utils"; 28import type { Context } from "hono"; 29import { env } from "../env"; 30 31export const isAuthorizedDomain = (url: string) => { 32 return url.includes(env().SITE_URL); 33}; 34 35const logger = getLogger("workflow"); 36 37const channelOptions = { 38 // Conservative 5-minute keepalive (gRPC best practice) 39 "grpc.keepalive_time_ms": 300000, 40 // 5-second timeout sufficient for ping response 41 "grpc.keepalive_timeout_ms": 5000, 42 // Disable pings without active calls to avoid server conflicts 43 "grpc.keepalive_permit_without_calls": 1, 44}; 45 46export async function sendCheckerTasks( 47 periodicity: z.infer<typeof monitorPeriodicitySchema>, 48 c: Context, 49) { 50 const client = new CloudTasksClient({ 51 fallback: "rest", 52 channelOptions, 53 projectId: env().GCP_PROJECT_ID, 54 credentials: { 55 client_email: env().GCP_CLIENT_EMAIL, 56 private_key: env().GCP_PRIVATE_KEY.replaceAll("\\n", "\n"), 57 }, 58 }); 59 60 const parent = client.queuePath( 61 env().GCP_PROJECT_ID, 62 env().GCP_LOCATION, 63 periodicity, 64 ); 65 66 const timestamp = Date.now(); 67 68 const currentMaintenance = db 69 .select({ id: maintenance.id }) 70 .from(maintenance) 71 .where( 72 and(lte(maintenance.from, new Date()), gte(maintenance.to, new Date())), 73 ) 74 .as("currentMaintenance"); 75 76 const currentMaintenanceMonitors = db 77 .select({ id: maintenancesToMonitors.monitorId }) 78 .from(maintenancesToMonitors) 79 .innerJoin( 80 currentMaintenance, 81 eq(maintenancesToMonitors.maintenanceId, currentMaintenance.id), 82 ); 83 84 const result = await db 85 .select() 86 .from(monitor) 87 .where( 88 and( 89 eq(monitor.periodicity, periodicity), 90 eq(monitor.active, true), 91 notInArray(monitor.id, currentMaintenanceMonitors), 92 ), 93 ) 94 .all(); 95 96 logger.info("Starting cron job", { 97 periodicity, 98 monitor_count: result.length, 99 }); 100 101 const monitors = z.array(selectMonitorSchema).safeParse(result); 102 const allResult = []; 103 if (!monitors.success) { 104 logger.error(`Error while fetching the monitors ${monitors.error}`); 105 throw new Error("Error while fetching the monitors"); 106 } 107 108 for (const row of monitors.data) { 109 // const selectedRegions = row.regions.length > 0 ? row.regions : ["ams"]; 110 111 const result = await db 112 .select() 113 .from(monitorStatusTable) 114 .where(eq(monitorStatusTable.monitorId, row.id)) 115 .all(); 116 const monitorStatus = z.array(selectMonitorStatusSchema).safeParse(result); 117 if (!monitorStatus.success) { 118 logger.error("Failed to parse monitor status", { 119 monitor_id: row.id, 120 error_message: monitorStatus.error.message, 121 }); 122 continue; 123 } 124 125 for (const region of row.regions) { 126 const status = 127 monitorStatus.data.find((m) => region === m.region)?.status || "active"; 128 129 const r = regionDict[region as keyof typeof regionDict]; 130 131 if (!r) { 132 logger.error(`Invalid region ${region}`); 133 continue; 134 } 135 if (r.deprecated) { 136 // Let's uncomment this when we are ready to remove deprecated regions 137 // We should not use deprecated regions anymore 138 logger.error(`Deprecated region ${region}`); 139 continue; 140 } 141 const response = createCronTask({ 142 row, 143 timestamp, 144 client, 145 parent, 146 status, 147 region, 148 }); 149 allResult.push(response); 150 if (periodicity === "30s") { 151 // we schedule another task in 30s 152 const scheduledAt = timestamp + 30 * 1000; 153 const response = createCronTask({ 154 row, 155 timestamp: scheduledAt, 156 client, 157 parent, 158 status, 159 region, 160 }); 161 allResult.push(response); 162 } 163 } 164 } 165 166 const allRequests = await Promise.allSettled(allResult); 167 168 const success = allRequests.filter((r) => r.status === "fulfilled").length; 169 const failed = allRequests.filter((r) => r.status === "rejected").length; 170 171 logger.info("Completed cron job", { 172 periodicity, 173 total_tasks: allResult.length, 174 success_count: success, 175 failed_count: failed, 176 }); 177 if (failed > 0) { 178 logger.error("Cron job had failures", { 179 periodicity, 180 failed_count: failed, 181 success_count: success, 182 }); 183 getSentry(c).captureMessage( 184 `sendCheckerTasks for ${periodicity} ended with ${failed} failed tasks`, 185 "error", 186 ); 187 } 188} 189// timestamp needs to be in ms 190const createCronTask = async ({ 191 row, 192 timestamp, 193 client, 194 parent, 195 status, 196 region, 197}: { 198 row: z.infer<typeof selectMonitorSchema>; 199 timestamp: number; 200 client: CloudTasksClient; 201 parent: string; 202 status: MonitorStatus; 203 region: Region; 204}) => { 205 let payload: 206 | z.infer<typeof httpPayloadSchema> 207 | z.infer<typeof tpcPayloadSchema> 208 | z.infer<typeof DNSPayloadSchema> 209 | null = null; 210 211 // 212 if (row.jobType === "http") { 213 payload = { 214 workspaceId: String(row.workspaceId), 215 monitorId: String(row.id), 216 url: row.url, 217 method: row.method || "GET", 218 cronTimestamp: timestamp, 219 body: row.body, 220 headers: row.headers, 221 status: status, 222 assertions: row.assertions ? JSON.parse(row.assertions) : null, 223 degradedAfter: row.degradedAfter, 224 timeout: row.timeout, 225 trigger: "cron", 226 otelConfig: row.otelEndpoint 227 ? { 228 endpoint: row.otelEndpoint, 229 headers: transformHeaders(row.otelHeaders), 230 } 231 : undefined, 232 retry: row.retry || 3, 233 followRedirects: 234 row.followRedirects === null ? true : row.followRedirects, 235 }; 236 } 237 if (row.jobType === "tcp") { 238 payload = { 239 workspaceId: String(row.workspaceId), 240 monitorId: String(row.id), 241 uri: row.url, 242 status: status, 243 assertions: row.assertions ? JSON.parse(row.assertions) : null, 244 cronTimestamp: timestamp, 245 degradedAfter: row.degradedAfter, 246 timeout: row.timeout, 247 trigger: "cron", 248 retry: row.retry || 3, 249 otelConfig: row.otelEndpoint 250 ? { 251 endpoint: row.otelEndpoint, 252 headers: transformHeaders(row.otelHeaders), 253 } 254 : undefined, 255 }; 256 } 257 if (row.jobType === "dns") { 258 payload = { 259 workspaceId: String(row.workspaceId), 260 monitorId: String(row.id), 261 uri: row.url, 262 cronTimestamp: timestamp, 263 status: status, 264 assertions: row.assertions ? JSON.parse(row.assertions) : null, 265 degradedAfter: row.degradedAfter, 266 timeout: row.timeout, 267 trigger: "cron", 268 otelConfig: row.otelEndpoint 269 ? { 270 endpoint: row.otelEndpoint, 271 headers: transformHeaders(row.otelHeaders), 272 } 273 : undefined, 274 retry: row.retry || 3, 275 }; 276 } 277 278 if (!payload) { 279 throw new Error("Invalid jobType"); 280 } 281 const regionInfo = regionDict[region]; 282 let regionHeader = {}; 283 if (regionInfo.provider === "fly") { 284 regionHeader = { "fly-prefer-region": region }; 285 } 286 if (regionInfo.provider === "koyeb") { 287 regionHeader = { "X-KOYEB-REGION-OVERRIDE": region.replace("koyeb_", "") }; 288 } 289 if (regionInfo.provider === "railway") { 290 regionHeader = { "railway-region": region.replace("railway_", "") }; 291 } 292 const newTask: google.cloud.tasks.v2beta3.ITask = { 293 httpRequest: { 294 headers: { 295 "Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing 296 ...regionHeader, 297 Authorization: `Basic ${env().CRON_SECRET}`, 298 }, 299 httpMethod: "POST", 300 url: generateUrl({ row, region }), 301 body: Buffer.from(JSON.stringify(payload)).toString("base64"), 302 }, 303 scheduleTime: { 304 seconds: timestamp / 1000, 305 }, 306 }; 307 308 const request = { parent: parent, task: newTask }; 309 return client.createTask(request); 310}; 311 312function generateUrl({ 313 row, 314 region, 315}: { 316 row: z.infer<typeof selectMonitorSchema>; 317 region: Region; 318}) { 319 const regionInfo = regionDict[region]; 320 321 switch (regionInfo.provider) { 322 case "fly": 323 return `https://openstatus-checker.fly.dev/checker/${row.jobType}?monitor_id=${row.id}`; 324 case "koyeb": 325 return `https://openstatus-checker.koyeb.app/checker/${row.jobType}?monitor_id=${row.id}`; 326 case "railway": 327 return `https://railway-proxy-production-9cb1.up.railway.app/checker/${row.jobType}?monitor_id=${row.id}`; 328 329 default: 330 throw new Error("Invalid jobType"); 331 } 332}