this repo has no description
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "strings" 7 8 _ "github.com/mattn/go-sqlite3" 9 "tangled.org/core/log" 10 "tangled.org/core/orm" 11) 12 13type DB struct { 14 *sql.DB 15} 16 17func Make(ctx context.Context, dbPath string) (*DB, error) { 18 // https://github.com/mattn/go-sqlite3#connection-string 19 opts := []string{ 20 "_foreign_keys=1", 21 "_journal_mode=WAL", 22 "_synchronous=NORMAL", 23 "_auto_vacuum=incremental", 24 } 25 26 logger := log.FromContext(ctx) 27 logger = log.SubLogger(logger, "db") 28 29 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 30 if err != nil { 31 return nil, err 32 } 33 34 conn, err := db.Conn(ctx) 35 if err != nil { 36 return nil, err 37 } 38 defer conn.Close() 39 40 _, err = db.Exec(` 41 create table if not exists _jetstream ( 42 id integer primary key autoincrement, 43 last_time_us integer not null 44 ); 45 46 create table if not exists known_dids ( 47 did text primary key 48 ); 49 50 create table if not exists repos ( 51 id integer primary key autoincrement, 52 knot text not null, 53 owner text not null, 54 name text not null, 55 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 56 57 unique(owner, name) 58 ); 59 60 create table if not exists spindle_members ( 61 -- identifiers for the record 62 id integer primary key autoincrement, 63 did text not null, 64 rkey text not null, 65 66 -- data 67 instance text not null, 68 subject text not null, 69 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 70 71 -- constraints 72 unique (did, instance, subject) 73 ); 74 75 -- status event for a single workflow 76 create table if not exists events ( 77 rkey text not null, 78 nsid text not null, 79 event text not null, -- json 80 created integer not null -- unix nanos 81 ); 82 `) 83 if err != nil { 84 return nil, err 85 } 86 87 // run migrations 88 89 // NOTE: this won't migrate existing records 90 // they will be fetched again with tap instead 91 orm.RunMigration(conn, logger, "add-rkey-to-repos", func(tx *sql.Tx) error { 92 // archive legacy repos (just in case) 93 _, err = tx.Exec(`alter table repos rename to repos_old`) 94 if err != nil { 95 return err 96 } 97 98 _, err := tx.Exec(` 99 create table repos ( 100 -- identifiers 101 id integer primary key autoincrement, 102 did text not null, 103 rkey text not null, 104 at_uri text generated always as ('at://' || did || '/' || 'sh.tangled.repo' || '/' || rkey) stored, 105 106 name text not null, 107 knot text not null, 108 109 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 110 unique(did, rkey) 111 ); 112 `) 113 if err != nil { 114 return err 115 } 116 117 return nil 118 }) 119 120 return &DB{db}, nil 121} 122 123func (d *DB) SaveLastTimeUs(lastTimeUs int64) error { 124 _, err := d.Exec(` 125 insert into _jetstream (id, last_time_us) 126 values (1, ?) 127 on conflict(id) do update set last_time_us = excluded.last_time_us 128 `, lastTimeUs) 129 return err 130} 131 132func (d *DB) GetLastTimeUs() (int64, error) { 133 var lastTimeUs int64 134 row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`) 135 err := row.Scan(&lastTimeUs) 136 return lastTimeUs, err 137}