package state import ( "context" "encoding/json" "fmt" "log/slog" "strings" "time" "github.com/bluesky-social/indigo/atproto/syntax" "tangled.org/core/api/tangled" "tangled.org/core/appview/cache" "tangled.org/core/appview/config" "tangled.org/core/appview/db" "tangled.org/core/appview/models" ec "tangled.org/core/eventconsumer" "tangled.org/core/eventconsumer/cursor" "tangled.org/core/log" "tangled.org/core/orm" "tangled.org/core/rbac" spindle "tangled.org/core/spindle/models" "tangled.org/core/workflow" ) func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) { logger := log.FromContext(ctx) logger = log.SubLogger(logger, "spindlestream") spindles, err := db.GetSpindles( d, orm.FilterIsNot("verified", "null"), ) if err != nil { return nil, err } srcs := make(map[ec.Source]struct{}) for _, s := range spindles { src := ec.NewSpindleSource(s.Instance) srcs[src] = struct{}{} } cache := cache.New(c.Redis.Addr) cursorStore := cursor.NewRedisCursorStore(cache) cfg := ec.ConsumerConfig{ Sources: srcs, ProcessFunc: spindleIngester(ctx, logger, d), RetryInterval: c.Spindlestream.RetryInterval, MaxRetryInterval: c.Spindlestream.MaxRetryInterval, ConnectionTimeout: c.Spindlestream.ConnectionTimeout, WorkerCount: c.Spindlestream.WorkerCount, QueueSize: c.Spindlestream.QueueSize, Logger: logger, Dev: c.Core.Dev, CursorStore: &cursorStore, } return ec.NewConsumer(cfg), nil } func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc { return func(ctx context.Context, source ec.Source, msg ec.Message) error { switch msg.Nsid { case tangled.PipelineNSID: return ingestPipeline(logger, d, source, msg) case tangled.PipelineStatusNSID: return ingestPipelineStatus(ctx, logger, d, source, msg) } return nil } } func ingestPipeline(l *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { var record tangled.Pipeline err := json.Unmarshal(msg.EventJson, &record) if err != nil { return err } if record.TriggerMetadata == nil { return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) } if record.TriggerMetadata.Repo == nil { return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) } // does this repo have a spindle configured? repos, err := db.GetRepos( d, 0, orm.FilterEq("did", record.TriggerMetadata.Repo.Did), orm.FilterEq("name", record.TriggerMetadata.Repo.Repo), ) if err != nil { return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) } if len(repos) != 1 { return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) } if repos[0].Spindle == "" { return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) } // trigger info var trigger models.Trigger var sha string trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) switch trigger.Kind { case workflow.TriggerKindPush: trigger.PushRef = &record.TriggerMetadata.Push.Ref trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha sha = *trigger.PushNewSha case workflow.TriggerKindPullRequest: trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha trigger.PRAction = &record.TriggerMetadata.PullRequest.Action sha = *trigger.PRSourceSha } tx, err := d.Begin() if err != nil { return fmt.Errorf("failed to start txn: %w", err) } triggerId, err := db.AddTrigger(tx, trigger) if err != nil { return fmt.Errorf("failed to add trigger entry: %w", err) } // TODO: we shouldn't even use knot to identify pipelines knot := record.TriggerMetadata.Repo.Knot pipeline := models.Pipeline{ Rkey: msg.Rkey, Knot: knot, RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), RepoName: record.TriggerMetadata.Repo.Repo, TriggerId: int(triggerId), Sha: sha, } err = db.AddPipeline(tx, pipeline) if err != nil { return fmt.Errorf("failed to add pipeline: %w", err) } err = tx.Commit() if err != nil { return fmt.Errorf("failed to commit txn: %w", err) } l.Info("added pipeline", "pipeline", pipeline) return nil } func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { var record tangled.PipelineStatus err := json.Unmarshal(msg.EventJson, &record) if err != nil { return err } pipelineUri, err := syntax.ParseATURI(record.Pipeline) if err != nil { return err } exitCode := 0 if record.ExitCode != nil { exitCode = int(*record.ExitCode) } // pick the record creation time if possible, or use time.Now created := time.Now() if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) { created = t } status := models.PipelineStatus{ Spindle: source.Key(), Rkey: msg.Rkey, PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"), PipelineRkey: pipelineUri.RecordKey().String(), Created: created, Workflow: record.Workflow, Status: spindle.StatusKind(record.Status), Error: record.Error, ExitCode: exitCode, } err = db.AddPipelineStatus(d, status) if err != nil { return fmt.Errorf("failed to add pipeline status: %w", err) } return nil }