Monorepo for Tangled
at a1e6f90382debc0c86f094b22ce67608c2243ef9 415 lines 8.1 kB view raw
1package db 2 3import ( 4 "fmt" 5 "slices" 6 "strings" 7 "time" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/appview/models" 11 "tangled.org/core/orm" 12) 13 14func GetPipelines(e Execer, filters ...orm.Filter) ([]models.Pipeline, error) { 15 var pipelines []models.Pipeline 16 17 var conditions []string 18 var args []any 19 for _, filter := range filters { 20 conditions = append(conditions, filter.Condition()) 21 args = append(args, filter.Arg()...) 22 } 23 24 whereClause := "" 25 if conditions != nil { 26 whereClause = " where " + strings.Join(conditions, " and ") 27 } 28 29 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) 30 31 rows, err := e.Query(query, args...) 32 33 if err != nil { 34 return nil, err 35 } 36 defer rows.Close() 37 38 for rows.Next() { 39 var pipeline models.Pipeline 40 var createdAt string 41 err = rows.Scan( 42 &pipeline.Id, 43 &pipeline.Rkey, 44 &pipeline.Knot, 45 &pipeline.RepoOwner, 46 &pipeline.RepoName, 47 &pipeline.Sha, 48 &createdAt, 49 ) 50 if err != nil { 51 return nil, err 52 } 53 54 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 55 pipeline.Created = t 56 } 57 58 pipelines = append(pipelines, pipeline) 59 } 60 61 if err = rows.Err(); err != nil { 62 return nil, err 63 } 64 65 return pipelines, nil 66} 67 68func AddPipeline(e Execer, pipeline models.Pipeline) error { 69 args := []any{ 70 pipeline.Rkey, 71 pipeline.Knot, 72 pipeline.RepoOwner, 73 pipeline.RepoName, 74 pipeline.TriggerId, 75 pipeline.Sha, 76 } 77 78 placeholders := make([]string, len(args)) 79 for i := range placeholders { 80 placeholders[i] = "?" 81 } 82 83 query := fmt.Sprintf(` 84 insert or ignore into pipelines ( 85 rkey, 86 knot, 87 repo_owner, 88 repo_name, 89 trigger_id, 90 sha 91 ) values (%s) 92 `, strings.Join(placeholders, ",")) 93 94 _, err := e.Exec(query, args...) 95 96 return err 97} 98 99func AddTrigger(e Execer, trigger models.Trigger) (int64, error) { 100 args := []any{ 101 trigger.Kind, 102 trigger.PushRef, 103 trigger.PushNewSha, 104 trigger.PushOldSha, 105 trigger.PRSourceBranch, 106 trigger.PRTargetBranch, 107 trigger.PRSourceSha, 108 trigger.PRAction, 109 } 110 111 placeholders := make([]string, len(args)) 112 for i := range placeholders { 113 placeholders[i] = "?" 114 } 115 116 query := fmt.Sprintf(`insert or ignore into triggers ( 117 kind, 118 push_ref, 119 push_new_sha, 120 push_old_sha, 121 pr_source_branch, 122 pr_target_branch, 123 pr_source_sha, 124 pr_action 125 ) values (%s)`, strings.Join(placeholders, ",")) 126 127 res, err := e.Exec(query, args...) 128 if err != nil { 129 return 0, err 130 } 131 132 return res.LastInsertId() 133} 134 135func AddPipelineStatus(e Execer, status models.PipelineStatus) error { 136 args := []any{ 137 status.Spindle, 138 status.Rkey, 139 status.PipelineKnot, 140 status.PipelineRkey, 141 status.Workflow, 142 status.Status, 143 status.Error, 144 status.ExitCode, 145 status.Created.Format(time.RFC3339), 146 } 147 148 placeholders := make([]string, len(args)) 149 for i := range placeholders { 150 placeholders[i] = "?" 151 } 152 153 query := fmt.Sprintf(` 154 insert or ignore into pipeline_statuses ( 155 spindle, 156 rkey, 157 pipeline_knot, 158 pipeline_rkey, 159 workflow, 160 status, 161 error, 162 exit_code, 163 created 164 ) values (%s) 165 `, strings.Join(placeholders, ",")) 166 167 _, err := e.Exec(query, args...) 168 return err 169} 170 171// this is a mega query, but the most useful one: 172// get N pipelines, for each one get the latest status of its N workflows 173// 174// the pipelines table is aliased to `p` 175// the triggers table is aliased to `t` 176func GetPipelineStatuses(e Execer, limit int, filters ...orm.Filter) ([]models.Pipeline, error) { 177 var conditions []string 178 var args []any 179 for _, filter := range filters { 180 conditions = append(conditions, filter.Condition()) 181 args = append(args, filter.Arg()...) 182 } 183 184 whereClause := "" 185 if conditions != nil { 186 whereClause = " where " + strings.Join(conditions, " and ") 187 } 188 189 query := fmt.Sprintf(` 190 select 191 p.id, 192 p.knot, 193 p.rkey, 194 p.repo_owner, 195 p.repo_name, 196 p.sha, 197 p.created, 198 t.id, 199 t.kind, 200 t.push_ref, 201 t.push_new_sha, 202 t.push_old_sha, 203 t.pr_source_branch, 204 t.pr_target_branch, 205 t.pr_source_sha, 206 t.pr_action 207 from 208 pipelines p 209 join 210 triggers t ON p.trigger_id = t.id 211 %s 212 order by p.created desc 213 limit %d 214 `, whereClause, limit) 215 216 rows, err := e.Query(query, args...) 217 if err != nil { 218 return nil, err 219 } 220 defer rows.Close() 221 222 pipelines := make(map[syntax.ATURI]models.Pipeline) 223 for rows.Next() { 224 var p models.Pipeline 225 var t models.Trigger 226 var created string 227 228 err := rows.Scan( 229 &p.Id, 230 &p.Knot, 231 &p.Rkey, 232 &p.RepoOwner, 233 &p.RepoName, 234 &p.Sha, 235 &created, 236 &p.TriggerId, 237 &t.Kind, 238 &t.PushRef, 239 &t.PushNewSha, 240 &t.PushOldSha, 241 &t.PRSourceBranch, 242 &t.PRTargetBranch, 243 &t.PRSourceSha, 244 &t.PRAction, 245 ) 246 if err != nil { 247 return nil, err 248 } 249 250 p.Created, err = time.Parse(time.RFC3339, created) 251 if err != nil { 252 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 253 } 254 255 t.Id = p.TriggerId 256 p.Trigger = &t 257 p.Statuses = make(map[string]models.WorkflowStatus) 258 259 pipelines[p.AtUri()] = p 260 } 261 262 // get all statuses 263 // the where clause here is of the form: 264 // 265 // where (pipeline_knot = k1 and pipeline_rkey = r1) 266 // or (pipeline_knot = k2 and pipeline_rkey = r2) 267 conditions = nil 268 args = nil 269 for _, p := range pipelines { 270 knotFilter := orm.FilterEq("pipeline_knot", p.Knot) 271 rkeyFilter := orm.FilterEq("pipeline_rkey", p.Rkey) 272 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 273 args = append(args, p.Knot) 274 args = append(args, p.Rkey) 275 } 276 whereClause = "" 277 if conditions != nil { 278 whereClause = "where " + strings.Join(conditions, " or ") 279 } 280 query = fmt.Sprintf(` 281 select 282 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code 283 from 284 pipeline_statuses 285 %s 286 `, whereClause) 287 288 rows, err = e.Query(query, args...) 289 if err != nil { 290 return nil, err 291 } 292 defer rows.Close() 293 294 for rows.Next() { 295 var ps models.PipelineStatus 296 var created string 297 298 err := rows.Scan( 299 &ps.ID, 300 &ps.Spindle, 301 &ps.Rkey, 302 &ps.PipelineKnot, 303 &ps.PipelineRkey, 304 &created, 305 &ps.Workflow, 306 &ps.Status, 307 &ps.Error, 308 &ps.ExitCode, 309 ) 310 if err != nil { 311 return nil, err 312 } 313 314 ps.Created, err = time.Parse(time.RFC3339, created) 315 if err != nil { 316 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 317 } 318 319 pipelineAt := ps.PipelineAt() 320 321 // extract 322 pipeline, ok := pipelines[pipelineAt] 323 if !ok { 324 continue 325 } 326 statuses, _ := pipeline.Statuses[ps.Workflow] 327 if !ok { 328 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{} 329 } 330 331 // append 332 statuses.Data = append(statuses.Data, ps) 333 334 // reassign 335 pipeline.Statuses[ps.Workflow] = statuses 336 pipelines[pipelineAt] = pipeline 337 } 338 339 var all []models.Pipeline 340 for _, p := range pipelines { 341 for _, s := range p.Statuses { 342 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int { 343 if a.Created.After(b.Created) { 344 return 1 345 } 346 if a.Created.Before(b.Created) { 347 return -1 348 } 349 if a.ID > b.ID { 350 return 1 351 } 352 if a.ID < b.ID { 353 return -1 354 } 355 return 0 356 }) 357 } 358 all = append(all, p) 359 } 360 361 // sort pipelines by date 362 slices.SortFunc(all, func(a, b models.Pipeline) int { 363 if a.Created.After(b.Created) { 364 return -1 365 } 366 return 1 367 }) 368 369 return all, nil 370} 371 372// the pipelines table is aliased to `p` 373// the triggers table is aliased to `t` 374func GetTotalPipelineStatuses(e Execer, filters ...orm.Filter) (int64, error) { 375 var conditions []string 376 var args []any 377 for _, filter := range filters { 378 conditions = append(conditions, filter.Condition()) 379 args = append(args, filter.Arg()...) 380 } 381 382 whereClause := "" 383 if conditions != nil { 384 whereClause = " where " + strings.Join(conditions, " and ") 385 } 386 387 query := fmt.Sprintf(` 388 select 389 count(1) 390 from 391 pipelines p 392 join 393 triggers t ON p.trigger_id = t.id 394 %s 395 `, whereClause) 396 397 rows, err := e.Query(query, args...) 398 if err != nil { 399 return 0, err 400 } 401 defer rows.Close() 402 403 for rows.Next() { 404 var count int64 405 err := rows.Scan(&count) 406 if err != nil { 407 return 0, err 408 } 409 410 return count, nil 411 } 412 413 // unreachable 414 return 0, nil 415}