this repo has no description
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "strings"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/xrpc"
15 "github.com/bluesky-social/jetstream/pkg/models"
16 securejoin "github.com/cyphar/filepath-securejoin"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/knotserver/db"
19 "tangled.org/core/log"
20 "tangled.org/core/rbac"
21)
22
23func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error {
24 l := log.FromContext(ctx)
25 raw := json.RawMessage(event.Commit.Record)
26 did := event.Did
27
28 var record tangled.PublicKey
29 if err := json.Unmarshal(raw, &record); err != nil {
30 return fmt.Errorf("failed to unmarshal record: %w", err)
31 }
32
33 pk := db.PublicKey{
34 Did: did,
35 PublicKey: record,
36 }
37 if err := h.db.AddPublicKey(pk); err != nil {
38 l.Error("failed to add public key", "error", err)
39 return fmt.Errorf("failed to add public key: %w", err)
40 }
41 l.Info("added public key from firehose", "did", did)
42 return nil
43}
44
45func (h *Knot) processKnotMember(ctx context.Context, event *models.Event) error {
46 l := log.FromContext(ctx)
47 raw := json.RawMessage(event.Commit.Record)
48 did := event.Did
49
50 var record tangled.KnotMember
51 if err := json.Unmarshal(raw, &record); err != nil {
52 return fmt.Errorf("failed to unmarshal record: %w", err)
53 }
54
55 if record.Domain != h.c.Server.Hostname {
56 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
57 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
58 }
59
60 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
61 if err != nil || !ok {
62 l.Error("failed to add member", "did", did)
63 return fmt.Errorf("failed to enforce permissions: %w", err)
64 }
65
66 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
67 l.Error("failed to add member", "error", err)
68 return fmt.Errorf("failed to add member: %w", err)
69 }
70 l.Info("added member from firehose", "member", record.Subject)
71
72 if err := h.db.AddDid(record.Subject); err != nil {
73 l.Error("failed to add did", "error", err)
74 return fmt.Errorf("failed to add did: %w", err)
75 }
76 h.jc.AddDid(record.Subject)
77
78 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil {
79 return fmt.Errorf("failed to fetch and add keys: %w", err)
80 }
81
82 return nil
83}
84
85// duplicated from add collaborator
86func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error {
87 raw := json.RawMessage(event.Commit.Record)
88 did := event.Did
89
90 var record tangled.RepoCollaborator
91 if err := json.Unmarshal(raw, &record); err != nil {
92 return fmt.Errorf("failed to unmarshal record: %w", err)
93 }
94
95 repoAt, err := syntax.ParseATURI(record.Repo)
96 if err != nil {
97 return err
98 }
99
100 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject)
101 if err != nil || subjectId.Handle.IsInvalidHandle() {
102 return err
103 }
104
105 // TODO: fix this for good, we need to fetch the record here unfortunately
106 // resolve this aturi to extract the repo record
107 owner, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
108 if err != nil || owner.Handle.IsInvalidHandle() {
109 return fmt.Errorf("failed to resolve handle: %w", err)
110 }
111
112 xrpcc := xrpc.Client{
113 Host: owner.PDSEndpoint(),
114 }
115
116 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
117 if err != nil {
118 return err
119 }
120
121 repo := resp.Value.Val.(*tangled.Repo)
122 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
123
124 // check perms for this user
125 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo)
126 if err != nil {
127 return fmt.Errorf("failed to check permissions: %w", err)
128 }
129 if !ok {
130 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", didSlashRepo)
131 }
132
133 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
134 return err
135 }
136 h.jc.AddDid(subjectId.DID.String())
137
138 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
139 return err
140 }
141
142 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
143}
144
145func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
146 l := log.FromContext(ctx)
147
148 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
149 if err != nil {
150 l.Error("error building endpoint url", "did", did, "error", err.Error())
151 return fmt.Errorf("error building endpoint url: %w", err)
152 }
153
154 resp, err := http.Get(keysEndpoint)
155 if err != nil {
156 l.Error("error getting keys", "did", did, "error", err)
157 return fmt.Errorf("error getting keys: %w", err)
158 }
159 defer resp.Body.Close()
160
161 if resp.StatusCode == http.StatusNotFound {
162 l.Info("no keys found for did", "did", did)
163 return nil
164 }
165
166 plaintext, err := io.ReadAll(resp.Body)
167 if err != nil {
168 l.Error("error reading response body", "error", err)
169 return fmt.Errorf("error reading response body: %w", err)
170 }
171
172 for key := range strings.SplitSeq(string(plaintext), "\n") {
173 if key == "" {
174 continue
175 }
176 pk := db.PublicKey{
177 Did: did,
178 }
179 pk.Key = key
180 if err := h.db.AddPublicKey(pk); err != nil {
181 l.Error("failed to add public key", "error", err)
182 return fmt.Errorf("failed to add public key: %w", err)
183 }
184 }
185 return nil
186}
187
188func (h *Knot) processMessages(ctx context.Context, event *models.Event) error {
189 if event.Kind != models.EventKindCommit {
190 return nil
191 }
192
193 var err error
194 defer func() {
195 eventTime := event.TimeUS
196 lastTimeUs := eventTime + 1
197 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
198 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
199 }
200 }()
201
202 switch event.Commit.Collection {
203 case tangled.PublicKeyNSID:
204 err = h.processPublicKey(ctx, event)
205 case tangled.KnotMemberNSID:
206 err = h.processKnotMember(ctx, event)
207 case tangled.RepoCollaboratorNSID:
208 err = h.processCollaborator(ctx, event)
209 }
210
211 if err != nil {
212 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err)
213 }
214
215 return nil
216}