this repo has no description
1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log"
8
9 _ "github.com/mattn/go-sqlite3"
10)
11
12type DB struct {
13 *sql.DB
14}
15
16type Execer interface {
17 Query(query string, args ...any) (*sql.Rows, error)
18 QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
19 QueryRow(query string, args ...any) *sql.Row
20 QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
21 Exec(query string, args ...any) (sql.Result, error)
22 ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
23 Prepare(query string) (*sql.Stmt, error)
24 PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
25}
26
27func Make(dbPath string) (*DB, error) {
28 db, err := sql.Open("sqlite3", dbPath)
29 if err != nil {
30 return nil, err
31 }
32 _, err = db.Exec(`
33 pragma journal_mode = WAL;
34 pragma synchronous = normal;
35 pragma foreign_keys = on;
36 pragma temp_store = memory;
37 pragma mmap_size = 30000000000;
38 pragma page_size = 32768;
39 pragma auto_vacuum = incremental;
40 pragma busy_timeout = 5000;
41
42 create table if not exists registrations (
43 id integer primary key autoincrement,
44 domain text not null unique,
45 did text not null,
46 secret text not null,
47 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
48 registered text
49 );
50 create table if not exists public_keys (
51 id integer primary key autoincrement,
52 did text not null,
53 name text not null,
54 key text not null,
55 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
56 unique(did, name, key)
57 );
58 create table if not exists repos (
59 id integer primary key autoincrement,
60 did text not null,
61 name text not null,
62 knot text not null,
63 rkey text not null,
64 at_uri text not null unique,
65 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
66 unique(did, name, knot, rkey)
67 );
68 create table if not exists collaborators (
69 id integer primary key autoincrement,
70 did text not null,
71 repo integer not null,
72 foreign key (repo) references repos(id) on delete cascade
73 );
74 create table if not exists follows (
75 user_did text not null,
76 subject_did text not null,
77 rkey text not null,
78 followed_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
79 primary key (user_did, subject_did),
80 check (user_did <> subject_did)
81 );
82 create table if not exists issues (
83 id integer primary key autoincrement,
84 owner_did text not null,
85 repo_at text not null,
86 issue_id integer not null,
87 title text not null,
88 body text not null,
89 open integer not null default 1,
90 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
91 issue_at text,
92 unique(repo_at, issue_id),
93 foreign key (repo_at) references repos(at_uri) on delete cascade
94 );
95 create table if not exists comments (
96 id integer primary key autoincrement,
97 owner_did text not null,
98 issue_id integer not null,
99 repo_at text not null,
100 comment_id integer not null,
101 comment_at text not null,
102 body text not null,
103 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
104 unique(issue_id, comment_id),
105 foreign key (repo_at, issue_id) references issues(repo_at, issue_id) on delete cascade
106 );
107 create table if not exists pulls (
108 -- identifiers
109 id integer primary key autoincrement,
110 pull_id integer not null,
111
112 -- at identifiers
113 repo_at text not null,
114 owner_did text not null,
115 rkey text not null,
116 pull_at text,
117
118 -- content
119 title text not null,
120 body text not null,
121 target_branch text not null,
122 state integer not null default 0 check (state in (0, 1, 2)), -- open, merged, closed
123
124 -- meta
125 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
126
127 -- constraints
128 unique(repo_at, pull_id),
129 foreign key (repo_at) references repos(at_uri) on delete cascade
130 );
131
132 -- every pull must have atleast 1 submission: the initial submission
133 create table if not exists pull_submissions (
134 -- identifiers
135 id integer primary key autoincrement,
136 pull_id integer not null,
137
138 -- at identifiers
139 repo_at text not null,
140
141 -- content, these are immutable, and require a resubmission to update
142 round_number integer not null default 0,
143 patch text,
144
145 -- meta
146 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
147
148 -- constraints
149 unique(repo_at, pull_id, round_number),
150 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade
151 );
152
153 create table if not exists pull_comments (
154 -- identifiers
155 id integer primary key autoincrement,
156 pull_id integer not null,
157 submission_id integer not null,
158
159 -- at identifiers
160 repo_at text not null,
161 owner_did text not null,
162 comment_at text not null,
163
164 -- content
165 body text not null,
166
167 -- meta
168 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
169
170 -- constraints
171 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade,
172 foreign key (submission_id) references pull_submissions(id) on delete cascade
173 );
174
175 create table if not exists _jetstream (
176 id integer primary key autoincrement,
177 last_time_us integer not null
178 );
179
180 create table if not exists repo_issue_seqs (
181 repo_at text primary key,
182 next_issue_id integer not null default 1
183 );
184
185 create table if not exists repo_pull_seqs (
186 repo_at text primary key,
187 next_pull_id integer not null default 1
188 );
189
190 create table if not exists stars (
191 id integer primary key autoincrement,
192 starred_by_did text not null,
193 repo_at text not null,
194 rkey text not null,
195 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
196 foreign key (repo_at) references repos(at_uri) on delete cascade,
197 unique(starred_by_did, repo_at)
198 );
199
200 create table if not exists emails (
201 id integer primary key autoincrement,
202 did text not null,
203 email text not null,
204 verified integer not null default 0,
205 verification_code text not null,
206 last_sent text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
207 is_primary integer not null default 0,
208 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
209 unique(did, email)
210 );
211
212 create table if not exists artifacts (
213 -- id
214 id integer primary key autoincrement,
215 did text not null,
216 rkey text not null,
217
218 -- meta
219 repo_at text not null,
220 tag binary(20) not null,
221 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
222
223 -- data
224 blob_cid text not null,
225 name text not null,
226 size integer not null default 0,
227 mimetype string not null default "*/*",
228
229 -- constraints
230 unique(did, rkey), -- record must be unique
231 unique(repo_at, tag, name), -- for a given tag object, each file must be unique
232 foreign key (repo_at) references repos(at_uri) on delete cascade
233 );
234
235 create table if not exists profile (
236 -- id
237 id integer primary key autoincrement,
238 did text not null,
239
240 -- data
241 description text not null,
242 include_bluesky integer not null default 0,
243 location text,
244
245 -- constraints
246 unique(did)
247 );
248 create table if not exists profile_links (
249 -- id
250 id integer primary key autoincrement,
251 did text not null,
252
253 -- data
254 link text not null,
255
256 -- constraints
257 foreign key (did) references profile(did) on delete cascade
258 );
259 create table if not exists profile_stats (
260 -- id
261 id integer primary key autoincrement,
262 did text not null,
263
264 -- data
265 kind text not null check (kind in (
266 "merged-pull-request-count",
267 "closed-pull-request-count",
268 "open-pull-request-count",
269 "open-issue-count",
270 "closed-issue-count",
271 "repository-count"
272 )),
273
274 -- constraints
275 foreign key (did) references profile(did) on delete cascade
276 );
277 create table if not exists profile_pinned_repositories (
278 -- id
279 id integer primary key autoincrement,
280 did text not null,
281
282 -- data
283 at_uri text not null,
284
285 -- constraints
286 unique(did, at_uri),
287 foreign key (did) references profile(did) on delete cascade,
288 foreign key (at_uri) references repos(at_uri) on delete cascade
289 );
290
291 create table if not exists oauth_requests (
292 id integer primary key autoincrement,
293 auth_server_iss text not null,
294 state text not null,
295 did text not null,
296 handle text not null,
297 pds_url text not null,
298 pkce_verifier text not null,
299 dpop_auth_server_nonce text not null,
300 dpop_private_jwk text not null
301 );
302
303 create table if not exists oauth_sessions (
304 id integer primary key autoincrement,
305 did text not null,
306 handle text not null,
307 pds_url text not null,
308 auth_server_iss text not null,
309 access_jwt text not null,
310 refresh_jwt text not null,
311 dpop_pds_nonce text,
312 dpop_auth_server_nonce text not null,
313 dpop_private_jwk text not null,
314 expiry text not null
315 );
316
317 create table if not exists punchcard (
318 did text not null,
319 date text not null, -- yyyy-mm-dd
320 count integer,
321 primary key (did, date)
322 );
323
324 create table if not exists spindles (
325 id integer primary key autoincrement,
326 owner text not null,
327 instance text not null,
328 verified text, -- time of verification
329 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
330
331 unique(instance)
332 );
333
334 create table if not exists pipelines (
335 -- identifiers
336 id integer primary key autoincrement,
337 knot text not null,
338 rkey text not null,
339
340 repo_owner text not null,
341 repo_name text not null,
342
343 -- every pipeline must be associated with exactly one commit
344 sha text not null check (length(sha) = 40),
345
346 -- trigger data
347 trigger_id integer not null,
348
349 unique(knot, rkey),
350 foreign key (trigger_id) references triggers(id) on delete cascade
351 );
352
353 create table if not exists triggers (
354 -- primary key
355 id integer primary key autoincrement,
356
357 -- top-level fields
358 kind text not null,
359
360 -- pushTriggerData fields
361 push_ref text,
362 push_new_sha text check (length(push_new_sha) = 40),
363 push_old_sha text check (length(push_old_sha) = 40),
364
365 -- pullRequestTriggerData fields
366 pr_source_branch text,
367 pr_target_branch text,
368 pr_source_sha text check (length(pr_source_sha) = 40),
369 pr_action text
370 );
371
372 create table if not exists pipeline_statuses (
373 -- identifiers
374 id integer primary key autoincrement,
375 spindle text not null,
376 rkey text not null,
377
378 -- referenced pipeline. these form the (did, rkey) pair
379 pipeline_knot text not null,
380 pipeline_rkey text not null,
381
382 -- content
383 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
384 workflow text not null,
385 status text not null,
386 error text,
387 exit_code integer not null default 0,
388
389 unique (spindle, rkey),
390 foreign key (pipeline_knot, pipeline_rkey)
391 references pipelines (knot, rkey)
392 on delete cascade
393 );
394
395 create table if not exists migrations (
396 id integer primary key autoincrement,
397 name text unique
398 );
399 `)
400 if err != nil {
401 return nil, err
402 }
403
404 // run migrations
405 runMigration(db, "add-description-to-repos", func(tx *sql.Tx) error {
406 tx.Exec(`
407 alter table repos add column description text check (length(description) <= 200);
408 `)
409 return nil
410 })
411
412 runMigration(db, "add-rkey-to-pubkeys", func(tx *sql.Tx) error {
413 // add unconstrained column
414 _, err := tx.Exec(`
415 alter table public_keys
416 add column rkey text;
417 `)
418 if err != nil {
419 return err
420 }
421
422 // backfill
423 _, err = tx.Exec(`
424 update public_keys
425 set rkey = ''
426 where rkey is null;
427 `)
428 if err != nil {
429 return err
430 }
431
432 return nil
433 })
434
435 runMigration(db, "add-rkey-to-comments", func(tx *sql.Tx) error {
436 _, err := tx.Exec(`
437 alter table comments drop column comment_at;
438 alter table comments add column rkey text;
439 `)
440 return err
441 })
442
443 runMigration(db, "add-deleted-and-edited-to-issue-comments", func(tx *sql.Tx) error {
444 _, err := tx.Exec(`
445 alter table comments add column deleted text; -- timestamp
446 alter table comments add column edited text; -- timestamp
447 `)
448 return err
449 })
450
451 runMigration(db, "add-source-info-to-pulls-and-submissions", func(tx *sql.Tx) error {
452 _, err := tx.Exec(`
453 alter table pulls add column source_branch text;
454 alter table pulls add column source_repo_at text;
455 alter table pull_submissions add column source_rev text;
456 `)
457 return err
458 })
459
460 runMigration(db, "add-source-to-repos", func(tx *sql.Tx) error {
461 _, err := tx.Exec(`
462 alter table repos add column source text;
463 `)
464 return err
465 })
466
467 // disable foreign-keys for the next migration
468 // NOTE: this cannot be done in a transaction, so it is run outside [0]
469 //
470 // [0]: https://sqlite.org/pragma.html#pragma_foreign_keys
471 db.Exec("pragma foreign_keys = off;")
472 runMigration(db, "recreate-pulls-column-for-stacking-support", func(tx *sql.Tx) error {
473 _, err := tx.Exec(`
474 create table pulls_new (
475 -- identifiers
476 id integer primary key autoincrement,
477 pull_id integer not null,
478
479 -- at identifiers
480 repo_at text not null,
481 owner_did text not null,
482 rkey text not null,
483
484 -- content
485 title text not null,
486 body text not null,
487 target_branch text not null,
488 state integer not null default 0 check (state in (0, 1, 2, 3)), -- closed, open, merged, deleted
489
490 -- source info
491 source_branch text,
492 source_repo_at text,
493
494 -- stacking
495 stack_id text,
496 change_id text,
497 parent_change_id text,
498
499 -- meta
500 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
501
502 -- constraints
503 unique(repo_at, pull_id),
504 foreign key (repo_at) references repos(at_uri) on delete cascade
505 );
506
507 insert into pulls_new (
508 id, pull_id,
509 repo_at, owner_did, rkey,
510 title, body, target_branch, state,
511 source_branch, source_repo_at,
512 created
513 )
514 select
515 id, pull_id,
516 repo_at, owner_did, rkey,
517 title, body, target_branch, state,
518 source_branch, source_repo_at,
519 created
520 FROM pulls;
521
522 drop table pulls;
523 alter table pulls_new rename to pulls;
524 `)
525 return err
526 })
527 db.Exec("pragma foreign_keys = on;")
528
529 // run migrations
530 runMigration(db, "add-spindle-to-repos", func(tx *sql.Tx) error {
531 tx.Exec(`
532 alter table repos add column spindle text;
533 `)
534 return nil
535 })
536
537 return &DB{db}, nil
538}
539
540type migrationFn = func(*sql.Tx) error
541
542func runMigration(d *sql.DB, name string, migrationFn migrationFn) error {
543 tx, err := d.Begin()
544 if err != nil {
545 return err
546 }
547 defer tx.Rollback()
548
549 var exists bool
550 err = tx.QueryRow("select exists (select 1 from migrations where name = ?)", name).Scan(&exists)
551 if err != nil {
552 return err
553 }
554
555 if !exists {
556 // run migration
557 err = migrationFn(tx)
558 if err != nil {
559 log.Printf("Failed to run migration %s: %v", name, err)
560 return err
561 }
562
563 // mark migration as complete
564 _, err = tx.Exec("insert into migrations (name) values (?)", name)
565 if err != nil {
566 log.Printf("Failed to mark migration %s as complete: %v", name, err)
567 return err
568 }
569
570 // commit the transaction
571 if err := tx.Commit(); err != nil {
572 return err
573 }
574
575 log.Printf("migration %s applied successfully", name)
576 } else {
577 log.Printf("skipped migration %s, already applied", name)
578 }
579
580 return nil
581}
582
583type filter struct {
584 key string
585 arg any
586 cmp string
587}
588
589func newFilter(key, cmp string, arg any) filter {
590 return filter{
591 key: key,
592 arg: arg,
593 cmp: cmp,
594 }
595}
596
597func FilterEq(key string, arg any) filter { return newFilter(key, "=", arg) }
598func FilterNotEq(key string, arg any) filter { return newFilter(key, "<>", arg) }
599func FilterGte(key string, arg any) filter { return newFilter(key, ">=", arg) }
600func FilterLte(key string, arg any) filter { return newFilter(key, "<=", arg) }
601func FilterIs(key string, arg any) filter { return newFilter(key, "is", arg) }
602func FilterIsNot(key string, arg any) filter { return newFilter(key, "is not", arg) }
603
604func (f filter) Condition() string {
605 return fmt.Sprintf("%s %s ?", f.key, f.cmp)
606}