Openstatus www.openstatus.dev
at 20d0eeac16db94063a196dbfa8cb02fb172cac98 407 lines 11 kB view raw
1import { CloudTasksClient } from "@google-cloud/tasks"; 2import type { google } from "@google-cloud/tasks/build/protos/protos"; 3import { 4 and, 5 // db, 6 desc, 7 eq, 8 isNull, 9 lte, 10 max, 11 or, 12 schema, 13} from "@openstatus/db"; 14import { session, user } from "@openstatus/db/src/schema"; 15import { 16 MonitorDeactivationEmail, 17 MonitorPausedEmail, 18} from "@openstatus/emails"; 19import { sendWithRender } from "@openstatus/emails/src/send"; 20import { Redis } from "@openstatus/upstash"; 21import { RateLimiter } from "limiter"; 22import { z } from "zod"; 23import { env } from "../env"; 24import { db } from "../lib/db"; 25 26const redis = Redis.fromEnv(); 27 28const client = new CloudTasksClient({ 29 projectId: env().GCP_PROJECT_ID, 30 credentials: { 31 client_email: env().GCP_CLIENT_EMAIL, 32 private_key: env().GCP_PRIVATE_KEY.replaceAll("\\n", "\n"), 33 }, 34}); 35 36const parent = client.queuePath( 37 env().GCP_PROJECT_ID, 38 env().GCP_LOCATION, 39 "workflow", 40); 41 42const limiter = new RateLimiter({ tokensPerInterval: 15, interval: "second" }); 43 44export async function LaunchMonitorWorkflow() { 45 // Expires is one month after last connection, so if we want to reach people who connected 3 months ago we need to check for people with expires 2 months ago 46 const twoMonthAgo = new Date().setMonth(new Date().getMonth() - 2); 47 48 const date = new Date(twoMonthAgo); 49 // User without session 50 const userWithoutSession = db 51 .select({ 52 userId: schema.user.id, 53 email: schema.user.email, 54 updatedAt: schema.user.updatedAt, 55 }) 56 .from(schema.user) 57 .leftJoin(schema.session, eq(schema.session.userId, schema.user.id)) 58 .where(isNull(schema.session.userId)) 59 .as("query"); 60 // Only free users monitors are paused 61 // We don't need to handle multi users per workspace because free workspaces only have one user 62 // Only free users monitors are paused 63 64 const u1 = await db 65 .select({ 66 userId: userWithoutSession.userId, 67 email: userWithoutSession.email, 68 workspaceId: schema.workspace.id, 69 }) 70 .from(userWithoutSession) 71 .innerJoin( 72 schema.usersToWorkspaces, 73 eq(userWithoutSession.userId, schema.usersToWorkspaces.userId), 74 ) 75 .innerJoin( 76 schema.workspace, 77 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id), 78 ) 79 .where( 80 and( 81 or( 82 lte(userWithoutSession.updatedAt, date), 83 isNull(userWithoutSession.updatedAt), 84 ), 85 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")), 86 ), 87 ); 88 89 console.log(`Found ${u1.length} users without session to start the workflow`); 90 const maxSessionPerUser = db 91 .select({ 92 userId: schema.user.id, 93 email: schema.user.email, 94 lastConnection: max(schema.session.expires).as("lastConnection"), 95 }) 96 .from(schema.user) 97 .innerJoin(schema.session, eq(schema.session.userId, schema.user.id)) 98 .groupBy(schema.user.id) 99 .as("maxSessionPerUser"); 100 // Only free users monitors are paused 101 // We don't need to handle multi users per workspace because free workspaces only have one user 102 // Only free users monitors are paused 103 104 const u = await db 105 .select({ 106 userId: maxSessionPerUser.userId, 107 email: maxSessionPerUser.email, 108 workspaceId: schema.workspace.id, 109 }) 110 .from(maxSessionPerUser) 111 .innerJoin( 112 schema.usersToWorkspaces, 113 eq(maxSessionPerUser.userId, schema.usersToWorkspaces.userId), 114 ) 115 .innerJoin( 116 schema.workspace, 117 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id), 118 ) 119 .where( 120 and( 121 lte(maxSessionPerUser.lastConnection, date), 122 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")), 123 ), 124 ); 125 // Let's merge both results 126 const users = [...u, ...u1]; 127 // iterate over users 128 129 const allResult = []; 130 131 for (const user of users) { 132 await limiter.removeTokens(1); 133 const workflow = workflowInit({ user }); 134 allResult.push(workflow); 135 } 136 137 const allRequests = await Promise.allSettled(allResult); 138 139 const success = allRequests.filter((r) => r.status === "fulfilled").length; 140 const failed = allRequests.filter((r) => r.status === "rejected").length; 141 142 console.log( 143 `End cron with ${allResult.length} jobs with ${success} success and ${failed} failed`, 144 ); 145} 146 147async function workflowInit({ 148 user, 149}: { 150 user: { 151 userId: number; 152 email: string | null; 153 workspaceId: number; 154 }; 155}) { 156 console.log(`Starting workflow for ${user.userId}`); 157 // Let's check if the user is in the workflow 158 const isMember = await redis.sismember("workflow:users", user.userId); 159 if (isMember) { 160 console.log(`user workflow already started for ${user.userId}`); 161 return; 162 } 163 // check if user has some running monitors 164 const nbRunningMonitor = await db.$count( 165 schema.monitor, 166 and( 167 eq(schema.monitor.workspaceId, user.workspaceId), 168 eq(schema.monitor.active, true), 169 isNull(schema.monitor.deletedAt), 170 ), 171 ); 172 if (nbRunningMonitor === 0) { 173 console.log(`user has no running monitors for ${user.userId}`); 174 return; 175 } 176 await CreateTask({ 177 parent, 178 client: client, 179 step: "14days", 180 userId: user.userId, 181 initialRun: new Date().getTime(), 182 }); 183 // // Add our user to the list of users that have started the workflow 184 185 await redis.sadd("workflow:users", user.userId); 186 console.log(`user workflow started for ${user.userId}`); 187} 188 189export async function Step14Days(userId: number, workFlowRunTimestamp: number) { 190 const user = await getUser(userId); 191 192 // Send email saying we are going to pause the monitors 193 // The task has just been created we don't double check if the user has logged in :scary: 194 // send First email 195 // TODO: Send email 196 197 if (user.email) { 198 await sendWithRender({ 199 to: [user.email], 200 subject: "Your OpenStatus monitors will be paused soon", 201 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>", 202 reply_to: "thibault@openstatus.dev", 203 204 react: MonitorDeactivationEmail({ 205 deactivateAt: new Date(new Date().setDate(new Date().getDate() + 14)), 206 }), 207 }); 208 209 await CreateTask({ 210 parent, 211 client: client, 212 step: "3days", 213 userId: user.id, 214 initialRun: workFlowRunTimestamp, 215 }); 216 } 217} 218 219export async function Step3Days(userId: number, workFlowRunTimestamp: number) { 220 // check if user has connected 221 const hasConnected = await hasUserLoggedIn({ 222 userId, 223 date: new Date(workFlowRunTimestamp), 224 }); 225 226 if (hasConnected) { 227 // 228 await redis.srem("workflow:users", userId); 229 return; 230 } 231 232 const user = await getUser(userId); 233 234 if (user.email) { 235 await sendWithRender({ 236 to: [user.email], 237 subject: "Your OpenStatus monitors will be paused in 3 days", 238 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>", 239 reply_to: "thibault@openstatus.dev", 240 241 react: MonitorDeactivationEmail({ 242 deactivateAt: new Date(new Date().setDate(new Date().getDate() + 3)), 243 }), 244 }); 245 } 246 247 // Send second email 248 //TODO: Send email 249 // Let's schedule the next task 250 await CreateTask({ 251 client, 252 parent, 253 step: "paused", 254 userId, 255 initialRun: workFlowRunTimestamp, 256 }); 257} 258 259export async function StepPaused(userId: number, workFlowRunTimestamp: number) { 260 const hasConnected = await hasUserLoggedIn({ 261 userId, 262 date: new Date(workFlowRunTimestamp), 263 }); 264 if (!hasConnected) { 265 // sendSecond pause email 266 const users = await db 267 .select({ 268 userId: schema.user.id, 269 email: schema.user.email, 270 workspaceId: schema.workspace.id, 271 }) 272 .from(user) 273 .innerJoin(session, eq(schema.user.id, schema.session.userId)) 274 .innerJoin( 275 schema.usersToWorkspaces, 276 eq(schema.user.id, schema.usersToWorkspaces.userId), 277 ) 278 .innerJoin( 279 schema.workspace, 280 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id), 281 ) 282 .where( 283 and( 284 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")), 285 eq(schema.user.id, userId), 286 ), 287 ) 288 .get(); 289 // We should only have one user :) 290 if (!users) { 291 console.error(`No user found for ${userId}`); 292 return; 293 } 294 295 await db 296 .update(schema.monitor) 297 .set({ active: false }) 298 .where(eq(schema.monitor.workspaceId, users.workspaceId)); 299 // Send last email with pause monitor 300 } 301 302 const currentUser = await getUser(userId); 303 // TODO: Send email 304 // Remove user for workflow 305 306 if (currentUser.email) { 307 await sendWithRender({ 308 to: [currentUser.email], 309 subject: "Your monitors have been paused", 310 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>", 311 reply_to: "thibault@openstatus.dev", 312 react: MonitorPausedEmail(), 313 }); 314 } 315 await redis.srem("workflow:users", userId); 316} 317 318async function hasUserLoggedIn({ 319 userId, 320 date, 321}: { 322 userId: number; 323 date: Date; 324}) { 325 const userResult = await db 326 .select({ lastSession: schema.session.expires }) 327 .from(schema.session) 328 .where(eq(schema.session.userId, userId)) 329 .orderBy(desc(schema.session.expires)); 330 331 if (userResult.length === 0) { 332 return false; 333 } 334 const user = userResult[0]; 335 if (user.lastSession === null) { 336 return false; 337 } 338 return user.lastSession > date; 339} 340 341function CreateTask({ 342 parent, 343 client, 344 step, 345 userId, 346 initialRun, 347}: { 348 parent: string; 349 client: CloudTasksClient; 350 step: z.infer<typeof workflowStepSchema>; 351 userId: number; 352 initialRun: number; 353}) { 354 const url = `https://openstatus-workflows.fly.dev/cron/monitors/${step}?userId=${userId}&initialRun=${initialRun}`; 355 const timestamp = getScheduledTime(step); 356 const newTask: google.cloud.tasks.v2beta3.ITask = { 357 httpRequest: { 358 headers: { 359 "Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing 360 Authorization: `${env().CRON_SECRET}`, 361 }, 362 httpMethod: "GET", 363 url, 364 }, 365 scheduleTime: { 366 seconds: timestamp, 367 }, 368 }; 369 370 const request = { parent: parent, task: newTask }; 371 return client.createTask(request); 372} 373 374function getScheduledTime(step: z.infer<typeof workflowStepSchema>) { 375 switch (step) { 376 case "14days": 377 // let's triger it now 378 return new Date().getTime() / 1000; 379 case "3days": 380 // it's 11 days after the 14 days 381 return new Date().setDate(new Date().getDate() + 11) / 1000; 382 case "paused": 383 // it's 3 days after the 3 days step 384 return new Date().setDate(new Date().getDate() + 3) / 1000; 385 default: 386 throw new Error("Invalid step"); 387 } 388} 389 390export const workflowStep = ["14days", "3days", "paused"] as const; 391export const workflowStepSchema = z.enum(workflowStep); 392 393async function getUser(userId: number) { 394 const currentUser = await db 395 .select() 396 .from(user) 397 .where(eq(schema.user.id, userId)) 398 .get(); 399 400 if (!currentUser) { 401 throw new Error("User not found"); 402 } 403 if (!currentUser.email) { 404 throw new Error("User email not found"); 405 } 406 return currentUser; 407}