Monorepo for Tangled
1package db
2
3import (
4 "database/sql"
5 "fmt"
6 "maps"
7 "slices"
8 "sort"
9 "strings"
10 "time"
11
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview/models"
15 "tangled.org/core/appview/pagination"
16 "tangled.org/core/orm"
17)
18
19func PutIssue(tx *sql.Tx, issue *models.Issue) error {
20 var seqRepoDid *string
21 if issue.RepoDid != "" {
22 seqRepoDid = &issue.RepoDid
23 }
24 _, err := tx.Exec(`
25 insert into repo_issue_seqs (repo_at, next_issue_id, repo_did)
26 values (?, 1, ?)
27 on conflict(repo_at) do update set repo_did = coalesce(excluded.repo_did, repo_did)
28 `, issue.RepoAt, seqRepoDid)
29 if err != nil {
30 return err
31 }
32
33 issues, err := GetIssues(
34 tx,
35 orm.FilterEq("did", issue.Did),
36 orm.FilterEq("rkey", issue.Rkey),
37 )
38 switch {
39 case err != nil:
40 return err
41 case len(issues) == 0:
42 return createNewIssue(tx, issue)
43 case len(issues) != 1: // should be unreachable
44 return fmt.Errorf("invalid number of issues returned: %d", len(issues))
45 default:
46 // if content is identical, do not edit
47 existingIssue := issues[0]
48 if existingIssue.Title == issue.Title && existingIssue.Body == issue.Body {
49 return nil
50 }
51
52 issue.Id = existingIssue.Id
53 issue.IssueId = existingIssue.IssueId
54 return updateIssue(tx, issue)
55 }
56}
57
58func createNewIssue(tx *sql.Tx, issue *models.Issue) error {
59 // get next issue_id
60 var newIssueId int
61 err := tx.QueryRow(`
62 update repo_issue_seqs
63 set next_issue_id = next_issue_id + 1
64 where repo_at = ?
65 returning next_issue_id - 1
66 `, issue.RepoAt).Scan(&newIssueId)
67 if err != nil {
68 return err
69 }
70
71 var repoDid *string
72 if issue.RepoDid != "" {
73 repoDid = &issue.RepoDid
74 }
75
76 // insert new issue
77 row := tx.QueryRow(`
78 insert into issues (repo_at, repo_did, did, rkey, issue_id, title, body)
79 values (?, ?, ?, ?, ?, ?, ?)
80 returning rowid, issue_id
81 `, issue.RepoAt, repoDid, issue.Did, issue.Rkey, newIssueId, issue.Title, issue.Body)
82
83 err = row.Scan(&issue.Id, &issue.IssueId)
84 if err != nil {
85 return fmt.Errorf("scan row: %w", err)
86 }
87
88 if err := putReferences(tx, issue.AtUri(), issue.References); err != nil {
89 return fmt.Errorf("put reference_links: %w", err)
90 }
91 return nil
92}
93
94func updateIssue(tx *sql.Tx, issue *models.Issue) error {
95 var repoDid *string
96 if issue.RepoDid != "" {
97 repoDid = &issue.RepoDid
98 }
99 _, err := tx.Exec(`
100 update issues
101 set title = ?, body = ?, edited = ?, repo_did = coalesce(?, repo_did)
102 where did = ? and rkey = ?
103 `, issue.Title, issue.Body, time.Now().Format(time.RFC3339), repoDid, issue.Did, issue.Rkey)
104 if err != nil {
105 return err
106 }
107
108 if err := putReferences(tx, issue.AtUri(), issue.References); err != nil {
109 return fmt.Errorf("put reference_links: %w", err)
110 }
111 return nil
112}
113
114func GetIssuesPaginated(e Execer, page pagination.Page, filters ...orm.Filter) ([]models.Issue, error) {
115 issueMap := make(map[string]*models.Issue) // at-uri -> issue
116
117 var conditions []string
118 var args []any
119
120 for _, filter := range filters {
121 conditions = append(conditions, filter.Condition())
122 args = append(args, filter.Arg()...)
123 }
124
125 whereClause := ""
126 if conditions != nil {
127 whereClause = " where " + strings.Join(conditions, " and ")
128 }
129
130 pLower := orm.FilterGte("row_num", page.Offset+1)
131 pUpper := orm.FilterLte("row_num", page.Offset+page.Limit)
132
133 pageClause := ""
134 if page.Limit > 0 {
135 args = append(args, pLower.Arg()...)
136 args = append(args, pUpper.Arg()...)
137 pageClause = " where " + pLower.Condition() + " and " + pUpper.Condition()
138 }
139
140 query := fmt.Sprintf(
141 `
142 select * from (
143 select
144 id,
145 did,
146 rkey,
147 repo_at,
148 repo_did,
149 issue_id,
150 title,
151 body,
152 open,
153 created,
154 edited,
155 deleted,
156 row_number() over (order by created desc) as row_num
157 from
158 issues
159 %s
160 ) ranked_issues
161 %s
162 `,
163 whereClause,
164 pageClause,
165 )
166
167 rows, err := e.Query(query, args...)
168 if err != nil {
169 return nil, fmt.Errorf("failed to query issues table: %w", err)
170 }
171 defer rows.Close()
172
173 for rows.Next() {
174 var issue models.Issue
175 var createdAt string
176 var editedAt, deletedAt sql.Null[string]
177 var nullableRepoDid sql.NullString
178 var rowNum int64
179 err := rows.Scan(
180 &issue.Id,
181 &issue.Did,
182 &issue.Rkey,
183 &issue.RepoAt,
184 &nullableRepoDid,
185 &issue.IssueId,
186 &issue.Title,
187 &issue.Body,
188 &issue.Open,
189 &createdAt,
190 &editedAt,
191 &deletedAt,
192 &rowNum,
193 )
194 if err != nil {
195 return nil, fmt.Errorf("failed to scan issue: %w", err)
196 }
197 if nullableRepoDid.Valid {
198 issue.RepoDid = nullableRepoDid.String
199 }
200
201 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
202 issue.Created = t
203 }
204
205 if editedAt.Valid {
206 if t, err := time.Parse(time.RFC3339, editedAt.V); err == nil {
207 issue.Edited = &t
208 }
209 }
210
211 if deletedAt.Valid {
212 if t, err := time.Parse(time.RFC3339, deletedAt.V); err == nil {
213 issue.Deleted = &t
214 }
215 }
216
217 atUri := issue.AtUri().String()
218 issueMap[atUri] = &issue
219 }
220
221 // collect reverse repos
222 repoAts := make([]string, 0, len(issueMap)) // or just []string{}
223 for _, issue := range issueMap {
224 repoAts = append(repoAts, string(issue.RepoAt))
225 }
226
227 repos, err := GetRepos(e, 0, orm.FilterIn("at_uri", repoAts))
228 if err != nil {
229 return nil, fmt.Errorf("failed to build repo mappings: %w", err)
230 }
231
232 repoMap := make(map[string]*models.Repo)
233 for i := range repos {
234 repoMap[string(repos[i].RepoAt())] = &repos[i]
235 }
236
237 for issueAt, i := range issueMap {
238 if r, ok := repoMap[string(i.RepoAt)]; ok {
239 i.Repo = r
240 } else {
241 // do not show up the issue if the repo is deleted
242 // TODO: foreign key where?
243 delete(issueMap, issueAt)
244 }
245 }
246
247 // collect comments
248 issueAts := slices.Collect(maps.Keys(issueMap))
249
250 comments, err := GetIssueComments(e, orm.FilterIn("issue_at", issueAts))
251 if err != nil {
252 return nil, fmt.Errorf("failed to query comments: %w", err)
253 }
254 for i := range comments {
255 issueAt := comments[i].IssueAt
256 if issue, ok := issueMap[issueAt]; ok {
257 issue.Comments = append(issue.Comments, comments[i])
258 }
259 }
260
261 // collect allLabels for each issue
262 allLabels, err := GetLabels(e, orm.FilterIn("subject", issueAts))
263 if err != nil {
264 return nil, fmt.Errorf("failed to query labels: %w", err)
265 }
266 for issueAt, labels := range allLabels {
267 if issue, ok := issueMap[issueAt.String()]; ok {
268 issue.Labels = labels
269 }
270 }
271
272 // collect references for each issue
273 allReferencs, err := GetReferencesAll(e, orm.FilterIn("from_at", issueAts))
274 if err != nil {
275 return nil, fmt.Errorf("failed to query reference_links: %w", err)
276 }
277 for issueAt, references := range allReferencs {
278 if issue, ok := issueMap[issueAt.String()]; ok {
279 issue.References = references
280 }
281 }
282
283 var issues []models.Issue
284 for _, i := range issueMap {
285 issues = append(issues, *i)
286 }
287
288 sort.Slice(issues, func(i, j int) bool {
289 return issues[i].Created.After(issues[j].Created)
290 })
291
292 return issues, nil
293}
294
295func GetIssue(e Execer, repoAt syntax.ATURI, issueId int) (*models.Issue, error) {
296 issues, err := GetIssuesPaginated(
297 e,
298 pagination.Page{},
299 orm.FilterEq("repo_at", repoAt),
300 orm.FilterEq("issue_id", issueId),
301 )
302 if err != nil {
303 return nil, err
304 }
305 if len(issues) != 1 {
306 return nil, sql.ErrNoRows
307 }
308
309 return &issues[0], nil
310}
311
312func GetIssues(e Execer, filters ...orm.Filter) ([]models.Issue, error) {
313 return GetIssuesPaginated(e, pagination.Page{}, filters...)
314}
315
316func AddIssueComment(tx *sql.Tx, c models.IssueComment) (int64, error) {
317 result, err := tx.Exec(
318 `insert into issue_comments (
319 did,
320 rkey,
321 issue_at,
322 body,
323 reply_to,
324 created,
325 edited
326 )
327 values (?, ?, ?, ?, ?, ?, null)
328 on conflict(did, rkey) do update set
329 issue_at = excluded.issue_at,
330 body = excluded.body,
331 edited = case
332 when
333 issue_comments.issue_at != excluded.issue_at
334 or issue_comments.body != excluded.body
335 or issue_comments.reply_to != excluded.reply_to
336 then ?
337 else issue_comments.edited
338 end`,
339 c.Did,
340 c.Rkey,
341 c.IssueAt,
342 c.Body,
343 c.ReplyTo,
344 c.Created.Format(time.RFC3339),
345 time.Now().Format(time.RFC3339),
346 )
347 if err != nil {
348 return 0, err
349 }
350
351 id, err := result.LastInsertId()
352 if err != nil {
353 return 0, err
354 }
355
356 if err := putReferences(tx, c.AtUri(), c.References); err != nil {
357 return 0, fmt.Errorf("put reference_links: %w", err)
358 }
359
360 return id, nil
361}
362
363func DeleteIssueComments(e Execer, filters ...orm.Filter) error {
364 var conditions []string
365 var args []any
366 for _, filter := range filters {
367 conditions = append(conditions, filter.Condition())
368 args = append(args, filter.Arg()...)
369 }
370
371 whereClause := ""
372 if conditions != nil {
373 whereClause = " where " + strings.Join(conditions, " and ")
374 }
375
376 query := fmt.Sprintf(`update issue_comments set body = "", deleted = strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now') %s`, whereClause)
377
378 _, err := e.Exec(query, args...)
379 return err
380}
381
382func GetIssueComments(e Execer, filters ...orm.Filter) ([]models.IssueComment, error) {
383 commentMap := make(map[string]*models.IssueComment)
384
385 var conditions []string
386 var args []any
387 for _, filter := range filters {
388 conditions = append(conditions, filter.Condition())
389 args = append(args, filter.Arg()...)
390 }
391
392 whereClause := ""
393 if conditions != nil {
394 whereClause = " where " + strings.Join(conditions, " and ")
395 }
396
397 query := fmt.Sprintf(`
398 select
399 id,
400 did,
401 rkey,
402 issue_at,
403 reply_to,
404 body,
405 created,
406 edited,
407 deleted
408 from
409 issue_comments
410 %s
411 `, whereClause)
412
413 rows, err := e.Query(query, args...)
414 if err != nil {
415 return nil, err
416 }
417 defer rows.Close()
418
419 for rows.Next() {
420 var comment models.IssueComment
421 var created string
422 var rkey, edited, deleted, replyTo sql.Null[string]
423 err := rows.Scan(
424 &comment.Id,
425 &comment.Did,
426 &rkey,
427 &comment.IssueAt,
428 &replyTo,
429 &comment.Body,
430 &created,
431 &edited,
432 &deleted,
433 )
434 if err != nil {
435 return nil, err
436 }
437
438 // this is a remnant from old times, newer comments always have rkey
439 if rkey.Valid {
440 comment.Rkey = rkey.V
441 }
442
443 if t, err := time.Parse(time.RFC3339, created); err == nil {
444 comment.Created = t
445 }
446
447 if edited.Valid {
448 if t, err := time.Parse(time.RFC3339, edited.V); err == nil {
449 comment.Edited = &t
450 }
451 }
452
453 if deleted.Valid {
454 if t, err := time.Parse(time.RFC3339, deleted.V); err == nil {
455 comment.Deleted = &t
456 }
457 }
458
459 if replyTo.Valid {
460 comment.ReplyTo = &replyTo.V
461 }
462
463 atUri := comment.AtUri().String()
464 commentMap[atUri] = &comment
465 }
466
467 if err = rows.Err(); err != nil {
468 return nil, err
469 }
470
471 // collect references for each comments
472 commentAts := slices.Collect(maps.Keys(commentMap))
473 allReferencs, err := GetReferencesAll(e, orm.FilterIn("from_at", commentAts))
474 if err != nil {
475 return nil, fmt.Errorf("failed to query reference_links: %w", err)
476 }
477 for commentAt, references := range allReferencs {
478 if comment, ok := commentMap[commentAt.String()]; ok {
479 comment.References = references
480 }
481 }
482
483 var comments []models.IssueComment
484 for _, c := range commentMap {
485 comments = append(comments, *c)
486 }
487
488 sort.Slice(comments, func(i, j int) bool {
489 return comments[i].Created.After(comments[j].Created)
490 })
491
492 return comments, nil
493}
494
495func DeleteIssues(tx *sql.Tx, did, rkey string) error {
496 _, err := tx.Exec(
497 `delete from issues
498 where did = ? and rkey = ?`,
499 did,
500 rkey,
501 )
502 if err != nil {
503 return fmt.Errorf("delete issue: %w", err)
504 }
505
506 uri := syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", did, tangled.RepoIssueNSID, rkey))
507 err = deleteReferences(tx, uri)
508 if err != nil {
509 return fmt.Errorf("delete reference_links: %w", err)
510 }
511
512 return nil
513}
514
515func CloseIssues(e Execer, filters ...orm.Filter) error {
516 var conditions []string
517 var args []any
518 for _, filter := range filters {
519 conditions = append(conditions, filter.Condition())
520 args = append(args, filter.Arg()...)
521 }
522
523 whereClause := ""
524 if conditions != nil {
525 whereClause = " where " + strings.Join(conditions, " and ")
526 }
527
528 query := fmt.Sprintf(`update issues set open = 0 %s`, whereClause)
529 _, err := e.Exec(query, args...)
530 return err
531}
532
533func ReopenIssues(e Execer, filters ...orm.Filter) error {
534 var conditions []string
535 var args []any
536 for _, filter := range filters {
537 conditions = append(conditions, filter.Condition())
538 args = append(args, filter.Arg()...)
539 }
540
541 whereClause := ""
542 if conditions != nil {
543 whereClause = " where " + strings.Join(conditions, " and ")
544 }
545
546 query := fmt.Sprintf(`update issues set open = 1 %s`, whereClause)
547 _, err := e.Exec(query, args...)
548 return err
549}
550
551func GetIssueCount(e Execer, repoAt syntax.ATURI) (models.IssueCount, error) {
552 row := e.QueryRow(`
553 select
554 count(case when open = 1 then 1 end) as open_count,
555 count(case when open = 0 then 1 end) as closed_count
556 from issues
557 where repo_at = ?`,
558 repoAt,
559 )
560
561 var count models.IssueCount
562 if err := row.Scan(&count.Open, &count.Closed); err != nil {
563 return models.IssueCount{}, err
564 }
565
566 return count, nil
567}