Openstatus www.openstatus.dev
at e04c855d4ded3de361d0d758827dbfc50ff511d0 321 lines 9.1 kB view raw
1import { CloudTasksClient } from "@google-cloud/tasks"; 2import type { google } from "@google-cloud/tasks/build/protos/protos"; 3import { z } from "zod"; 4 5import { and, eq, gte, lte, notInArray } from "@openstatus/db"; 6import { 7 type MonitorStatus, 8 maintenance, 9 maintenancesToMonitors, 10 monitor, 11 monitorStatusTable, 12 selectMonitorSchema, 13 selectMonitorStatusSchema, 14} from "@openstatus/db/src/schema"; 15import type { Region } from "@openstatus/db/src/schema/constants"; 16import { regionDict } from "@openstatus/regions"; 17import { db } from "../lib/db"; 18 19import { getSentry } from "@hono/sentry"; 20import { getLogger } from "@logtape/logtape"; 21import type { monitorPeriodicitySchema } from "@openstatus/db/src/schema/constants"; 22import { 23 type DNSPayloadSchema, 24 type httpPayloadSchema, 25 type tpcPayloadSchema, 26 transformHeaders, 27} from "@openstatus/utils"; 28import type { Context } from "hono"; 29import { env } from "../env"; 30 31export const isAuthorizedDomain = (url: string) => { 32 return url.includes(env().SITE_URL); 33}; 34 35const logger = getLogger("workflow"); 36 37const channelOptions = { 38 // Conservative 5-minute keepalive (gRPC best practice) 39 "grpc.keepalive_time_ms": 300000, 40 // 5-second timeout sufficient for ping response 41 "grpc.keepalive_timeout_ms": 5000, 42 // Disable pings without active calls to avoid server conflicts 43 "grpc.keepalive_permit_without_calls": 1, 44}; 45 46export async function sendCheckerTasks( 47 periodicity: z.infer<typeof monitorPeriodicitySchema>, 48 c: Context, 49) { 50 const client = new CloudTasksClient({ 51 fallback: "rest", 52 channelOptions, 53 projectId: env().GCP_PROJECT_ID, 54 credentials: { 55 client_email: env().GCP_CLIENT_EMAIL, 56 private_key: env().GCP_PRIVATE_KEY.replaceAll("\\n", "\n"), 57 }, 58 }); 59 60 const parent = client.queuePath( 61 env().GCP_PROJECT_ID, 62 env().GCP_LOCATION, 63 periodicity, 64 ); 65 66 const timestamp = Date.now(); 67 68 const currentMaintenance = db 69 .select({ id: maintenance.id }) 70 .from(maintenance) 71 .where( 72 and(lte(maintenance.from, new Date()), gte(maintenance.to, new Date())), 73 ) 74 .as("currentMaintenance"); 75 76 const currentMaintenanceMonitors = db 77 .select({ id: maintenancesToMonitors.monitorId }) 78 .from(maintenancesToMonitors) 79 .innerJoin( 80 currentMaintenance, 81 eq(maintenancesToMonitors.maintenanceId, currentMaintenance.id), 82 ); 83 84 const result = await db 85 .select() 86 .from(monitor) 87 .where( 88 and( 89 eq(monitor.periodicity, periodicity), 90 eq(monitor.active, true), 91 notInArray(monitor.id, currentMaintenanceMonitors), 92 ), 93 ) 94 .all(); 95 96 logger.info(`Start cron for ${periodicity}`); 97 98 const monitors = z.array(selectMonitorSchema).safeParse(result); 99 const allResult = []; 100 if (!monitors.success) { 101 logger.error(`Error while fetching the monitors ${monitors.error}`); 102 throw new Error("Error while fetching the monitors"); 103 } 104 105 for (const row of monitors.data) { 106 // const selectedRegions = row.regions.length > 0 ? row.regions : ["ams"]; 107 108 const result = await db 109 .select() 110 .from(monitorStatusTable) 111 .where(eq(monitorStatusTable.monitorId, row.id)) 112 .all(); 113 const monitorStatus = z.array(selectMonitorStatusSchema).safeParse(result); 114 if (!monitorStatus.success) { 115 console.error( 116 `Error while fetching the monitor status ${monitorStatus.error}`, 117 ); 118 continue; 119 } 120 121 for (const region of row.regions) { 122 const status = 123 monitorStatus.data.find((m) => region === m.region)?.status || "active"; 124 125 const r = regionDict[region as keyof typeof regionDict]; 126 127 if (!r) { 128 logger.error(`Invalid region ${region}`); 129 continue; 130 } 131 if (r.deprecated) { 132 // Let's uncomment this when we are ready to remove deprecated regions 133 // We should not use deprecated regions anymore 134 logger.error(`Deprecated region ${region}`); 135 continue; 136 } 137 const response = createCronTask({ 138 row, 139 timestamp, 140 client, 141 parent, 142 status, 143 region, 144 }); 145 allResult.push(response); 146 if (periodicity === "30s") { 147 // we schedule another task in 30s 148 const scheduledAt = timestamp + 30 * 1000; 149 const response = createCronTask({ 150 row, 151 timestamp: scheduledAt, 152 client, 153 parent, 154 status, 155 region, 156 }); 157 allResult.push(response); 158 } 159 } 160 } 161 162 const allRequests = await Promise.allSettled(allResult); 163 164 const success = allRequests.filter((r) => r.status === "fulfilled").length; 165 const failed = allRequests.filter((r) => r.status === "rejected").length; 166 167 logger.info( 168 `End cron for ${periodicity} with ${allResult.length} jobs with ${success} success and ${failed} failed`, 169 ); 170 if (failed > 0) { 171 logger.error("error with cron jobs"); 172 getSentry(c).captureMessage( 173 `sendCheckerTasks for ${periodicity} ended with ${failed} failed tasks`, 174 "error", 175 ); 176 } 177} 178// timestamp needs to be in ms 179const createCronTask = async ({ 180 row, 181 timestamp, 182 client, 183 parent, 184 status, 185 region, 186}: { 187 row: z.infer<typeof selectMonitorSchema>; 188 timestamp: number; 189 client: CloudTasksClient; 190 parent: string; 191 status: MonitorStatus; 192 region: Region; 193}) => { 194 let payload: 195 | z.infer<typeof httpPayloadSchema> 196 | z.infer<typeof tpcPayloadSchema> 197 | z.infer<typeof DNSPayloadSchema> 198 | null = null; 199 200 // 201 if (row.jobType === "http") { 202 payload = { 203 workspaceId: String(row.workspaceId), 204 monitorId: String(row.id), 205 url: row.url, 206 method: row.method || "GET", 207 cronTimestamp: timestamp, 208 body: row.body, 209 headers: row.headers, 210 status: status, 211 assertions: row.assertions ? JSON.parse(row.assertions) : null, 212 degradedAfter: row.degradedAfter, 213 timeout: row.timeout, 214 trigger: "cron", 215 otelConfig: row.otelEndpoint 216 ? { 217 endpoint: row.otelEndpoint, 218 headers: transformHeaders(row.otelHeaders), 219 } 220 : undefined, 221 retry: row.retry || 3, 222 followRedirects: 223 row.followRedirects === null ? true : row.followRedirects, 224 }; 225 } 226 if (row.jobType === "tcp") { 227 payload = { 228 workspaceId: String(row.workspaceId), 229 monitorId: String(row.id), 230 uri: row.url, 231 status: status, 232 assertions: row.assertions ? JSON.parse(row.assertions) : null, 233 cronTimestamp: timestamp, 234 degradedAfter: row.degradedAfter, 235 timeout: row.timeout, 236 trigger: "cron", 237 retry: row.retry || 3, 238 otelConfig: row.otelEndpoint 239 ? { 240 endpoint: row.otelEndpoint, 241 headers: transformHeaders(row.otelHeaders), 242 } 243 : undefined, 244 }; 245 } 246 if (row.jobType === "dns") { 247 payload = { 248 workspaceId: String(row.workspaceId), 249 monitorId: String(row.id), 250 uri: row.url, 251 cronTimestamp: timestamp, 252 status: status, 253 assertions: row.assertions ? JSON.parse(row.assertions) : null, 254 degradedAfter: row.degradedAfter, 255 timeout: row.timeout, 256 trigger: "cron", 257 otelConfig: row.otelEndpoint 258 ? { 259 endpoint: row.otelEndpoint, 260 headers: transformHeaders(row.otelHeaders), 261 } 262 : undefined, 263 retry: row.retry || 3, 264 }; 265 } 266 267 if (!payload) { 268 throw new Error("Invalid jobType"); 269 } 270 const regionInfo = regionDict[region]; 271 let regionHeader = {}; 272 if (regionInfo.provider === "fly") { 273 regionHeader = { "fly-prefer-region": region }; 274 } 275 if (regionInfo.provider === "koyeb") { 276 regionHeader = { "X-KOYEB-REGION-OVERRIDE": region.replace("koyeb_", "") }; 277 } 278 if (regionInfo.provider === "railway") { 279 regionHeader = { "railway-region": region.replace("railway_", "") }; 280 } 281 const newTask: google.cloud.tasks.v2beta3.ITask = { 282 httpRequest: { 283 headers: { 284 "Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing 285 ...regionHeader, 286 Authorization: `Basic ${env().CRON_SECRET}`, 287 }, 288 httpMethod: "POST", 289 url: generateUrl({ row, region }), 290 body: Buffer.from(JSON.stringify(payload)).toString("base64"), 291 }, 292 scheduleTime: { 293 seconds: timestamp / 1000, 294 }, 295 }; 296 297 const request = { parent: parent, task: newTask }; 298 return client.createTask(request); 299}; 300 301function generateUrl({ 302 row, 303 region, 304}: { 305 row: z.infer<typeof selectMonitorSchema>; 306 region: Region; 307}) { 308 const regionInfo = regionDict[region]; 309 310 switch (regionInfo.provider) { 311 case "fly": 312 return `https://openstatus-checker.fly.dev/checker/${row.jobType}?monitor_id=${row.id}`; 313 case "koyeb": 314 return `https://openstatus-checker.koyeb.app/checker/${row.jobType}?monitor_id=${row.id}`; 315 case "railway": 316 return `https://railway-proxy-production-9cb1.up.railway.app/checker/${row.jobType}?monitor_id=${row.id}`; 317 318 default: 319 throw new Error("Invalid jobType"); 320 } 321}