Bluesky app fork with some witchin' additions 馃挮
witchsky.app
bluesky
fork
client
1import assert from 'assert'
2import {
3 Kysely,
4 type KyselyPlugin,
5 Migrator,
6 type PluginTransformQueryArgs,
7 type PluginTransformResultArgs,
8 PostgresDialect,
9 type QueryResult,
10 type RootOperationNode,
11 type UnknownRow,
12} from 'kysely'
13import {default as Pg} from 'pg'
14
15import {dbLogger as log} from '../logger.js'
16import {default as migrations} from './migrations/index.js'
17import {DbMigrationProvider} from './migrations/provider.js'
18import {type DbSchema} from './schema.js'
19
20export class Database {
21 migrator: Migrator
22 destroyed = false
23
24 constructor(
25 public db: Kysely<DbSchema>,
26 public cfg: PgConfig,
27 ) {
28 this.migrator = new Migrator({
29 db,
30 migrationTableSchema: cfg.schema,
31 provider: new DbMigrationProvider(migrations),
32 })
33 }
34
35 static postgres(opts: PgOptions): Database {
36 const {schema, url, txLockNonce} = opts
37 log.info(
38 {
39 schema,
40 poolSize: opts.poolSize,
41 poolMaxUses: opts.poolMaxUses,
42 poolIdleTimeoutMs: opts.poolIdleTimeoutMs,
43 },
44 'Creating database connection',
45 )
46
47 const pool =
48 opts.pool ??
49 new Pg.Pool({
50 connectionString: url,
51 max: opts.poolSize,
52 maxUses: opts.poolMaxUses,
53 idleTimeoutMillis: opts.poolIdleTimeoutMs,
54 })
55
56 // Select count(*) and other pg bigints as js integer
57 Pg.types.setTypeParser(Pg.types.builtins.INT8, n => parseInt(n, 10))
58
59 // Setup schema usage, primarily for test parallelism (each test suite runs in its own pg schema)
60 if (schema && !/^[a-z_]+$/i.test(schema)) {
61 throw new Error(`Postgres schema must only contain [A-Za-z_]: ${schema}`)
62 }
63
64 pool.on('error', onPoolError)
65
66 const db = new Kysely<DbSchema>({
67 dialect: new PostgresDialect({pool}),
68 })
69
70 return new Database(db, {
71 pool,
72 schema,
73 url,
74 txLockNonce,
75 })
76 }
77
78 async transaction<T>(fn: (db: Database) => Promise<T>): Promise<T> {
79 const leakyTxPlugin = new LeakyTxPlugin()
80 return this.db
81 .withPlugin(leakyTxPlugin)
82 .transaction()
83 .execute(txn => {
84 const dbTxn = new Database(txn, this.cfg)
85 return fn(dbTxn)
86 .catch(async err => {
87 leakyTxPlugin.endTx()
88 // ensure that all in-flight queries are flushed & the connection is open
89 await dbTxn.db.getExecutor().provideConnection(async () => {})
90 throw err
91 })
92 .finally(() => leakyTxPlugin.endTx())
93 })
94 }
95
96 get schema(): string | undefined {
97 return this.cfg.schema
98 }
99
100 get isTransaction() {
101 return this.db.isTransaction
102 }
103
104 assertTransaction() {
105 assert(this.isTransaction, 'Transaction required')
106 }
107
108 assertNotTransaction() {
109 assert(!this.isTransaction, 'Cannot be in a transaction')
110 }
111
112 async close(): Promise<void> {
113 if (this.destroyed) return
114 await this.db.destroy()
115 this.destroyed = true
116 }
117
118 async migrateToOrThrow(migration: string) {
119 if (this.schema) {
120 await this.db.schema.createSchema(this.schema).ifNotExists().execute()
121 }
122 const {error, results} = await this.migrator.migrateTo(migration)
123 if (error) {
124 throw error
125 }
126 if (!results) {
127 throw new Error('An unknown failure occurred while migrating')
128 }
129 return results
130 }
131
132 async migrateToLatestOrThrow() {
133 if (this.schema) {
134 await this.db.schema.createSchema(this.schema).ifNotExists().execute()
135 }
136 const {error, results} = await this.migrator.migrateToLatest()
137 if (error) {
138 throw error
139 }
140 if (!results) {
141 throw new Error('An unknown failure occurred while migrating')
142 }
143 return results
144 }
145}
146
147export default Database
148
149export type PgConfig = {
150 pool: Pg.Pool
151 url: string
152 schema?: string
153 txLockNonce?: string
154}
155
156type PgOptions = {
157 url: string
158 pool?: Pg.Pool
159 schema?: string
160 poolSize?: number
161 poolMaxUses?: number
162 poolIdleTimeoutMs?: number
163 txLockNonce?: string
164}
165
166class LeakyTxPlugin implements KyselyPlugin {
167 private txOver = false
168
169 endTx() {
170 this.txOver = true
171 }
172
173 transformQuery(args: PluginTransformQueryArgs): RootOperationNode {
174 if (this.txOver) {
175 throw new Error('tx already failed')
176 }
177 return args.node
178 }
179
180 async transformResult(
181 args: PluginTransformResultArgs,
182 ): Promise<QueryResult<UnknownRow>> {
183 return args.result
184 }
185}
186
187const onPoolError = (err: Error) => log.error({err}, 'db pool error')