Openstatus
www.openstatus.dev
1import { CloudTasksClient } from "@google-cloud/tasks";
2import type { google } from "@google-cloud/tasks/build/protos/protos";
3import { z } from "zod";
4
5import { and, eq, gte, lte, notInArray } from "@openstatus/db";
6import {
7 type MonitorStatus,
8 maintenance,
9 maintenancesToMonitors,
10 monitor,
11 monitorStatusTable,
12 selectMonitorSchema,
13 selectMonitorStatusSchema,
14} from "@openstatus/db/src/schema";
15import type { Region } from "@openstatus/db/src/schema/constants";
16import { regionDict } from "@openstatus/regions";
17import { db } from "../lib/db";
18
19import { getSentry } from "@hono/sentry";
20import { getLogger } from "@logtape/logtape";
21import type { monitorPeriodicitySchema } from "@openstatus/db/src/schema/constants";
22import {
23 type DNSPayloadSchema,
24 type httpPayloadSchema,
25 type tpcPayloadSchema,
26 transformHeaders,
27} from "@openstatus/utils";
28import type { Context } from "hono";
29import { env } from "../env";
30
31export const isAuthorizedDomain = (url: string) => {
32 return url.includes(env().SITE_URL);
33};
34
35const logger = getLogger("workflow");
36
37const channelOptions = {
38 // Conservative 5-minute keepalive (gRPC best practice)
39 "grpc.keepalive_time_ms": 300000,
40 // 5-second timeout sufficient for ping response
41 "grpc.keepalive_timeout_ms": 5000,
42 // Disable pings without active calls to avoid server conflicts
43 "grpc.keepalive_permit_without_calls": 1,
44};
45
46export async function sendCheckerTasks(
47 periodicity: z.infer<typeof monitorPeriodicitySchema>,
48 c: Context,
49) {
50 const client = new CloudTasksClient({
51 fallback: "rest",
52 channelOptions,
53 projectId: env().GCP_PROJECT_ID,
54 credentials: {
55 client_email: env().GCP_CLIENT_EMAIL,
56 private_key: env().GCP_PRIVATE_KEY.replaceAll("\\n", "\n"),
57 },
58 });
59
60 const parent = client.queuePath(
61 env().GCP_PROJECT_ID,
62 env().GCP_LOCATION,
63 periodicity,
64 );
65
66 const timestamp = Date.now();
67
68 const currentMaintenance = db
69 .select({ id: maintenance.id })
70 .from(maintenance)
71 .where(
72 and(lte(maintenance.from, new Date()), gte(maintenance.to, new Date())),
73 )
74 .as("currentMaintenance");
75
76 const currentMaintenanceMonitors = db
77 .select({ id: maintenancesToMonitors.monitorId })
78 .from(maintenancesToMonitors)
79 .innerJoin(
80 currentMaintenance,
81 eq(maintenancesToMonitors.maintenanceId, currentMaintenance.id),
82 );
83
84 const result = await db
85 .select()
86 .from(monitor)
87 .where(
88 and(
89 eq(monitor.periodicity, periodicity),
90 eq(monitor.active, true),
91 notInArray(monitor.id, currentMaintenanceMonitors),
92 ),
93 )
94 .all();
95
96 logger.info(`Start cron for ${periodicity}`);
97
98 const monitors = z.array(selectMonitorSchema).safeParse(result);
99 const allResult = [];
100 if (!monitors.success) {
101 logger.error(`Error while fetching the monitors ${monitors.error}`);
102 throw new Error("Error while fetching the monitors");
103 }
104
105 for (const row of monitors.data) {
106 // const selectedRegions = row.regions.length > 0 ? row.regions : ["ams"];
107
108 const result = await db
109 .select()
110 .from(monitorStatusTable)
111 .where(eq(monitorStatusTable.monitorId, row.id))
112 .all();
113 const monitorStatus = z.array(selectMonitorStatusSchema).safeParse(result);
114 if (!monitorStatus.success) {
115 console.error(
116 `Error while fetching the monitor status ${monitorStatus.error}`,
117 );
118 continue;
119 }
120
121 for (const region of row.regions) {
122 const status =
123 monitorStatus.data.find((m) => region === m.region)?.status || "active";
124
125 const r = regionDict[region as keyof typeof regionDict];
126
127 if (!r) {
128 logger.error(`Invalid region ${region}`);
129 continue;
130 }
131 if (r.deprecated) {
132 // Let's uncomment this when we are ready to remove deprecated regions
133 // We should not use deprecated regions anymore
134 logger.error(`Deprecated region ${region}`);
135 continue;
136 }
137 const response = createCronTask({
138 row,
139 timestamp,
140 client,
141 parent,
142 status,
143 region,
144 });
145 allResult.push(response);
146 if (periodicity === "30s") {
147 // we schedule another task in 30s
148 const scheduledAt = timestamp + 30 * 1000;
149 const response = createCronTask({
150 row,
151 timestamp: scheduledAt,
152 client,
153 parent,
154 status,
155 region,
156 });
157 allResult.push(response);
158 }
159 }
160 }
161
162 const allRequests = await Promise.allSettled(allResult);
163
164 const success = allRequests.filter((r) => r.status === "fulfilled").length;
165 const failed = allRequests.filter((r) => r.status === "rejected").length;
166
167 logger.info(
168 `End cron for ${periodicity} with ${allResult.length} jobs with ${success} success and ${failed} failed`,
169 );
170 if (failed > 0) {
171 logger.error("error with cron jobs");
172 getSentry(c).captureMessage(
173 `sendCheckerTasks for ${periodicity} ended with ${failed} failed tasks`,
174 "error",
175 );
176 }
177}
178// timestamp needs to be in ms
179const createCronTask = async ({
180 row,
181 timestamp,
182 client,
183 parent,
184 status,
185 region,
186}: {
187 row: z.infer<typeof selectMonitorSchema>;
188 timestamp: number;
189 client: CloudTasksClient;
190 parent: string;
191 status: MonitorStatus;
192 region: Region;
193}) => {
194 let payload:
195 | z.infer<typeof httpPayloadSchema>
196 | z.infer<typeof tpcPayloadSchema>
197 | z.infer<typeof DNSPayloadSchema>
198 | null = null;
199
200 //
201 if (row.jobType === "http") {
202 payload = {
203 workspaceId: String(row.workspaceId),
204 monitorId: String(row.id),
205 url: row.url,
206 method: row.method || "GET",
207 cronTimestamp: timestamp,
208 body: row.body,
209 headers: row.headers,
210 status: status,
211 assertions: row.assertions ? JSON.parse(row.assertions) : null,
212 degradedAfter: row.degradedAfter,
213 timeout: row.timeout,
214 trigger: "cron",
215 otelConfig: row.otelEndpoint
216 ? {
217 endpoint: row.otelEndpoint,
218 headers: transformHeaders(row.otelHeaders),
219 }
220 : undefined,
221 retry: row.retry || 3,
222 followRedirects:
223 row.followRedirects === null ? true : row.followRedirects,
224 };
225 }
226 if (row.jobType === "tcp") {
227 payload = {
228 workspaceId: String(row.workspaceId),
229 monitorId: String(row.id),
230 uri: row.url,
231 status: status,
232 assertions: row.assertions ? JSON.parse(row.assertions) : null,
233 cronTimestamp: timestamp,
234 degradedAfter: row.degradedAfter,
235 timeout: row.timeout,
236 trigger: "cron",
237 retry: row.retry || 3,
238 otelConfig: row.otelEndpoint
239 ? {
240 endpoint: row.otelEndpoint,
241 headers: transformHeaders(row.otelHeaders),
242 }
243 : undefined,
244 };
245 }
246 if (row.jobType === "dns") {
247 payload = {
248 workspaceId: String(row.workspaceId),
249 monitorId: String(row.id),
250 uri: row.url,
251 cronTimestamp: timestamp,
252 status: status,
253 assertions: row.assertions ? JSON.parse(row.assertions) : null,
254 degradedAfter: row.degradedAfter,
255 timeout: row.timeout,
256 trigger: "cron",
257 otelConfig: row.otelEndpoint
258 ? {
259 endpoint: row.otelEndpoint,
260 headers: transformHeaders(row.otelHeaders),
261 }
262 : undefined,
263 retry: row.retry || 3,
264 };
265 }
266
267 if (!payload) {
268 throw new Error("Invalid jobType");
269 }
270 const regionInfo = regionDict[region];
271 let regionHeader = {};
272 if (regionInfo.provider === "fly") {
273 regionHeader = { "fly-prefer-region": region };
274 }
275 if (regionInfo.provider === "koyeb") {
276 regionHeader = { "X-KOYEB-REGION-OVERRIDE": region.replace("koyeb_", "") };
277 }
278 if (regionInfo.provider === "railway") {
279 regionHeader = { "railway-region": region.replace("railway_", "") };
280 }
281 const newTask: google.cloud.tasks.v2beta3.ITask = {
282 httpRequest: {
283 headers: {
284 "Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing
285 ...regionHeader,
286 Authorization: `Basic ${env().CRON_SECRET}`,
287 },
288 httpMethod: "POST",
289 url: generateUrl({ row, region }),
290 body: Buffer.from(JSON.stringify(payload)).toString("base64"),
291 },
292 scheduleTime: {
293 seconds: timestamp / 1000,
294 },
295 };
296
297 const request = { parent: parent, task: newTask };
298 return client.createTask(request);
299};
300
301function generateUrl({
302 row,
303 region,
304}: {
305 row: z.infer<typeof selectMonitorSchema>;
306 region: Region;
307}) {
308 const regionInfo = regionDict[region];
309
310 switch (regionInfo.provider) {
311 case "fly":
312 return `https://openstatus-checker.fly.dev/checker/${row.jobType}?monitor_id=${row.id}`;
313 case "koyeb":
314 return `https://openstatus-checker.koyeb.app/checker/${row.jobType}?monitor_id=${row.id}`;
315 case "railway":
316 return `https://railway-proxy-production-9cb1.up.railway.app/checker/${row.jobType}?monitor_id=${row.id}`;
317
318 default:
319 throw new Error("Invalid jobType");
320 }
321}