Monorepo for Tangled

appview/db: fix orphaned pipelines and triggers on repo deletion

pipelines and triggers were not cleaned up when a repository was
deleted. add repo_at foreign key with on delete cascade to both tables
so the database handles cleanup automatically.
pipeline_statuses also cascade through pipelines, ensuring the full
chain is removed. replaces repo_owner/repo_name lookups with repo_at

Signed-off-by: moshyfawn <email@moshyfawn.dev>

+150 -29
+116 -2
appview/db/db.go
··· 377 377 378 378 repo_owner text not null, 379 379 repo_name text not null, 380 + repo_at text not null, 380 381 381 382 -- every pipeline must be associated with exactly one commit 382 383 sha text not null check (length(sha) = 40), ··· 386 387 trigger_id integer not null, 387 388 388 389 unique(knot, rkey), 389 - foreign key (trigger_id) references triggers(id) on delete cascade 390 + foreign key (trigger_id) references triggers(id) on delete cascade, 391 + foreign key (repo_at) references repos(at_uri) on delete cascade 390 392 ); 391 393 392 394 create table if not exists triggers ( ··· 395 397 396 398 -- top-level fields 397 399 kind text not null, 400 + repo_at text not null, 398 401 399 402 -- pushTriggerData fields 400 403 push_ref text, ··· 405 408 pr_source_branch text, 406 409 pr_target_branch text, 407 410 pr_source_sha text check (length(pr_source_sha) = 40), 408 - pr_action text 411 + pr_action text, 412 + 413 + foreign key (repo_at) references repos(at_uri) on delete cascade 409 414 ); 410 415 411 416 create table if not exists pipeline_statuses ( ··· 1204 1209 `) 1205 1210 return err 1206 1211 }) 1212 + 1213 + conn.ExecContext(ctx, "pragma foreign_keys = off;") 1214 + orm.RunMigration(conn, logger, "add-repo-at-fk-to-pipelines", func(tx *sql.Tx) error { 1215 + _, err := tx.Exec(` 1216 + create table if not exists pipelines_new ( 1217 + id integer primary key autoincrement, 1218 + knot text not null, 1219 + rkey text not null, 1220 + 1221 + repo_owner text not null, 1222 + repo_name text not null, 1223 + repo_at text not null, 1224 + 1225 + sha text not null check (length(sha) = 40), 1226 + created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 1227 + 1228 + trigger_id integer not null, 1229 + 1230 + unique(knot, rkey), 1231 + foreign key (trigger_id) references triggers(id) on delete cascade, 1232 + foreign key (repo_at) references repos(at_uri) on delete cascade 1233 + ); 1234 + `) 1235 + if err != nil { 1236 + return err 1237 + } 1238 + 1239 + _, err = tx.Exec(` 1240 + -- backfill repo_at from repos; orphaned rows are dropped 1241 + insert into pipelines_new (id, knot, rkey, repo_owner, repo_name, repo_at, sha, created, trigger_id) 1242 + select 1243 + p.id, 1244 + p.knot, 1245 + p.rkey, 1246 + p.repo_owner, 1247 + p.repo_name, 1248 + r.at_uri, 1249 + p.sha, 1250 + p.created, 1251 + p.trigger_id 1252 + from pipelines p 1253 + join repos r on p.repo_owner = r.did and p.repo_name = r.name; 1254 + `) 1255 + if err != nil { 1256 + return err 1257 + } 1258 + 1259 + _, err = tx.Exec(`drop table pipelines`) 1260 + if err != nil { 1261 + return err 1262 + } 1263 + 1264 + _, err = tx.Exec(`alter table pipelines_new rename to pipelines`) 1265 + return err 1266 + }) 1267 + 1268 + orm.RunMigration(conn, logger, "add-repo-at-fk-to-triggers", func(tx *sql.Tx) error { 1269 + _, err := tx.Exec(` 1270 + create table if not exists triggers_new ( 1271 + id integer primary key autoincrement, 1272 + kind text not null, 1273 + repo_at text not null, 1274 + 1275 + push_ref text, 1276 + push_new_sha text check (length(push_new_sha) = 40), 1277 + push_old_sha text check (length(push_old_sha) = 40), 1278 + 1279 + pr_source_branch text, 1280 + pr_target_branch text, 1281 + pr_source_sha text check (length(pr_source_sha) = 40), 1282 + pr_action text, 1283 + 1284 + foreign key (repo_at) references repos(at_uri) on delete cascade 1285 + ); 1286 + `) 1287 + if err != nil { 1288 + return err 1289 + } 1290 + 1291 + _, err = tx.Exec(` 1292 + -- backfill repo_at from pipelines; orphaned rows are dropped 1293 + insert into triggers_new (id, kind, repo_at, push_ref, push_new_sha, push_old_sha, pr_source_branch, pr_target_branch, pr_source_sha, pr_action) 1294 + select 1295 + t.id, 1296 + t.kind, 1297 + p.repo_at, 1298 + t.push_ref, 1299 + t.push_new_sha, 1300 + t.push_old_sha, 1301 + t.pr_source_branch, 1302 + t.pr_target_branch, 1303 + t.pr_source_sha, 1304 + t.pr_action 1305 + from triggers t 1306 + join pipelines p on t.id = p.trigger_id; 1307 + `) 1308 + if err != nil { 1309 + return err 1310 + } 1311 + 1312 + _, err = tx.Exec(`drop table triggers`) 1313 + if err != nil { 1314 + return err 1315 + } 1316 + 1317 + _, err = tx.Exec(`alter table triggers_new rename to triggers`) 1318 + return err 1319 + }) 1320 + conn.ExecContext(ctx, "pragma foreign_keys = on;") 1207 1321 1208 1322 return &DB{ 1209 1323 db,
+8 -1
appview/db/pipeline.go
··· 26 26 whereClause = " where " + strings.Join(conditions, " and ") 27 27 } 28 28 29 - query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) 29 + query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, repo_at, sha, created from pipelines %s`, whereClause) 30 30 31 31 rows, err := e.Query(query, args...) 32 32 ··· 44 44 &pipeline.Knot, 45 45 &pipeline.RepoOwner, 46 46 &pipeline.RepoName, 47 + &pipeline.RepoAt, 47 48 &pipeline.Sha, 48 49 &createdAt, 49 50 ) ··· 71 72 pipeline.Knot, 72 73 pipeline.RepoOwner, 73 74 pipeline.RepoName, 75 + pipeline.RepoAt, 74 76 pipeline.TriggerId, 75 77 pipeline.Sha, 76 78 } ··· 86 88 knot, 87 89 repo_owner, 88 90 repo_name, 91 + repo_at, 89 92 trigger_id, 90 93 sha 91 94 ) values (%s) ··· 99 102 func AddTrigger(e Execer, trigger models.Trigger) (int64, error) { 100 103 args := []any{ 101 104 trigger.Kind, 105 + trigger.RepoAt, 102 106 trigger.PushRef, 103 107 trigger.PushNewSha, 104 108 trigger.PushOldSha, ··· 115 119 116 120 query := fmt.Sprintf(`insert or ignore into triggers ( 117 121 kind, 122 + repo_at, 118 123 push_ref, 119 124 push_new_sha, 120 125 push_old_sha, ··· 193 198 p.rkey, 194 199 p.repo_owner, 195 200 p.repo_name, 201 + p.repo_at, 196 202 p.sha, 197 203 p.created, 198 204 t.id, ··· 231 237 &p.Rkey, 232 238 &p.RepoOwner, 233 239 &p.RepoName, 240 + &p.RepoAt, 234 241 &p.Sha, 235 242 &created, 236 243 &p.TriggerId,
+4 -2
appview/models/pipeline.go
··· 19 19 Knot string 20 20 RepoOwner syntax.DID 21 21 RepoName string 22 + RepoAt syntax.ATURI 22 23 TriggerId int 23 24 Sha string 24 25 Created time.Time ··· 127 128 } 128 129 129 130 type Trigger struct { 130 - Id int 131 - Kind workflow.TriggerKind 131 + Id int 132 + Kind workflow.TriggerKind 133 + RepoAt syntax.ATURI 132 134 133 135 // push trigger fields 134 136 PushRef *string
+4 -8
appview/pipelines/pipelines.go
··· 88 88 89 89 filterKind := r.URL.Query().Get("trigger") 90 90 filters := []orm.Filter{ 91 - orm.FilterEq("p.repo_owner", f.Did), 92 - orm.FilterEq("p.repo_name", f.Name), 91 + orm.FilterEq("p.repo_at", f.RepoAt()), 93 92 orm.FilterEq("p.knot", f.Knot), 94 93 } 95 94 switch filterKind { ··· 152 151 ps, err := db.GetPipelineStatuses( 153 152 p.db, 154 153 1, 155 - orm.FilterEq("p.repo_owner", f.Did), 156 - orm.FilterEq("p.repo_name", f.Name), 154 + orm.FilterEq("p.repo_at", f.RepoAt()), 157 155 orm.FilterEq("p.knot", f.Knot), 158 156 orm.FilterEq("p.id", pipelineId), 159 157 ) ··· 219 217 ps, err := db.GetPipelineStatuses( 220 218 p.db, 221 219 1, 222 - orm.FilterEq("p.repo_owner", f.Did), 223 - orm.FilterEq("p.repo_name", f.Name), 220 + orm.FilterEq("p.repo_at", f.RepoAt()), 224 221 orm.FilterEq("p.knot", f.Knot), 225 222 orm.FilterEq("p.id", pipelineId), 226 223 ) ··· 368 365 ps, err := db.GetPipelineStatuses( 369 366 p.db, 370 367 1, 371 - orm.FilterEq("p.repo_owner", f.Did), 372 - orm.FilterEq("p.repo_name", f.Name), 368 + orm.FilterEq("p.repo_at", f.RepoAt()), 373 369 orm.FilterEq("p.knot", f.Knot), 374 370 orm.FilterEq("p.id", pipelineId), 375 371 )
+2 -4
appview/pulls/pulls.go
··· 214 214 ps, err := db.GetPipelineStatuses( 215 215 s.db, 216 216 len(shas), 217 - orm.FilterEq("p.repo_owner", f.Did), 218 - orm.FilterEq("p.repo_name", f.Name), 217 + orm.FilterEq("p.repo_at", f.RepoAt()), 219 218 orm.FilterEq("p.knot", f.Knot), 220 219 orm.FilterIn("p.sha", shas), 221 220 ) ··· 636 635 ps, err := db.GetPipelineStatuses( 637 636 s.db, 638 637 len(shas), 639 - orm.FilterEq("p.repo_owner", f.Did), 640 - orm.FilterEq("p.repo_name", f.Name), 638 + orm.FilterEq("p.repo_at", f.RepoAt()), 641 639 orm.FilterEq("p.knot", f.Knot), 642 640 orm.FilterIn("p.sha", shas), 643 641 )
+1 -2
appview/repo/repo_util.go
··· 103 103 ps, err := db.GetPipelineStatuses( 104 104 d, 105 105 len(shas), 106 - orm.FilterEq("p.repo_owner", repo.Did), 107 - orm.FilterEq("p.repo_name", repo.Name), 106 + orm.FilterEq("p.repo_at", repo.RepoAt()), 108 107 orm.FilterEq("p.knot", repo.Knot), 109 108 orm.FilterIn("p.sha", shas), 110 109 )
+2
appview/state/knotstream.go
··· 246 246 return fmt.Errorf("failed to start txn: %w", err) 247 247 } 248 248 249 + trigger.RepoAt = repos[0].RepoAt() 249 250 triggerId, err := db.AddTrigger(tx, trigger) 250 251 if err != nil { 251 252 return fmt.Errorf("failed to add trigger entry: %w", err) ··· 256 257 Knot: source.Key(), 257 258 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 258 259 RepoName: record.TriggerMetadata.Repo.Repo, 260 + RepoAt: repos[0].RepoAt(), 259 261 TriggerId: int(triggerId), 260 262 Sha: sha, 261 263 }
+13 -10
cmd/populatepipelines/populate_pipelines.go
··· 68 68 log.Fatalf("Invalid repo format: %s (expected: did:plc:xyz/reponame)", *repo) 69 69 } 70 70 71 + // Construct the AT URI for repo_at lookups 72 + repoAt := fmt.Sprintf("at://%s/sh.tangled.repo/%s", did, repoName) 73 + 71 74 db, err := sql.Open("sqlite3", *dbPath) 72 75 if err != nil { 73 76 log.Fatalf("Failed to open database: %v", err) ··· 87 90 88 91 var triggerId int64 89 92 if isPush { 90 - triggerId, err = createPushTrigger(db, branches) 93 + triggerId, err = createPushTrigger(db, repoAt, branches) 91 94 } else { 92 - triggerId, err = createPRTrigger(db, branches) 95 + triggerId, err = createPRTrigger(db, repoAt, branches) 93 96 } 94 97 if err != nil { 95 98 log.Fatalf("Failed to create trigger: %v", err) ··· 147 150 return "", "", false 148 151 } 149 152 150 - func createPushTrigger(db *sql.DB, branches []string) (int64, error) { 153 + func createPushTrigger(db *sql.DB, repoAt string, branches []string) (int64, error) { 151 154 branch := branches[rand.Intn(len(branches))] 152 155 oldSha := generateRandomSha() 153 156 newSha := generateRandomSha() 154 157 155 158 result, err := db.Exec(` 156 - INSERT INTO triggers (kind, push_ref, push_new_sha, push_old_sha) 157 - VALUES (?, ?, ?, ?) 158 - `, "push", "refs/heads/"+branch, newSha, oldSha) 159 + INSERT INTO triggers (kind, repo_at, push_ref, push_new_sha, push_old_sha) 160 + VALUES (?, ?, ?, ?, ?) 161 + `, "push", repoAt, "refs/heads/"+branch, newSha, oldSha) 159 162 160 163 if err != nil { 161 164 return 0, err ··· 164 167 return result.LastInsertId() 165 168 } 166 169 167 - func createPRTrigger(db *sql.DB, branches []string) (int64, error) { 170 + func createPRTrigger(db *sql.DB, repoAt string, branches []string) (int64, error) { 168 171 targetBranch := branches[0] // Usually main 169 172 sourceBranch := branches[rand.Intn(len(branches)-1)+1] 170 173 sourceSha := generateRandomSha() ··· 172 175 action := actions[rand.Intn(len(actions))] 173 176 174 177 result, err := db.Exec(` 175 - INSERT INTO triggers (kind, pr_source_branch, pr_target_branch, pr_source_sha, pr_action) 176 - VALUES (?, ?, ?, ?, ?) 177 - `, "pull_request", sourceBranch, targetBranch, sourceSha, action) 178 + INSERT INTO triggers (kind, repo_at, pr_source_branch, pr_target_branch, pr_source_sha, pr_action) 179 + VALUES (?, ?, ?, ?, ?, ?) 180 + `, "pull_request", repoAt, sourceBranch, targetBranch, sourceSha, action) 178 181 179 182 if err != nil { 180 183 return 0, err