this repo has no description
1package db 2 3import ( 4 "encoding/json" 5 "fmt" 6 "time" 7 8 "tangled.org/core/api/tangled" 9 "tangled.org/core/notifier" 10 "tangled.org/core/spindle/models" 11 "tangled.org/core/tid" 12) 13 14type Event struct { 15 Rkey string `json:"rkey"` 16 Nsid string `json:"nsid"` 17 Created int64 `json:"created"` 18 EventJson string `json:"event"` 19} 20 21func (d *DB) insertEvent(event Event, notifier *notifier.Notifier) error { 22 _, err := d.Exec( 23 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`, 24 event.Rkey, 25 event.Nsid, 26 event.EventJson, 27 time.Now().UnixNano(), 28 ) 29 30 notifier.NotifyAll() 31 32 return err 33} 34 35func (d *DB) GetEvents(cursor int64) ([]Event, error) { 36 whereClause := "" 37 args := []any{} 38 if cursor > 0 { 39 whereClause = "where created > ?" 40 args = append(args, cursor) 41 } 42 43 query := fmt.Sprintf(` 44 select rkey, nsid, event, created 45 from events 46 %s 47 order by created asc 48 limit 100 49 `, whereClause) 50 51 rows, err := d.Query(query, args...) 52 if err != nil { 53 return nil, err 54 } 55 defer rows.Close() 56 57 var evts []Event 58 for rows.Next() { 59 var ev Event 60 if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil { 61 return nil, err 62 } 63 evts = append(evts, ev) 64 } 65 66 if err := rows.Err(); err != nil { 67 return nil, err 68 } 69 70 return evts, nil 71} 72 73func (d *DB) CreatePipelineEvent(rkey string, pipeline tangled.Pipeline, n *notifier.Notifier) error { 74 eventJson, err := json.Marshal(pipeline) 75 if err != nil { 76 return err 77 } 78 event := Event{ 79 Rkey: rkey, 80 Nsid: tangled.PipelineNSID, 81 Created: time.Now().UnixNano(), 82 EventJson: string(eventJson), 83 } 84 return d.insertEvent(event, n) 85} 86 87func (d *DB) createStatusEvent( 88 workflowId models.WorkflowId, 89 statusKind models.StatusKind, 90 workflowError *string, 91 exitCode *int64, 92 n *notifier.Notifier, 93) error { 94 now := time.Now() 95 pipelineAtUri := workflowId.PipelineId.AtUri() 96 s := tangled.PipelineStatus{ 97 CreatedAt: now.Format(time.RFC3339), 98 Error: workflowError, 99 ExitCode: exitCode, 100 Pipeline: string(pipelineAtUri), 101 Workflow: workflowId.Name, 102 Status: string(statusKind), 103 } 104 105 eventJson, err := json.Marshal(s) 106 if err != nil { 107 return err 108 } 109 110 event := Event{ 111 Rkey: tid.TID(), 112 Nsid: tangled.PipelineStatusNSID, 113 Created: now.UnixNano(), 114 EventJson: string(eventJson), 115 } 116 117 return d.insertEvent(event, n) 118 119} 120 121func (d *DB) GetStatus(workflowId models.WorkflowId) (*tangled.PipelineStatus, error) { 122 pipelineAtUri := workflowId.PipelineId.AtUri() 123 124 var eventJson string 125 err := d.QueryRow( 126 ` 127 select 128 event from events 129 where 130 nsid = ? 131 and json_extract(event, '$.pipeline') = ? 132 and json_extract(event, '$.workflow') = ? 133 order by 134 created desc 135 limit 136 1 137 `, 138 tangled.PipelineStatusNSID, 139 string(pipelineAtUri), 140 workflowId.Name, 141 ).Scan(&eventJson) 142 143 if err != nil { 144 return nil, err 145 } 146 147 var status tangled.PipelineStatus 148 if err := json.Unmarshal([]byte(eventJson), &status); err != nil { 149 return nil, err 150 } 151 152 return &status, nil 153} 154 155func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error { 156 return d.createStatusEvent(workflowId, models.StatusKindPending, nil, nil, n) 157} 158 159func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error { 160 return d.createStatusEvent(workflowId, models.StatusKindRunning, nil, nil, n) 161} 162 163func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 164 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 165} 166 167func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 168 return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n) 169} 170 171func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { 172 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n) 173} 174 175func (d *DB) StatusTimeout(workflowId models.WorkflowId, n *notifier.Notifier) error { 176 return d.createStatusEvent(workflowId, models.StatusKindTimeout, nil, nil, n) 177}