this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "strings" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/xrpc" 15 "github.com/bluesky-social/jetstream/pkg/models" 16 securejoin "github.com/cyphar/filepath-securejoin" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/knotserver/db" 19 "tangled.org/core/log" 20 "tangled.org/core/rbac" 21) 22 23func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error { 24 l := log.FromContext(ctx) 25 raw := json.RawMessage(event.Commit.Record) 26 did := event.Did 27 28 var record tangled.PublicKey 29 if err := json.Unmarshal(raw, &record); err != nil { 30 return fmt.Errorf("failed to unmarshal record: %w", err) 31 } 32 33 pk := db.PublicKey{ 34 Did: did, 35 PublicKey: record, 36 } 37 if err := h.db.AddPublicKey(pk); err != nil { 38 l.Error("failed to add public key", "error", err) 39 return fmt.Errorf("failed to add public key: %w", err) 40 } 41 l.Info("added public key from firehose", "did", did) 42 return nil 43} 44 45func (h *Knot) processKnotMember(ctx context.Context, event *models.Event) error { 46 l := log.FromContext(ctx) 47 raw := json.RawMessage(event.Commit.Record) 48 did := event.Did 49 50 var record tangled.KnotMember 51 if err := json.Unmarshal(raw, &record); err != nil { 52 return fmt.Errorf("failed to unmarshal record: %w", err) 53 } 54 55 if record.Domain != h.c.Server.Hostname { 56 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 57 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 58 } 59 60 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 61 if err != nil || !ok { 62 l.Error("failed to add member", "did", did) 63 return fmt.Errorf("failed to enforce permissions: %w", err) 64 } 65 66 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 67 l.Error("failed to add member", "error", err) 68 return fmt.Errorf("failed to add member: %w", err) 69 } 70 l.Info("added member from firehose", "member", record.Subject) 71 72 if err := h.db.AddDid(record.Subject); err != nil { 73 l.Error("failed to add did", "error", err) 74 return fmt.Errorf("failed to add did: %w", err) 75 } 76 h.jc.AddDid(record.Subject) 77 78 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil { 79 return fmt.Errorf("failed to fetch and add keys: %w", err) 80 } 81 82 return nil 83} 84 85// duplicated from add collaborator 86func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error { 87 raw := json.RawMessage(event.Commit.Record) 88 did := event.Did 89 90 var record tangled.RepoCollaborator 91 if err := json.Unmarshal(raw, &record); err != nil { 92 return fmt.Errorf("failed to unmarshal record: %w", err) 93 } 94 95 repoAt, err := syntax.ParseATURI(record.Repo) 96 if err != nil { 97 return err 98 } 99 100 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject) 101 if err != nil || subjectId.Handle.IsInvalidHandle() { 102 return err 103 } 104 105 // TODO: fix this for good, we need to fetch the record here unfortunately 106 // resolve this aturi to extract the repo record 107 owner, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 108 if err != nil || owner.Handle.IsInvalidHandle() { 109 return fmt.Errorf("failed to resolve handle: %w", err) 110 } 111 112 xrpcc := xrpc.Client{ 113 Host: owner.PDSEndpoint(), 114 } 115 116 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 117 if err != nil { 118 return err 119 } 120 121 repo := resp.Value.Val.(*tangled.Repo) 122 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 123 124 // check perms for this user 125 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo) 126 if err != nil { 127 return fmt.Errorf("failed to check permissions: %w", err) 128 } 129 if !ok { 130 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", didSlashRepo) 131 } 132 133 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 134 return err 135 } 136 h.jc.AddDid(subjectId.DID.String()) 137 138 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 139 return err 140 } 141 142 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 143} 144 145func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 146 l := log.FromContext(ctx) 147 148 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 149 if err != nil { 150 l.Error("error building endpoint url", "did", did, "error", err.Error()) 151 return fmt.Errorf("error building endpoint url: %w", err) 152 } 153 154 resp, err := http.Get(keysEndpoint) 155 if err != nil { 156 l.Error("error getting keys", "did", did, "error", err) 157 return fmt.Errorf("error getting keys: %w", err) 158 } 159 defer resp.Body.Close() 160 161 if resp.StatusCode == http.StatusNotFound { 162 l.Info("no keys found for did", "did", did) 163 return nil 164 } 165 166 plaintext, err := io.ReadAll(resp.Body) 167 if err != nil { 168 l.Error("error reading response body", "error", err) 169 return fmt.Errorf("error reading response body: %w", err) 170 } 171 172 for key := range strings.SplitSeq(string(plaintext), "\n") { 173 if key == "" { 174 continue 175 } 176 pk := db.PublicKey{ 177 Did: did, 178 } 179 pk.Key = key 180 if err := h.db.AddPublicKey(pk); err != nil { 181 l.Error("failed to add public key", "error", err) 182 return fmt.Errorf("failed to add public key: %w", err) 183 } 184 } 185 return nil 186} 187 188func (h *Knot) processMessages(ctx context.Context, event *models.Event) error { 189 if event.Kind != models.EventKindCommit { 190 return nil 191 } 192 193 var err error 194 defer func() { 195 eventTime := event.TimeUS 196 lastTimeUs := eventTime + 1 197 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 198 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 199 } 200 }() 201 202 switch event.Commit.Collection { 203 case tangled.PublicKeyNSID: 204 err = h.processPublicKey(ctx, event) 205 case tangled.KnotMemberNSID: 206 err = h.processKnotMember(ctx, event) 207 case tangled.RepoCollaboratorNSID: 208 err = h.processCollaborator(ctx, event) 209 } 210 211 if err != nil { 212 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) 213 } 214 215 return nil 216}