Openstatus
www.openstatus.dev
1import { CloudTasksClient } from "@google-cloud/tasks";
2import type { google } from "@google-cloud/tasks/build/protos/protos";
3import {
4 and,
5 db,
6 desc,
7 eq,
8 isNull,
9 lte,
10 max,
11 or,
12 schema,
13} from "@openstatus/db";
14import { session, user } from "@openstatus/db/src/schema";
15import {
16 monitorDeactivationEmail,
17 monitorPausedEmail,
18} from "@openstatus/emails";
19import { sendBatchEmailHtml } from "@openstatus/emails/src/send";
20import { Redis } from "@openstatus/upstash";
21import { RateLimiter } from "limiter";
22import { z } from "zod";
23import { env } from "../env";
24
25const redis = Redis.fromEnv();
26
27const client = new CloudTasksClient({
28 projectId: env().GCP_PROJECT_ID,
29 credentials: {
30 client_email: env().GCP_CLIENT_EMAIL,
31 private_key: env().GCP_PRIVATE_KEY.replaceAll("\\n", "\n"),
32 },
33});
34
35const parent = client.queuePath(
36 env().GCP_PROJECT_ID,
37 env().GCP_LOCATION,
38 "workflow",
39);
40
41const limiter = new RateLimiter({ tokensPerInterval: 15, interval: "second" });
42
43export async function LaunchMonitorWorkflow() {
44 // Expires is one month after last connection, so if we want to reach people who connected 3 months ago we need to check for people with expires 2 months ago
45 const twoMonthAgo = new Date().setMonth(new Date().getMonth() - 2);
46
47 const date = new Date(twoMonthAgo);
48 // User without session
49 const userWithoutSession = db
50 .select({
51 userId: schema.user.id,
52 email: schema.user.email,
53 updatedAt: schema.user.updatedAt,
54 })
55 .from(schema.user)
56 .leftJoin(schema.session, eq(schema.session.userId, schema.user.id))
57 .where(isNull(schema.session.userId))
58 .as("query");
59 // Only free users monitors are paused
60 // We don't need to handle multi users per workspace because free workspaces only have one user
61 // Only free users monitors are paused
62
63 const u1 = await db
64 .select({
65 userId: userWithoutSession.userId,
66 email: userWithoutSession.email,
67 workspaceId: schema.workspace.id,
68 })
69 .from(userWithoutSession)
70 .innerJoin(
71 schema.usersToWorkspaces,
72 eq(userWithoutSession.userId, schema.usersToWorkspaces.userId),
73 )
74 .innerJoin(
75 schema.workspace,
76 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id),
77 )
78 .where(
79 and(
80 or(
81 lte(userWithoutSession.updatedAt, date),
82 isNull(userWithoutSession.updatedAt),
83 ),
84 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")),
85 ),
86 );
87
88 console.log(`Found ${u1.length} users without session to start the workflow`);
89 const maxSessionPerUser = db
90 .select({
91 userId: schema.user.id,
92 email: schema.user.email,
93 lastConnection: max(schema.session.expires).as("lastConnection"),
94 })
95 .from(schema.user)
96 .innerJoin(schema.session, eq(schema.session.userId, schema.user.id))
97 .groupBy(schema.user.id)
98 .as("maxSessionPerUser");
99 // Only free users monitors are paused
100 // We don't need to handle multi users per workspace because free workspaces only have one user
101 // Only free users monitors are paused
102
103 const u = await db
104 .select({
105 userId: maxSessionPerUser.userId,
106 email: maxSessionPerUser.email,
107 workspaceId: schema.workspace.id,
108 })
109 .from(maxSessionPerUser)
110 .innerJoin(
111 schema.usersToWorkspaces,
112 eq(maxSessionPerUser.userId, schema.usersToWorkspaces.userId),
113 )
114 .innerJoin(
115 schema.workspace,
116 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id),
117 )
118 .where(
119 and(
120 lte(maxSessionPerUser.lastConnection, date),
121 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")),
122 ),
123 );
124 // Let's merge both results
125 const users = [...u, ...u1];
126 // iterate over users
127
128 const allResult = [];
129
130 for (const user of users) {
131 await limiter.removeTokens(1);
132 const workflow = workflowInit({ user });
133 allResult.push(workflow);
134 }
135
136 const allRequests = await Promise.allSettled(allResult);
137
138 const success = allRequests.filter((r) => r.status === "fulfilled").length;
139 const failed = allRequests.filter((r) => r.status === "rejected").length;
140
141 console.log(
142 `End cron with ${allResult.length} jobs with ${success} success and ${failed} failed`,
143 );
144}
145
146async function workflowInit({
147 user,
148}: {
149 user: {
150 userId: number;
151 email: string | null;
152 workspaceId: number;
153 };
154}) {
155 console.log(`Starting workflow for ${user.userId}`);
156 // Let's check if the user is in the workflow
157 const isMember = await redis.sismember("workflow:users", user.userId);
158 if (isMember) {
159 console.log(`user workflow already started for ${user.userId}`);
160 return;
161 }
162 // check if user has some running monitors
163 const nbRunningMonitor = await db.$count(
164 schema.monitor,
165 and(
166 eq(schema.monitor.workspaceId, user.workspaceId),
167 eq(schema.monitor.active, true),
168 isNull(schema.monitor.deletedAt),
169 ),
170 );
171 if (nbRunningMonitor === 0) {
172 console.log(`user has no running monitors for ${user.userId}`);
173 return;
174 }
175 await CreateTask({
176 parent,
177 client: client,
178 step: "14days",
179 userId: user.userId,
180 initialRun: new Date().getTime(),
181 });
182 // // Add our user to the list of users that have started the workflow
183
184 await redis.sadd("workflow:users", user.userId);
185 console.log(`user workflow started for ${user.userId}`);
186}
187
188export async function Step14Days(userId: number, workFlowRunTimestamp: number) {
189 const user = await getUser(userId);
190
191 // Send email saying we are going to pause the monitors
192 // The task has just been created we don't double check if the user has logged in :scary:
193 // send First email
194 // TODO: Send email
195
196 if (user.email) {
197 await sendBatchEmailHtml([
198 {
199 to: user.email,
200 subject: "Your OpenStatus monitors will be paused in 14 days",
201 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>",
202 reply_to: "thibault@openstatus.dev",
203 html: monitorDeactivationEmail({
204 date: new Date(
205 new Date().setDate(new Date().getDate() + 14),
206 ).toDateString(),
207 }),
208 },
209 ]);
210
211 await CreateTask({
212 parent,
213 client: client,
214 step: "3days",
215 userId: user.id,
216 initialRun: workFlowRunTimestamp,
217 });
218 }
219}
220
221export async function Step3Days(userId: number, workFlowRunTimestamp: number) {
222 // check if user has connected
223 const hasConnected = await hasUserLoggedIn({
224 userId,
225 date: new Date(workFlowRunTimestamp),
226 });
227
228 if (hasConnected) {
229 //
230 await redis.srem("workflow:users", userId);
231 return;
232 }
233
234 const user = await getUser(userId);
235
236 if (user.email) {
237 await sendBatchEmailHtml([
238 {
239 to: user.email,
240 subject: "Your OpenStatus monitors will be paused in 3 days",
241 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>",
242 reply_to: "thibault@openstatus.dev",
243 html: monitorDeactivationEmail({
244 date: new Date(
245 new Date().setDate(new Date().getDate() + 3),
246 ).toDateString(),
247 }),
248 },
249 ]);
250 }
251
252 // Send second email
253 //TODO: Send email
254 // Let's schedule the next task
255 await CreateTask({
256 client,
257 parent,
258 step: "paused",
259 userId,
260 initialRun: workFlowRunTimestamp,
261 });
262}
263
264export async function StepPaused(userId: number, workFlowRunTimestamp: number) {
265 const hasConnected = await hasUserLoggedIn({
266 userId,
267 date: new Date(workFlowRunTimestamp),
268 });
269 if (!hasConnected) {
270 // sendSecond pause email
271 const users = await db
272 .select({
273 userId: schema.user.id,
274 email: schema.user.email,
275 workspaceId: schema.workspace.id,
276 })
277 .from(user)
278 .innerJoin(session, eq(schema.user.id, schema.session.userId))
279 .innerJoin(
280 schema.usersToWorkspaces,
281 eq(schema.user.id, schema.usersToWorkspaces.userId),
282 )
283 .innerJoin(
284 schema.workspace,
285 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id),
286 )
287 .where(
288 and(
289 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")),
290 eq(schema.user.id, userId),
291 ),
292 )
293 .get();
294 // We should only have one user :)
295 if (!users) {
296 console.error(`No user found for ${userId}`);
297 return;
298 }
299
300 await db
301 .update(schema.monitor)
302 .set({ active: false })
303 .where(eq(schema.monitor.workspaceId, users.workspaceId));
304 // Send last email with pause monitor
305 }
306
307 const currentUser = await getUser(userId);
308 // TODO: Send email
309 // Remove user for workflow
310
311 if (currentUser.email) {
312 await sendBatchEmailHtml([
313 {
314 to: currentUser.email,
315 subject: "Your monitors have been paused",
316 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>",
317 reply_to: "thibault@openstatus.dev",
318 html: monitorPausedEmail(),
319 },
320 ]);
321 }
322 await redis.srem("workflow:users", userId);
323}
324
325async function hasUserLoggedIn({
326 userId,
327 date,
328}: {
329 userId: number;
330 date: Date;
331}) {
332 const userResult = await db
333 .select({ lastSession: schema.session.expires })
334 .from(schema.session)
335 .where(eq(schema.session.userId, userId))
336 .orderBy(desc(schema.session.expires));
337
338 if (userResult.length === 0) {
339 return false;
340 }
341 const user = userResult[0];
342 if (user.lastSession === null) {
343 return false;
344 }
345 return user.lastSession > date;
346}
347
348function CreateTask({
349 parent,
350 client,
351 step,
352 userId,
353 initialRun,
354}: {
355 parent: string;
356 client: CloudTasksClient;
357 step: z.infer<typeof workflowStepSchema>;
358 userId: number;
359 initialRun: number;
360}) {
361 const url = `https://openstatus-workflows.fly.dev/cron/monitors/${step}?userId=${userId}&initialRun=${initialRun}`;
362 const timestamp = getScheduledTime(step);
363 const newTask: google.cloud.tasks.v2beta3.ITask = {
364 httpRequest: {
365 headers: {
366 "Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing
367 Authorization: `${env().CRON_SECRET}`,
368 },
369 httpMethod: "GET",
370 url,
371 },
372 scheduleTime: {
373 seconds: timestamp,
374 },
375 };
376
377 const request = { parent: parent, task: newTask };
378 return client.createTask(request);
379}
380
381function getScheduledTime(step: z.infer<typeof workflowStepSchema>) {
382 switch (step) {
383 case "14days":
384 // let's triger it now
385 return new Date().getTime() / 1000;
386 case "3days":
387 // it's 11 days after the 14 days
388 return new Date().setDate(new Date().getDate() + 11) / 1000;
389 case "paused":
390 // it's 3 days after the 3 days step
391 return new Date().setDate(new Date().getDate() + 3) / 1000;
392 default:
393 throw new Error("Invalid step");
394 }
395}
396
397export const workflowStep = ["14days", "3days", "paused"] as const;
398export const workflowStepSchema = z.enum(workflowStep);
399
400async function getUser(userId: number) {
401 const currentUser = await db
402 .select()
403 .from(user)
404 .where(eq(schema.user.id, userId))
405 .get();
406
407 if (!currentUser) {
408 throw new Error("User not found");
409 }
410 if (!currentUser.email) {
411 throw new Error("User email not found");
412 }
413 return currentUser;
414}