A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at feat/feed-generator 163 lines 4.2 kB view raw
1import SqliteDb from "better-sqlite3"; 2import chalk from "chalk"; 3import type { Context } from "context"; 4import { 5 Kysely, 6 type Migration, 7 type MigrationProvider, 8 Migrator, 9 SqliteDialect, 10} from "kysely"; 11import { createAgent } from "lib/agent"; 12 13// Types 14 15export type DatabaseSchema = { 16 status: Status; 17 auth_session: AuthSession; 18 auth_state: AuthState; 19}; 20 21export type Status = { 22 uri: string; 23 authorDid: string; 24 status: string; 25 createdAt: string; 26 indexedAt: string; 27}; 28 29export type AuthSession = { 30 key: string; 31 session: AuthSessionJson; 32 expiresAt?: string | null; 33}; 34 35export type AuthState = { 36 key: string; 37 state: AuthStateJson; 38}; 39 40type AuthStateJson = string; 41 42type AuthSessionJson = string; 43 44// Migrations 45 46const migrations: Record<string, Migration> = {}; 47 48const migrationProvider: MigrationProvider = { 49 async getMigrations() { 50 return migrations; 51 }, 52}; 53 54migrations["001"] = { 55 async up(db: Kysely<unknown>) { 56 await db.schema 57 .createTable("status") 58 .addColumn("uri", "varchar", (col) => col.primaryKey()) 59 .addColumn("authorDid", "varchar", (col) => col.notNull()) 60 .addColumn("status", "varchar", (col) => col.notNull()) 61 .addColumn("createdAt", "varchar", (col) => col.notNull()) 62 .addColumn("indexedAt", "varchar", (col) => col.notNull()) 63 .execute(); 64 await db.schema 65 .createTable("auth_session") 66 .addColumn("key", "varchar", (col) => col.primaryKey()) 67 .addColumn("session", "varchar", (col) => col.notNull()) 68 .execute(); 69 await db.schema 70 .createTable("auth_state") 71 .addColumn("key", "varchar", (col) => col.primaryKey()) 72 .addColumn("state", "varchar", (col) => col.notNull()) 73 .execute(); 74 }, 75 async down(db: Kysely<unknown>) { 76 await db.schema.dropTable("auth_state").execute(); 77 await db.schema.dropTable("auth_session").execute(); 78 await db.schema.dropTable("status").execute(); 79 }, 80}; 81 82migrations["002"] = { 83 async up(db: Kysely<unknown>) { 84 await db.schema 85 .alterTable("auth_session") 86 .addColumn("expiresAt", "text", (col) => col.defaultTo("NULL")) 87 .execute(); 88 }, 89 async down(db: Kysely<unknown>) { 90 await db.schema 91 .alterTable("auth_session") 92 .dropColumn("expiresAt") 93 .execute(); 94 }, 95}; 96 97// APIs 98 99export const createDb = (location: string): Database => { 100 return new Kysely<DatabaseSchema>({ 101 dialect: new SqliteDialect({ 102 database: new SqliteDb(location), 103 }), 104 }); 105}; 106 107export const migrateToLatest = async (db: Database) => { 108 const migrator = new Migrator({ db, provider: migrationProvider }); 109 const { error } = await migrator.migrateToLatest(); 110 if (error) throw error; 111}; 112 113export const updateExpiresAt = async (db: Database) => { 114 // get all sessions that have expiresAt is null 115 const sessions = await db.selectFrom("auth_session").selectAll().execute(); 116 console.log("Found", sessions.length, "sessions to update"); 117 for (const session of sessions) { 118 const data = JSON.parse(session.session) as { 119 tokenSet: { expires_at?: string | null }; 120 }; 121 console.log(session.key, data.tokenSet.expires_at); 122 await db 123 .updateTable("auth_session") 124 .set({ expiresAt: data.tokenSet.expires_at }) 125 .where("key", "=", session.key) 126 .execute(); 127 } 128 129 console.log(`Updated ${chalk.greenBright(sessions.length)} sessions`); 130}; 131 132export const refreshSessionsAboutToExpire = async ( 133 db: Database, 134 ctx: Context, 135) => { 136 const now = new Date().toISOString(); 137 138 const sessions = await db 139 .selectFrom("auth_session") 140 .selectAll() 141 .where("expiresAt", "is not", "NULL") 142 .where("expiresAt", ">", now) 143 .orderBy("expiresAt", "asc") 144 .execute(); 145 146 for (const session of sessions) { 147 console.log( 148 "Session about to expire:", 149 chalk.cyan(session.key), 150 session.expiresAt, 151 ); 152 const agent = await createAgent(ctx.oauthClient, session.key); 153 // Trigger a token refresh by fetching preferences 154 await agent.getPreferences(); 155 await new Promise((r) => setTimeout(r, 200)); 156 } 157 158 console.log( 159 `Found ${chalk.yellowBright(sessions.length)} sessions to refresh`, 160 ); 161}; 162 163export type Database = Kysely<DatabaseSchema>;