Openstatus
www.openstatus.dev
1import { CloudTasksClient } from "@google-cloud/tasks";
2import type { google } from "@google-cloud/tasks/build/protos/protos";
3import { z } from "zod";
4
5import { and, eq, gte, lte, notInArray } from "@openstatus/db";
6import {
7 type MonitorStatus,
8 maintenance,
9 maintenancesToMonitors,
10 monitor,
11 monitorStatusTable,
12 selectMonitorSchema,
13 selectMonitorStatusSchema,
14} from "@openstatus/db/src/schema";
15import type { Region } from "@openstatus/db/src/schema/constants";
16import { regionDict } from "@openstatus/regions";
17import { db } from "../lib/db";
18
19import { getSentry } from "@hono/sentry";
20import { getLogger } from "@logtape/logtape";
21import type { monitorPeriodicitySchema } from "@openstatus/db/src/schema/constants";
22import {
23 type DNSPayloadSchema,
24 type httpPayloadSchema,
25 type tpcPayloadSchema,
26 transformHeaders,
27} from "@openstatus/utils";
28import type { Context } from "hono";
29import { env } from "../env";
30
31export const isAuthorizedDomain = (url: string) => {
32 return url.includes(env().SITE_URL);
33};
34
35const logger = getLogger("workflow");
36
37const channelOptions = {
38 // Conservative 5-minute keepalive (gRPC best practice)
39 "grpc.keepalive_time_ms": 300000,
40 // 5-second timeout sufficient for ping response
41 "grpc.keepalive_timeout_ms": 5000,
42 // Disable pings without active calls to avoid server conflicts
43 "grpc.keepalive_permit_without_calls": 1,
44};
45
46export async function sendCheckerTasks(
47 periodicity: z.infer<typeof monitorPeriodicitySchema>,
48 c: Context,
49) {
50 const client = new CloudTasksClient({
51 fallback: "rest",
52 channelOptions,
53 projectId: env().GCP_PROJECT_ID,
54 credentials: {
55 client_email: env().GCP_CLIENT_EMAIL,
56 private_key: env().GCP_PRIVATE_KEY.replaceAll("\\n", "\n"),
57 },
58 });
59
60 const parent = client.queuePath(
61 env().GCP_PROJECT_ID,
62 env().GCP_LOCATION,
63 periodicity,
64 );
65
66 const timestamp = Date.now();
67
68 const currentMaintenance = db
69 .select({ id: maintenance.id })
70 .from(maintenance)
71 .where(
72 and(lte(maintenance.from, new Date()), gte(maintenance.to, new Date())),
73 )
74 .as("currentMaintenance");
75
76 const currentMaintenanceMonitors = db
77 .select({ id: maintenancesToMonitors.monitorId })
78 .from(maintenancesToMonitors)
79 .innerJoin(
80 currentMaintenance,
81 eq(maintenancesToMonitors.maintenanceId, currentMaintenance.id),
82 );
83
84 const result = await db
85 .select()
86 .from(monitor)
87 .where(
88 and(
89 eq(monitor.periodicity, periodicity),
90 eq(monitor.active, true),
91 notInArray(monitor.id, currentMaintenanceMonitors),
92 ),
93 )
94 .all();
95
96 logger.info("Starting cron job", {
97 periodicity,
98 monitor_count: result.length,
99 });
100
101 const monitors = z.array(selectMonitorSchema).safeParse(result);
102 const allResult = [];
103 if (!monitors.success) {
104 logger.error(`Error while fetching the monitors ${monitors.error}`);
105 throw new Error("Error while fetching the monitors");
106 }
107
108 for (const row of monitors.data) {
109 // const selectedRegions = row.regions.length > 0 ? row.regions : ["ams"];
110
111 const result = await db
112 .select()
113 .from(monitorStatusTable)
114 .where(eq(monitorStatusTable.monitorId, row.id))
115 .all();
116 const monitorStatus = z.array(selectMonitorStatusSchema).safeParse(result);
117 if (!monitorStatus.success) {
118 logger.error("Failed to parse monitor status", {
119 monitor_id: row.id,
120 error_message: monitorStatus.error.message,
121 });
122 continue;
123 }
124
125 for (const region of row.regions) {
126 const status =
127 monitorStatus.data.find((m) => region === m.region)?.status || "active";
128
129 const r = regionDict[region as keyof typeof regionDict];
130
131 if (!r) {
132 logger.error(`Invalid region ${region}`);
133 continue;
134 }
135 if (r.deprecated) {
136 // Let's uncomment this when we are ready to remove deprecated regions
137 // We should not use deprecated regions anymore
138 logger.error(`Deprecated region ${region}`);
139 continue;
140 }
141 const response = createCronTask({
142 row,
143 timestamp,
144 client,
145 parent,
146 status,
147 region,
148 });
149 allResult.push(response);
150 if (periodicity === "30s") {
151 // we schedule another task in 30s
152 const scheduledAt = timestamp + 30 * 1000;
153 const response = createCronTask({
154 row,
155 timestamp: scheduledAt,
156 client,
157 parent,
158 status,
159 region,
160 });
161 allResult.push(response);
162 }
163 }
164 }
165
166 const allRequests = await Promise.allSettled(allResult);
167
168 const success = allRequests.filter((r) => r.status === "fulfilled").length;
169 const failed = allRequests.filter((r) => r.status === "rejected").length;
170
171 logger.info("Completed cron job", {
172 periodicity,
173 total_tasks: allResult.length,
174 success_count: success,
175 failed_count: failed,
176 });
177 if (failed > 0) {
178 logger.error("Cron job had failures", {
179 periodicity,
180 failed_count: failed,
181 success_count: success,
182 });
183 getSentry(c).captureMessage(
184 `sendCheckerTasks for ${periodicity} ended with ${failed} failed tasks`,
185 "error",
186 );
187 }
188}
189// timestamp needs to be in ms
190const createCronTask = async ({
191 row,
192 timestamp,
193 client,
194 parent,
195 status,
196 region,
197}: {
198 row: z.infer<typeof selectMonitorSchema>;
199 timestamp: number;
200 client: CloudTasksClient;
201 parent: string;
202 status: MonitorStatus;
203 region: Region;
204}) => {
205 let payload:
206 | z.infer<typeof httpPayloadSchema>
207 | z.infer<typeof tpcPayloadSchema>
208 | z.infer<typeof DNSPayloadSchema>
209 | null = null;
210
211 //
212 if (row.jobType === "http") {
213 payload = {
214 workspaceId: String(row.workspaceId),
215 monitorId: String(row.id),
216 url: row.url,
217 method: row.method || "GET",
218 cronTimestamp: timestamp,
219 body: row.body,
220 headers: row.headers,
221 status: status,
222 assertions: row.assertions ? JSON.parse(row.assertions) : null,
223 degradedAfter: row.degradedAfter,
224 timeout: row.timeout,
225 trigger: "cron",
226 otelConfig: row.otelEndpoint
227 ? {
228 endpoint: row.otelEndpoint,
229 headers: transformHeaders(row.otelHeaders),
230 }
231 : undefined,
232 retry: row.retry || 3,
233 followRedirects:
234 row.followRedirects === null ? true : row.followRedirects,
235 };
236 }
237 if (row.jobType === "tcp") {
238 payload = {
239 workspaceId: String(row.workspaceId),
240 monitorId: String(row.id),
241 uri: row.url,
242 status: status,
243 assertions: row.assertions ? JSON.parse(row.assertions) : null,
244 cronTimestamp: timestamp,
245 degradedAfter: row.degradedAfter,
246 timeout: row.timeout,
247 trigger: "cron",
248 retry: row.retry || 3,
249 otelConfig: row.otelEndpoint
250 ? {
251 endpoint: row.otelEndpoint,
252 headers: transformHeaders(row.otelHeaders),
253 }
254 : undefined,
255 };
256 }
257 if (row.jobType === "dns") {
258 payload = {
259 workspaceId: String(row.workspaceId),
260 monitorId: String(row.id),
261 uri: row.url,
262 cronTimestamp: timestamp,
263 status: status,
264 assertions: row.assertions ? JSON.parse(row.assertions) : null,
265 degradedAfter: row.degradedAfter,
266 timeout: row.timeout,
267 trigger: "cron",
268 otelConfig: row.otelEndpoint
269 ? {
270 endpoint: row.otelEndpoint,
271 headers: transformHeaders(row.otelHeaders),
272 }
273 : undefined,
274 retry: row.retry || 3,
275 };
276 }
277
278 if (!payload) {
279 throw new Error("Invalid jobType");
280 }
281 const regionInfo = regionDict[region];
282 let regionHeader = {};
283 if (regionInfo.provider === "fly") {
284 regionHeader = { "fly-prefer-region": region };
285 }
286 if (regionInfo.provider === "koyeb") {
287 regionHeader = { "X-KOYEB-REGION-OVERRIDE": region.replace("koyeb_", "") };
288 }
289 if (regionInfo.provider === "railway") {
290 regionHeader = { "railway-region": region.replace("railway_", "") };
291 }
292 const newTask: google.cloud.tasks.v2beta3.ITask = {
293 httpRequest: {
294 headers: {
295 "Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing
296 ...regionHeader,
297 Authorization: `Basic ${env().CRON_SECRET}`,
298 },
299 httpMethod: "POST",
300 url: generateUrl({ row, region }),
301 body: Buffer.from(JSON.stringify(payload)).toString("base64"),
302 },
303 scheduleTime: {
304 seconds: timestamp / 1000,
305 },
306 };
307
308 const request = { parent: parent, task: newTask };
309 return client.createTask(request);
310};
311
312function generateUrl({
313 row,
314 region,
315}: {
316 row: z.infer<typeof selectMonitorSchema>;
317 region: Region;
318}) {
319 const regionInfo = regionDict[region];
320
321 switch (regionInfo.provider) {
322 case "fly":
323 return `https://openstatus-checker.fly.dev/checker/${row.jobType}?monitor_id=${row.id}`;
324 case "koyeb":
325 return `https://openstatus-checker.koyeb.app/checker/${row.jobType}?monitor_id=${row.id}`;
326 case "railway":
327 return `https://railway-proxy-production-9cb1.up.railway.app/checker/${row.jobType}?monitor_id=${row.id}`;
328
329 default:
330 throw new Error("Invalid jobType");
331 }
332}