this repo has no description
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/appview/cache" 14 "tangled.org/core/appview/config" 15 "tangled.org/core/appview/db" 16 "tangled.org/core/appview/models" 17 ec "tangled.org/core/eventconsumer" 18 "tangled.org/core/eventconsumer/cursor" 19 "tangled.org/core/log" 20 "tangled.org/core/orm" 21 "tangled.org/core/rbac" 22 spindle "tangled.org/core/spindle/models" 23 "tangled.org/core/workflow" 24) 25 26func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) { 27 logger := log.FromContext(ctx) 28 logger = log.SubLogger(logger, "spindlestream") 29 30 spindles, err := db.GetSpindles( 31 d, 32 orm.FilterIsNot("verified", "null"), 33 ) 34 if err != nil { 35 return nil, err 36 } 37 38 srcs := make(map[ec.Source]struct{}) 39 for _, s := range spindles { 40 src := ec.NewSpindleSource(s.Instance) 41 srcs[src] = struct{}{} 42 } 43 44 cache := cache.New(c.Redis.Addr) 45 cursorStore := cursor.NewRedisCursorStore(cache) 46 47 cfg := ec.ConsumerConfig{ 48 Sources: srcs, 49 ProcessFunc: spindleIngester(ctx, logger, d), 50 RetryInterval: c.Spindlestream.RetryInterval, 51 MaxRetryInterval: c.Spindlestream.MaxRetryInterval, 52 ConnectionTimeout: c.Spindlestream.ConnectionTimeout, 53 WorkerCount: c.Spindlestream.WorkerCount, 54 QueueSize: c.Spindlestream.QueueSize, 55 Logger: logger, 56 Dev: c.Core.Dev, 57 CursorStore: &cursorStore, 58 } 59 60 return ec.NewConsumer(cfg), nil 61} 62 63func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc { 64 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 65 switch msg.Nsid { 66 case tangled.PipelineNSID: 67 return ingestPipeline(logger, d, source, msg) 68 case tangled.PipelineStatusNSID: 69 return ingestPipelineStatus(ctx, logger, d, source, msg) 70 } 71 72 return nil 73 } 74} 75 76func ingestPipeline(l *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 77 var record tangled.Pipeline 78 err := json.Unmarshal(msg.EventJson, &record) 79 if err != nil { 80 return err 81 } 82 83 if record.TriggerMetadata == nil { 84 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 85 } 86 87 if record.TriggerMetadata.Repo == nil { 88 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 89 } 90 91 // does this repo have a spindle configured? 92 repos, err := db.GetRepos( 93 d, 94 0, 95 orm.FilterEq("did", record.TriggerMetadata.Repo.Did), 96 orm.FilterEq("name", record.TriggerMetadata.Repo.Repo), 97 ) 98 if err != nil { 99 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 100 } 101 if len(repos) != 1 { 102 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 103 } 104 if repos[0].Spindle == "" { 105 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 106 } 107 108 // trigger info 109 var trigger models.Trigger 110 var sha string 111 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 112 switch trigger.Kind { 113 case workflow.TriggerKindPush: 114 trigger.PushRef = &record.TriggerMetadata.Push.Ref 115 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 116 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 117 sha = *trigger.PushNewSha 118 case workflow.TriggerKindPullRequest: 119 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 120 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 121 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 122 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 123 sha = *trigger.PRSourceSha 124 } 125 126 tx, err := d.Begin() 127 if err != nil { 128 return fmt.Errorf("failed to start txn: %w", err) 129 } 130 131 triggerId, err := db.AddTrigger(tx, trigger) 132 if err != nil { 133 return fmt.Errorf("failed to add trigger entry: %w", err) 134 } 135 136 // TODO: we shouldn't even use knot to identify pipelines 137 knot := record.TriggerMetadata.Repo.Knot 138 pipeline := models.Pipeline{ 139 Rkey: msg.Rkey, 140 Knot: knot, 141 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 142 RepoName: record.TriggerMetadata.Repo.Repo, 143 TriggerId: int(triggerId), 144 Sha: sha, 145 } 146 147 err = db.AddPipeline(tx, pipeline) 148 if err != nil { 149 return fmt.Errorf("failed to add pipeline: %w", err) 150 } 151 152 err = tx.Commit() 153 if err != nil { 154 return fmt.Errorf("failed to commit txn: %w", err) 155 } 156 157 l.Info("added pipeline", "pipeline", pipeline) 158 159 return nil 160} 161 162func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 163 var record tangled.PipelineStatus 164 err := json.Unmarshal(msg.EventJson, &record) 165 if err != nil { 166 return err 167 } 168 169 pipelineUri, err := syntax.ParseATURI(record.Pipeline) 170 if err != nil { 171 return err 172 } 173 174 exitCode := 0 175 if record.ExitCode != nil { 176 exitCode = int(*record.ExitCode) 177 } 178 179 // pick the record creation time if possible, or use time.Now 180 created := time.Now() 181 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) { 182 created = t 183 } 184 185 status := models.PipelineStatus{ 186 Spindle: source.Key(), 187 Rkey: msg.Rkey, 188 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"), 189 PipelineRkey: pipelineUri.RecordKey().String(), 190 Created: created, 191 Workflow: record.Workflow, 192 Status: spindle.StatusKind(record.Status), 193 Error: record.Error, 194 ExitCode: exitCode, 195 } 196 197 err = db.AddPipelineStatus(d, status) 198 if err != nil { 199 return fmt.Errorf("failed to add pipeline status: %w", err) 200 } 201 202 return nil 203}