Monorepo for Tangled
1package db
2
3import (
4 "fmt"
5 "slices"
6 "strings"
7 "time"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/appview/models"
11 "tangled.org/core/orm"
12)
13
14func GetPipelines(e Execer, filters ...orm.Filter) ([]models.Pipeline, error) {
15 var pipelines []models.Pipeline
16
17 var conditions []string
18 var args []any
19 for _, filter := range filters {
20 conditions = append(conditions, filter.Condition())
21 args = append(args, filter.Arg()...)
22 }
23
24 whereClause := ""
25 if conditions != nil {
26 whereClause = " where " + strings.Join(conditions, " and ")
27 }
28
29 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause)
30
31 rows, err := e.Query(query, args...)
32
33 if err != nil {
34 return nil, err
35 }
36 defer rows.Close()
37
38 for rows.Next() {
39 var pipeline models.Pipeline
40 var createdAt string
41 err = rows.Scan(
42 &pipeline.Id,
43 &pipeline.Rkey,
44 &pipeline.Knot,
45 &pipeline.RepoOwner,
46 &pipeline.RepoName,
47 &pipeline.Sha,
48 &createdAt,
49 )
50 if err != nil {
51 return nil, err
52 }
53
54 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
55 pipeline.Created = t
56 }
57
58 pipelines = append(pipelines, pipeline)
59 }
60
61 if err = rows.Err(); err != nil {
62 return nil, err
63 }
64
65 return pipelines, nil
66}
67
68func AddPipeline(e Execer, pipeline models.Pipeline) error {
69 args := []any{
70 pipeline.Rkey,
71 pipeline.Knot,
72 pipeline.RepoOwner,
73 pipeline.RepoName,
74 pipeline.TriggerId,
75 pipeline.Sha,
76 }
77
78 placeholders := make([]string, len(args))
79 for i := range placeholders {
80 placeholders[i] = "?"
81 }
82
83 query := fmt.Sprintf(`
84 insert or ignore into pipelines (
85 rkey,
86 knot,
87 repo_owner,
88 repo_name,
89 trigger_id,
90 sha
91 ) values (%s)
92 `, strings.Join(placeholders, ","))
93
94 _, err := e.Exec(query, args...)
95
96 return err
97}
98
99func AddTrigger(e Execer, trigger models.Trigger) (int64, error) {
100 args := []any{
101 trigger.Kind,
102 trigger.PushRef,
103 trigger.PushNewSha,
104 trigger.PushOldSha,
105 trigger.PRSourceBranch,
106 trigger.PRTargetBranch,
107 trigger.PRSourceSha,
108 trigger.PRAction,
109 }
110
111 placeholders := make([]string, len(args))
112 for i := range placeholders {
113 placeholders[i] = "?"
114 }
115
116 query := fmt.Sprintf(`insert or ignore into triggers (
117 kind,
118 push_ref,
119 push_new_sha,
120 push_old_sha,
121 pr_source_branch,
122 pr_target_branch,
123 pr_source_sha,
124 pr_action
125 ) values (%s)`, strings.Join(placeholders, ","))
126
127 res, err := e.Exec(query, args...)
128 if err != nil {
129 return 0, err
130 }
131
132 return res.LastInsertId()
133}
134
135func AddPipelineStatus(e Execer, status models.PipelineStatus) error {
136 args := []any{
137 status.Spindle,
138 status.Rkey,
139 status.PipelineKnot,
140 status.PipelineRkey,
141 status.Workflow,
142 status.Status,
143 status.Error,
144 status.ExitCode,
145 status.Created.Format(time.RFC3339),
146 }
147
148 placeholders := make([]string, len(args))
149 for i := range placeholders {
150 placeholders[i] = "?"
151 }
152
153 query := fmt.Sprintf(`
154 insert or ignore into pipeline_statuses (
155 spindle,
156 rkey,
157 pipeline_knot,
158 pipeline_rkey,
159 workflow,
160 status,
161 error,
162 exit_code,
163 created
164 ) values (%s)
165 `, strings.Join(placeholders, ","))
166
167 _, err := e.Exec(query, args...)
168 return err
169}
170
171// this is a mega query, but the most useful one:
172// get N pipelines, for each one get the latest status of its N workflows
173//
174// the pipelines table is aliased to `p`
175// the triggers table is aliased to `t`
176func GetPipelineStatuses(e Execer, limit int, filters ...orm.Filter) ([]models.Pipeline, error) {
177 var conditions []string
178 var args []any
179 for _, filter := range filters {
180 conditions = append(conditions, filter.Condition())
181 args = append(args, filter.Arg()...)
182 }
183
184 whereClause := ""
185 if conditions != nil {
186 whereClause = " where " + strings.Join(conditions, " and ")
187 }
188
189 query := fmt.Sprintf(`
190 select
191 p.id,
192 p.knot,
193 p.rkey,
194 p.repo_owner,
195 p.repo_name,
196 p.sha,
197 p.created,
198 t.id,
199 t.kind,
200 t.push_ref,
201 t.push_new_sha,
202 t.push_old_sha,
203 t.pr_source_branch,
204 t.pr_target_branch,
205 t.pr_source_sha,
206 t.pr_action
207 from
208 pipelines p
209 join
210 triggers t ON p.trigger_id = t.id
211 %s
212 order by p.created desc
213 limit %d
214 `, whereClause, limit)
215
216 rows, err := e.Query(query, args...)
217 if err != nil {
218 return nil, err
219 }
220 defer rows.Close()
221
222 pipelines := make(map[syntax.ATURI]models.Pipeline)
223 for rows.Next() {
224 var p models.Pipeline
225 var t models.Trigger
226 var created string
227
228 err := rows.Scan(
229 &p.Id,
230 &p.Knot,
231 &p.Rkey,
232 &p.RepoOwner,
233 &p.RepoName,
234 &p.Sha,
235 &created,
236 &p.TriggerId,
237 &t.Kind,
238 &t.PushRef,
239 &t.PushNewSha,
240 &t.PushOldSha,
241 &t.PRSourceBranch,
242 &t.PRTargetBranch,
243 &t.PRSourceSha,
244 &t.PRAction,
245 )
246 if err != nil {
247 return nil, err
248 }
249
250 p.Created, err = time.Parse(time.RFC3339, created)
251 if err != nil {
252 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
253 }
254
255 t.Id = p.TriggerId
256 p.Trigger = &t
257 p.Statuses = make(map[string]models.WorkflowStatus)
258
259 pipelines[p.AtUri()] = p
260 }
261
262 // get all statuses
263 // the where clause here is of the form:
264 //
265 // where (pipeline_knot = k1 and pipeline_rkey = r1)
266 // or (pipeline_knot = k2 and pipeline_rkey = r2)
267 conditions = nil
268 args = nil
269 for _, p := range pipelines {
270 knotFilter := orm.FilterEq("pipeline_knot", p.Knot)
271 rkeyFilter := orm.FilterEq("pipeline_rkey", p.Rkey)
272 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
273 args = append(args, p.Knot)
274 args = append(args, p.Rkey)
275 }
276 whereClause = ""
277 if conditions != nil {
278 whereClause = "where " + strings.Join(conditions, " or ")
279 }
280 query = fmt.Sprintf(`
281 select
282 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code
283 from
284 pipeline_statuses
285 %s
286 `, whereClause)
287
288 rows, err = e.Query(query, args...)
289 if err != nil {
290 return nil, err
291 }
292 defer rows.Close()
293
294 for rows.Next() {
295 var ps models.PipelineStatus
296 var created string
297
298 err := rows.Scan(
299 &ps.ID,
300 &ps.Spindle,
301 &ps.Rkey,
302 &ps.PipelineKnot,
303 &ps.PipelineRkey,
304 &created,
305 &ps.Workflow,
306 &ps.Status,
307 &ps.Error,
308 &ps.ExitCode,
309 )
310 if err != nil {
311 return nil, err
312 }
313
314 ps.Created, err = time.Parse(time.RFC3339, created)
315 if err != nil {
316 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
317 }
318
319 pipelineAt := ps.PipelineAt()
320
321 // extract
322 pipeline, ok := pipelines[pipelineAt]
323 if !ok {
324 continue
325 }
326 statuses, _ := pipeline.Statuses[ps.Workflow]
327 if !ok {
328 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{}
329 }
330
331 // append
332 statuses.Data = append(statuses.Data, ps)
333
334 // reassign
335 pipeline.Statuses[ps.Workflow] = statuses
336 pipelines[pipelineAt] = pipeline
337 }
338
339 var all []models.Pipeline
340 for _, p := range pipelines {
341 for _, s := range p.Statuses {
342 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int {
343 if a.Created.After(b.Created) {
344 return 1
345 }
346 if a.Created.Before(b.Created) {
347 return -1
348 }
349 if a.ID > b.ID {
350 return 1
351 }
352 if a.ID < b.ID {
353 return -1
354 }
355 return 0
356 })
357 }
358 all = append(all, p)
359 }
360
361 // sort pipelines by date
362 slices.SortFunc(all, func(a, b models.Pipeline) int {
363 if a.Created.After(b.Created) {
364 return -1
365 }
366 return 1
367 })
368
369 return all, nil
370}
371
372// the pipelines table is aliased to `p`
373// the triggers table is aliased to `t`
374func GetTotalPipelineStatuses(e Execer, filters ...orm.Filter) (int64, error) {
375 var conditions []string
376 var args []any
377 for _, filter := range filters {
378 conditions = append(conditions, filter.Condition())
379 args = append(args, filter.Arg()...)
380 }
381
382 whereClause := ""
383 if conditions != nil {
384 whereClause = " where " + strings.Join(conditions, " and ")
385 }
386
387 query := fmt.Sprintf(`
388 select
389 count(1)
390 from
391 pipelines p
392 join
393 triggers t ON p.trigger_id = t.id
394 %s
395 `, whereClause)
396
397 rows, err := e.Query(query, args...)
398 if err != nil {
399 return 0, err
400 }
401 defer rows.Close()
402
403 for rows.Next() {
404 var count int64
405 err := rows.Scan(&count)
406 if err != nil {
407 return 0, err
408 }
409
410 return count, nil
411 }
412
413 // unreachable
414 return 0, nil
415}