Bluesky app fork with some witchin' additions 馃挮 witchsky.app
bluesky fork client
at main 187 lines 4.5 kB view raw
1import assert from 'assert' 2import { 3 Kysely, 4 type KyselyPlugin, 5 Migrator, 6 type PluginTransformQueryArgs, 7 type PluginTransformResultArgs, 8 PostgresDialect, 9 type QueryResult, 10 type RootOperationNode, 11 type UnknownRow, 12} from 'kysely' 13import {default as Pg} from 'pg' 14 15import {dbLogger as log} from '../logger.js' 16import {default as migrations} from './migrations/index.js' 17import {DbMigrationProvider} from './migrations/provider.js' 18import {type DbSchema} from './schema.js' 19 20export class Database { 21 migrator: Migrator 22 destroyed = false 23 24 constructor( 25 public db: Kysely<DbSchema>, 26 public cfg: PgConfig, 27 ) { 28 this.migrator = new Migrator({ 29 db, 30 migrationTableSchema: cfg.schema, 31 provider: new DbMigrationProvider(migrations), 32 }) 33 } 34 35 static postgres(opts: PgOptions): Database { 36 const {schema, url, txLockNonce} = opts 37 log.info( 38 { 39 schema, 40 poolSize: opts.poolSize, 41 poolMaxUses: opts.poolMaxUses, 42 poolIdleTimeoutMs: opts.poolIdleTimeoutMs, 43 }, 44 'Creating database connection', 45 ) 46 47 const pool = 48 opts.pool ?? 49 new Pg.Pool({ 50 connectionString: url, 51 max: opts.poolSize, 52 maxUses: opts.poolMaxUses, 53 idleTimeoutMillis: opts.poolIdleTimeoutMs, 54 }) 55 56 // Select count(*) and other pg bigints as js integer 57 Pg.types.setTypeParser(Pg.types.builtins.INT8, n => parseInt(n, 10)) 58 59 // Setup schema usage, primarily for test parallelism (each test suite runs in its own pg schema) 60 if (schema && !/^[a-z_]+$/i.test(schema)) { 61 throw new Error(`Postgres schema must only contain [A-Za-z_]: ${schema}`) 62 } 63 64 pool.on('error', onPoolError) 65 66 const db = new Kysely<DbSchema>({ 67 dialect: new PostgresDialect({pool}), 68 }) 69 70 return new Database(db, { 71 pool, 72 schema, 73 url, 74 txLockNonce, 75 }) 76 } 77 78 async transaction<T>(fn: (db: Database) => Promise<T>): Promise<T> { 79 const leakyTxPlugin = new LeakyTxPlugin() 80 return this.db 81 .withPlugin(leakyTxPlugin) 82 .transaction() 83 .execute(txn => { 84 const dbTxn = new Database(txn, this.cfg) 85 return fn(dbTxn) 86 .catch(async err => { 87 leakyTxPlugin.endTx() 88 // ensure that all in-flight queries are flushed & the connection is open 89 await dbTxn.db.getExecutor().provideConnection(async () => {}) 90 throw err 91 }) 92 .finally(() => leakyTxPlugin.endTx()) 93 }) 94 } 95 96 get schema(): string | undefined { 97 return this.cfg.schema 98 } 99 100 get isTransaction() { 101 return this.db.isTransaction 102 } 103 104 assertTransaction() { 105 assert(this.isTransaction, 'Transaction required') 106 } 107 108 assertNotTransaction() { 109 assert(!this.isTransaction, 'Cannot be in a transaction') 110 } 111 112 async close(): Promise<void> { 113 if (this.destroyed) return 114 await this.db.destroy() 115 this.destroyed = true 116 } 117 118 async migrateToOrThrow(migration: string) { 119 if (this.schema) { 120 await this.db.schema.createSchema(this.schema).ifNotExists().execute() 121 } 122 const {error, results} = await this.migrator.migrateTo(migration) 123 if (error) { 124 throw error 125 } 126 if (!results) { 127 throw new Error('An unknown failure occurred while migrating') 128 } 129 return results 130 } 131 132 async migrateToLatestOrThrow() { 133 if (this.schema) { 134 await this.db.schema.createSchema(this.schema).ifNotExists().execute() 135 } 136 const {error, results} = await this.migrator.migrateToLatest() 137 if (error) { 138 throw error 139 } 140 if (!results) { 141 throw new Error('An unknown failure occurred while migrating') 142 } 143 return results 144 } 145} 146 147export default Database 148 149export type PgConfig = { 150 pool: Pg.Pool 151 url: string 152 schema?: string 153 txLockNonce?: string 154} 155 156type PgOptions = { 157 url: string 158 pool?: Pg.Pool 159 schema?: string 160 poolSize?: number 161 poolMaxUses?: number 162 poolIdleTimeoutMs?: number 163 txLockNonce?: string 164} 165 166class LeakyTxPlugin implements KyselyPlugin { 167 private txOver = false 168 169 endTx() { 170 this.txOver = true 171 } 172 173 transformQuery(args: PluginTransformQueryArgs): RootOperationNode { 174 if (this.txOver) { 175 throw new Error('tx already failed') 176 } 177 return args.node 178 } 179 180 async transformResult( 181 args: PluginTransformResultArgs, 182 ): Promise<QueryResult<UnknownRow>> { 183 return args.result 184 } 185} 186 187const onPoolError = (err: Error) => log.error({err}, 'db pool error')