this repo has no description
1package db
2
3import (
4 "context"
5 "database/sql"
6 "strings"
7
8 _ "github.com/mattn/go-sqlite3"
9 "tangled.org/core/log"
10 "tangled.org/core/orm"
11)
12
13type DB struct {
14 *sql.DB
15}
16
17func Make(ctx context.Context, dbPath string) (*DB, error) {
18 // https://github.com/mattn/go-sqlite3#connection-string
19 opts := []string{
20 "_foreign_keys=1",
21 "_journal_mode=WAL",
22 "_synchronous=NORMAL",
23 "_auto_vacuum=incremental",
24 }
25
26 logger := log.FromContext(ctx)
27 logger = log.SubLogger(logger, "db")
28
29 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
30 if err != nil {
31 return nil, err
32 }
33
34 conn, err := db.Conn(ctx)
35 if err != nil {
36 return nil, err
37 }
38 defer conn.Close()
39
40 _, err = db.Exec(`
41 create table if not exists _jetstream (
42 id integer primary key autoincrement,
43 last_time_us integer not null
44 );
45
46 create table if not exists known_dids (
47 did text primary key
48 );
49
50 create table if not exists repos (
51 id integer primary key autoincrement,
52 knot text not null,
53 owner text not null,
54 name text not null,
55 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
56
57 unique(owner, name)
58 );
59
60 create table if not exists spindle_members (
61 -- identifiers for the record
62 id integer primary key autoincrement,
63 did text not null,
64 rkey text not null,
65
66 -- data
67 instance text not null,
68 subject text not null,
69 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
70
71 -- constraints
72 unique (did, instance, subject)
73 );
74
75 -- status event for a single workflow
76 create table if not exists events (
77 rkey text not null,
78 nsid text not null,
79 event text not null, -- json
80 created integer not null -- unix nanos
81 );
82 `)
83 if err != nil {
84 return nil, err
85 }
86
87 // run migrations
88
89 // NOTE: this won't migrate existing records
90 // they will be fetched again with tap instead
91 orm.RunMigration(conn, logger, "add-rkey-to-repos", func(tx *sql.Tx) error {
92 // archive legacy repos (just in case)
93 _, err = tx.Exec(`alter table repos rename to repos_old`)
94 if err != nil {
95 return err
96 }
97
98 _, err := tx.Exec(`
99 create table repos (
100 -- identifiers
101 id integer primary key autoincrement,
102 did text not null,
103 rkey text not null,
104 at_uri text generated always as ('at://' || did || '/' || 'sh.tangled.repo' || '/' || rkey) stored,
105
106 name text not null,
107 knot text not null,
108
109 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
110 unique(did, rkey)
111 );
112 `)
113 if err != nil {
114 return err
115 }
116
117 return nil
118 })
119
120 return &DB{db}, nil
121}
122
123func (d *DB) SaveLastTimeUs(lastTimeUs int64) error {
124 _, err := d.Exec(`
125 insert into _jetstream (id, last_time_us)
126 values (1, ?)
127 on conflict(id) do update set last_time_us = excluded.last_time_us
128 `, lastTimeUs)
129 return err
130}
131
132func (d *DB) GetLastTimeUs() (int64, error) {
133 var lastTimeUs int64
134 row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`)
135 err := row.Scan(&lastTimeUs)
136 return lastTimeUs, err
137}