this repo has no description
1package db
2
3import (
4 "fmt"
5)
6
7type Pipeline struct {
8 Rkey string
9 PipelineJson string
10}
11
12func (d *DB) InsertPipeline(pipeline Pipeline) error {
13 _, err := d.Exec(
14 `insert into pipelines (rkey, nsid, event) values (?, ?, ?)`,
15 pipeline.Rkey,
16 pipeline.PipelineJson,
17 )
18
19 return err
20}
21
22func (d *DB) GetPipeline(rkey, cursor string) (Pipeline, error) {
23 whereClause := "where rkey = ?"
24 args := []any{rkey}
25
26 if cursor != "" {
27 whereClause += " and rkey > ?"
28 args = append(args, cursor)
29 }
30
31 query := fmt.Sprintf(`
32 select rkey, pipeline
33 from pipelines
34 %s
35 limit 1
36 `, whereClause)
37
38 row := d.QueryRow(query, args...)
39
40 var p Pipeline
41 err := row.Scan(&p.Rkey, &p.PipelineJson)
42 if err != nil {
43 return Pipeline{}, err
44 }
45
46 return p, nil
47}
48
49func (d *DB) GetPipelines(cursor string) ([]Pipeline, error) {
50 whereClause := ""
51 args := []any{}
52 if cursor != "" {
53 whereClause = "where rkey > ?"
54 args = append(args, cursor)
55 }
56
57 query := fmt.Sprintf(`
58 select rkey, nsid, pipeline
59 from pipelines
60 %s
61 order by rkey asc
62 limit 100
63 `, whereClause)
64
65 rows, err := d.Query(query, args...)
66 if err != nil {
67 return nil, err
68 }
69 defer rows.Close()
70
71 var evts []Pipeline
72 for rows.Next() {
73 var ev Pipeline
74 rows.Scan(&ev.Rkey, &ev.PipelineJson)
75 evts = append(evts, ev)
76 }
77
78 if err := rows.Err(); err != nil {
79 return nil, err
80 }
81
82 return evts, nil
83}