Monorepo for Tangled
at master 567 lines 13 kB view raw
1package db 2 3import ( 4 "database/sql" 5 "fmt" 6 "maps" 7 "slices" 8 "sort" 9 "strings" 10 "time" 11 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview/models" 15 "tangled.org/core/appview/pagination" 16 "tangled.org/core/orm" 17) 18 19func PutIssue(tx *sql.Tx, issue *models.Issue) error { 20 var seqRepoDid *string 21 if issue.RepoDid != "" { 22 seqRepoDid = &issue.RepoDid 23 } 24 _, err := tx.Exec(` 25 insert into repo_issue_seqs (repo_at, next_issue_id, repo_did) 26 values (?, 1, ?) 27 on conflict(repo_at) do update set repo_did = coalesce(excluded.repo_did, repo_did) 28 `, issue.RepoAt, seqRepoDid) 29 if err != nil { 30 return err 31 } 32 33 issues, err := GetIssues( 34 tx, 35 orm.FilterEq("did", issue.Did), 36 orm.FilterEq("rkey", issue.Rkey), 37 ) 38 switch { 39 case err != nil: 40 return err 41 case len(issues) == 0: 42 return createNewIssue(tx, issue) 43 case len(issues) != 1: // should be unreachable 44 return fmt.Errorf("invalid number of issues returned: %d", len(issues)) 45 default: 46 // if content is identical, do not edit 47 existingIssue := issues[0] 48 if existingIssue.Title == issue.Title && existingIssue.Body == issue.Body { 49 return nil 50 } 51 52 issue.Id = existingIssue.Id 53 issue.IssueId = existingIssue.IssueId 54 return updateIssue(tx, issue) 55 } 56} 57 58func createNewIssue(tx *sql.Tx, issue *models.Issue) error { 59 // get next issue_id 60 var newIssueId int 61 err := tx.QueryRow(` 62 update repo_issue_seqs 63 set next_issue_id = next_issue_id + 1 64 where repo_at = ? 65 returning next_issue_id - 1 66 `, issue.RepoAt).Scan(&newIssueId) 67 if err != nil { 68 return err 69 } 70 71 var repoDid *string 72 if issue.RepoDid != "" { 73 repoDid = &issue.RepoDid 74 } 75 76 // insert new issue 77 row := tx.QueryRow(` 78 insert into issues (repo_at, repo_did, did, rkey, issue_id, title, body) 79 values (?, ?, ?, ?, ?, ?, ?) 80 returning rowid, issue_id 81 `, issue.RepoAt, repoDid, issue.Did, issue.Rkey, newIssueId, issue.Title, issue.Body) 82 83 err = row.Scan(&issue.Id, &issue.IssueId) 84 if err != nil { 85 return fmt.Errorf("scan row: %w", err) 86 } 87 88 if err := putReferences(tx, issue.AtUri(), issue.References); err != nil { 89 return fmt.Errorf("put reference_links: %w", err) 90 } 91 return nil 92} 93 94func updateIssue(tx *sql.Tx, issue *models.Issue) error { 95 var repoDid *string 96 if issue.RepoDid != "" { 97 repoDid = &issue.RepoDid 98 } 99 _, err := tx.Exec(` 100 update issues 101 set title = ?, body = ?, edited = ?, repo_did = coalesce(?, repo_did) 102 where did = ? and rkey = ? 103 `, issue.Title, issue.Body, time.Now().Format(time.RFC3339), repoDid, issue.Did, issue.Rkey) 104 if err != nil { 105 return err 106 } 107 108 if err := putReferences(tx, issue.AtUri(), issue.References); err != nil { 109 return fmt.Errorf("put reference_links: %w", err) 110 } 111 return nil 112} 113 114func GetIssuesPaginated(e Execer, page pagination.Page, filters ...orm.Filter) ([]models.Issue, error) { 115 issueMap := make(map[string]*models.Issue) // at-uri -> issue 116 117 var conditions []string 118 var args []any 119 120 for _, filter := range filters { 121 conditions = append(conditions, filter.Condition()) 122 args = append(args, filter.Arg()...) 123 } 124 125 whereClause := "" 126 if conditions != nil { 127 whereClause = " where " + strings.Join(conditions, " and ") 128 } 129 130 pLower := orm.FilterGte("row_num", page.Offset+1) 131 pUpper := orm.FilterLte("row_num", page.Offset+page.Limit) 132 133 pageClause := "" 134 if page.Limit > 0 { 135 args = append(args, pLower.Arg()...) 136 args = append(args, pUpper.Arg()...) 137 pageClause = " where " + pLower.Condition() + " and " + pUpper.Condition() 138 } 139 140 query := fmt.Sprintf( 141 ` 142 select * from ( 143 select 144 id, 145 did, 146 rkey, 147 repo_at, 148 repo_did, 149 issue_id, 150 title, 151 body, 152 open, 153 created, 154 edited, 155 deleted, 156 row_number() over (order by created desc) as row_num 157 from 158 issues 159 %s 160 ) ranked_issues 161 %s 162 `, 163 whereClause, 164 pageClause, 165 ) 166 167 rows, err := e.Query(query, args...) 168 if err != nil { 169 return nil, fmt.Errorf("failed to query issues table: %w", err) 170 } 171 defer rows.Close() 172 173 for rows.Next() { 174 var issue models.Issue 175 var createdAt string 176 var editedAt, deletedAt sql.Null[string] 177 var nullableRepoDid sql.NullString 178 var rowNum int64 179 err := rows.Scan( 180 &issue.Id, 181 &issue.Did, 182 &issue.Rkey, 183 &issue.RepoAt, 184 &nullableRepoDid, 185 &issue.IssueId, 186 &issue.Title, 187 &issue.Body, 188 &issue.Open, 189 &createdAt, 190 &editedAt, 191 &deletedAt, 192 &rowNum, 193 ) 194 if err != nil { 195 return nil, fmt.Errorf("failed to scan issue: %w", err) 196 } 197 if nullableRepoDid.Valid { 198 issue.RepoDid = nullableRepoDid.String 199 } 200 201 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 202 issue.Created = t 203 } 204 205 if editedAt.Valid { 206 if t, err := time.Parse(time.RFC3339, editedAt.V); err == nil { 207 issue.Edited = &t 208 } 209 } 210 211 if deletedAt.Valid { 212 if t, err := time.Parse(time.RFC3339, deletedAt.V); err == nil { 213 issue.Deleted = &t 214 } 215 } 216 217 atUri := issue.AtUri().String() 218 issueMap[atUri] = &issue 219 } 220 221 // collect reverse repos 222 repoAts := make([]string, 0, len(issueMap)) // or just []string{} 223 for _, issue := range issueMap { 224 repoAts = append(repoAts, string(issue.RepoAt)) 225 } 226 227 repos, err := GetRepos(e, 0, orm.FilterIn("at_uri", repoAts)) 228 if err != nil { 229 return nil, fmt.Errorf("failed to build repo mappings: %w", err) 230 } 231 232 repoMap := make(map[string]*models.Repo) 233 for i := range repos { 234 repoMap[string(repos[i].RepoAt())] = &repos[i] 235 } 236 237 for issueAt, i := range issueMap { 238 if r, ok := repoMap[string(i.RepoAt)]; ok { 239 i.Repo = r 240 } else { 241 // do not show up the issue if the repo is deleted 242 // TODO: foreign key where? 243 delete(issueMap, issueAt) 244 } 245 } 246 247 // collect comments 248 issueAts := slices.Collect(maps.Keys(issueMap)) 249 250 comments, err := GetIssueComments(e, orm.FilterIn("issue_at", issueAts)) 251 if err != nil { 252 return nil, fmt.Errorf("failed to query comments: %w", err) 253 } 254 for i := range comments { 255 issueAt := comments[i].IssueAt 256 if issue, ok := issueMap[issueAt]; ok { 257 issue.Comments = append(issue.Comments, comments[i]) 258 } 259 } 260 261 // collect allLabels for each issue 262 allLabels, err := GetLabels(e, orm.FilterIn("subject", issueAts)) 263 if err != nil { 264 return nil, fmt.Errorf("failed to query labels: %w", err) 265 } 266 for issueAt, labels := range allLabels { 267 if issue, ok := issueMap[issueAt.String()]; ok { 268 issue.Labels = labels 269 } 270 } 271 272 // collect references for each issue 273 allReferencs, err := GetReferencesAll(e, orm.FilterIn("from_at", issueAts)) 274 if err != nil { 275 return nil, fmt.Errorf("failed to query reference_links: %w", err) 276 } 277 for issueAt, references := range allReferencs { 278 if issue, ok := issueMap[issueAt.String()]; ok { 279 issue.References = references 280 } 281 } 282 283 var issues []models.Issue 284 for _, i := range issueMap { 285 issues = append(issues, *i) 286 } 287 288 sort.Slice(issues, func(i, j int) bool { 289 return issues[i].Created.After(issues[j].Created) 290 }) 291 292 return issues, nil 293} 294 295func GetIssue(e Execer, repoAt syntax.ATURI, issueId int) (*models.Issue, error) { 296 issues, err := GetIssuesPaginated( 297 e, 298 pagination.Page{}, 299 orm.FilterEq("repo_at", repoAt), 300 orm.FilterEq("issue_id", issueId), 301 ) 302 if err != nil { 303 return nil, err 304 } 305 if len(issues) != 1 { 306 return nil, sql.ErrNoRows 307 } 308 309 return &issues[0], nil 310} 311 312func GetIssues(e Execer, filters ...orm.Filter) ([]models.Issue, error) { 313 return GetIssuesPaginated(e, pagination.Page{}, filters...) 314} 315 316func AddIssueComment(tx *sql.Tx, c models.IssueComment) (int64, error) { 317 result, err := tx.Exec( 318 `insert into issue_comments ( 319 did, 320 rkey, 321 issue_at, 322 body, 323 reply_to, 324 created, 325 edited 326 ) 327 values (?, ?, ?, ?, ?, ?, null) 328 on conflict(did, rkey) do update set 329 issue_at = excluded.issue_at, 330 body = excluded.body, 331 edited = case 332 when 333 issue_comments.issue_at != excluded.issue_at 334 or issue_comments.body != excluded.body 335 or issue_comments.reply_to != excluded.reply_to 336 then ? 337 else issue_comments.edited 338 end`, 339 c.Did, 340 c.Rkey, 341 c.IssueAt, 342 c.Body, 343 c.ReplyTo, 344 c.Created.Format(time.RFC3339), 345 time.Now().Format(time.RFC3339), 346 ) 347 if err != nil { 348 return 0, err 349 } 350 351 id, err := result.LastInsertId() 352 if err != nil { 353 return 0, err 354 } 355 356 if err := putReferences(tx, c.AtUri(), c.References); err != nil { 357 return 0, fmt.Errorf("put reference_links: %w", err) 358 } 359 360 return id, nil 361} 362 363func DeleteIssueComments(e Execer, filters ...orm.Filter) error { 364 var conditions []string 365 var args []any 366 for _, filter := range filters { 367 conditions = append(conditions, filter.Condition()) 368 args = append(args, filter.Arg()...) 369 } 370 371 whereClause := "" 372 if conditions != nil { 373 whereClause = " where " + strings.Join(conditions, " and ") 374 } 375 376 query := fmt.Sprintf(`update issue_comments set body = "", deleted = strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now') %s`, whereClause) 377 378 _, err := e.Exec(query, args...) 379 return err 380} 381 382func GetIssueComments(e Execer, filters ...orm.Filter) ([]models.IssueComment, error) { 383 commentMap := make(map[string]*models.IssueComment) 384 385 var conditions []string 386 var args []any 387 for _, filter := range filters { 388 conditions = append(conditions, filter.Condition()) 389 args = append(args, filter.Arg()...) 390 } 391 392 whereClause := "" 393 if conditions != nil { 394 whereClause = " where " + strings.Join(conditions, " and ") 395 } 396 397 query := fmt.Sprintf(` 398 select 399 id, 400 did, 401 rkey, 402 issue_at, 403 reply_to, 404 body, 405 created, 406 edited, 407 deleted 408 from 409 issue_comments 410 %s 411 `, whereClause) 412 413 rows, err := e.Query(query, args...) 414 if err != nil { 415 return nil, err 416 } 417 defer rows.Close() 418 419 for rows.Next() { 420 var comment models.IssueComment 421 var created string 422 var rkey, edited, deleted, replyTo sql.Null[string] 423 err := rows.Scan( 424 &comment.Id, 425 &comment.Did, 426 &rkey, 427 &comment.IssueAt, 428 &replyTo, 429 &comment.Body, 430 &created, 431 &edited, 432 &deleted, 433 ) 434 if err != nil { 435 return nil, err 436 } 437 438 // this is a remnant from old times, newer comments always have rkey 439 if rkey.Valid { 440 comment.Rkey = rkey.V 441 } 442 443 if t, err := time.Parse(time.RFC3339, created); err == nil { 444 comment.Created = t 445 } 446 447 if edited.Valid { 448 if t, err := time.Parse(time.RFC3339, edited.V); err == nil { 449 comment.Edited = &t 450 } 451 } 452 453 if deleted.Valid { 454 if t, err := time.Parse(time.RFC3339, deleted.V); err == nil { 455 comment.Deleted = &t 456 } 457 } 458 459 if replyTo.Valid { 460 comment.ReplyTo = &replyTo.V 461 } 462 463 atUri := comment.AtUri().String() 464 commentMap[atUri] = &comment 465 } 466 467 if err = rows.Err(); err != nil { 468 return nil, err 469 } 470 471 // collect references for each comments 472 commentAts := slices.Collect(maps.Keys(commentMap)) 473 allReferencs, err := GetReferencesAll(e, orm.FilterIn("from_at", commentAts)) 474 if err != nil { 475 return nil, fmt.Errorf("failed to query reference_links: %w", err) 476 } 477 for commentAt, references := range allReferencs { 478 if comment, ok := commentMap[commentAt.String()]; ok { 479 comment.References = references 480 } 481 } 482 483 var comments []models.IssueComment 484 for _, c := range commentMap { 485 comments = append(comments, *c) 486 } 487 488 sort.Slice(comments, func(i, j int) bool { 489 return comments[i].Created.After(comments[j].Created) 490 }) 491 492 return comments, nil 493} 494 495func DeleteIssues(tx *sql.Tx, did, rkey string) error { 496 _, err := tx.Exec( 497 `delete from issues 498 where did = ? and rkey = ?`, 499 did, 500 rkey, 501 ) 502 if err != nil { 503 return fmt.Errorf("delete issue: %w", err) 504 } 505 506 uri := syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", did, tangled.RepoIssueNSID, rkey)) 507 err = deleteReferences(tx, uri) 508 if err != nil { 509 return fmt.Errorf("delete reference_links: %w", err) 510 } 511 512 return nil 513} 514 515func CloseIssues(e Execer, filters ...orm.Filter) error { 516 var conditions []string 517 var args []any 518 for _, filter := range filters { 519 conditions = append(conditions, filter.Condition()) 520 args = append(args, filter.Arg()...) 521 } 522 523 whereClause := "" 524 if conditions != nil { 525 whereClause = " where " + strings.Join(conditions, " and ") 526 } 527 528 query := fmt.Sprintf(`update issues set open = 0 %s`, whereClause) 529 _, err := e.Exec(query, args...) 530 return err 531} 532 533func ReopenIssues(e Execer, filters ...orm.Filter) error { 534 var conditions []string 535 var args []any 536 for _, filter := range filters { 537 conditions = append(conditions, filter.Condition()) 538 args = append(args, filter.Arg()...) 539 } 540 541 whereClause := "" 542 if conditions != nil { 543 whereClause = " where " + strings.Join(conditions, " and ") 544 } 545 546 query := fmt.Sprintf(`update issues set open = 1 %s`, whereClause) 547 _, err := e.Exec(query, args...) 548 return err 549} 550 551func GetIssueCount(e Execer, repoAt syntax.ATURI) (models.IssueCount, error) { 552 row := e.QueryRow(` 553 select 554 count(case when open = 1 then 1 end) as open_count, 555 count(case when open = 0 then 1 end) as closed_count 556 from issues 557 where repo_at = ?`, 558 repoAt, 559 ) 560 561 var count models.IssueCount 562 if err := row.Scan(&count.Open, &count.Closed); err != nil { 563 return models.IssueCount{}, err 564 } 565 566 return count, nil 567}