this repo has no description
1package db
2
3import (
4 "fmt"
5 "slices"
6 "strings"
7 "time"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 spindle "tangled.sh/tangled.sh/core/spindle/models"
11)
12
13type Pipeline struct {
14 Id int
15 Rkey string
16 Knot string
17 RepoOwner syntax.DID
18 RepoName string
19 TriggerId int
20 Sha string
21 Created time.Time
22
23 // populate when querying for reverse mappings
24 Trigger *Trigger
25 Statuses map[string]WorkflowStatus
26}
27
28type WorkflowStatus struct {
29 data []PipelineStatus
30}
31
32func (w WorkflowStatus) Latest() PipelineStatus {
33 return w.data[len(w.data)-1]
34}
35
36// time taken by this workflow to reach an "end state"
37func (w WorkflowStatus) TimeTaken() time.Duration {
38 var start, end *time.Time
39 for _, s := range w.data {
40 if s.Status.IsStart() {
41 start = &s.Created
42 }
43 if s.Status.IsFinish() {
44 end = &s.Created
45 }
46 }
47
48 if start != nil && end != nil && end.After(*start) {
49 return end.Sub(*start)
50 }
51
52 return 0
53}
54
55func (p Pipeline) Counts() map[string]int {
56 m := make(map[string]int)
57 for _, w := range p.Statuses {
58 m[w.Latest().Status.String()] += 1
59 }
60 return m
61}
62
63type Trigger struct {
64 Id int
65 Kind string
66
67 // push trigger fields
68 PushRef *string
69 PushNewSha *string
70 PushOldSha *string
71
72 // pull request trigger fields
73 PRSourceBranch *string
74 PRTargetBranch *string
75 PRSourceSha *string
76 PRAction *string
77}
78
79type PipelineStatus struct {
80 ID int
81 Spindle string
82 Rkey string
83 PipelineKnot string
84 PipelineRkey string
85 Created time.Time
86 Workflow string
87 Status spindle.StatusKind
88 Error *string
89 ExitCode int
90}
91
92func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) {
93 var pipelines []Pipeline
94
95 var conditions []string
96 var args []any
97 for _, filter := range filters {
98 conditions = append(conditions, filter.Condition())
99 args = append(args, filter.Arg()...)
100 }
101
102 whereClause := ""
103 if conditions != nil {
104 whereClause = " where " + strings.Join(conditions, " and ")
105 }
106
107 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause)
108
109 rows, err := e.Query(query, args...)
110
111 if err != nil {
112 return nil, err
113 }
114 defer rows.Close()
115
116 for rows.Next() {
117 var pipeline Pipeline
118 var createdAt string
119 err = rows.Scan(
120 &pipeline.Id,
121 &pipeline.Rkey,
122 &pipeline.Knot,
123 &pipeline.RepoOwner,
124 &pipeline.RepoName,
125 &pipeline.Sha,
126 &createdAt,
127 )
128 if err != nil {
129 return nil, err
130 }
131
132 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
133 pipeline.Created = t
134 }
135
136 pipelines = append(pipelines, pipeline)
137 }
138
139 if err = rows.Err(); err != nil {
140 return nil, err
141 }
142
143 return pipelines, nil
144}
145
146func AddPipeline(e Execer, pipeline Pipeline) error {
147 args := []any{
148 pipeline.Rkey,
149 pipeline.Knot,
150 pipeline.RepoOwner,
151 pipeline.RepoName,
152 pipeline.TriggerId,
153 pipeline.Sha,
154 }
155
156 placeholders := make([]string, len(args))
157 for i := range placeholders {
158 placeholders[i] = "?"
159 }
160
161 query := fmt.Sprintf(`
162 insert or ignore into pipelines (
163 rkey,
164 knot,
165 repo_owner,
166 repo_name,
167 trigger_id,
168 sha
169 ) values (%s)
170 `, strings.Join(placeholders, ","))
171
172 _, err := e.Exec(query, args...)
173
174 return err
175}
176
177func AddTrigger(e Execer, trigger Trigger) (int64, error) {
178 args := []any{
179 trigger.Kind,
180 trigger.PushRef,
181 trigger.PushNewSha,
182 trigger.PushOldSha,
183 trigger.PRSourceBranch,
184 trigger.PRTargetBranch,
185 trigger.PRSourceSha,
186 trigger.PRAction,
187 }
188
189 placeholders := make([]string, len(args))
190 for i := range placeholders {
191 placeholders[i] = "?"
192 }
193
194 query := fmt.Sprintf(`insert or ignore into triggers (
195 kind,
196 push_ref,
197 push_new_sha,
198 push_old_sha,
199 pr_source_branch,
200 pr_target_branch,
201 pr_source_sha,
202 pr_action
203 ) values (%s)`, strings.Join(placeholders, ","))
204
205 res, err := e.Exec(query, args...)
206 if err != nil {
207 return 0, err
208 }
209
210 return res.LastInsertId()
211}
212
213func AddPipelineStatus(e Execer, status PipelineStatus) error {
214 args := []any{
215 status.Spindle,
216 status.Rkey,
217 status.PipelineKnot,
218 status.PipelineRkey,
219 status.Workflow,
220 status.Status,
221 status.Error,
222 status.ExitCode,
223 }
224
225 placeholders := make([]string, len(args))
226 for i := range placeholders {
227 placeholders[i] = "?"
228 }
229
230 query := fmt.Sprintf(`
231 insert or ignore into pipeline_statuses (
232 spindle,
233 rkey,
234 pipeline_knot,
235 pipeline_rkey,
236 workflow,
237 status,
238 error,
239 exit_code
240 ) values (%s)
241 `, strings.Join(placeholders, ","))
242
243 _, err := e.Exec(query, args...)
244 return err
245}
246
247// this is a mega query, but the most useful one:
248// get N pipelines, for each one get the latest status of its N workflows
249func GetPipelineStatuses(e Execer, filters ...filter) ([]Pipeline, error) {
250 var conditions []string
251 var args []any
252 for _, filter := range filters {
253 filter.key = "p." + filter.key // the table is aliased in the query to `p`
254 conditions = append(conditions, filter.Condition())
255 args = append(args, filter.Arg()...)
256 }
257
258 whereClause := ""
259 if conditions != nil {
260 whereClause = " where " + strings.Join(conditions, " and ")
261 }
262
263 query := fmt.Sprintf(`
264 select
265 p.id AS pipeline_id,
266 p.knot,
267 p.rkey,
268 p.repo_owner,
269 p.repo_name,
270 p.sha,
271 p.created,
272 t.id AS trigger_id,
273 t.kind,
274 t.push_ref,
275 t.push_new_sha,
276 t.push_old_sha,
277 t.pr_source_branch,
278 t.pr_target_branch,
279 t.pr_source_sha,
280 t.pr_action
281 from
282 pipelines p
283 join
284 triggers t ON p.trigger_id = t.id
285 %s
286 order by p.created desc
287 `, whereClause)
288
289 rows, err := e.Query(query, args...)
290 if err != nil {
291 return nil, err
292 }
293 defer rows.Close()
294
295 pipelines := make(map[string]Pipeline)
296 for rows.Next() {
297 var p Pipeline
298 var t Trigger
299 var created string
300
301 err := rows.Scan(
302 &p.Id,
303 &p.Knot,
304 &p.Rkey,
305 &p.RepoOwner,
306 &p.RepoName,
307 &p.Sha,
308 &created,
309 &p.TriggerId,
310 &t.Kind,
311 &t.PushRef,
312 &t.PushNewSha,
313 &t.PushOldSha,
314 &t.PRSourceBranch,
315 &t.PRTargetBranch,
316 &t.PRSourceSha,
317 &t.PRAction,
318 )
319 if err != nil {
320 return nil, err
321 }
322
323 // Parse created time manually
324 p.Created, err = time.Parse(time.RFC3339, created)
325 if err != nil {
326 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
327 }
328
329 // Link trigger to pipeline
330 t.Id = p.TriggerId
331 p.Trigger = &t
332 p.Statuses = make(map[string]WorkflowStatus)
333
334 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey)
335 pipelines[k] = p
336 }
337
338 // get all statuses
339 // the where clause here is of the form:
340 //
341 // where (pipeline_knot = k1 and pipeline_rkey = r1)
342 // or (pipeline_knot = k2 and pipeline_rkey = r2)
343 conditions = nil
344 args = nil
345 for _, p := range pipelines {
346 knotFilter := FilterEq("pipeline_knot", p.Knot)
347 rkeyFilter := FilterEq("pipeline_rkey", p.Rkey)
348 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
349 args = append(args, p.Knot)
350 args = append(args, p.Rkey)
351 }
352 whereClause = ""
353 if conditions != nil {
354 whereClause = "where " + strings.Join(conditions, " or ")
355 }
356 query = fmt.Sprintf(`
357 select
358 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code
359 from
360 pipeline_statuses
361 %s
362 `, whereClause)
363
364 rows, err = e.Query(query, args...)
365 if err != nil {
366 return nil, err
367 }
368 defer rows.Close()
369
370 for rows.Next() {
371 var ps PipelineStatus
372 var created string
373
374 err := rows.Scan(
375 &ps.ID,
376 &ps.Spindle,
377 &ps.Rkey,
378 &ps.PipelineKnot,
379 &ps.PipelineRkey,
380 &created,
381 &ps.Workflow,
382 &ps.Status,
383 &ps.Error,
384 &ps.ExitCode,
385 )
386 if err != nil {
387 return nil, err
388 }
389
390 ps.Created, err = time.Parse(time.RFC3339, created)
391 if err != nil {
392 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
393 }
394
395 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey)
396
397 // extract
398 pipeline, ok := pipelines[key]
399 if !ok {
400 continue
401 }
402 statuses, _ := pipeline.Statuses[ps.Workflow]
403 if !ok {
404 pipeline.Statuses[ps.Workflow] = WorkflowStatus{}
405 }
406
407 // append
408 statuses.data = append(statuses.data, ps)
409
410 // reassign
411 pipeline.Statuses[ps.Workflow] = statuses
412 pipelines[key] = pipeline
413 }
414
415 var all []Pipeline
416 for _, p := range pipelines {
417 for _, s := range p.Statuses {
418 slices.SortFunc(s.data, func(a, b PipelineStatus) int {
419 if a.Created.After(b.Created) {
420 return 1
421 }
422 return -1
423 })
424 }
425 all = append(all, p)
426 }
427
428 return all, nil
429}