Monorepo for Tangled
1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log/slog"
8 "os"
9 "strings"
10
11 securejoin "github.com/cyphar/filepath-securejoin"
12 _ "github.com/mattn/go-sqlite3"
13 "tangled.org/core/log"
14)
15
16type DB struct {
17 db *sql.DB
18 logger *slog.Logger
19}
20
21func Setup(ctx context.Context, dbPath string) (*DB, error) {
22 // https://github.com/mattn/go-sqlite3#connection-string
23 opts := []string{
24 "_foreign_keys=1",
25 "_journal_mode=WAL",
26 "_synchronous=NORMAL",
27 "_auto_vacuum=incremental",
28 }
29
30 logger := log.FromContext(ctx)
31 logger = log.SubLogger(logger, "db")
32
33 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
34 if err != nil {
35 return nil, err
36 }
37
38 conn, err := db.Conn(ctx)
39 if err != nil {
40 return nil, err
41 }
42 defer conn.Close()
43
44 _, err = conn.ExecContext(ctx, `
45 create table if not exists known_dids (
46 did text primary key
47 );
48
49 create table if not exists public_keys (
50 id integer primary key autoincrement,
51 did text not null,
52 key text not null,
53 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
54 unique(did, key),
55 foreign key (did) references known_dids(did) on delete cascade
56 );
57
58 create table if not exists _jetstream (
59 id integer primary key autoincrement,
60 last_time_us integer not null
61 );
62
63 create table if not exists events (
64 rkey text not null,
65 nsid text not null,
66 event text not null, -- json
67 created integer not null default (strftime('%s', 'now')),
68 primary key (rkey, nsid)
69 );
70
71 create table if not exists repo_keys (
72 repo_did text primary key,
73 signing_key blob not null,
74 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
75 );
76
77 create table if not exists repo_at_history (
78 old_repo_at text primary key,
79 repo_did text not null,
80 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
81 );
82
83 create table if not exists migrations (
84 id integer primary key autoincrement,
85 name text unique
86 );
87 `)
88 if err != nil {
89 return nil, err
90 }
91
92 migrationCheck := func(name string) bool {
93 var count int
94 conn.QueryRowContext(ctx, `SELECT count(1) FROM migrations WHERE name = ?`, name).Scan(&count)
95 return count > 0
96 }
97
98 runMigration := func(name string, fn func() error) error {
99 if migrationCheck(name) {
100 return nil
101 }
102 if err := fn(); err != nil {
103 return fmt.Errorf("migration %q failed: %w", name, err)
104 }
105 _, err := conn.ExecContext(ctx, `INSERT INTO migrations (name) VALUES (?)`, name)
106 if err != nil {
107 return fmt.Errorf("recording migration %q: %w", name, err)
108 }
109 return nil
110 }
111
112 if err := runMigration("add-owner-did-to-repo-keys", func() error {
113 _, mErr := conn.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN owner_did TEXT`)
114 return mErr
115 }); err != nil {
116 return nil, err
117 }
118
119 if err := runMigration("add-repo-name-to-repo-keys", func() error {
120 _, mErr := conn.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN repo_name TEXT`)
121 return mErr
122 }); err != nil {
123 return nil, err
124 }
125
126 if err := runMigration("add-unique-owner-repo-on-repo-keys", func() error {
127 _, mErr := conn.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_repo_keys_owner_repo ON repo_keys(owner_did, repo_name)`)
128 return mErr
129 }); err != nil {
130 return nil, err
131 }
132
133 if err := runMigration("add-key-type-and-nullable-signing-key", func() error {
134 tx, txErr := conn.BeginTx(ctx, nil)
135 if txErr != nil {
136 return txErr
137 }
138 defer tx.Rollback()
139
140 _, mErr := tx.ExecContext(ctx, `
141 create table repo_keys_new (
142 repo_did text primary key,
143 signing_key blob,
144 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
145 owner_did text,
146 repo_name text,
147 key_type text not null default 'k256'
148 );
149 insert into repo_keys_new
150 select repo_did, signing_key, created_at, owner_did, repo_name, 'k256'
151 from repo_keys;
152 drop table repo_keys;
153 alter table repo_keys_new rename to repo_keys;
154 create unique index if not exists idx_repo_keys_owner_repo
155 on repo_keys(owner_did, repo_name);
156 `)
157 if mErr != nil {
158 return mErr
159 }
160 return tx.Commit()
161 }); err != nil {
162 return nil, err
163 }
164
165 if err := runMigration("add-fk-repo-at-history", func() error {
166 tx, txErr := conn.BeginTx(ctx, nil)
167 if txErr != nil {
168 return txErr
169 }
170 defer tx.Rollback()
171
172 _, mErr := tx.ExecContext(ctx, `
173 create table repo_at_history_new (
174 old_repo_at text primary key,
175 repo_did text not null,
176 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
177 foreign key (repo_did) references repo_keys(repo_did) on delete cascade
178 );
179 insert into repo_at_history_new
180 select old_repo_at, repo_did, created_at
181 from repo_at_history;
182 drop table repo_at_history;
183 alter table repo_at_history_new rename to repo_at_history;
184 `)
185 if mErr != nil {
186 return mErr
187 }
188 return tx.Commit()
189 }); err != nil {
190 return nil, err
191 }
192
193 return &DB{
194 db: db,
195 logger: logger,
196 }, nil
197}
198
199func (d *DB) StoreRepoKey(repoDid string, signingKey []byte, ownerDid, repoName string) error {
200 _, err := d.db.Exec(
201 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, ?, ?, ?, 'k256')`,
202 repoDid, signingKey, ownerDid, repoName,
203 )
204 return err
205}
206
207func (d *DB) StoreRepoDidWeb(repoDid, ownerDid, repoName string) error {
208 _, err := d.db.Exec(
209 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, NULL, ?, ?, 'web')`,
210 repoDid, ownerDid, repoName,
211 )
212 return err
213}
214
215func (d *DB) DeleteRepoKey(repoDid string) error {
216 _, err := d.db.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid)
217 return err
218}
219
220func (d *DB) RepoDidExists(repoDid string) (bool, error) {
221 var count int
222 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count)
223 return count > 0, err
224}
225
226func (d *DB) ResolveAtUri(atUri string) (string, error) {
227 var repoDid string
228 err := d.db.QueryRow(
229 `SELECT repo_did FROM repo_at_history WHERE old_repo_at = ?`,
230 atUri,
231 ).Scan(&repoDid)
232 return repoDid, err
233}
234
235func (d *DB) GetRepoDid(ownerDid, repoName string) (string, error) {
236 var repoDid string
237 err := d.db.QueryRow(
238 `SELECT repo_did FROM repo_keys WHERE owner_did = ? AND repo_name = ?`,
239 ownerDid, repoName,
240 ).Scan(&repoDid)
241 return repoDid, err
242}
243
244func (d *DB) GetRepoSigningKey(repoDid string) ([]byte, error) {
245 var signingKey []byte
246 err := d.db.QueryRow(
247 `SELECT signing_key FROM repo_keys WHERE repo_did = ? AND key_type = 'k256'`,
248 repoDid,
249 ).Scan(&signingKey)
250 if err != nil {
251 return nil, fmt.Errorf("retrieving signing key for %s: %w", repoDid, err)
252 }
253 if signingKey == nil {
254 return nil, fmt.Errorf("signing key for %s is null (did:web repo?)", repoDid)
255 }
256 return signingKey, nil
257}
258
259func (d *DB) GetRepoKeyOwner(repoDid string) (ownerDid string, repoName string, err error) {
260 var nullOwner, nullName sql.NullString
261 err = d.db.QueryRow(
262 `SELECT owner_did, repo_name FROM repo_keys WHERE repo_did = ?`,
263 repoDid,
264 ).Scan(&nullOwner, &nullName)
265 if err != nil {
266 return
267 }
268 if !nullOwner.Valid || !nullName.Valid || nullOwner.String == "" || nullName.String == "" {
269 err = fmt.Errorf("repo_keys row for %s has empty or null owner_did or repo_name", repoDid)
270 return
271 }
272 ownerDid = nullOwner.String
273 repoName = nullName.String
274 return
275}
276
277func (d *DB) ResolveRepoOnDisk(scanPath, ownerDid, repoName string) (repoPath string, repoDid string, err error) {
278 did, lookupErr := d.GetRepoDid(ownerDid, repoName)
279 if lookupErr == nil && did != "" {
280 didPath, joinErr := securejoin.SecureJoin(scanPath, did)
281 if joinErr != nil {
282 d.logger.Error("securejoin failed for repo DID path",
283 "repoDid", did, "scanPath", scanPath, "error", joinErr)
284 } else {
285 if _, statErr := os.Stat(didPath); statErr == nil {
286 return didPath, did, nil
287 }
288 d.logger.Warn("repo DID in database but directory missing on disk, falling back to legacy path",
289 "repoDid", did, "expectedPath", didPath, "owner", ownerDid, "repo", repoName)
290 }
291 }
292 relative, relErr := securejoin.SecureJoin(ownerDid, repoName)
293 if relErr != nil {
294 return "", "", fmt.Errorf("securejoin failed for legacy path %s/%s: %w", ownerDid, repoName, relErr)
295 }
296 fallback, fallbackErr := securejoin.SecureJoin(scanPath, relative)
297 if fallbackErr != nil {
298 return "", "", fmt.Errorf("securejoin failed for legacy path %s/%s: %w", scanPath, relative, fallbackErr)
299 }
300 if _, statErr := os.Stat(fallback); statErr != nil {
301 return "", "", fmt.Errorf("repo not found on disk: %s/%s", ownerDid, repoName)
302 }
303 return fallback, "", nil
304}
305
306func (d *DB) ResolveRepoDIDOnDisk(scanPath, repoDid string) (repoPath, ownerDid, repoName string, err error) {
307 ownerDid, repoName, err = d.GetRepoKeyOwner(repoDid)
308 if err != nil {
309 return
310 }
311
312 didPath, joinErr := securejoin.SecureJoin(scanPath, repoDid)
313 if joinErr != nil {
314 d.logger.Error("securejoin failed for repo DID path",
315 "repoDid", repoDid, "scanPath", scanPath, "error", joinErr)
316 } else {
317 if _, statErr := os.Stat(didPath); statErr == nil {
318 repoPath = didPath
319 return
320 }
321 d.logger.Warn("repo DID directory missing on disk, falling back to legacy path",
322 "repoDid", repoDid, "expectedPath", didPath, "owner", ownerDid, "repo", repoName)
323 }
324
325 relative, relErr := securejoin.SecureJoin(ownerDid, repoName)
326 if relErr != nil {
327 err = fmt.Errorf("securejoin failed for legacy path %s/%s: %w", ownerDid, repoName, relErr)
328 return
329 }
330 fallback, fallbackErr := securejoin.SecureJoin(scanPath, relative)
331 if fallbackErr != nil {
332 err = fmt.Errorf("securejoin failed for legacy path %s/%s: %w", scanPath, relative, fallbackErr)
333 return
334 }
335 if _, statErr := os.Stat(fallback); statErr != nil {
336 err = fmt.Errorf("repo not found on disk for DID %s: %s", repoDid, fallback)
337 return
338 }
339 repoPath = fallback
340 return
341}