this repo has no description
1package db 2 3import ( 4 "fmt" 5 "slices" 6 "strings" 7 "time" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 spindle "tangled.sh/tangled.sh/core/spindle/models" 11) 12 13type Pipeline struct { 14 Id int 15 Rkey string 16 Knot string 17 RepoOwner syntax.DID 18 RepoName string 19 TriggerId int 20 Sha string 21 Created time.Time 22 23 // populate when querying for reverse mappings 24 Trigger *Trigger 25 Statuses map[string]WorkflowStatus 26} 27 28type WorkflowStatus struct { 29 data []PipelineStatus 30} 31 32func (w WorkflowStatus) Latest() PipelineStatus { 33 return w.data[len(w.data)-1] 34} 35 36// time taken by this workflow to reach an "end state" 37func (w WorkflowStatus) TimeTaken() time.Duration { 38 var start, end *time.Time 39 for _, s := range w.data { 40 if s.Status.IsStart() { 41 start = &s.Created 42 } 43 if s.Status.IsFinish() { 44 end = &s.Created 45 } 46 } 47 48 if start != nil && end != nil && end.After(*start) { 49 return end.Sub(*start) 50 } 51 52 return 0 53} 54 55func (p Pipeline) Counts() map[string]int { 56 m := make(map[string]int) 57 for _, w := range p.Statuses { 58 m[w.Latest().Status.String()] += 1 59 } 60 return m 61} 62 63type Trigger struct { 64 Id int 65 Kind string 66 67 // push trigger fields 68 PushRef *string 69 PushNewSha *string 70 PushOldSha *string 71 72 // pull request trigger fields 73 PRSourceBranch *string 74 PRTargetBranch *string 75 PRSourceSha *string 76 PRAction *string 77} 78 79type PipelineStatus struct { 80 ID int 81 Spindle string 82 Rkey string 83 PipelineKnot string 84 PipelineRkey string 85 Created time.Time 86 Workflow string 87 Status spindle.StatusKind 88 Error *string 89 ExitCode int 90} 91 92func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) { 93 var pipelines []Pipeline 94 95 var conditions []string 96 var args []any 97 for _, filter := range filters { 98 conditions = append(conditions, filter.Condition()) 99 args = append(args, filter.Arg()...) 100 } 101 102 whereClause := "" 103 if conditions != nil { 104 whereClause = " where " + strings.Join(conditions, " and ") 105 } 106 107 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) 108 109 rows, err := e.Query(query, args...) 110 111 if err != nil { 112 return nil, err 113 } 114 defer rows.Close() 115 116 for rows.Next() { 117 var pipeline Pipeline 118 var createdAt string 119 err = rows.Scan( 120 &pipeline.Id, 121 &pipeline.Rkey, 122 &pipeline.Knot, 123 &pipeline.RepoOwner, 124 &pipeline.RepoName, 125 &pipeline.Sha, 126 &createdAt, 127 ) 128 if err != nil { 129 return nil, err 130 } 131 132 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 133 pipeline.Created = t 134 } 135 136 pipelines = append(pipelines, pipeline) 137 } 138 139 if err = rows.Err(); err != nil { 140 return nil, err 141 } 142 143 return pipelines, nil 144} 145 146func AddPipeline(e Execer, pipeline Pipeline) error { 147 args := []any{ 148 pipeline.Rkey, 149 pipeline.Knot, 150 pipeline.RepoOwner, 151 pipeline.RepoName, 152 pipeline.TriggerId, 153 pipeline.Sha, 154 } 155 156 placeholders := make([]string, len(args)) 157 for i := range placeholders { 158 placeholders[i] = "?" 159 } 160 161 query := fmt.Sprintf(` 162 insert or ignore into pipelines ( 163 rkey, 164 knot, 165 repo_owner, 166 repo_name, 167 trigger_id, 168 sha 169 ) values (%s) 170 `, strings.Join(placeholders, ",")) 171 172 _, err := e.Exec(query, args...) 173 174 return err 175} 176 177func AddTrigger(e Execer, trigger Trigger) (int64, error) { 178 args := []any{ 179 trigger.Kind, 180 trigger.PushRef, 181 trigger.PushNewSha, 182 trigger.PushOldSha, 183 trigger.PRSourceBranch, 184 trigger.PRTargetBranch, 185 trigger.PRSourceSha, 186 trigger.PRAction, 187 } 188 189 placeholders := make([]string, len(args)) 190 for i := range placeholders { 191 placeholders[i] = "?" 192 } 193 194 query := fmt.Sprintf(`insert or ignore into triggers ( 195 kind, 196 push_ref, 197 push_new_sha, 198 push_old_sha, 199 pr_source_branch, 200 pr_target_branch, 201 pr_source_sha, 202 pr_action 203 ) values (%s)`, strings.Join(placeholders, ",")) 204 205 res, err := e.Exec(query, args...) 206 if err != nil { 207 return 0, err 208 } 209 210 return res.LastInsertId() 211} 212 213func AddPipelineStatus(e Execer, status PipelineStatus) error { 214 args := []any{ 215 status.Spindle, 216 status.Rkey, 217 status.PipelineKnot, 218 status.PipelineRkey, 219 status.Workflow, 220 status.Status, 221 status.Error, 222 status.ExitCode, 223 } 224 225 placeholders := make([]string, len(args)) 226 for i := range placeholders { 227 placeholders[i] = "?" 228 } 229 230 query := fmt.Sprintf(` 231 insert or ignore into pipeline_statuses ( 232 spindle, 233 rkey, 234 pipeline_knot, 235 pipeline_rkey, 236 workflow, 237 status, 238 error, 239 exit_code 240 ) values (%s) 241 `, strings.Join(placeholders, ",")) 242 243 _, err := e.Exec(query, args...) 244 return err 245} 246 247// this is a mega query, but the most useful one: 248// get N pipelines, for each one get the latest status of its N workflows 249func GetPipelineStatuses(e Execer, filters ...filter) ([]Pipeline, error) { 250 var conditions []string 251 var args []any 252 for _, filter := range filters { 253 filter.key = "p." + filter.key // the table is aliased in the query to `p` 254 conditions = append(conditions, filter.Condition()) 255 args = append(args, filter.Arg()...) 256 } 257 258 whereClause := "" 259 if conditions != nil { 260 whereClause = " where " + strings.Join(conditions, " and ") 261 } 262 263 query := fmt.Sprintf(` 264 select 265 p.id AS pipeline_id, 266 p.knot, 267 p.rkey, 268 p.repo_owner, 269 p.repo_name, 270 p.sha, 271 p.created, 272 t.id AS trigger_id, 273 t.kind, 274 t.push_ref, 275 t.push_new_sha, 276 t.push_old_sha, 277 t.pr_source_branch, 278 t.pr_target_branch, 279 t.pr_source_sha, 280 t.pr_action 281 from 282 pipelines p 283 join 284 triggers t ON p.trigger_id = t.id 285 %s 286 order by p.created desc 287 `, whereClause) 288 289 rows, err := e.Query(query, args...) 290 if err != nil { 291 return nil, err 292 } 293 defer rows.Close() 294 295 pipelines := make(map[string]Pipeline) 296 for rows.Next() { 297 var p Pipeline 298 var t Trigger 299 var created string 300 301 err := rows.Scan( 302 &p.Id, 303 &p.Knot, 304 &p.Rkey, 305 &p.RepoOwner, 306 &p.RepoName, 307 &p.Sha, 308 &created, 309 &p.TriggerId, 310 &t.Kind, 311 &t.PushRef, 312 &t.PushNewSha, 313 &t.PushOldSha, 314 &t.PRSourceBranch, 315 &t.PRTargetBranch, 316 &t.PRSourceSha, 317 &t.PRAction, 318 ) 319 if err != nil { 320 return nil, err 321 } 322 323 // Parse created time manually 324 p.Created, err = time.Parse(time.RFC3339, created) 325 if err != nil { 326 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 327 } 328 329 // Link trigger to pipeline 330 t.Id = p.TriggerId 331 p.Trigger = &t 332 p.Statuses = make(map[string]WorkflowStatus) 333 334 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey) 335 pipelines[k] = p 336 } 337 338 // get all statuses 339 // the where clause here is of the form: 340 // 341 // where (pipeline_knot = k1 and pipeline_rkey = r1) 342 // or (pipeline_knot = k2 and pipeline_rkey = r2) 343 conditions = nil 344 args = nil 345 for _, p := range pipelines { 346 knotFilter := FilterEq("pipeline_knot", p.Knot) 347 rkeyFilter := FilterEq("pipeline_rkey", p.Rkey) 348 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 349 args = append(args, p.Knot) 350 args = append(args, p.Rkey) 351 } 352 whereClause = "" 353 if conditions != nil { 354 whereClause = "where " + strings.Join(conditions, " or ") 355 } 356 query = fmt.Sprintf(` 357 select 358 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code 359 from 360 pipeline_statuses 361 %s 362 `, whereClause) 363 364 rows, err = e.Query(query, args...) 365 if err != nil { 366 return nil, err 367 } 368 defer rows.Close() 369 370 for rows.Next() { 371 var ps PipelineStatus 372 var created string 373 374 err := rows.Scan( 375 &ps.ID, 376 &ps.Spindle, 377 &ps.Rkey, 378 &ps.PipelineKnot, 379 &ps.PipelineRkey, 380 &created, 381 &ps.Workflow, 382 &ps.Status, 383 &ps.Error, 384 &ps.ExitCode, 385 ) 386 if err != nil { 387 return nil, err 388 } 389 390 ps.Created, err = time.Parse(time.RFC3339, created) 391 if err != nil { 392 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 393 } 394 395 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey) 396 397 // extract 398 pipeline, ok := pipelines[key] 399 if !ok { 400 continue 401 } 402 statuses, _ := pipeline.Statuses[ps.Workflow] 403 if !ok { 404 pipeline.Statuses[ps.Workflow] = WorkflowStatus{} 405 } 406 407 // append 408 statuses.data = append(statuses.data, ps) 409 410 // reassign 411 pipeline.Statuses[ps.Workflow] = statuses 412 pipelines[key] = pipeline 413 } 414 415 var all []Pipeline 416 for _, p := range pipelines { 417 for _, s := range p.Statuses { 418 slices.SortFunc(s.data, func(a, b PipelineStatus) int { 419 if a.Created.After(b.Created) { 420 return 1 421 } 422 return -1 423 }) 424 } 425 all = append(all, p) 426 } 427 428 return all, nil 429}