this repo has no description
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "strings"
11 "time"
12
13 "github.com/sotangled/tangled/api/tangled"
14 "github.com/sotangled/tangled/knotserver/db"
15 "github.com/sotangled/tangled/knotserver/jsclient"
16 "github.com/sotangled/tangled/log"
17)
18
19func (h *Handle) StartJetstream(ctx context.Context) error {
20 l := h.l.With("component", "jetstream")
21 ctx = log.IntoContext(ctx, l)
22 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID}
23 dids := []string{}
24
25 lastTimeUs, err := h.getLastTimeUs(ctx)
26 if err != nil {
27 return err
28 }
29
30 h.js = jsclient.NewJetstreamClient(collections, dids)
31 messages, err := h.js.ReadJetstream(ctx, lastTimeUs)
32 if err != nil {
33 return fmt.Errorf("failed to read from jetstream: %w", err)
34 }
35
36 go h.processMessages(ctx, messages)
37
38 return nil
39}
40
41func (h *Handle) getLastTimeUs(ctx context.Context) (int64, error) {
42 l := log.FromContext(ctx)
43 lastTimeUs, err := h.db.GetLastTimeUs()
44 if err != nil {
45 l.Info("couldn't get last time us, starting from now")
46 lastTimeUs = time.Now().UnixMicro()
47 }
48
49 // If last time is older than a week, start from now
50 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
51 lastTimeUs = time.Now().UnixMicro()
52 l.Info("last time us is older than a week. discarding that and starting from now")
53 err = h.db.SaveLastTimeUs(lastTimeUs)
54 if err != nil {
55 l.Error("failed to save last time us")
56 }
57 }
58
59 l.Info("found last time_us", "time_us", lastTimeUs)
60 return lastTimeUs, nil
61}
62
63func (h *Handle) processPublicKey(ctx context.Context, did string, record map[string]interface{}) error {
64 l := log.FromContext(ctx)
65 if err := h.db.AddPublicKeyFromRecord(did, record); err != nil {
66 l.Error("failed to add public key", "error", err)
67 return fmt.Errorf("failed to add public key: %w", err)
68 }
69 l.Info("added public key from firehose", "did", did)
70 return nil
71}
72
73func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
74 l := log.FromContext(ctx)
75
76 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
77 if err != nil {
78 l.Error("error building endpoint url", "did", did, "error", err.Error())
79 return fmt.Errorf("error building endpoint url: %w", err)
80 }
81
82 resp, err := http.Get(keysEndpoint)
83 if err != nil {
84 l.Error("error getting keys", "did", did, "error", err)
85 return fmt.Errorf("error getting keys: %w", err)
86 }
87 defer resp.Body.Close()
88
89 if ct := resp.Header.Get("Content-Type"); !strings.HasPrefix(ct, "text/plain") {
90 l.Error("unexpected content type", "content-type", ct)
91 return fmt.Errorf("unexpected content type: %s", ct)
92 }
93
94 plaintext, err := io.ReadAll(resp.Body)
95 if err != nil {
96 l.Error("error reading response body", "error", err)
97 return fmt.Errorf("error reading response body: %w", err)
98 }
99
100 for _, key := range strings.Split(string(plaintext), "\n") {
101 if key == "" {
102 continue
103 }
104 pk := db.PublicKey{
105 Did: did,
106 }
107 pk.Key = key
108 if err := h.db.AddPublicKey(pk); err != nil {
109 l.Error("failed to add public key", "error", err)
110 return fmt.Errorf("failed to add public key: %w", err)
111 }
112 }
113 return nil
114}
115
116func (h *Handle) processKnotMember(ctx context.Context, did string, record map[string]interface{}) error {
117 l := log.FromContext(ctx)
118
119 if record["domain"] != h.c.Server.Hostname {
120 l.Error("domain mismatch", "domain", record["domain"], "expected", h.c.Server.Hostname)
121 return fmt.Errorf("domain mismatch: %s != %s", record["domain"], h.c.Server.Hostname)
122 }
123
124 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
125 if err != nil || !ok {
126 l.Error("failed to add member", "did", did)
127 return fmt.Errorf("failed to enforce permissions: %w", err)
128 }
129
130 l.Info("adding member")
131 if err := h.e.AddMember(ThisServer, record["member"].(string)); err != nil {
132 l.Error("failed to add member", "error", err)
133 return fmt.Errorf("failed to add member: %w", err)
134 }
135 l.Info("added member from firehose", "member", record["member"])
136
137 if err := h.db.AddDid(did); err != nil {
138 l.Error("failed to add did", "error", err)
139 return fmt.Errorf("failed to add did: %w", err)
140 }
141
142 if err := h.fetchAndAddKeys(ctx, did); err != nil {
143 return fmt.Errorf("failed to fetch and add keys: %w", err)
144 }
145
146 h.js.UpdateDids([]string{did})
147 return nil
148}
149
150func (h *Handle) processMessages(ctx context.Context, messages <-chan []byte) {
151 l := log.FromContext(ctx)
152 l.Info("waiting for knot to be initialized")
153 <-h.init
154 l.Info("initialized jetstream watcher")
155
156 for msg := range messages {
157 var data map[string]interface{}
158 if err := json.Unmarshal(msg, &data); err != nil {
159 l.Error("error unmarshaling message", "error", err)
160 continue
161 }
162
163 if kind, ok := data["kind"].(string); ok && kind == "commit" {
164 commit := data["commit"].(map[string]interface{})
165 did := data["did"].(string)
166 record := commit["record"].(map[string]interface{})
167
168 var processErr error
169 switch commit["collection"].(string) {
170 case tangled.PublicKeyNSID:
171 if err := h.processPublicKey(ctx, did, record); err != nil {
172 processErr = fmt.Errorf("failed to process public key: %w", err)
173 }
174 case tangled.KnotMemberNSID:
175 if err := h.processKnotMember(ctx, did, record); err != nil {
176 processErr = fmt.Errorf("failed to process knot member: %w", err)
177 }
178 }
179
180 if processErr != nil {
181 l.Error("error processing message", "error", processErr)
182 continue
183 }
184
185 lastTimeUs := int64(data["time_us"].(float64))
186 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
187 l.Error("failed to save last time us", "error", err)
188 continue
189 }
190 }
191 }
192}