this repo has no description
1package db
2
3import (
4 "encoding/json"
5 "fmt"
6 "time"
7
8 "tangled.org/core/api/tangled"
9 "tangled.org/core/notifier"
10 "tangled.org/core/spindle/models"
11 "tangled.org/core/tid"
12)
13
14type Event struct {
15 Rkey string `json:"rkey"`
16 Nsid string `json:"nsid"`
17 Created int64 `json:"created"`
18 EventJson string `json:"event"`
19}
20
21func (d *DB) insertEvent(event Event, notifier *notifier.Notifier) error {
22 _, err := d.Exec(
23 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`,
24 event.Rkey,
25 event.Nsid,
26 event.EventJson,
27 time.Now().UnixNano(),
28 )
29
30 notifier.NotifyAll()
31
32 return err
33}
34
35func (d *DB) GetEvents(cursor int64) ([]Event, error) {
36 whereClause := ""
37 args := []any{}
38 if cursor > 0 {
39 whereClause = "where created > ?"
40 args = append(args, cursor)
41 }
42
43 query := fmt.Sprintf(`
44 select rkey, nsid, event, created
45 from events
46 %s
47 order by created asc
48 limit 100
49 `, whereClause)
50
51 rows, err := d.Query(query, args...)
52 if err != nil {
53 return nil, err
54 }
55 defer rows.Close()
56
57 var evts []Event
58 for rows.Next() {
59 var ev Event
60 if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil {
61 return nil, err
62 }
63 evts = append(evts, ev)
64 }
65
66 if err := rows.Err(); err != nil {
67 return nil, err
68 }
69
70 return evts, nil
71}
72
73func (d *DB) CreatePipelineEvent(rkey string, pipeline tangled.Pipeline, n *notifier.Notifier) error {
74 eventJson, err := json.Marshal(pipeline)
75 if err != nil {
76 return err
77 }
78 event := Event{
79 Rkey: rkey,
80 Nsid: tangled.PipelineNSID,
81 Created: time.Now().UnixNano(),
82 EventJson: string(eventJson),
83 }
84 return d.insertEvent(event, n)
85}
86
87func (d *DB) createStatusEvent(
88 workflowId models.WorkflowId,
89 statusKind models.StatusKind,
90 workflowError *string,
91 exitCode *int64,
92 n *notifier.Notifier,
93) error {
94 now := time.Now()
95 pipelineAtUri := workflowId.PipelineId.AtUri()
96 s := tangled.PipelineStatus{
97 CreatedAt: now.Format(time.RFC3339),
98 Error: workflowError,
99 ExitCode: exitCode,
100 Pipeline: string(pipelineAtUri),
101 Workflow: workflowId.Name,
102 Status: string(statusKind),
103 }
104
105 eventJson, err := json.Marshal(s)
106 if err != nil {
107 return err
108 }
109
110 event := Event{
111 Rkey: tid.TID(),
112 Nsid: tangled.PipelineStatusNSID,
113 Created: now.UnixNano(),
114 EventJson: string(eventJson),
115 }
116
117 return d.insertEvent(event, n)
118
119}
120
121func (d *DB) GetStatus(workflowId models.WorkflowId) (*tangled.PipelineStatus, error) {
122 pipelineAtUri := workflowId.PipelineId.AtUri()
123
124 var eventJson string
125 err := d.QueryRow(
126 `
127 select
128 event from events
129 where
130 nsid = ?
131 and json_extract(event, '$.pipeline') = ?
132 and json_extract(event, '$.workflow') = ?
133 order by
134 created desc
135 limit
136 1
137 `,
138 tangled.PipelineStatusNSID,
139 string(pipelineAtUri),
140 workflowId.Name,
141 ).Scan(&eventJson)
142
143 if err != nil {
144 return nil, err
145 }
146
147 var status tangled.PipelineStatus
148 if err := json.Unmarshal([]byte(eventJson), &status); err != nil {
149 return nil, err
150 }
151
152 return &status, nil
153}
154
155func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error {
156 return d.createStatusEvent(workflowId, models.StatusKindPending, nil, nil, n)
157}
158
159func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error {
160 return d.createStatusEvent(workflowId, models.StatusKindRunning, nil, nil, n)
161}
162
163func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error {
164 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n)
165}
166
167func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error {
168 return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n)
169}
170
171func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error {
172 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n)
173}
174
175func (d *DB) StatusTimeout(workflowId models.WorkflowId, n *notifier.Notifier) error {
176 return d.createStatusEvent(workflowId, models.StatusKindTimeout, nil, nil, n)
177}