Monorepo for Tangled
at push-pmqotzqwskqq 341 lines 9.8 kB view raw
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log/slog" 8 "os" 9 "strings" 10 11 securejoin "github.com/cyphar/filepath-securejoin" 12 _ "github.com/mattn/go-sqlite3" 13 "tangled.org/core/log" 14) 15 16type DB struct { 17 db *sql.DB 18 logger *slog.Logger 19} 20 21func Setup(ctx context.Context, dbPath string) (*DB, error) { 22 // https://github.com/mattn/go-sqlite3#connection-string 23 opts := []string{ 24 "_foreign_keys=1", 25 "_journal_mode=WAL", 26 "_synchronous=NORMAL", 27 "_auto_vacuum=incremental", 28 } 29 30 logger := log.FromContext(ctx) 31 logger = log.SubLogger(logger, "db") 32 33 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 34 if err != nil { 35 return nil, err 36 } 37 38 conn, err := db.Conn(ctx) 39 if err != nil { 40 return nil, err 41 } 42 defer conn.Close() 43 44 _, err = conn.ExecContext(ctx, ` 45 create table if not exists known_dids ( 46 did text primary key 47 ); 48 49 create table if not exists public_keys ( 50 id integer primary key autoincrement, 51 did text not null, 52 key text not null, 53 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 54 unique(did, key), 55 foreign key (did) references known_dids(did) on delete cascade 56 ); 57 58 create table if not exists _jetstream ( 59 id integer primary key autoincrement, 60 last_time_us integer not null 61 ); 62 63 create table if not exists events ( 64 rkey text not null, 65 nsid text not null, 66 event text not null, -- json 67 created integer not null default (strftime('%s', 'now')), 68 primary key (rkey, nsid) 69 ); 70 71 create table if not exists repo_keys ( 72 repo_did text primary key, 73 signing_key blob not null, 74 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 75 ); 76 77 create table if not exists repo_at_history ( 78 old_repo_at text primary key, 79 repo_did text not null, 80 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 81 ); 82 83 create table if not exists migrations ( 84 id integer primary key autoincrement, 85 name text unique 86 ); 87 `) 88 if err != nil { 89 return nil, err 90 } 91 92 migrationCheck := func(name string) bool { 93 var count int 94 conn.QueryRowContext(ctx, `SELECT count(1) FROM migrations WHERE name = ?`, name).Scan(&count) 95 return count > 0 96 } 97 98 runMigration := func(name string, fn func() error) error { 99 if migrationCheck(name) { 100 return nil 101 } 102 if err := fn(); err != nil { 103 return fmt.Errorf("migration %q failed: %w", name, err) 104 } 105 _, err := conn.ExecContext(ctx, `INSERT INTO migrations (name) VALUES (?)`, name) 106 if err != nil { 107 return fmt.Errorf("recording migration %q: %w", name, err) 108 } 109 return nil 110 } 111 112 if err := runMigration("add-owner-did-to-repo-keys", func() error { 113 _, mErr := conn.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN owner_did TEXT`) 114 return mErr 115 }); err != nil { 116 return nil, err 117 } 118 119 if err := runMigration("add-repo-name-to-repo-keys", func() error { 120 _, mErr := conn.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN repo_name TEXT`) 121 return mErr 122 }); err != nil { 123 return nil, err 124 } 125 126 if err := runMigration("add-unique-owner-repo-on-repo-keys", func() error { 127 _, mErr := conn.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_repo_keys_owner_repo ON repo_keys(owner_did, repo_name)`) 128 return mErr 129 }); err != nil { 130 return nil, err 131 } 132 133 if err := runMigration("add-key-type-and-nullable-signing-key", func() error { 134 tx, txErr := conn.BeginTx(ctx, nil) 135 if txErr != nil { 136 return txErr 137 } 138 defer tx.Rollback() 139 140 _, mErr := tx.ExecContext(ctx, ` 141 create table repo_keys_new ( 142 repo_did text primary key, 143 signing_key blob, 144 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 145 owner_did text, 146 repo_name text, 147 key_type text not null default 'k256' 148 ); 149 insert into repo_keys_new 150 select repo_did, signing_key, created_at, owner_did, repo_name, 'k256' 151 from repo_keys; 152 drop table repo_keys; 153 alter table repo_keys_new rename to repo_keys; 154 create unique index if not exists idx_repo_keys_owner_repo 155 on repo_keys(owner_did, repo_name); 156 `) 157 if mErr != nil { 158 return mErr 159 } 160 return tx.Commit() 161 }); err != nil { 162 return nil, err 163 } 164 165 if err := runMigration("add-fk-repo-at-history", func() error { 166 tx, txErr := conn.BeginTx(ctx, nil) 167 if txErr != nil { 168 return txErr 169 } 170 defer tx.Rollback() 171 172 _, mErr := tx.ExecContext(ctx, ` 173 create table repo_at_history_new ( 174 old_repo_at text primary key, 175 repo_did text not null, 176 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 177 foreign key (repo_did) references repo_keys(repo_did) on delete cascade 178 ); 179 insert into repo_at_history_new 180 select old_repo_at, repo_did, created_at 181 from repo_at_history; 182 drop table repo_at_history; 183 alter table repo_at_history_new rename to repo_at_history; 184 `) 185 if mErr != nil { 186 return mErr 187 } 188 return tx.Commit() 189 }); err != nil { 190 return nil, err 191 } 192 193 return &DB{ 194 db: db, 195 logger: logger, 196 }, nil 197} 198 199func (d *DB) StoreRepoKey(repoDid string, signingKey []byte, ownerDid, repoName string) error { 200 _, err := d.db.Exec( 201 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, ?, ?, ?, 'k256')`, 202 repoDid, signingKey, ownerDid, repoName, 203 ) 204 return err 205} 206 207func (d *DB) StoreRepoDidWeb(repoDid, ownerDid, repoName string) error { 208 _, err := d.db.Exec( 209 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, NULL, ?, ?, 'web')`, 210 repoDid, ownerDid, repoName, 211 ) 212 return err 213} 214 215func (d *DB) DeleteRepoKey(repoDid string) error { 216 _, err := d.db.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid) 217 return err 218} 219 220func (d *DB) RepoDidExists(repoDid string) (bool, error) { 221 var count int 222 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count) 223 return count > 0, err 224} 225 226func (d *DB) ResolveAtUri(atUri string) (string, error) { 227 var repoDid string 228 err := d.db.QueryRow( 229 `SELECT repo_did FROM repo_at_history WHERE old_repo_at = ?`, 230 atUri, 231 ).Scan(&repoDid) 232 return repoDid, err 233} 234 235func (d *DB) GetRepoDid(ownerDid, repoName string) (string, error) { 236 var repoDid string 237 err := d.db.QueryRow( 238 `SELECT repo_did FROM repo_keys WHERE owner_did = ? AND repo_name = ?`, 239 ownerDid, repoName, 240 ).Scan(&repoDid) 241 return repoDid, err 242} 243 244func (d *DB) GetRepoSigningKey(repoDid string) ([]byte, error) { 245 var signingKey []byte 246 err := d.db.QueryRow( 247 `SELECT signing_key FROM repo_keys WHERE repo_did = ? AND key_type = 'k256'`, 248 repoDid, 249 ).Scan(&signingKey) 250 if err != nil { 251 return nil, fmt.Errorf("retrieving signing key for %s: %w", repoDid, err) 252 } 253 if signingKey == nil { 254 return nil, fmt.Errorf("signing key for %s is null (did:web repo?)", repoDid) 255 } 256 return signingKey, nil 257} 258 259func (d *DB) GetRepoKeyOwner(repoDid string) (ownerDid string, repoName string, err error) { 260 var nullOwner, nullName sql.NullString 261 err = d.db.QueryRow( 262 `SELECT owner_did, repo_name FROM repo_keys WHERE repo_did = ?`, 263 repoDid, 264 ).Scan(&nullOwner, &nullName) 265 if err != nil { 266 return 267 } 268 if !nullOwner.Valid || !nullName.Valid || nullOwner.String == "" || nullName.String == "" { 269 err = fmt.Errorf("repo_keys row for %s has empty or null owner_did or repo_name", repoDid) 270 return 271 } 272 ownerDid = nullOwner.String 273 repoName = nullName.String 274 return 275} 276 277func (d *DB) ResolveRepoOnDisk(scanPath, ownerDid, repoName string) (repoPath string, repoDid string, err error) { 278 did, lookupErr := d.GetRepoDid(ownerDid, repoName) 279 if lookupErr == nil && did != "" { 280 didPath, joinErr := securejoin.SecureJoin(scanPath, did) 281 if joinErr != nil { 282 d.logger.Error("securejoin failed for repo DID path", 283 "repoDid", did, "scanPath", scanPath, "error", joinErr) 284 } else { 285 if _, statErr := os.Stat(didPath); statErr == nil { 286 return didPath, did, nil 287 } 288 d.logger.Warn("repo DID in database but directory missing on disk, falling back to legacy path", 289 "repoDid", did, "expectedPath", didPath, "owner", ownerDid, "repo", repoName) 290 } 291 } 292 relative, relErr := securejoin.SecureJoin(ownerDid, repoName) 293 if relErr != nil { 294 return "", "", fmt.Errorf("securejoin failed for legacy path %s/%s: %w", ownerDid, repoName, relErr) 295 } 296 fallback, fallbackErr := securejoin.SecureJoin(scanPath, relative) 297 if fallbackErr != nil { 298 return "", "", fmt.Errorf("securejoin failed for legacy path %s/%s: %w", scanPath, relative, fallbackErr) 299 } 300 if _, statErr := os.Stat(fallback); statErr != nil { 301 return "", "", fmt.Errorf("repo not found on disk: %s/%s", ownerDid, repoName) 302 } 303 return fallback, "", nil 304} 305 306func (d *DB) ResolveRepoDIDOnDisk(scanPath, repoDid string) (repoPath, ownerDid, repoName string, err error) { 307 ownerDid, repoName, err = d.GetRepoKeyOwner(repoDid) 308 if err != nil { 309 return 310 } 311 312 didPath, joinErr := securejoin.SecureJoin(scanPath, repoDid) 313 if joinErr != nil { 314 d.logger.Error("securejoin failed for repo DID path", 315 "repoDid", repoDid, "scanPath", scanPath, "error", joinErr) 316 } else { 317 if _, statErr := os.Stat(didPath); statErr == nil { 318 repoPath = didPath 319 return 320 } 321 d.logger.Warn("repo DID directory missing on disk, falling back to legacy path", 322 "repoDid", repoDid, "expectedPath", didPath, "owner", ownerDid, "repo", repoName) 323 } 324 325 relative, relErr := securejoin.SecureJoin(ownerDid, repoName) 326 if relErr != nil { 327 err = fmt.Errorf("securejoin failed for legacy path %s/%s: %w", ownerDid, repoName, relErr) 328 return 329 } 330 fallback, fallbackErr := securejoin.SecureJoin(scanPath, relative) 331 if fallbackErr != nil { 332 err = fmt.Errorf("securejoin failed for legacy path %s/%s: %w", scanPath, relative, fallbackErr) 333 return 334 } 335 if _, statErr := os.Stat(fallback); statErr != nil { 336 err = fmt.Errorf("repo not found on disk for DID %s: %s", repoDid, fallback) 337 return 338 } 339 repoPath = fallback 340 return 341}