this repo has no description
1package db 2 3import ( 4 "fmt" 5) 6 7type Pipeline struct { 8 Rkey string 9 PipelineJson string 10} 11 12func (d *DB) InsertPipeline(pipeline Pipeline) error { 13 _, err := d.Exec( 14 `insert into pipelines (rkey, nsid, event) values (?, ?, ?)`, 15 pipeline.Rkey, 16 pipeline.PipelineJson, 17 ) 18 19 return err 20} 21 22func (d *DB) GetPipeline(rkey, cursor string) (Pipeline, error) { 23 whereClause := "where rkey = ?" 24 args := []any{rkey} 25 26 if cursor != "" { 27 whereClause += " and rkey > ?" 28 args = append(args, cursor) 29 } 30 31 query := fmt.Sprintf(` 32 select rkey, pipeline 33 from pipelines 34 %s 35 limit 1 36 `, whereClause) 37 38 row := d.QueryRow(query, args...) 39 40 var p Pipeline 41 err := row.Scan(&p.Rkey, &p.PipelineJson) 42 if err != nil { 43 return Pipeline{}, err 44 } 45 46 return p, nil 47} 48 49func (d *DB) GetPipelines(cursor string) ([]Pipeline, error) { 50 whereClause := "" 51 args := []any{} 52 if cursor != "" { 53 whereClause = "where rkey > ?" 54 args = append(args, cursor) 55 } 56 57 query := fmt.Sprintf(` 58 select rkey, nsid, pipeline 59 from pipelines 60 %s 61 order by rkey asc 62 limit 100 63 `, whereClause) 64 65 rows, err := d.Query(query, args...) 66 if err != nil { 67 return nil, err 68 } 69 defer rows.Close() 70 71 var evts []Pipeline 72 for rows.Next() { 73 var ev Pipeline 74 rows.Scan(&ev.Rkey, &ev.PipelineJson) 75 evts = append(evts, ev) 76 } 77 78 if err := rows.Err(); err != nil { 79 return nil, err 80 } 81 82 return evts, nil 83}