Monorepo for Tangled
1package db
2
3import (
4 "database/sql"
5 "fmt"
6 "slices"
7 "strings"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "tangled.org/core/appview/models"
12 "tangled.org/core/orm"
13)
14
15func GetPipelines(e Execer, filters ...orm.Filter) ([]models.Pipeline, error) {
16 var pipelines []models.Pipeline
17
18 var conditions []string
19 var args []any
20 for _, filter := range filters {
21 conditions = append(conditions, filter.Condition())
22 args = append(args, filter.Arg()...)
23 }
24
25 whereClause := ""
26 if conditions != nil {
27 whereClause = " where " + strings.Join(conditions, " and ")
28 }
29
30 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created, repo_did from pipelines %s`, whereClause)
31
32 rows, err := e.Query(query, args...)
33
34 if err != nil {
35 return nil, err
36 }
37 defer rows.Close()
38
39 for rows.Next() {
40 var pipeline models.Pipeline
41 var createdAt string
42 var repoDid sql.NullString
43 err = rows.Scan(
44 &pipeline.Id,
45 &pipeline.Rkey,
46 &pipeline.Knot,
47 &pipeline.RepoOwner,
48 &pipeline.RepoName,
49 &pipeline.Sha,
50 &createdAt,
51 &repoDid,
52 )
53 if err != nil {
54 return nil, err
55 }
56
57 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
58 pipeline.Created = t
59 }
60 if repoDid.Valid {
61 pipeline.RepoDid = repoDid.String
62 }
63
64 pipelines = append(pipelines, pipeline)
65 }
66
67 if err = rows.Err(); err != nil {
68 return nil, err
69 }
70
71 return pipelines, nil
72}
73
74func AddPipeline(e Execer, pipeline models.Pipeline) error {
75 var repoDid *string
76 if pipeline.RepoDid != "" {
77 repoDid = &pipeline.RepoDid
78 }
79
80 args := []any{
81 pipeline.Rkey,
82 pipeline.Knot,
83 pipeline.RepoOwner,
84 pipeline.RepoName,
85 pipeline.TriggerId,
86 pipeline.Sha,
87 repoDid,
88 }
89
90 placeholders := make([]string, len(args))
91 for i := range placeholders {
92 placeholders[i] = "?"
93 }
94
95 query := fmt.Sprintf(`
96 insert or ignore into pipelines (
97 rkey,
98 knot,
99 repo_owner,
100 repo_name,
101 trigger_id,
102 sha,
103 repo_did
104 ) values (%s)
105 `, strings.Join(placeholders, ","))
106
107 _, err := e.Exec(query, args...)
108
109 return err
110}
111
112func AddTrigger(e Execer, trigger models.Trigger) (int64, error) {
113 args := []any{
114 trigger.Kind,
115 trigger.PushRef,
116 trigger.PushNewSha,
117 trigger.PushOldSha,
118 trigger.PRSourceBranch,
119 trigger.PRTargetBranch,
120 trigger.PRSourceSha,
121 trigger.PRAction,
122 }
123
124 placeholders := make([]string, len(args))
125 for i := range placeholders {
126 placeholders[i] = "?"
127 }
128
129 query := fmt.Sprintf(`insert or ignore into triggers (
130 kind,
131 push_ref,
132 push_new_sha,
133 push_old_sha,
134 pr_source_branch,
135 pr_target_branch,
136 pr_source_sha,
137 pr_action
138 ) values (%s)`, strings.Join(placeholders, ","))
139
140 res, err := e.Exec(query, args...)
141 if err != nil {
142 return 0, err
143 }
144
145 return res.LastInsertId()
146}
147
148func AddPipelineStatus(e Execer, status models.PipelineStatus) error {
149 args := []any{
150 status.Spindle,
151 status.Rkey,
152 status.PipelineKnot,
153 status.PipelineRkey,
154 status.Workflow,
155 status.Status,
156 status.Error,
157 status.ExitCode,
158 status.Created.Format(time.RFC3339),
159 }
160
161 placeholders := make([]string, len(args))
162 for i := range placeholders {
163 placeholders[i] = "?"
164 }
165
166 query := fmt.Sprintf(`
167 insert or ignore into pipeline_statuses (
168 spindle,
169 rkey,
170 pipeline_knot,
171 pipeline_rkey,
172 workflow,
173 status,
174 error,
175 exit_code,
176 created
177 ) values (%s)
178 `, strings.Join(placeholders, ","))
179
180 _, err := e.Exec(query, args...)
181 return err
182}
183
184// this is a mega query, but the most useful one:
185// get N pipelines, for each one get the latest status of its N workflows
186//
187// the pipelines table is aliased to `p`
188// the triggers table is aliased to `t`
189func GetPipelineStatuses(e Execer, limit int, filters ...orm.Filter) ([]models.Pipeline, error) {
190 var conditions []string
191 var args []any
192 for _, filter := range filters {
193 conditions = append(conditions, filter.Condition())
194 args = append(args, filter.Arg()...)
195 }
196
197 whereClause := ""
198 if conditions != nil {
199 whereClause = " where " + strings.Join(conditions, " and ")
200 }
201
202 query := fmt.Sprintf(`
203 select
204 p.id,
205 p.knot,
206 p.rkey,
207 p.repo_owner,
208 p.repo_name,
209 p.sha,
210 p.created,
211 p.repo_did,
212 t.id,
213 t.kind,
214 t.push_ref,
215 t.push_new_sha,
216 t.push_old_sha,
217 t.pr_source_branch,
218 t.pr_target_branch,
219 t.pr_source_sha,
220 t.pr_action
221 from
222 pipelines p
223 join
224 triggers t ON p.trigger_id = t.id
225 %s
226 order by p.created desc
227 limit %d
228 `, whereClause, limit)
229
230 rows, err := e.Query(query, args...)
231 if err != nil {
232 return nil, err
233 }
234 defer rows.Close()
235
236 pipelines := make(map[syntax.ATURI]models.Pipeline)
237 for rows.Next() {
238 var p models.Pipeline
239 var t models.Trigger
240 var created string
241 var repoDid sql.NullString
242
243 err := rows.Scan(
244 &p.Id,
245 &p.Knot,
246 &p.Rkey,
247 &p.RepoOwner,
248 &p.RepoName,
249 &p.Sha,
250 &created,
251 &repoDid,
252 &p.TriggerId,
253 &t.Kind,
254 &t.PushRef,
255 &t.PushNewSha,
256 &t.PushOldSha,
257 &t.PRSourceBranch,
258 &t.PRTargetBranch,
259 &t.PRSourceSha,
260 &t.PRAction,
261 )
262 if err != nil {
263 return nil, err
264 }
265
266 p.Created, err = time.Parse(time.RFC3339, created)
267 if err != nil {
268 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
269 }
270 if repoDid.Valid {
271 p.RepoDid = repoDid.String
272 }
273
274 t.Id = p.TriggerId
275 p.Trigger = &t
276 p.Statuses = make(map[string]models.WorkflowStatus)
277
278 pipelines[p.AtUri()] = p
279 }
280
281 // get all statuses
282 // the where clause here is of the form:
283 //
284 // where (pipeline_knot = k1 and pipeline_rkey = r1)
285 // or (pipeline_knot = k2 and pipeline_rkey = r2)
286 conditions = nil
287 args = nil
288 for _, p := range pipelines {
289 knotFilter := orm.FilterEq("pipeline_knot", p.Knot)
290 rkeyFilter := orm.FilterEq("pipeline_rkey", p.Rkey)
291 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
292 args = append(args, p.Knot)
293 args = append(args, p.Rkey)
294 }
295 whereClause = ""
296 if conditions != nil {
297 whereClause = "where " + strings.Join(conditions, " or ")
298 }
299 query = fmt.Sprintf(`
300 select
301 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code
302 from
303 pipeline_statuses
304 %s
305 `, whereClause)
306
307 rows, err = e.Query(query, args...)
308 if err != nil {
309 return nil, err
310 }
311 defer rows.Close()
312
313 for rows.Next() {
314 var ps models.PipelineStatus
315 var created string
316
317 err := rows.Scan(
318 &ps.ID,
319 &ps.Spindle,
320 &ps.Rkey,
321 &ps.PipelineKnot,
322 &ps.PipelineRkey,
323 &created,
324 &ps.Workflow,
325 &ps.Status,
326 &ps.Error,
327 &ps.ExitCode,
328 )
329 if err != nil {
330 return nil, err
331 }
332
333 ps.Created, err = time.Parse(time.RFC3339, created)
334 if err != nil {
335 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
336 }
337
338 pipelineAt := ps.PipelineAt()
339
340 // extract
341 pipeline, ok := pipelines[pipelineAt]
342 if !ok {
343 continue
344 }
345 statuses, _ := pipeline.Statuses[ps.Workflow]
346 if !ok {
347 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{}
348 }
349
350 // append
351 statuses.Data = append(statuses.Data, ps)
352
353 // reassign
354 pipeline.Statuses[ps.Workflow] = statuses
355 pipelines[pipelineAt] = pipeline
356 }
357
358 var all []models.Pipeline
359 for _, p := range pipelines {
360 for _, s := range p.Statuses {
361 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int {
362 if a.Created.After(b.Created) {
363 return 1
364 }
365 if a.Created.Before(b.Created) {
366 return -1
367 }
368 if a.ID > b.ID {
369 return 1
370 }
371 if a.ID < b.ID {
372 return -1
373 }
374 return 0
375 })
376 }
377 all = append(all, p)
378 }
379
380 // sort pipelines by date
381 slices.SortFunc(all, func(a, b models.Pipeline) int {
382 if a.Created.After(b.Created) {
383 return -1
384 }
385 return 1
386 })
387
388 return all, nil
389}
390
391// the pipelines table is aliased to `p`
392// the triggers table is aliased to `t`
393func GetTotalPipelineStatuses(e Execer, filters ...orm.Filter) (int64, error) {
394 var conditions []string
395 var args []any
396 for _, filter := range filters {
397 conditions = append(conditions, filter.Condition())
398 args = append(args, filter.Arg()...)
399 }
400
401 whereClause := ""
402 if conditions != nil {
403 whereClause = " where " + strings.Join(conditions, " and ")
404 }
405
406 query := fmt.Sprintf(`
407 select
408 count(1)
409 from
410 pipelines p
411 join
412 triggers t ON p.trigger_id = t.id
413 %s
414 `, whereClause)
415
416 rows, err := e.Query(query, args...)
417 if err != nil {
418 return 0, err
419 }
420 defer rows.Close()
421
422 for rows.Next() {
423 var count int64
424 err := rows.Scan(&count)
425 if err != nil {
426 return 0, err
427 }
428
429 return count, nil
430 }
431
432 // unreachable
433 return 0, nil
434}