Monorepo for Tangled
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "strings"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/xrpc"
16 "github.com/bluesky-social/jetstream/pkg/models"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/knotserver/db"
19 "tangled.org/core/knotserver/git"
20 "tangled.org/core/log"
21 "tangled.org/core/rbac"
22 "tangled.org/core/workflow"
23)
24
25func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error {
26 l := log.FromContext(ctx)
27 raw := json.RawMessage(event.Commit.Record)
28 did := event.Did
29
30 var record tangled.PublicKey
31 if err := json.Unmarshal(raw, &record); err != nil {
32 return fmt.Errorf("failed to unmarshal record: %w", err)
33 }
34
35 pk := db.PublicKey{
36 Did: did,
37 PublicKey: record,
38 }
39 if err := h.db.AddPublicKey(pk); err != nil {
40 l.Error("failed to add public key", "error", err)
41 return fmt.Errorf("failed to add public key: %w", err)
42 }
43 l.Info("added public key from firehose", "did", did)
44 return nil
45}
46
47func (h *Knot) processKnotMember(ctx context.Context, event *models.Event) error {
48 l := log.FromContext(ctx)
49 raw := json.RawMessage(event.Commit.Record)
50 did := event.Did
51
52 var record tangled.KnotMember
53 if err := json.Unmarshal(raw, &record); err != nil {
54 return fmt.Errorf("failed to unmarshal record: %w", err)
55 }
56
57 if record.Domain != h.c.Server.Hostname {
58 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
59 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
60 }
61
62 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
63 if err != nil || !ok {
64 l.Error("failed to add member", "did", did)
65 return fmt.Errorf("failed to enforce permissions: %w", err)
66 }
67
68 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
69 l.Error("failed to add member", "error", err)
70 return fmt.Errorf("failed to add member: %w", err)
71 }
72 l.Info("added member from firehose", "member", record.Subject)
73
74 if err := h.db.AddDid(record.Subject); err != nil {
75 l.Error("failed to add did", "error", err)
76 return fmt.Errorf("failed to add did: %w", err)
77 }
78 h.jc.AddDid(record.Subject)
79
80 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil {
81 return fmt.Errorf("failed to fetch and add keys: %w", err)
82 }
83
84 return nil
85}
86
87func (h *Knot) processPull(ctx context.Context, event *models.Event) error {
88 raw := json.RawMessage(event.Commit.Record)
89 did := event.Did
90
91 var record tangled.RepoPull
92 if err := json.Unmarshal(raw, &record); err != nil {
93 return fmt.Errorf("failed to unmarshal record: %w", err)
94 }
95
96 l := log.FromContext(ctx)
97 l = l.With("handler", "processPull")
98 l = l.With("did", did)
99
100 if record.Target == nil {
101 return fmt.Errorf("ignoring pull record: target repo is nil")
102 }
103
104 l = l.With("target_repo", record.Target.Repo, "target_repo_did", record.Target.RepoDid)
105 l = l.With("target_branch", record.Target.Branch)
106
107 if record.Source == nil {
108 return fmt.Errorf("ignoring pull record: not a branch-based pull request")
109 }
110
111 if record.Source.Repo != nil || record.Source.RepoDid != nil {
112 return fmt.Errorf("ignoring pull record: fork based pull")
113 }
114
115 var repoPath, ownerDid, repoName, repoDid string
116 switch {
117 case record.Target.RepoDid != nil && *record.Target.RepoDid != "":
118 repoDid = *record.Target.RepoDid
119 var lookupErr error
120 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
121 if lookupErr != nil {
122 return fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr)
123 }
124
125 case record.Target.Repo != nil:
126 // TODO: get rid of this PDS fetch once all repos have DIDs
127 repoAt, parseErr := syntax.ParseATURI(*record.Target.Repo)
128 if parseErr != nil {
129 return fmt.Errorf("failed to parse ATURI: %w", parseErr)
130 }
131
132 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
133 if resolveErr != nil || ident.Handle.IsInvalidHandle() {
134 return fmt.Errorf("failed to resolve handle: %w", resolveErr)
135 }
136
137 xrpcc := xrpc.Client{
138 Host: ident.PDSEndpoint(),
139 }
140
141 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
142 if getErr != nil {
143 return fmt.Errorf("failed to resolve repo: %w", getErr)
144 }
145
146 repo := resp.Value.Val.(*tangled.Repo)
147
148 if repo.Knot != h.c.Server.Hostname {
149 return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
150 }
151
152 ownerDid = ident.DID.String()
153 repoName = repo.Name
154
155 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName)
156 if didErr != nil {
157 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr)
158 }
159
160 var lookupErr error
161 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
162 if lookupErr != nil {
163 return fmt.Errorf("failed to resolve repo on disk: %w", lookupErr)
164 }
165
166 default:
167 return fmt.Errorf("ignoring pull record: target has neither repo nor repoDid")
168 }
169
170 gr, err := git.Open(repoPath, record.Source.Sha)
171 if err != nil {
172 return fmt.Errorf("failed to open git repository: %w", err)
173 }
174
175 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
176 if err != nil {
177 return fmt.Errorf("failed to open workflow directory: %w", err)
178 }
179
180 var pipeline workflow.RawPipeline
181 for _, e := range workflowDir {
182 if !e.IsFile() {
183 continue
184 }
185
186 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
187 contents, err := gr.RawContent(fpath)
188 if err != nil {
189 continue
190 }
191
192 pipeline = append(pipeline, workflow.RawWorkflow{
193 Name: e.Name,
194 Contents: contents,
195 })
196 }
197
198 trigger := tangled.Pipeline_PullRequestTriggerData{
199 Action: "create",
200 SourceBranch: record.Source.Branch,
201 SourceSha: record.Source.Sha,
202 TargetBranch: record.Target.Branch,
203 }
204
205 compiler := workflow.Compiler{
206 Trigger: tangled.Pipeline_TriggerMetadata{
207 Kind: string(workflow.TriggerKindPullRequest),
208 PullRequest: &trigger,
209 Repo: &tangled.Pipeline_TriggerRepo{
210 Did: ownerDid,
211 Knot: h.c.Server.Hostname,
212 Repo: repoName,
213 RepoDid: repoDid,
214 },
215 },
216 }
217
218 cp := compiler.Compile(compiler.Parse(pipeline))
219 eventJson, err := json.Marshal(cp)
220 if err != nil {
221 return fmt.Errorf("failed to marshal pipeline event: %w", err)
222 }
223
224 // do not run empty pipelines
225 if cp.Workflows == nil {
226 return nil
227 }
228
229 ev := db.Event{
230 Rkey: TID(),
231 Nsid: tangled.PipelineNSID,
232 EventJson: string(eventJson),
233 }
234
235 return h.db.InsertEvent(ev, h.n)
236}
237
238// duplicated from add collaborator
239func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error {
240 raw := json.RawMessage(event.Commit.Record)
241 did := event.Did
242
243 var record tangled.RepoCollaborator
244 if err := json.Unmarshal(raw, &record); err != nil {
245 return fmt.Errorf("failed to unmarshal record: %w", err)
246 }
247
248 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject)
249 if err != nil || subjectId.Handle.IsInvalidHandle() {
250 return err
251 }
252
253 var rbacResource string
254 switch {
255 case record.RepoDid != nil && *record.RepoDid != "":
256 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(*record.RepoDid)
257 if lookupErr != nil {
258 return fmt.Errorf("unknown repo DID %s: %w", *record.RepoDid, lookupErr)
259 }
260 if ownerDid != did {
261 return fmt.Errorf("collaborator record author %s does not own repo %s", did, *record.RepoDid)
262 }
263 rbacResource = *record.RepoDid
264
265 case record.Repo != nil:
266 // TODO: get rid of this PDS fetch once all repos have DIDs
267 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
268 if parseErr != nil {
269 return parseErr
270 }
271
272 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
273 if resolveErr != nil || owner.Handle.IsInvalidHandle() {
274 return fmt.Errorf("failed to resolve handle: %w", resolveErr)
275 }
276
277 xrpcc := xrpc.Client{
278 Host: owner.PDSEndpoint(),
279 }
280
281 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
282 if getErr != nil {
283 return getErr
284 }
285
286 repo := resp.Value.Val.(*tangled.Repo)
287 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), repo.Name)
288 if didErr != nil {
289 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), repo.Name, didErr)
290 }
291 rbacResource = repoDid
292
293 default:
294 return fmt.Errorf("collaborator record has neither repo nor repoDid")
295 }
296
297 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource)
298 if err != nil {
299 return fmt.Errorf("failed to check permissions: %w", err)
300 }
301 if !ok {
302 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource)
303 }
304
305 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
306 return err
307 }
308 h.jc.AddDid(subjectId.DID.String())
309
310 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil {
311 return err
312 }
313
314 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
315}
316
317func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
318 l := log.FromContext(ctx)
319
320 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
321 if err != nil {
322 l.Error("error building endpoint url", "did", did, "error", err.Error())
323 return fmt.Errorf("error building endpoint url: %w", err)
324 }
325
326 resp, err := http.Get(keysEndpoint)
327 if err != nil {
328 l.Error("error getting keys", "did", did, "error", err)
329 return fmt.Errorf("error getting keys: %w", err)
330 }
331 defer resp.Body.Close()
332
333 if resp.StatusCode == http.StatusNotFound {
334 l.Info("no keys found for did", "did", did)
335 return nil
336 }
337
338 plaintext, err := io.ReadAll(resp.Body)
339 if err != nil {
340 l.Error("error reading response body", "error", err)
341 return fmt.Errorf("error reading response body: %w", err)
342 }
343
344 for key := range strings.SplitSeq(string(plaintext), "\n") {
345 if key == "" {
346 continue
347 }
348 pk := db.PublicKey{
349 Did: did,
350 }
351 pk.Key = key
352 if err := h.db.AddPublicKey(pk); err != nil {
353 l.Error("failed to add public key", "error", err)
354 return fmt.Errorf("failed to add public key: %w", err)
355 }
356 }
357 return nil
358}
359
360func (h *Knot) processMessages(ctx context.Context, event *models.Event) error {
361 if event.Kind != models.EventKindCommit {
362 return nil
363 }
364
365 var err error
366 defer func() {
367 eventTime := event.TimeUS
368 lastTimeUs := eventTime + 1
369 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
370 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
371 }
372 }()
373
374 switch event.Commit.Collection {
375 case tangled.PublicKeyNSID:
376 err = h.processPublicKey(ctx, event)
377 case tangled.KnotMemberNSID:
378 err = h.processKnotMember(ctx, event)
379 case tangled.RepoPullNSID:
380 err = h.processPull(ctx, event)
381 case tangled.RepoCollaboratorNSID:
382 err = h.processCollaborator(ctx, event)
383 }
384
385 if err != nil {
386 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err)
387 }
388
389 return nil
390}