A decentralized music tracking and discovery platform built on AT Protocol 🎵

feat: add session expiration handling and cron job for refreshing sessions

+97 -3
+2 -1
apps/api/.gitignore
··· 175 175 credentials.json 176 176 *.sqlite 177 177 *.ddb 178 - *.ddb.wal 178 + *.ddb.wal 179 + *.bak
+2
apps/api/package.json
··· 14 14 "sync:library": "tsx ./src/scripts/sync-library.ts", 15 15 "avatar": "tsx ./src/scripts/avatar.ts", 16 16 "genres": "tsx ./src/scripts/genres.ts", 17 + "exp": "tsx ./src/scripts/exp.ts", 17 18 "pkl:eval": "pkl eval -f json", 18 19 "pkl:gen": "tsx ./scripts/pkl.ts", 19 20 "dev:xrpc": "tsx --watch ./src/server.ts", ··· 66 67 "kysely": "^0.27.5", 67 68 "lodash": "^4.17.21", 68 69 "nats": "^2.29.2", 70 + "node-cron": "^4.2.1", 69 71 "pg": "^8.13.3", 70 72 "ramda": "^0.30.1", 71 73 "redis": "^4.7.0",
+4 -2
apps/api/src/auth/storage.ts
··· 45 45 const session = JSON.stringify(val); 46 46 await this.db 47 47 .insertInto("auth_session") 48 - .values({ key, session }) 49 - .onConflict((oc) => oc.doUpdateSet({ session })) 48 + .values({ key, session, expiresAt: val.tokenSet.expires_at }) 49 + .onConflict((oc) => 50 + oc.doUpdateSet({ session, expiresAt: val.tokenSet.expires_at }) 51 + ) 50 52 .execute(); 51 53 } 52 54 async del(key: string) {
+70
apps/api/src/db.ts
··· 1 1 import SqliteDb from "better-sqlite3"; 2 + import chalk from "chalk"; 3 + import type { Context } from "context"; 2 4 import { 3 5 Kysely, 4 6 type Migration, ··· 6 8 Migrator, 7 9 SqliteDialect, 8 10 } from "kysely"; 11 + import { createAgent } from "lib/agent"; 9 12 10 13 // Types 11 14 ··· 26 29 export type AuthSession = { 27 30 key: string; 28 31 session: AuthSessionJson; 32 + expiresAt?: string | null; 29 33 }; 30 34 31 35 export type AuthState = { ··· 75 79 }, 76 80 }; 77 81 82 + migrations["002"] = { 83 + async up(db: Kysely<unknown>) { 84 + await db.schema 85 + .alterTable("auth_session") 86 + .addColumn("expiresAt", "text", (col) => col.defaultTo("NULL")) 87 + .execute(); 88 + }, 89 + async down(db: Kysely<unknown>) { 90 + await db.schema 91 + .alterTable("auth_session") 92 + .dropColumn("expiresAt") 93 + .execute(); 94 + }, 95 + }; 96 + 78 97 // APIs 79 98 80 99 export const createDb = (location: string): Database => { ··· 89 108 const migrator = new Migrator({ db, provider: migrationProvider }); 90 109 const { error } = await migrator.migrateToLatest(); 91 110 if (error) throw error; 111 + }; 112 + 113 + // create a function that update expiresAt to value in auth_session 114 + export const updateExpiresAt = async (db: Database) => { 115 + // get all sessions that have expiresAt is null 116 + const sessions = await db.selectFrom("auth_session").selectAll().execute(); 117 + console.log("Found", sessions.length, "sessions to update"); 118 + for (const session of sessions) { 119 + const data = JSON.parse(session.session) as { 120 + tokenSet: { expires_at?: string | null }; 121 + }; 122 + console.log(session.key, data.tokenSet.expires_at); 123 + await db 124 + .updateTable("auth_session") 125 + .set({ expiresAt: data.tokenSet.expires_at }) 126 + .where("key", "=", session.key) 127 + .execute(); 128 + } 129 + 130 + console.log(`Updated ${chalk.greenBright(sessions.length)} sessions`); 131 + }; 132 + 133 + export const refreshSessionsAboutToExpire = async ( 134 + db: Database, 135 + ctx: Context 136 + ) => { 137 + const inOneHour = new Date(Date.now() + 60 * 60 * 1000).toISOString(); 138 + const now = new Date().toISOString(); 139 + 140 + const sessions = await db 141 + .selectFrom("auth_session") 142 + .selectAll() 143 + .where("expiresAt", "is not", "NULL") 144 + .where("expiresAt", ">", now) 145 + .where("expiresAt", "<=", inOneHour) 146 + .orderBy("expiresAt", "asc") 147 + .execute(); 148 + 149 + for (const session of sessions) { 150 + console.log( 151 + "Session about to expire:", 152 + chalk.cyan(session.key), 153 + session.expiresAt 154 + ); 155 + await createAgent(ctx.oauthClient, session.key); 156 + await new Promise((r) => setTimeout(r, 100)); 157 + } 158 + 159 + console.log( 160 + `Found ${chalk.yellowBright(sessions.length)} sessions to refresh` 161 + ); 92 162 }; 93 163 94 164 export type Database = Kysely<DatabaseSchema>;
+16
apps/api/src/scripts/exp.ts
··· 1 + import { ctx, db } from "context"; 2 + import { refreshSessionsAboutToExpire, updateExpiresAt } from "db"; 3 + import { env } from "lib/env"; 4 + import cron from "node-cron"; 5 + 6 + console.log("DB Path:", env.DB_PATH); 7 + 8 + await updateExpiresAt(db); 9 + 10 + await refreshSessionsAboutToExpire(db, ctx); 11 + 12 + // run every 5 minutes 13 + cron.schedule("*/5 * * * *", async () => { 14 + console.log("Running session refresh job..."); 15 + await refreshSessionsAboutToExpire(db, ctx); 16 + });
+3
bun.lock
··· 57 57 "kysely": "^0.27.5", 58 58 "lodash": "^4.17.21", 59 59 "nats": "^2.29.2", 60 + "node-cron": "^4.2.1", 60 61 "pg": "^8.13.3", 61 62 "ramda": "^0.30.1", 62 63 "redis": "^4.7.0", ··· 2205 2206 "nkeys.js": ["nkeys.js@1.1.0", "", { "dependencies": { "tweetnacl": "1.0.3" } }, "sha512-tB/a0shZL5UZWSwsoeyqfTszONTt4k2YS0tuQioMOD180+MbombYVgzDUYHlx+gejYK6rgf08n/2Df99WY0Sxg=="], 2206 2207 2207 2208 "node-abi": ["node-abi@3.75.0", "", { "dependencies": { "semver": "^7.3.5" } }, "sha512-OhYaY5sDsIka7H7AtijtI9jwGYLyl29eQn/W623DiN/MIv5sUqc4g7BIDThX+gb7di9f6xK02nkp8sdfFWZLTg=="], 2209 + 2210 + "node-cron": ["node-cron@4.2.1", "", {}, "sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg=="], 2208 2211 2209 2212 "node-fetch": ["node-fetch@2.7.0", "", { "dependencies": { "whatwg-url": "^5.0.0" }, "peerDependencies": { "encoding": "^0.1.0" }, "optionalPeers": ["encoding"] }, "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A=="], 2210 2213