this repo has no description
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "slices"
12 "strings"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/bluesky-social/indigo/xrpc"
17 "github.com/bluesky-social/jetstream/pkg/models"
18 securejoin "github.com/cyphar/filepath-securejoin"
19 "tangled.sh/tangled.sh/core/api/tangled"
20 "tangled.sh/tangled.sh/core/idresolver"
21 "tangled.sh/tangled.sh/core/knotserver/db"
22 "tangled.sh/tangled.sh/core/knotserver/git"
23 "tangled.sh/tangled.sh/core/log"
24 "tangled.sh/tangled.sh/core/rbac"
25 "tangled.sh/tangled.sh/core/workflow"
26)
27
28func (h *Handle) processPublicKey(ctx context.Context, event *models.Event) error {
29 l := log.FromContext(ctx)
30 raw := json.RawMessage(event.Commit.Record)
31 did := event.Did
32
33 var record tangled.PublicKey
34 if err := json.Unmarshal(raw, &record); err != nil {
35 return fmt.Errorf("failed to unmarshal record: %w", err)
36 }
37
38 pk := db.PublicKey{
39 Did: did,
40 PublicKey: record,
41 }
42 if err := h.db.AddPublicKey(pk); err != nil {
43 l.Error("failed to add public key", "error", err)
44 return fmt.Errorf("failed to add public key: %w", err)
45 }
46 l.Info("added public key from firehose", "did", did)
47 return nil
48}
49
50func (h *Handle) processKnotMember(ctx context.Context, event *models.Event) error {
51 l := log.FromContext(ctx)
52 raw := json.RawMessage(event.Commit.Record)
53 did := event.Did
54
55 var record tangled.KnotMember
56 if err := json.Unmarshal(raw, &record); err != nil {
57 return fmt.Errorf("failed to unmarshal record: %w", err)
58 }
59
60 if record.Domain != h.c.Server.Hostname {
61 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
62 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
63 }
64
65 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
66 if err != nil || !ok {
67 l.Error("failed to add member", "did", did)
68 return fmt.Errorf("failed to enforce permissions: %w", err)
69 }
70
71 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
72 l.Error("failed to add member", "error", err)
73 return fmt.Errorf("failed to add member: %w", err)
74 }
75 l.Info("added member from firehose", "member", record.Subject)
76
77 if err := h.db.AddDid(did); err != nil {
78 l.Error("failed to add did", "error", err)
79 return fmt.Errorf("failed to add did: %w", err)
80 }
81 h.jc.AddDid(did)
82
83 if err := h.fetchAndAddKeys(ctx, did); err != nil {
84 return fmt.Errorf("failed to fetch and add keys: %w", err)
85 }
86
87 return nil
88}
89
90func (h *Handle) processPull(ctx context.Context, event *models.Event) error {
91 raw := json.RawMessage(event.Commit.Record)
92 did := event.Did
93
94 var record tangled.RepoPull
95 if err := json.Unmarshal(raw, &record); err != nil {
96 return fmt.Errorf("failed to unmarshal record: %w", err)
97 }
98
99 l := log.FromContext(ctx)
100 l = l.With("handler", "processPull")
101 l = l.With("did", did)
102 l = l.With("target_repo", record.TargetRepo)
103 l = l.With("target_branch", record.TargetBranch)
104
105 if record.Source == nil {
106 reason := "not a branch-based pull request"
107 l.Info("ignoring pull record", "reason", reason)
108 return fmt.Errorf("ignoring pull record: %s", reason)
109 }
110
111 if record.Source.Repo != nil {
112 reason := "fork based pull"
113 l.Info("ignoring pull record", "reason", reason)
114 return fmt.Errorf("ignoring pull record: %s", reason)
115 }
116
117 allDids, err := h.db.GetAllDids()
118 if err != nil {
119 return err
120 }
121
122 // presently: we only process PRs from collaborators for pipelines
123 if !slices.Contains(allDids, did) {
124 reason := "not a known did"
125 l.Info("rejecting pull record", "reason", reason)
126 return fmt.Errorf("rejected pull record: %s, %s", reason, did)
127 }
128
129 repoAt, err := syntax.ParseATURI(record.TargetRepo)
130 if err != nil {
131 return err
132 }
133
134 // resolve this aturi to extract the repo record
135 resolver := idresolver.DefaultResolver()
136 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
137 if err != nil || ident.Handle.IsInvalidHandle() {
138 return fmt.Errorf("failed to resolve handle: %w", err)
139 }
140
141 xrpcc := xrpc.Client{
142 Host: ident.PDSEndpoint(),
143 }
144
145 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
146 if err != nil {
147 return err
148 }
149
150 repo := resp.Value.Val.(*tangled.Repo)
151
152 if repo.Knot != h.c.Server.Hostname {
153 reason := "not this knot"
154 l.Info("rejecting pull record", "reason", reason)
155 return fmt.Errorf("rejected pull record: %s", reason)
156 }
157
158 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name)
159 if err != nil {
160 return err
161 }
162
163 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
164 if err != nil {
165 return err
166 }
167
168 gr, err := git.Open(repoPath, record.Source.Branch)
169 if err != nil {
170 return err
171 }
172
173 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
174 if err != nil {
175 return err
176 }
177
178 var pipeline workflow.Pipeline
179 for _, e := range workflowDir {
180 if !e.IsFile {
181 continue
182 }
183
184 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
185 contents, err := gr.RawContent(fpath)
186 if err != nil {
187 continue
188 }
189
190 wf, err := workflow.FromFile(e.Name, contents)
191 if err != nil {
192 // TODO: log here, respond to client that is pushing
193 h.l.Error("failed to parse workflow", "err", err, "path", fpath)
194 continue
195 }
196
197 pipeline = append(pipeline, wf)
198 }
199
200 trigger := tangled.Pipeline_PullRequestTriggerData{
201 Action: "create",
202 SourceBranch: record.Source.Branch,
203 SourceSha: record.Source.Sha,
204 TargetBranch: record.TargetBranch,
205 }
206
207 compiler := workflow.Compiler{
208 Trigger: tangled.Pipeline_TriggerMetadata{
209 Kind: string(workflow.TriggerKindPullRequest),
210 PullRequest: &trigger,
211 Repo: &tangled.Pipeline_TriggerRepo{
212 Did: repo.Owner,
213 Knot: repo.Knot,
214 Repo: repo.Name,
215 },
216 },
217 }
218
219 cp := compiler.Compile(pipeline)
220 eventJson, err := json.Marshal(cp)
221 if err != nil {
222 return err
223 }
224
225 // do not run empty pipelines
226 if cp.Workflows == nil {
227 return nil
228 }
229
230 ev := db.Event{
231 Rkey: TID(),
232 Nsid: tangled.PipelineNSID,
233 EventJson: string(eventJson),
234 }
235
236 return h.db.InsertEvent(ev, h.n)
237}
238
239// duplicated from add collaborator
240func (h *Handle) processCollaborator(ctx context.Context, event *models.Event) error {
241 raw := json.RawMessage(event.Commit.Record)
242 did := event.Did
243
244 var record tangled.RepoCollaborator
245 if err := json.Unmarshal(raw, &record); err != nil {
246 return fmt.Errorf("failed to unmarshal record: %w", err)
247 }
248
249 repoAt, err := syntax.ParseATURI(record.Repo)
250 if err != nil {
251 return err
252 }
253
254 resolver := idresolver.DefaultResolver()
255
256 subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
257 if err != nil || subjectId.Handle.IsInvalidHandle() {
258 return err
259 }
260
261 // TODO: fix this for good, we need to fetch the record here unfortunately
262 // resolve this aturi to extract the repo record
263 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
264 if err != nil || owner.Handle.IsInvalidHandle() {
265 return fmt.Errorf("failed to resolve handle: %w", err)
266 }
267
268 xrpcc := xrpc.Client{
269 Host: owner.PDSEndpoint(),
270 }
271
272 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
273 if err != nil {
274 return err
275 }
276
277 repo := resp.Value.Val.(*tangled.Repo)
278 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
279
280 // check perms for this user
281 if ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo); !ok || err != nil {
282 return fmt.Errorf("insufficient permissions: %w", err)
283 }
284
285 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
286 return err
287 }
288 h.jc.AddDid(subjectId.DID.String())
289
290 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
291 return err
292 }
293
294 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
295}
296
297func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
298 l := log.FromContext(ctx)
299
300 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
301 if err != nil {
302 l.Error("error building endpoint url", "did", did, "error", err.Error())
303 return fmt.Errorf("error building endpoint url: %w", err)
304 }
305
306 resp, err := http.Get(keysEndpoint)
307 if err != nil {
308 l.Error("error getting keys", "did", did, "error", err)
309 return fmt.Errorf("error getting keys: %w", err)
310 }
311 defer resp.Body.Close()
312
313 if resp.StatusCode == http.StatusNotFound {
314 l.Info("no keys found for did", "did", did)
315 return nil
316 }
317
318 plaintext, err := io.ReadAll(resp.Body)
319 if err != nil {
320 l.Error("error reading response body", "error", err)
321 return fmt.Errorf("error reading response body: %w", err)
322 }
323
324 for _, key := range strings.Split(string(plaintext), "\n") {
325 if key == "" {
326 continue
327 }
328 pk := db.PublicKey{
329 Did: did,
330 }
331 pk.Key = key
332 if err := h.db.AddPublicKey(pk); err != nil {
333 l.Error("failed to add public key", "error", err)
334 return fmt.Errorf("failed to add public key: %w", err)
335 }
336 }
337 return nil
338}
339
340func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
341 if event.Kind != models.EventKindCommit {
342 return nil
343 }
344
345 var err error
346 defer func() {
347 eventTime := event.TimeUS
348 lastTimeUs := eventTime + 1
349 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
350 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
351 }
352 }()
353
354 switch event.Commit.Collection {
355 case tangled.PublicKeyNSID:
356 err = h.processPublicKey(ctx, event)
357 case tangled.KnotMemberNSID:
358 err = h.processKnotMember(ctx, event)
359 case tangled.RepoPullNSID:
360 err = h.processPull(ctx, event)
361 case tangled.RepoCollaboratorNSID:
362 err = h.processCollaborator(ctx, event)
363 }
364
365 if err != nil {
366 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err)
367 }
368
369 return nil
370}