Monorepo for Tangled
at master 390 lines 11 kB view raw
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "strings" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/xrpc" 16 "github.com/bluesky-social/jetstream/pkg/models" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/knotserver/db" 19 "tangled.org/core/knotserver/git" 20 "tangled.org/core/log" 21 "tangled.org/core/rbac" 22 "tangled.org/core/workflow" 23) 24 25func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error { 26 l := log.FromContext(ctx) 27 raw := json.RawMessage(event.Commit.Record) 28 did := event.Did 29 30 var record tangled.PublicKey 31 if err := json.Unmarshal(raw, &record); err != nil { 32 return fmt.Errorf("failed to unmarshal record: %w", err) 33 } 34 35 pk := db.PublicKey{ 36 Did: did, 37 PublicKey: record, 38 } 39 if err := h.db.AddPublicKey(pk); err != nil { 40 l.Error("failed to add public key", "error", err) 41 return fmt.Errorf("failed to add public key: %w", err) 42 } 43 l.Info("added public key from firehose", "did", did) 44 return nil 45} 46 47func (h *Knot) processKnotMember(ctx context.Context, event *models.Event) error { 48 l := log.FromContext(ctx) 49 raw := json.RawMessage(event.Commit.Record) 50 did := event.Did 51 52 var record tangled.KnotMember 53 if err := json.Unmarshal(raw, &record); err != nil { 54 return fmt.Errorf("failed to unmarshal record: %w", err) 55 } 56 57 if record.Domain != h.c.Server.Hostname { 58 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 59 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 60 } 61 62 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 63 if err != nil || !ok { 64 l.Error("failed to add member", "did", did) 65 return fmt.Errorf("failed to enforce permissions: %w", err) 66 } 67 68 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 69 l.Error("failed to add member", "error", err) 70 return fmt.Errorf("failed to add member: %w", err) 71 } 72 l.Info("added member from firehose", "member", record.Subject) 73 74 if err := h.db.AddDid(record.Subject); err != nil { 75 l.Error("failed to add did", "error", err) 76 return fmt.Errorf("failed to add did: %w", err) 77 } 78 h.jc.AddDid(record.Subject) 79 80 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil { 81 return fmt.Errorf("failed to fetch and add keys: %w", err) 82 } 83 84 return nil 85} 86 87func (h *Knot) processPull(ctx context.Context, event *models.Event) error { 88 raw := json.RawMessage(event.Commit.Record) 89 did := event.Did 90 91 var record tangled.RepoPull 92 if err := json.Unmarshal(raw, &record); err != nil { 93 return fmt.Errorf("failed to unmarshal record: %w", err) 94 } 95 96 l := log.FromContext(ctx) 97 l = l.With("handler", "processPull") 98 l = l.With("did", did) 99 100 if record.Target == nil { 101 return fmt.Errorf("ignoring pull record: target repo is nil") 102 } 103 104 l = l.With("target_repo", record.Target.Repo, "target_repo_did", record.Target.RepoDid) 105 l = l.With("target_branch", record.Target.Branch) 106 107 if record.Source == nil { 108 return fmt.Errorf("ignoring pull record: not a branch-based pull request") 109 } 110 111 if record.Source.Repo != nil || record.Source.RepoDid != nil { 112 return fmt.Errorf("ignoring pull record: fork based pull") 113 } 114 115 var repoPath, ownerDid, repoName, repoDid string 116 switch { 117 case record.Target.RepoDid != nil && *record.Target.RepoDid != "": 118 repoDid = *record.Target.RepoDid 119 var lookupErr error 120 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 121 if lookupErr != nil { 122 return fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr) 123 } 124 125 case record.Target.Repo != nil: 126 // TODO: get rid of this PDS fetch once all repos have DIDs 127 repoAt, parseErr := syntax.ParseATURI(*record.Target.Repo) 128 if parseErr != nil { 129 return fmt.Errorf("failed to parse ATURI: %w", parseErr) 130 } 131 132 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 133 if resolveErr != nil || ident.Handle.IsInvalidHandle() { 134 return fmt.Errorf("failed to resolve handle: %w", resolveErr) 135 } 136 137 xrpcc := xrpc.Client{ 138 Host: ident.PDSEndpoint(), 139 } 140 141 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 142 if getErr != nil { 143 return fmt.Errorf("failed to resolve repo: %w", getErr) 144 } 145 146 repo := resp.Value.Val.(*tangled.Repo) 147 148 if repo.Knot != h.c.Server.Hostname { 149 return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 150 } 151 152 ownerDid = ident.DID.String() 153 repoName = repo.Name 154 155 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName) 156 if didErr != nil { 157 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr) 158 } 159 160 var lookupErr error 161 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 162 if lookupErr != nil { 163 return fmt.Errorf("failed to resolve repo on disk: %w", lookupErr) 164 } 165 166 default: 167 return fmt.Errorf("ignoring pull record: target has neither repo nor repoDid") 168 } 169 170 gr, err := git.Open(repoPath, record.Source.Sha) 171 if err != nil { 172 return fmt.Errorf("failed to open git repository: %w", err) 173 } 174 175 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 176 if err != nil { 177 return fmt.Errorf("failed to open workflow directory: %w", err) 178 } 179 180 var pipeline workflow.RawPipeline 181 for _, e := range workflowDir { 182 if !e.IsFile() { 183 continue 184 } 185 186 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 187 contents, err := gr.RawContent(fpath) 188 if err != nil { 189 continue 190 } 191 192 pipeline = append(pipeline, workflow.RawWorkflow{ 193 Name: e.Name, 194 Contents: contents, 195 }) 196 } 197 198 trigger := tangled.Pipeline_PullRequestTriggerData{ 199 Action: "create", 200 SourceBranch: record.Source.Branch, 201 SourceSha: record.Source.Sha, 202 TargetBranch: record.Target.Branch, 203 } 204 205 compiler := workflow.Compiler{ 206 Trigger: tangled.Pipeline_TriggerMetadata{ 207 Kind: string(workflow.TriggerKindPullRequest), 208 PullRequest: &trigger, 209 Repo: &tangled.Pipeline_TriggerRepo{ 210 Did: ownerDid, 211 Knot: h.c.Server.Hostname, 212 Repo: repoName, 213 RepoDid: repoDid, 214 }, 215 }, 216 } 217 218 cp := compiler.Compile(compiler.Parse(pipeline)) 219 eventJson, err := json.Marshal(cp) 220 if err != nil { 221 return fmt.Errorf("failed to marshal pipeline event: %w", err) 222 } 223 224 // do not run empty pipelines 225 if cp.Workflows == nil { 226 return nil 227 } 228 229 ev := db.Event{ 230 Rkey: TID(), 231 Nsid: tangled.PipelineNSID, 232 EventJson: string(eventJson), 233 } 234 235 return h.db.InsertEvent(ev, h.n) 236} 237 238// duplicated from add collaborator 239func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error { 240 raw := json.RawMessage(event.Commit.Record) 241 did := event.Did 242 243 var record tangled.RepoCollaborator 244 if err := json.Unmarshal(raw, &record); err != nil { 245 return fmt.Errorf("failed to unmarshal record: %w", err) 246 } 247 248 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject) 249 if err != nil || subjectId.Handle.IsInvalidHandle() { 250 return err 251 } 252 253 var rbacResource string 254 switch { 255 case record.RepoDid != nil && *record.RepoDid != "": 256 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(*record.RepoDid) 257 if lookupErr != nil { 258 return fmt.Errorf("unknown repo DID %s: %w", *record.RepoDid, lookupErr) 259 } 260 if ownerDid != did { 261 return fmt.Errorf("collaborator record author %s does not own repo %s", did, *record.RepoDid) 262 } 263 rbacResource = *record.RepoDid 264 265 case record.Repo != nil: 266 // TODO: get rid of this PDS fetch once all repos have DIDs 267 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 268 if parseErr != nil { 269 return parseErr 270 } 271 272 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 273 if resolveErr != nil || owner.Handle.IsInvalidHandle() { 274 return fmt.Errorf("failed to resolve handle: %w", resolveErr) 275 } 276 277 xrpcc := xrpc.Client{ 278 Host: owner.PDSEndpoint(), 279 } 280 281 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 282 if getErr != nil { 283 return getErr 284 } 285 286 repo := resp.Value.Val.(*tangled.Repo) 287 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), repo.Name) 288 if didErr != nil { 289 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), repo.Name, didErr) 290 } 291 rbacResource = repoDid 292 293 default: 294 return fmt.Errorf("collaborator record has neither repo nor repoDid") 295 } 296 297 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource) 298 if err != nil { 299 return fmt.Errorf("failed to check permissions: %w", err) 300 } 301 if !ok { 302 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource) 303 } 304 305 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 306 return err 307 } 308 h.jc.AddDid(subjectId.DID.String()) 309 310 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil { 311 return err 312 } 313 314 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 315} 316 317func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 318 l := log.FromContext(ctx) 319 320 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 321 if err != nil { 322 l.Error("error building endpoint url", "did", did, "error", err.Error()) 323 return fmt.Errorf("error building endpoint url: %w", err) 324 } 325 326 resp, err := http.Get(keysEndpoint) 327 if err != nil { 328 l.Error("error getting keys", "did", did, "error", err) 329 return fmt.Errorf("error getting keys: %w", err) 330 } 331 defer resp.Body.Close() 332 333 if resp.StatusCode == http.StatusNotFound { 334 l.Info("no keys found for did", "did", did) 335 return nil 336 } 337 338 plaintext, err := io.ReadAll(resp.Body) 339 if err != nil { 340 l.Error("error reading response body", "error", err) 341 return fmt.Errorf("error reading response body: %w", err) 342 } 343 344 for key := range strings.SplitSeq(string(plaintext), "\n") { 345 if key == "" { 346 continue 347 } 348 pk := db.PublicKey{ 349 Did: did, 350 } 351 pk.Key = key 352 if err := h.db.AddPublicKey(pk); err != nil { 353 l.Error("failed to add public key", "error", err) 354 return fmt.Errorf("failed to add public key: %w", err) 355 } 356 } 357 return nil 358} 359 360func (h *Knot) processMessages(ctx context.Context, event *models.Event) error { 361 if event.Kind != models.EventKindCommit { 362 return nil 363 } 364 365 var err error 366 defer func() { 367 eventTime := event.TimeUS 368 lastTimeUs := eventTime + 1 369 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 370 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 371 } 372 }() 373 374 switch event.Commit.Collection { 375 case tangled.PublicKeyNSID: 376 err = h.processPublicKey(ctx, event) 377 case tangled.KnotMemberNSID: 378 err = h.processKnotMember(ctx, event) 379 case tangled.RepoPullNSID: 380 err = h.processPull(ctx, event) 381 case tangled.RepoCollaboratorNSID: 382 err = h.processCollaborator(ctx, event) 383 } 384 385 if err != nil { 386 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) 387 } 388 389 return nil 390}