Openstatus www.openstatus.dev
at 4c0f4c00a38753a5d0dfd7e7b7b7706dec6f1503 414 lines 11 kB view raw
1import { CloudTasksClient } from "@google-cloud/tasks"; 2import type { google } from "@google-cloud/tasks/build/protos/protos"; 3import { 4 and, 5 db, 6 desc, 7 eq, 8 isNull, 9 lte, 10 max, 11 or, 12 schema, 13} from "@openstatus/db"; 14import { session, user } from "@openstatus/db/src/schema"; 15import { 16 monitorDeactivationEmail, 17 monitorPausedEmail, 18} from "@openstatus/emails"; 19import { sendBatchEmailHtml } from "@openstatus/emails/src/send"; 20import { Redis } from "@openstatus/upstash"; 21import { RateLimiter } from "limiter"; 22import { z } from "zod"; 23import { env } from "../env"; 24 25const redis = Redis.fromEnv(); 26 27const client = new CloudTasksClient({ 28 projectId: env().GCP_PROJECT_ID, 29 credentials: { 30 client_email: env().GCP_CLIENT_EMAIL, 31 private_key: env().GCP_PRIVATE_KEY.replaceAll("\\n", "\n"), 32 }, 33}); 34 35const parent = client.queuePath( 36 env().GCP_PROJECT_ID, 37 env().GCP_LOCATION, 38 "workflow", 39); 40 41const limiter = new RateLimiter({ tokensPerInterval: 15, interval: "second" }); 42 43export async function LaunchMonitorWorkflow() { 44 // Expires is one month after last connection, so if we want to reach people who connected 3 months ago we need to check for people with expires 2 months ago 45 const twoMonthAgo = new Date().setMonth(new Date().getMonth() - 2); 46 47 const date = new Date(twoMonthAgo); 48 // User without session 49 const userWithoutSession = db 50 .select({ 51 userId: schema.user.id, 52 email: schema.user.email, 53 updatedAt: schema.user.updatedAt, 54 }) 55 .from(schema.user) 56 .leftJoin(schema.session, eq(schema.session.userId, schema.user.id)) 57 .where(isNull(schema.session.userId)) 58 .as("query"); 59 // Only free users monitors are paused 60 // We don't need to handle multi users per workspace because free workspaces only have one user 61 // Only free users monitors are paused 62 63 const u1 = await db 64 .select({ 65 userId: userWithoutSession.userId, 66 email: userWithoutSession.email, 67 workspaceId: schema.workspace.id, 68 }) 69 .from(userWithoutSession) 70 .innerJoin( 71 schema.usersToWorkspaces, 72 eq(userWithoutSession.userId, schema.usersToWorkspaces.userId), 73 ) 74 .innerJoin( 75 schema.workspace, 76 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id), 77 ) 78 .where( 79 and( 80 or( 81 lte(userWithoutSession.updatedAt, date), 82 isNull(userWithoutSession.updatedAt), 83 ), 84 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")), 85 ), 86 ); 87 88 console.log(`Found ${u1.length} users without session to start the workflow`); 89 const maxSessionPerUser = db 90 .select({ 91 userId: schema.user.id, 92 email: schema.user.email, 93 lastConnection: max(schema.session.expires).as("lastConnection"), 94 }) 95 .from(schema.user) 96 .innerJoin(schema.session, eq(schema.session.userId, schema.user.id)) 97 .groupBy(schema.user.id) 98 .as("maxSessionPerUser"); 99 // Only free users monitors are paused 100 // We don't need to handle multi users per workspace because free workspaces only have one user 101 // Only free users monitors are paused 102 103 const u = await db 104 .select({ 105 userId: maxSessionPerUser.userId, 106 email: maxSessionPerUser.email, 107 workspaceId: schema.workspace.id, 108 }) 109 .from(maxSessionPerUser) 110 .innerJoin( 111 schema.usersToWorkspaces, 112 eq(maxSessionPerUser.userId, schema.usersToWorkspaces.userId), 113 ) 114 .innerJoin( 115 schema.workspace, 116 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id), 117 ) 118 .where( 119 and( 120 lte(maxSessionPerUser.lastConnection, date), 121 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")), 122 ), 123 ); 124 // Let's merge both results 125 const users = [...u, ...u1]; 126 // iterate over users 127 128 const allResult = []; 129 130 for (const user of users) { 131 await limiter.removeTokens(1); 132 const workflow = workflowInit({ user }); 133 allResult.push(workflow); 134 } 135 136 const allRequests = await Promise.allSettled(allResult); 137 138 const success = allRequests.filter((r) => r.status === "fulfilled").length; 139 const failed = allRequests.filter((r) => r.status === "rejected").length; 140 141 console.log( 142 `End cron with ${allResult.length} jobs with ${success} success and ${failed} failed`, 143 ); 144} 145 146async function workflowInit({ 147 user, 148}: { 149 user: { 150 userId: number; 151 email: string | null; 152 workspaceId: number; 153 }; 154}) { 155 console.log(`Starting workflow for ${user.userId}`); 156 // Let's check if the user is in the workflow 157 const isMember = await redis.sismember("workflow:users", user.userId); 158 if (isMember) { 159 console.log(`user workflow already started for ${user.userId}`); 160 return; 161 } 162 // check if user has some running monitors 163 const nbRunningMonitor = await db.$count( 164 schema.monitor, 165 and( 166 eq(schema.monitor.workspaceId, user.workspaceId), 167 eq(schema.monitor.active, true), 168 isNull(schema.monitor.deletedAt), 169 ), 170 ); 171 if (nbRunningMonitor === 0) { 172 console.log(`user has no running monitors for ${user.userId}`); 173 return; 174 } 175 await CreateTask({ 176 parent, 177 client: client, 178 step: "14days", 179 userId: user.userId, 180 initialRun: new Date().getTime(), 181 }); 182 // // Add our user to the list of users that have started the workflow 183 184 await redis.sadd("workflow:users", user.userId); 185 console.log(`user workflow started for ${user.userId}`); 186} 187 188export async function Step14Days(userId: number, workFlowRunTimestamp: number) { 189 const user = await getUser(userId); 190 191 // Send email saying we are going to pause the monitors 192 // The task has just been created we don't double check if the user has logged in :scary: 193 // send First email 194 // TODO: Send email 195 196 if (user.email) { 197 await sendBatchEmailHtml([ 198 { 199 to: user.email, 200 subject: "Your OpenStatus monitors will be paused in 14 days", 201 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>", 202 reply_to: "thibault@openstatus.dev", 203 html: monitorDeactivationEmail({ 204 date: new Date( 205 new Date().setDate(new Date().getDate() + 14), 206 ).toDateString(), 207 }), 208 }, 209 ]); 210 211 await CreateTask({ 212 parent, 213 client: client, 214 step: "3days", 215 userId: user.id, 216 initialRun: workFlowRunTimestamp, 217 }); 218 } 219} 220 221export async function Step3Days(userId: number, workFlowRunTimestamp: number) { 222 // check if user has connected 223 const hasConnected = await hasUserLoggedIn({ 224 userId, 225 date: new Date(workFlowRunTimestamp), 226 }); 227 228 if (hasConnected) { 229 // 230 await redis.srem("workflow:users", userId); 231 return; 232 } 233 234 const user = await getUser(userId); 235 236 if (user.email) { 237 await sendBatchEmailHtml([ 238 { 239 to: user.email, 240 subject: "Your OpenStatus monitors will be paused in 3 days", 241 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>", 242 reply_to: "thibault@openstatus.dev", 243 html: monitorDeactivationEmail({ 244 date: new Date( 245 new Date().setDate(new Date().getDate() + 3), 246 ).toDateString(), 247 }), 248 }, 249 ]); 250 } 251 252 // Send second email 253 //TODO: Send email 254 // Let's schedule the next task 255 await CreateTask({ 256 client, 257 parent, 258 step: "paused", 259 userId, 260 initialRun: workFlowRunTimestamp, 261 }); 262} 263 264export async function StepPaused(userId: number, workFlowRunTimestamp: number) { 265 const hasConnected = await hasUserLoggedIn({ 266 userId, 267 date: new Date(workFlowRunTimestamp), 268 }); 269 if (!hasConnected) { 270 // sendSecond pause email 271 const users = await db 272 .select({ 273 userId: schema.user.id, 274 email: schema.user.email, 275 workspaceId: schema.workspace.id, 276 }) 277 .from(user) 278 .innerJoin(session, eq(schema.user.id, schema.session.userId)) 279 .innerJoin( 280 schema.usersToWorkspaces, 281 eq(schema.user.id, schema.usersToWorkspaces.userId), 282 ) 283 .innerJoin( 284 schema.workspace, 285 eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id), 286 ) 287 .where( 288 and( 289 or(isNull(schema.workspace.plan), eq(schema.workspace.plan, "free")), 290 eq(schema.user.id, userId), 291 ), 292 ) 293 .get(); 294 // We should only have one user :) 295 if (!users) { 296 console.error(`No user found for ${userId}`); 297 return; 298 } 299 300 await db 301 .update(schema.monitor) 302 .set({ active: false }) 303 .where(eq(schema.monitor.workspaceId, users.workspaceId)); 304 // Send last email with pause monitor 305 } 306 307 const currentUser = await getUser(userId); 308 // TODO: Send email 309 // Remove user for workflow 310 311 if (currentUser.email) { 312 await sendBatchEmailHtml([ 313 { 314 to: currentUser.email, 315 subject: "Your monitors have been paused", 316 from: "Thibault From OpenStatus <thibault@notifications.openstatus.dev>", 317 reply_to: "thibault@openstatus.dev", 318 html: monitorPausedEmail(), 319 }, 320 ]); 321 } 322 await redis.srem("workflow:users", userId); 323} 324 325async function hasUserLoggedIn({ 326 userId, 327 date, 328}: { 329 userId: number; 330 date: Date; 331}) { 332 const userResult = await db 333 .select({ lastSession: schema.session.expires }) 334 .from(schema.session) 335 .where(eq(schema.session.userId, userId)) 336 .orderBy(desc(schema.session.expires)); 337 338 if (userResult.length === 0) { 339 return false; 340 } 341 const user = userResult[0]; 342 if (user.lastSession === null) { 343 return false; 344 } 345 return user.lastSession > date; 346} 347 348function CreateTask({ 349 parent, 350 client, 351 step, 352 userId, 353 initialRun, 354}: { 355 parent: string; 356 client: CloudTasksClient; 357 step: z.infer<typeof workflowStepSchema>; 358 userId: number; 359 initialRun: number; 360}) { 361 const url = `https://openstatus-workflows.fly.dev/cron/monitors/${step}?userId=${userId}&initialRun=${initialRun}`; 362 const timestamp = getScheduledTime(step); 363 const newTask: google.cloud.tasks.v2beta3.ITask = { 364 httpRequest: { 365 headers: { 366 "Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing 367 Authorization: `${env().CRON_SECRET}`, 368 }, 369 httpMethod: "GET", 370 url, 371 }, 372 scheduleTime: { 373 seconds: timestamp, 374 }, 375 }; 376 377 const request = { parent: parent, task: newTask }; 378 return client.createTask(request); 379} 380 381function getScheduledTime(step: z.infer<typeof workflowStepSchema>) { 382 switch (step) { 383 case "14days": 384 // let's triger it now 385 return new Date().getTime() / 1000; 386 case "3days": 387 // it's 11 days after the 14 days 388 return new Date().setDate(new Date().getDate() + 11) / 1000; 389 case "paused": 390 // it's 3 days after the 3 days step 391 return new Date().setDate(new Date().getDate() + 3) / 1000; 392 default: 393 throw new Error("Invalid step"); 394 } 395} 396 397export const workflowStep = ["14days", "3days", "paused"] as const; 398export const workflowStepSchema = z.enum(workflowStep); 399 400async function getUser(userId: number) { 401 const currentUser = await db 402 .select() 403 .from(user) 404 .where(eq(schema.user.id, userId)) 405 .get(); 406 407 if (!currentUser) { 408 throw new Error("User not found"); 409 } 410 if (!currentUser.email) { 411 throw new Error("User email not found"); 412 } 413 return currentUser; 414}