this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "slices" 12 "strings" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/bluesky-social/indigo/xrpc" 17 "github.com/bluesky-social/jetstream/pkg/models" 18 securejoin "github.com/cyphar/filepath-securejoin" 19 "tangled.sh/tangled.sh/core/api/tangled" 20 "tangled.sh/tangled.sh/core/idresolver" 21 "tangled.sh/tangled.sh/core/knotserver/db" 22 "tangled.sh/tangled.sh/core/knotserver/git" 23 "tangled.sh/tangled.sh/core/log" 24 "tangled.sh/tangled.sh/core/rbac" 25 "tangled.sh/tangled.sh/core/workflow" 26) 27 28func (h *Handle) processPublicKey(ctx context.Context, event *models.Event) error { 29 l := log.FromContext(ctx) 30 raw := json.RawMessage(event.Commit.Record) 31 did := event.Did 32 33 var record tangled.PublicKey 34 if err := json.Unmarshal(raw, &record); err != nil { 35 return fmt.Errorf("failed to unmarshal record: %w", err) 36 } 37 38 pk := db.PublicKey{ 39 Did: did, 40 PublicKey: record, 41 } 42 if err := h.db.AddPublicKey(pk); err != nil { 43 l.Error("failed to add public key", "error", err) 44 return fmt.Errorf("failed to add public key: %w", err) 45 } 46 l.Info("added public key from firehose", "did", did) 47 return nil 48} 49 50func (h *Handle) processKnotMember(ctx context.Context, event *models.Event) error { 51 l := log.FromContext(ctx) 52 raw := json.RawMessage(event.Commit.Record) 53 did := event.Did 54 55 var record tangled.KnotMember 56 if err := json.Unmarshal(raw, &record); err != nil { 57 return fmt.Errorf("failed to unmarshal record: %w", err) 58 } 59 60 if record.Domain != h.c.Server.Hostname { 61 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 62 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 63 } 64 65 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 66 if err != nil || !ok { 67 l.Error("failed to add member", "did", did) 68 return fmt.Errorf("failed to enforce permissions: %w", err) 69 } 70 71 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 72 l.Error("failed to add member", "error", err) 73 return fmt.Errorf("failed to add member: %w", err) 74 } 75 l.Info("added member from firehose", "member", record.Subject) 76 77 if err := h.db.AddDid(did); err != nil { 78 l.Error("failed to add did", "error", err) 79 return fmt.Errorf("failed to add did: %w", err) 80 } 81 h.jc.AddDid(did) 82 83 if err := h.fetchAndAddKeys(ctx, did); err != nil { 84 return fmt.Errorf("failed to fetch and add keys: %w", err) 85 } 86 87 return nil 88} 89 90func (h *Handle) processPull(ctx context.Context, event *models.Event) error { 91 raw := json.RawMessage(event.Commit.Record) 92 did := event.Did 93 94 var record tangled.RepoPull 95 if err := json.Unmarshal(raw, &record); err != nil { 96 return fmt.Errorf("failed to unmarshal record: %w", err) 97 } 98 99 l := log.FromContext(ctx) 100 l = l.With("handler", "processPull") 101 l = l.With("did", did) 102 l = l.With("target_repo", record.TargetRepo) 103 l = l.With("target_branch", record.TargetBranch) 104 105 if record.Source == nil { 106 reason := "not a branch-based pull request" 107 l.Info("ignoring pull record", "reason", reason) 108 return fmt.Errorf("ignoring pull record: %s", reason) 109 } 110 111 if record.Source.Repo != nil { 112 reason := "fork based pull" 113 l.Info("ignoring pull record", "reason", reason) 114 return fmt.Errorf("ignoring pull record: %s", reason) 115 } 116 117 allDids, err := h.db.GetAllDids() 118 if err != nil { 119 return err 120 } 121 122 // presently: we only process PRs from collaborators for pipelines 123 if !slices.Contains(allDids, did) { 124 reason := "not a known did" 125 l.Info("rejecting pull record", "reason", reason) 126 return fmt.Errorf("rejected pull record: %s, %s", reason, did) 127 } 128 129 repoAt, err := syntax.ParseATURI(record.TargetRepo) 130 if err != nil { 131 return err 132 } 133 134 // resolve this aturi to extract the repo record 135 resolver := idresolver.DefaultResolver() 136 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 137 if err != nil || ident.Handle.IsInvalidHandle() { 138 return fmt.Errorf("failed to resolve handle: %w", err) 139 } 140 141 xrpcc := xrpc.Client{ 142 Host: ident.PDSEndpoint(), 143 } 144 145 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 146 if err != nil { 147 return err 148 } 149 150 repo := resp.Value.Val.(*tangled.Repo) 151 152 if repo.Knot != h.c.Server.Hostname { 153 reason := "not this knot" 154 l.Info("rejecting pull record", "reason", reason) 155 return fmt.Errorf("rejected pull record: %s", reason) 156 } 157 158 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name) 159 if err != nil { 160 return err 161 } 162 163 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 164 if err != nil { 165 return err 166 } 167 168 gr, err := git.Open(repoPath, record.Source.Branch) 169 if err != nil { 170 return err 171 } 172 173 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 174 if err != nil { 175 return err 176 } 177 178 var pipeline workflow.Pipeline 179 for _, e := range workflowDir { 180 if !e.IsFile { 181 continue 182 } 183 184 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 185 contents, err := gr.RawContent(fpath) 186 if err != nil { 187 continue 188 } 189 190 wf, err := workflow.FromFile(e.Name, contents) 191 if err != nil { 192 // TODO: log here, respond to client that is pushing 193 h.l.Error("failed to parse workflow", "err", err, "path", fpath) 194 continue 195 } 196 197 pipeline = append(pipeline, wf) 198 } 199 200 trigger := tangled.Pipeline_PullRequestTriggerData{ 201 Action: "create", 202 SourceBranch: record.Source.Branch, 203 SourceSha: record.Source.Sha, 204 TargetBranch: record.TargetBranch, 205 } 206 207 compiler := workflow.Compiler{ 208 Trigger: tangled.Pipeline_TriggerMetadata{ 209 Kind: string(workflow.TriggerKindPullRequest), 210 PullRequest: &trigger, 211 Repo: &tangled.Pipeline_TriggerRepo{ 212 Did: repo.Owner, 213 Knot: repo.Knot, 214 Repo: repo.Name, 215 }, 216 }, 217 } 218 219 cp := compiler.Compile(pipeline) 220 eventJson, err := json.Marshal(cp) 221 if err != nil { 222 return err 223 } 224 225 // do not run empty pipelines 226 if cp.Workflows == nil { 227 return nil 228 } 229 230 ev := db.Event{ 231 Rkey: TID(), 232 Nsid: tangled.PipelineNSID, 233 EventJson: string(eventJson), 234 } 235 236 return h.db.InsertEvent(ev, h.n) 237} 238 239// duplicated from add collaborator 240func (h *Handle) processCollaborator(ctx context.Context, event *models.Event) error { 241 raw := json.RawMessage(event.Commit.Record) 242 did := event.Did 243 244 var record tangled.RepoCollaborator 245 if err := json.Unmarshal(raw, &record); err != nil { 246 return fmt.Errorf("failed to unmarshal record: %w", err) 247 } 248 249 repoAt, err := syntax.ParseATURI(record.Repo) 250 if err != nil { 251 return err 252 } 253 254 resolver := idresolver.DefaultResolver() 255 256 subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 257 if err != nil || subjectId.Handle.IsInvalidHandle() { 258 return err 259 } 260 261 // TODO: fix this for good, we need to fetch the record here unfortunately 262 // resolve this aturi to extract the repo record 263 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 264 if err != nil || owner.Handle.IsInvalidHandle() { 265 return fmt.Errorf("failed to resolve handle: %w", err) 266 } 267 268 xrpcc := xrpc.Client{ 269 Host: owner.PDSEndpoint(), 270 } 271 272 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 273 if err != nil { 274 return err 275 } 276 277 repo := resp.Value.Val.(*tangled.Repo) 278 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 279 280 // check perms for this user 281 if ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo); !ok || err != nil { 282 return fmt.Errorf("insufficient permissions: %w", err) 283 } 284 285 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 286 return err 287 } 288 h.jc.AddDid(subjectId.DID.String()) 289 290 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 291 return err 292 } 293 294 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 295} 296 297func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 298 l := log.FromContext(ctx) 299 300 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 301 if err != nil { 302 l.Error("error building endpoint url", "did", did, "error", err.Error()) 303 return fmt.Errorf("error building endpoint url: %w", err) 304 } 305 306 resp, err := http.Get(keysEndpoint) 307 if err != nil { 308 l.Error("error getting keys", "did", did, "error", err) 309 return fmt.Errorf("error getting keys: %w", err) 310 } 311 defer resp.Body.Close() 312 313 if resp.StatusCode == http.StatusNotFound { 314 l.Info("no keys found for did", "did", did) 315 return nil 316 } 317 318 plaintext, err := io.ReadAll(resp.Body) 319 if err != nil { 320 l.Error("error reading response body", "error", err) 321 return fmt.Errorf("error reading response body: %w", err) 322 } 323 324 for _, key := range strings.Split(string(plaintext), "\n") { 325 if key == "" { 326 continue 327 } 328 pk := db.PublicKey{ 329 Did: did, 330 } 331 pk.Key = key 332 if err := h.db.AddPublicKey(pk); err != nil { 333 l.Error("failed to add public key", "error", err) 334 return fmt.Errorf("failed to add public key: %w", err) 335 } 336 } 337 return nil 338} 339 340func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 341 if event.Kind != models.EventKindCommit { 342 return nil 343 } 344 345 var err error 346 defer func() { 347 eventTime := event.TimeUS 348 lastTimeUs := eventTime + 1 349 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 350 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 351 } 352 }() 353 354 switch event.Commit.Collection { 355 case tangled.PublicKeyNSID: 356 err = h.processPublicKey(ctx, event) 357 case tangled.KnotMemberNSID: 358 err = h.processKnotMember(ctx, event) 359 case tangled.RepoPullNSID: 360 err = h.processPull(ctx, event) 361 case tangled.RepoCollaboratorNSID: 362 err = h.processCollaborator(ctx, event) 363 } 364 365 if err != nil { 366 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) 367 } 368 369 return nil 370}