Openstatus
www.openstatus.dev
1import { CloudTasksClient } from "@google-cloud/tasks";
2import type { google } from "@google-cloud/tasks/build/protos/protos";
3import {
4 and,
5 // db,
6 desc,
7 eq,
8 isNull,
9 lte,
10 max,
11 or,
12 schema,
13} from "@openstatus/db";
14import { session, user } from "@openstatus/db/src/schema";
15import {
16 MonitorDeactivationEmail,
17 MonitorPausedEmail,
18} from "@openstatus/emails";
19import { sendWithRender } from "@openstatus/emails/src/send";
20import { Redis } from "@openstatus/upstash";
21import { RateLimiter } from "limiter";
22import { z } from "zod";
23import { env } from "../env";
24import { db } from "../lib/db";
25
26const redis = Redis.fromEnv();
27
28const client = new CloudTasksClient({
29 projectId: env().GCP_PROJECT_ID,
30 credentials: {
31 client_email: env().GCP_CLIENT_EMAIL,
32 private_key: env().GCP_PRIVATE_KEY.replaceAll("\\n", "\n"),
33 },
34});
35
36const parent = client.queuePath(
37 env().GCP_PROJECT_ID,
38 env().GCP_LOCATION,
39 "workflow",
40);
41
42const limiter = new RateLimiter({ tokensPerInterval: 15, interval: "second" });
43
44export async function LaunchMonitorWorkflow() {
45 // Expires is one month after last connection, so if we want to reach people who connected 3 months ago we need to check for people with expires 2 months ago
46 const twoMonthAgo = new Date().setMonth(new Date().getMonth() - 2);
47
48 const date = new Date(twoMonthAgo);
49 // User without session
50 const userWithoutSession = db
51 .select({
52 userId: schema.user.id,
53 email: schema.user.email,
54 updatedAt: schema.user.updatedAt,
55 })
56 .from(schema.user)
57 .leftJoin(schema.session, eq(schema.session.userId, schema.user.id))
58 .where(isNull(schema.session.userId))
59 .as("query");
60 // Only free users monitors are paused
61 // We don't need to handle multi users per workspace because free workspaces only have one user
62 // Only free users monitors are paused
63
64 const u1 = await db
65 .select({
66 userId: userWithoutSession.userId,
67 email: userWithoutSession.email,
68 workspaceId: schema.workspace.id,
69 })
70 .from(userWithoutSession)
71 .innerJoin(
72 schema.usersToWorkspaces,
73 eq(userWithoutSession.userId, schema.usersToWorkspaces.userId),
74 )
75 .innerJoin(
76 schema.workspace,
77 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id),
78 )
79 .where(
80 and(
81 or(
82 lte(userWithoutSession.updatedAt, date),
83 isNull(userWithoutSession.updatedAt),
84 ),
85 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")),
86 ),
87 );
88
89 console.log(`Found ${u1.length} users without session to start the workflow`);
90 const maxSessionPerUser = db
91 .select({
92 userId: schema.user.id,
93 email: schema.user.email,
94 lastConnection: max(schema.session.expires).as("lastConnection"),
95 })
96 .from(schema.user)
97 .innerJoin(schema.session, eq(schema.session.userId, schema.user.id))
98 .groupBy(schema.user.id)
99 .as("maxSessionPerUser");
100 // Only free users monitors are paused
101 // We don't need to handle multi users per workspace because free workspaces only have one user
102 // Only free users monitors are paused
103
104 const u = await db
105 .select({
106 userId: maxSessionPerUser.userId,
107 email: maxSessionPerUser.email,
108 workspaceId: schema.workspace.id,
109 })
110 .from(maxSessionPerUser)
111 .innerJoin(
112 schema.usersToWorkspaces,
113 eq(maxSessionPerUser.userId, schema.usersToWorkspaces.userId),
114 )
115 .innerJoin(
116 schema.workspace,
117 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id),
118 )
119 .where(
120 and(
121 lte(maxSessionPerUser.lastConnection, date),
122 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")),
123 ),
124 );
125 // Let's merge both results
126 const users = [...u, ...u1];
127 // iterate over users
128
129 const allResult = [];
130
131 for (const user of users) {
132 await limiter.removeTokens(1);
133 const workflow = workflowInit({ user });
134 allResult.push(workflow);
135 }
136
137 const allRequests = await Promise.allSettled(allResult);
138
139 const success = allRequests.filter((r) => r.status === "fulfilled").length;
140 const failed = allRequests.filter((r) => r.status === "rejected").length;
141
142 console.log(
143 `End cron with ${allResult.length} jobs with ${success} success and ${failed} failed`,
144 );
145}
146
147async function workflowInit({
148 user,
149}: {
150 user: {
151 userId: number;
152 email: string | null;
153 workspaceId: number;
154 };
155}) {
156 console.log(`Starting workflow for ${user.userId}`);
157 // Let's check if the user is in the workflow
158 const isMember = await redis.sismember("workflow:users", user.userId);
159 if (isMember) {
160 console.log(`user workflow already started for ${user.userId}`);
161 return;
162 }
163 // check if user has some running monitors
164 const nbRunningMonitor = await db.$count(
165 schema.monitor,
166 and(
167 eq(schema.monitor.workspaceId, user.workspaceId),
168 eq(schema.monitor.active, true),
169 isNull(schema.monitor.deletedAt),
170 ),
171 );
172 if (nbRunningMonitor === 0) {
173 console.log(`user has no running monitors for ${user.userId}`);
174 return;
175 }
176 await CreateTask({
177 parent,
178 client: client,
179 step: "14days",
180 userId: user.userId,
181 initialRun: new Date().getTime(),
182 });
183 // // Add our user to the list of users that have started the workflow
184
185 await redis.sadd("workflow:users", user.userId);
186 console.log(`user workflow started for ${user.userId}`);
187}
188
189export async function Step14Days(userId: number, workFlowRunTimestamp: number) {
190 const user = await getUser(userId);
191
192 // Send email saying we are going to pause the monitors
193 // The task has just been created we don't double check if the user has logged in :scary:
194 // send First email
195 // TODO: Send email
196
197 if (user.email) {
198 await sendWithRender({
199 to: [user.email],
200 subject: "Your OpenStatus monitors will be paused soon",
201 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>",
202 reply_to: "thibault@openstatus.dev",
203
204 react: MonitorDeactivationEmail({
205 deactivateAt: new Date(new Date().setDate(new Date().getDate() + 14)),
206 }),
207 });
208
209 await CreateTask({
210 parent,
211 client: client,
212 step: "3days",
213 userId: user.id,
214 initialRun: workFlowRunTimestamp,
215 });
216 }
217}
218
219export async function Step3Days(userId: number, workFlowRunTimestamp: number) {
220 // check if user has connected
221 const hasConnected = await hasUserLoggedIn({
222 userId,
223 date: new Date(workFlowRunTimestamp),
224 });
225
226 if (hasConnected) {
227 //
228 await redis.srem("workflow:users", userId);
229 return;
230 }
231
232 const user = await getUser(userId);
233
234 if (user.email) {
235 await sendWithRender({
236 to: [user.email],
237 subject: "Your OpenStatus monitors will be paused in 3 days",
238 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>",
239 reply_to: "thibault@openstatus.dev",
240
241 react: MonitorDeactivationEmail({
242 deactivateAt: new Date(new Date().setDate(new Date().getDate() + 3)),
243 }),
244 });
245 }
246
247 // Send second email
248 //TODO: Send email
249 // Let's schedule the next task
250 await CreateTask({
251 client,
252 parent,
253 step: "paused",
254 userId,
255 initialRun: workFlowRunTimestamp,
256 });
257}
258
259export async function StepPaused(userId: number, workFlowRunTimestamp: number) {
260 const hasConnected = await hasUserLoggedIn({
261 userId,
262 date: new Date(workFlowRunTimestamp),
263 });
264 if (!hasConnected) {
265 // sendSecond pause email
266 const users = await db
267 .select({
268 userId: schema.user.id,
269 email: schema.user.email,
270 workspaceId: schema.workspace.id,
271 })
272 .from(user)
273 .innerJoin(session, eq(schema.user.id, schema.session.userId))
274 .innerJoin(
275 schema.usersToWorkspaces,
276 eq(schema.user.id, schema.usersToWorkspaces.userId),
277 )
278 .innerJoin(
279 schema.workspace,
280 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id),
281 )
282 .where(
283 and(
284 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")),
285 eq(schema.user.id, userId),
286 ),
287 )
288 .get();
289 // We should only have one user :)
290 if (!users) {
291 console.error(`No user found for ${userId}`);
292 return;
293 }
294
295 await db
296 .update(schema.monitor)
297 .set({ active: false })
298 .where(eq(schema.monitor.workspaceId, users.workspaceId));
299 // Send last email with pause monitor
300 }
301
302 const currentUser = await getUser(userId);
303 // TODO: Send email
304 // Remove user for workflow
305
306 if (currentUser.email) {
307 await sendWithRender({
308 to: [currentUser.email],
309 subject: "Your monitors have been paused",
310 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>",
311 reply_to: "thibault@openstatus.dev",
312 react: MonitorPausedEmail(),
313 });
314 }
315 await redis.srem("workflow:users", userId);
316}
317
318async function hasUserLoggedIn({
319 userId,
320 date,
321}: {
322 userId: number;
323 date: Date;
324}) {
325 const userResult = await db
326 .select({ lastSession: schema.session.expires })
327 .from(schema.session)
328 .where(eq(schema.session.userId, userId))
329 .orderBy(desc(schema.session.expires));
330
331 if (userResult.length === 0) {
332 return false;
333 }
334 const user = userResult[0];
335 if (user.lastSession === null) {
336 return false;
337 }
338 return user.lastSession > date;
339}
340
341function CreateTask({
342 parent,
343 client,
344 step,
345 userId,
346 initialRun,
347}: {
348 parent: string;
349 client: CloudTasksClient;
350 step: z.infer<typeof workflowStepSchema>;
351 userId: number;
352 initialRun: number;
353}) {
354 const url = `https://openstatus-workflows.fly.dev/cron/monitors/${step}?userId=${userId}&initialRun=${initialRun}`;
355 const timestamp = getScheduledTime(step);
356 const newTask: google.cloud.tasks.v2beta3.ITask = {
357 httpRequest: {
358 headers: {
359 "Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing
360 Authorization: `${env().CRON_SECRET}`,
361 },
362 httpMethod: "GET",
363 url,
364 },
365 scheduleTime: {
366 seconds: timestamp,
367 },
368 };
369
370 const request = { parent: parent, task: newTask };
371 return client.createTask(request);
372}
373
374function getScheduledTime(step: z.infer<typeof workflowStepSchema>) {
375 switch (step) {
376 case "14days":
377 // let's triger it now
378 return new Date().getTime() / 1000;
379 case "3days":
380 // it's 11 days after the 14 days
381 return new Date().setDate(new Date().getDate() + 11) / 1000;
382 case "paused":
383 // it's 3 days after the 3 days step
384 return new Date().setDate(new Date().getDate() + 3) / 1000;
385 default:
386 throw new Error("Invalid step");
387 }
388}
389
390export const workflowStep = ["14days", "3days", "paused"] as const;
391export const workflowStepSchema = z.enum(workflowStep);
392
393async function getUser(userId: number) {
394 const currentUser = await db
395 .select()
396 .from(user)
397 .where(eq(schema.user.id, userId))
398 .get();
399
400 if (!currentUser) {
401 throw new Error("User not found");
402 }
403 if (!currentUser.email) {
404 throw new Error("User email not found");
405 }
406 return currentUser;
407}