Monorepo for Tangled
at master 434 lines 8.5 kB view raw
1package db 2 3import ( 4 "database/sql" 5 "fmt" 6 "slices" 7 "strings" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "tangled.org/core/appview/models" 12 "tangled.org/core/orm" 13) 14 15func GetPipelines(e Execer, filters ...orm.Filter) ([]models.Pipeline, error) { 16 var pipelines []models.Pipeline 17 18 var conditions []string 19 var args []any 20 for _, filter := range filters { 21 conditions = append(conditions, filter.Condition()) 22 args = append(args, filter.Arg()...) 23 } 24 25 whereClause := "" 26 if conditions != nil { 27 whereClause = " where " + strings.Join(conditions, " and ") 28 } 29 30 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created, repo_did from pipelines %s`, whereClause) 31 32 rows, err := e.Query(query, args...) 33 34 if err != nil { 35 return nil, err 36 } 37 defer rows.Close() 38 39 for rows.Next() { 40 var pipeline models.Pipeline 41 var createdAt string 42 var repoDid sql.NullString 43 err = rows.Scan( 44 &pipeline.Id, 45 &pipeline.Rkey, 46 &pipeline.Knot, 47 &pipeline.RepoOwner, 48 &pipeline.RepoName, 49 &pipeline.Sha, 50 &createdAt, 51 &repoDid, 52 ) 53 if err != nil { 54 return nil, err 55 } 56 57 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 58 pipeline.Created = t 59 } 60 if repoDid.Valid { 61 pipeline.RepoDid = repoDid.String 62 } 63 64 pipelines = append(pipelines, pipeline) 65 } 66 67 if err = rows.Err(); err != nil { 68 return nil, err 69 } 70 71 return pipelines, nil 72} 73 74func AddPipeline(e Execer, pipeline models.Pipeline) error { 75 var repoDid *string 76 if pipeline.RepoDid != "" { 77 repoDid = &pipeline.RepoDid 78 } 79 80 args := []any{ 81 pipeline.Rkey, 82 pipeline.Knot, 83 pipeline.RepoOwner, 84 pipeline.RepoName, 85 pipeline.TriggerId, 86 pipeline.Sha, 87 repoDid, 88 } 89 90 placeholders := make([]string, len(args)) 91 for i := range placeholders { 92 placeholders[i] = "?" 93 } 94 95 query := fmt.Sprintf(` 96 insert or ignore into pipelines ( 97 rkey, 98 knot, 99 repo_owner, 100 repo_name, 101 trigger_id, 102 sha, 103 repo_did 104 ) values (%s) 105 `, strings.Join(placeholders, ",")) 106 107 _, err := e.Exec(query, args...) 108 109 return err 110} 111 112func AddTrigger(e Execer, trigger models.Trigger) (int64, error) { 113 args := []any{ 114 trigger.Kind, 115 trigger.PushRef, 116 trigger.PushNewSha, 117 trigger.PushOldSha, 118 trigger.PRSourceBranch, 119 trigger.PRTargetBranch, 120 trigger.PRSourceSha, 121 trigger.PRAction, 122 } 123 124 placeholders := make([]string, len(args)) 125 for i := range placeholders { 126 placeholders[i] = "?" 127 } 128 129 query := fmt.Sprintf(`insert or ignore into triggers ( 130 kind, 131 push_ref, 132 push_new_sha, 133 push_old_sha, 134 pr_source_branch, 135 pr_target_branch, 136 pr_source_sha, 137 pr_action 138 ) values (%s)`, strings.Join(placeholders, ",")) 139 140 res, err := e.Exec(query, args...) 141 if err != nil { 142 return 0, err 143 } 144 145 return res.LastInsertId() 146} 147 148func AddPipelineStatus(e Execer, status models.PipelineStatus) error { 149 args := []any{ 150 status.Spindle, 151 status.Rkey, 152 status.PipelineKnot, 153 status.PipelineRkey, 154 status.Workflow, 155 status.Status, 156 status.Error, 157 status.ExitCode, 158 status.Created.Format(time.RFC3339), 159 } 160 161 placeholders := make([]string, len(args)) 162 for i := range placeholders { 163 placeholders[i] = "?" 164 } 165 166 query := fmt.Sprintf(` 167 insert or ignore into pipeline_statuses ( 168 spindle, 169 rkey, 170 pipeline_knot, 171 pipeline_rkey, 172 workflow, 173 status, 174 error, 175 exit_code, 176 created 177 ) values (%s) 178 `, strings.Join(placeholders, ",")) 179 180 _, err := e.Exec(query, args...) 181 return err 182} 183 184// this is a mega query, but the most useful one: 185// get N pipelines, for each one get the latest status of its N workflows 186// 187// the pipelines table is aliased to `p` 188// the triggers table is aliased to `t` 189func GetPipelineStatuses(e Execer, limit int, filters ...orm.Filter) ([]models.Pipeline, error) { 190 var conditions []string 191 var args []any 192 for _, filter := range filters { 193 conditions = append(conditions, filter.Condition()) 194 args = append(args, filter.Arg()...) 195 } 196 197 whereClause := "" 198 if conditions != nil { 199 whereClause = " where " + strings.Join(conditions, " and ") 200 } 201 202 query := fmt.Sprintf(` 203 select 204 p.id, 205 p.knot, 206 p.rkey, 207 p.repo_owner, 208 p.repo_name, 209 p.sha, 210 p.created, 211 p.repo_did, 212 t.id, 213 t.kind, 214 t.push_ref, 215 t.push_new_sha, 216 t.push_old_sha, 217 t.pr_source_branch, 218 t.pr_target_branch, 219 t.pr_source_sha, 220 t.pr_action 221 from 222 pipelines p 223 join 224 triggers t ON p.trigger_id = t.id 225 %s 226 order by p.created desc 227 limit %d 228 `, whereClause, limit) 229 230 rows, err := e.Query(query, args...) 231 if err != nil { 232 return nil, err 233 } 234 defer rows.Close() 235 236 pipelines := make(map[syntax.ATURI]models.Pipeline) 237 for rows.Next() { 238 var p models.Pipeline 239 var t models.Trigger 240 var created string 241 var repoDid sql.NullString 242 243 err := rows.Scan( 244 &p.Id, 245 &p.Knot, 246 &p.Rkey, 247 &p.RepoOwner, 248 &p.RepoName, 249 &p.Sha, 250 &created, 251 &repoDid, 252 &p.TriggerId, 253 &t.Kind, 254 &t.PushRef, 255 &t.PushNewSha, 256 &t.PushOldSha, 257 &t.PRSourceBranch, 258 &t.PRTargetBranch, 259 &t.PRSourceSha, 260 &t.PRAction, 261 ) 262 if err != nil { 263 return nil, err 264 } 265 266 p.Created, err = time.Parse(time.RFC3339, created) 267 if err != nil { 268 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 269 } 270 if repoDid.Valid { 271 p.RepoDid = repoDid.String 272 } 273 274 t.Id = p.TriggerId 275 p.Trigger = &t 276 p.Statuses = make(map[string]models.WorkflowStatus) 277 278 pipelines[p.AtUri()] = p 279 } 280 281 // get all statuses 282 // the where clause here is of the form: 283 // 284 // where (pipeline_knot = k1 and pipeline_rkey = r1) 285 // or (pipeline_knot = k2 and pipeline_rkey = r2) 286 conditions = nil 287 args = nil 288 for _, p := range pipelines { 289 knotFilter := orm.FilterEq("pipeline_knot", p.Knot) 290 rkeyFilter := orm.FilterEq("pipeline_rkey", p.Rkey) 291 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 292 args = append(args, p.Knot) 293 args = append(args, p.Rkey) 294 } 295 whereClause = "" 296 if conditions != nil { 297 whereClause = "where " + strings.Join(conditions, " or ") 298 } 299 query = fmt.Sprintf(` 300 select 301 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code 302 from 303 pipeline_statuses 304 %s 305 `, whereClause) 306 307 rows, err = e.Query(query, args...) 308 if err != nil { 309 return nil, err 310 } 311 defer rows.Close() 312 313 for rows.Next() { 314 var ps models.PipelineStatus 315 var created string 316 317 err := rows.Scan( 318 &ps.ID, 319 &ps.Spindle, 320 &ps.Rkey, 321 &ps.PipelineKnot, 322 &ps.PipelineRkey, 323 &created, 324 &ps.Workflow, 325 &ps.Status, 326 &ps.Error, 327 &ps.ExitCode, 328 ) 329 if err != nil { 330 return nil, err 331 } 332 333 ps.Created, err = time.Parse(time.RFC3339, created) 334 if err != nil { 335 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 336 } 337 338 pipelineAt := ps.PipelineAt() 339 340 // extract 341 pipeline, ok := pipelines[pipelineAt] 342 if !ok { 343 continue 344 } 345 statuses, _ := pipeline.Statuses[ps.Workflow] 346 if !ok { 347 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{} 348 } 349 350 // append 351 statuses.Data = append(statuses.Data, ps) 352 353 // reassign 354 pipeline.Statuses[ps.Workflow] = statuses 355 pipelines[pipelineAt] = pipeline 356 } 357 358 var all []models.Pipeline 359 for _, p := range pipelines { 360 for _, s := range p.Statuses { 361 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int { 362 if a.Created.After(b.Created) { 363 return 1 364 } 365 if a.Created.Before(b.Created) { 366 return -1 367 } 368 if a.ID > b.ID { 369 return 1 370 } 371 if a.ID < b.ID { 372 return -1 373 } 374 return 0 375 }) 376 } 377 all = append(all, p) 378 } 379 380 // sort pipelines by date 381 slices.SortFunc(all, func(a, b models.Pipeline) int { 382 if a.Created.After(b.Created) { 383 return -1 384 } 385 return 1 386 }) 387 388 return all, nil 389} 390 391// the pipelines table is aliased to `p` 392// the triggers table is aliased to `t` 393func GetTotalPipelineStatuses(e Execer, filters ...orm.Filter) (int64, error) { 394 var conditions []string 395 var args []any 396 for _, filter := range filters { 397 conditions = append(conditions, filter.Condition()) 398 args = append(args, filter.Arg()...) 399 } 400 401 whereClause := "" 402 if conditions != nil { 403 whereClause = " where " + strings.Join(conditions, " and ") 404 } 405 406 query := fmt.Sprintf(` 407 select 408 count(1) 409 from 410 pipelines p 411 join 412 triggers t ON p.trigger_id = t.id 413 %s 414 `, whereClause) 415 416 rows, err := e.Query(query, args...) 417 if err != nil { 418 return 0, err 419 } 420 defer rows.Close() 421 422 for rows.Next() { 423 var count int64 424 err := rows.Scan(&count) 425 if err != nil { 426 return 0, err 427 } 428 429 return count, nil 430 } 431 432 // unreachable 433 return 0, nil 434}