this repo has no description
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "log/slog" 7 "time" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.sh/tangled.sh/core/api/tangled" 11 "tangled.sh/tangled.sh/core/appview/cache" 12 "tangled.sh/tangled.sh/core/appview/config" 13 "tangled.sh/tangled.sh/core/appview/db" 14 ec "tangled.sh/tangled.sh/core/eventconsumer" 15 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 16 "tangled.sh/tangled.sh/core/log" 17 "tangled.sh/tangled.sh/core/rbac" 18) 19 20func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) { 21 spindles, err := db.GetSpindles(d) 22 if err != nil { 23 return nil, err 24 } 25 26 srcs := make(map[ec.Source]struct{}) 27 for _, s := range spindles { 28 src := ec.NewSpindleSource(s.Instance) 29 srcs[src] = struct{}{} 30 } 31 32 logger := log.New("spindlestream") 33 cache := cache.New(c.Redis.Addr) 34 cursorStore := cursor.NewRedisCursorStore(cache) 35 36 cfg := ec.ConsumerConfig{ 37 Sources: srcs, 38 ProcessFunc: spindleIngester(ctx, logger, d), 39 RetryInterval: c.Spindlestream.RetryInterval, 40 MaxRetryInterval: c.Spindlestream.MaxRetryInterval, 41 ConnectionTimeout: c.Spindlestream.ConnectionTimeout, 42 WorkerCount: c.Spindlestream.WorkerCount, 43 QueueSize: c.Spindlestream.QueueSize, 44 Logger: logger, 45 Dev: c.Core.Dev, 46 CursorStore: &cursorStore, 47 } 48 49 return ec.NewConsumer(cfg), nil 50} 51 52func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc { 53 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 54 switch msg.Nsid { 55 case tangled.PipelineStatusNSID: 56 return ingestPipelineStatus(ctx, logger, d, source, msg) 57 } 58 59 return nil 60 } 61} 62 63func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 64 var record tangled.PipelineStatus 65 err := json.Unmarshal(msg.EventJson, &record) 66 if err != nil { 67 return err 68 } 69 70 pipelineUri, err := syntax.ParseATURI(record.Pipeline) 71 if err != nil { 72 return err 73 } 74 75 exitCode := 0 76 if record.ExitCode != nil { 77 exitCode = int(*record.ExitCode) 78 } 79 80 status := db.PipelineStatus{ 81 Spindle: source.Key(), 82 Rkey: msg.Rkey, 83 PipelineKnot: pipelineUri.Authority().String(), 84 PipelineRkey: pipelineUri.RecordKey().String(), 85 Created: time.Now(), 86 Workflow: record.Workflow, 87 Status: record.Status, 88 Error: record.Error, 89 ExitCode: exitCode, 90 } 91 92 return db.AddPipelineStatus(d, status) 93}